Database Exploration Assignment
Beyond SQLite and FAISS
Explore different databases for different use cases
Assignment Overview
What You'll Build
A comprehensive database exploration system that: - Compares different databases - SQLite, PostgreSQL, MongoDB, Elasticsearch - Optimizes for different use cases - OLTP, OLAP, search, analytics - Implements data pipelines - ETL processes and data synchronization - Benchmarks performance - Speed, scalability, and resource usage - Handles different data types - Structured, semi-structured, and unstructured - Provides data insights - Analytics and reporting capabilities
Problem Statement
Database Limitations
The current FastOpp system uses SQLite and FAISS, which have limitations: - Scalability - SQLite doesn't scale well for large datasets - Concurrency - Limited concurrent read/write operations - Search capabilities - Basic text search and vector similarity - Data types - Limited support for complex data structures - Performance - Not optimized for specific use cases - Analytics - Limited analytical and reporting capabilities
Your Solution
Multi-Database Architecture
Create a comprehensive database system that addresses these limitations:
- Database Selection - Choose the right database for each use case
- Data Modeling - Design optimal schemas for different databases
- Performance Optimization - Indexing, query optimization, and caching
- Data Synchronization - Keep multiple databases in sync
- Analytics Pipeline - Extract insights from your data
- Monitoring - Track performance and usage patterns
Technical Requirements
Tech Stack
- PostgreSQL - Advanced relational database
- MongoDB - Document-based NoSQL database
- Elasticsearch - Full-text search and analytics
- Redis - In-memory caching and session storage
- ClickHouse - Columnar database for analytics
- Neo4j - Graph database for relationships
- Apache Kafka - Stream processing and data pipelines
Database Comparison
Use Case Matrix
Database | Best For | Strengths | Weaknesses |
---|---|---|---|
SQLite | Development, small apps | Simple, embedded, no setup | Limited concurrency, no network |
PostgreSQL | Complex queries, ACID | Advanced SQL, JSON support | Memory usage, complexity |
MongoDB | Document storage, flexibility | Schema flexibility, scaling | No joins, eventual consistency |
Elasticsearch | Search, analytics | Full-text search, aggregations | Not ACID, resource intensive |
Redis | Caching, sessions | Speed, data structures | Memory only, no persistence |
ClickHouse | Analytics, OLAP | Columnar storage, speed | Not OLTP, limited updates |
Neo4j | Graph relationships | Graph queries, relationships | Not for tabular data |
Core Components
1. Database Connectors
# src/databases/connectors.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
import asyncio
import asyncpg
import motor.motor_asyncio
from elasticsearch import AsyncElasticsearch
import redis.asyncio as redis
import clickhouse_connect
from neo4j import AsyncGraphDatabase
class BaseDatabase(ABC):
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.client = None
@abstractmethod
async def connect(self) -> bool:
"""Connect to the database"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the database"""
pass
@abstractmethod
async def query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Execute a query and return results"""
pass
class PostgreSQLConnector(BaseDatabase):
async def connect(self) -> bool:
try:
self.client = await asyncpg.connect(self.connection_string)
return True
except Exception as e:
print(f"PostgreSQL connection failed: {e}")
return False
async def disconnect(self) -> None:
if self.client:
await self.client.close()
async def query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
if not self.client:
raise ValueError("Not connected to database")
rows = await self.client.fetch(query, *(params or {}).values())
return [dict(row) for row in rows]
class MongoDBConnector(BaseDatabase):
async def connect(self) -> bool:
try:
self.client = motor.motor_asyncio.AsyncIOMotorClient(self.connection_string)
await self.client.admin.command('ping')
return True
except Exception as e:
print(f"MongoDB connection failed: {e}")
return False
async def disconnect(self) -> None:
if self.client:
self.client.close()
async def query(self, collection: str, filter: Dict[str, Any] = None,
projection: Dict[str, Any] = None) -> List[Dict[str, Any]]:
if not self.client:
raise ValueError("Not connected to database")
db = self.client.get_default_database()
cursor = db[collection].find(filter or {}, projection)
return await cursor.to_list(length=None)
class ElasticsearchConnector(BaseDatabase):
async def connect(self) -> bool:
try:
self.client = AsyncElasticsearch([self.connection_string])
await self.client.ping()
return True
except Exception as e:
print(f"Elasticsearch connection failed: {e}")
return False
async def disconnect(self) -> None:
if self.client:
await self.client.close()
async def query(self, index: str, body: Dict[str, Any]) -> List[Dict[str, Any]]:
if not self.client:
raise ValueError("Not connected to database")
response = await self.client.search(index=index, body=body)
return [hit['_source'] for hit in response['hits']['hits']]
class RedisConnector(BaseDatabase):
async def connect(self) -> bool:
try:
self.client = redis.from_url(self.connection_string)
await self.client.ping()
return True
except Exception as e:
print(f"Redis connection failed: {e}")
return False
async def disconnect(self) -> None:
if self.client:
await self.client.close()
async def query(self, command: str, *args) -> Any:
if not self.client:
raise ValueError("Not connected to database")
return await self.client.execute_command(command, *args)
Core Components
2. Data Synchronization
# src/databases/sync.py
import asyncio
from typing import Dict, Any, List
from datetime import datetime
import json
class DatabaseSynchronizer:
def __init__(self, source_db: BaseDatabase, target_dbs: List[BaseDatabase]):
self.source_db = source_db
self.target_dbs = target_dbs
self.sync_log = []
async def sync_table(self, table_name: str,
sync_strategy: str = 'full') -> Dict[str, Any]:
"""Synchronize a table from source to target databases"""
sync_start = datetime.now()
try:
# Get data from source
if sync_strategy == 'full':
data = await self._full_sync(table_name)
elif sync_strategy == 'incremental':
data = await self._incremental_sync(table_name)
else:
raise ValueError(f"Unknown sync strategy: {sync_strategy}")
# Sync to target databases
sync_results = {}
for target_db in self.target_dbs:
result = await self._sync_to_target(target_db, table_name, data)
sync_results[target_db.__class__.__name__] = result
sync_end = datetime.now()
sync_duration = (sync_end - sync_start).total_seconds()
sync_info = {
'table': table_name,
'strategy': sync_strategy,
'records_synced': len(data),
'duration_seconds': sync_duration,
'target_results': sync_results,
'timestamp': sync_start.isoformat()
}
self.sync_log.append(sync_info)
return sync_info
except Exception as e:
error_info = {
'table': table_name,
'strategy': sync_strategy,
'error': str(e),
'timestamp': sync_start.isoformat()
}
self.sync_log.append(error_info)
raise
async def _full_sync(self, table_name: str) -> List[Dict[str, Any]]:
"""Perform full table synchronization"""
query = f"SELECT * FROM {table_name}"
return await self.source_db.query(query)
async def _incremental_sync(self, table_name: str) -> List[Dict[str, Any]]:
"""Perform incremental synchronization"""
# Get last sync timestamp
last_sync = await self._get_last_sync_timestamp(table_name)
# Query for records modified since last sync
query = f"""
SELECT * FROM {table_name}
WHERE updated_at > %s
ORDER BY updated_at
"""
return await self.source_db.query(query, {'last_sync': last_sync})
async def _sync_to_target(self, target_db: BaseDatabase,
table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sync data to a specific target database"""
start_time = datetime.now()
try:
if isinstance(target_db, MongoDBConnector):
result = await self._sync_to_mongodb(target_db, table_name, data)
elif isinstance(target_db, ElasticsearchConnector):
result = await self._sync_to_elasticsearch(target_db, table_name, data)
elif isinstance(target_db, RedisConnector):
result = await self._sync_to_redis(target_db, table_name, data)
else:
result = await self._sync_to_sql(target_db, table_name, data)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
return {
'success': True,
'records_processed': len(data),
'duration_seconds': duration,
'result': result
}
except Exception as e:
return {
'success': False,
'error': str(e),
'records_processed': 0
}
async def _sync_to_mongodb(self, target_db: MongoDBConnector,
table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sync data to MongoDB"""
db = target_db.client.get_default_database()
collection = db[table_name]
# Clear existing data
await collection.delete_many({})
# Insert new data
if data:
await collection.insert_many(data)
return {'operation': 'insert_many', 'count': len(data)}
async def _sync_to_elasticsearch(self, target_db: ElasticsearchConnector,
table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sync data to Elasticsearch"""
index_name = table_name.lower()
# Create index if it doesn't exist
if not await target_db.client.indices.exists(index=index_name):
await target_db.client.indices.create(index=index_name)
# Bulk insert data
if data:
bulk_body = []
for doc in data:
bulk_body.append({
'index': {
'_index': index_name,
'_id': doc.get('id', doc.get('_id'))
}
})
bulk_body.append(doc)
response = await target_db.client.bulk(body=bulk_body)
return {'operation': 'bulk_insert', 'count': len(data)}
return {'operation': 'bulk_insert', 'count': 0}
async def _sync_to_redis(self, target_db: RedisConnector,
table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sync data to Redis"""
key_prefix = f"{table_name}:"
# Clear existing keys
pattern = f"{key_prefix}*"
keys = await target_db.client.keys(pattern)
if keys:
await target_db.client.delete(*keys)
# Set new data
for doc in data:
key = f"{key_prefix}{doc.get('id', doc.get('_id'))}"
await target_db.client.set(key, json.dumps(doc))
return {'operation': 'set', 'count': len(data)}
async def _sync_to_sql(self, target_db: BaseDatabase,
table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sync data to SQL database"""
if not data:
return {'operation': 'insert', 'count': 0}
# Get column names from first record
columns = list(data[0].keys())
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
# Create table if it doesn't exist
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join([f'{col} TEXT' for col in columns])}
)
"""
await target_db.query(create_table_query)
# Insert data
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES ({placeholders})
"""
for doc in data:
values = [doc.get(col) for col in columns]
await target_db.query(insert_query, values)
return {'operation': 'insert', 'count': len(data)}
Core Components
3. Performance Benchmarking
# src/databases/benchmark.py
import asyncio
import time
from typing import Dict, Any, List
import statistics
from dataclasses import dataclass
@dataclass
class BenchmarkResult:
operation: str
database: str
records: int
duration_seconds: float
records_per_second: float
memory_usage_mb: float
cpu_usage_percent: float
class DatabaseBenchmark:
def __init__(self, databases: Dict[str, BaseDatabase]):
self.databases = databases
self.results = []
async def benchmark_read_operations(self, table_name: str,
record_counts: List[int] = [100, 1000, 10000]) -> List[BenchmarkResult]:
"""Benchmark read operations across different databases"""
results = []
for db_name, db in self.databases.items():
for record_count in record_counts:
# Generate test data
test_data = await self._generate_test_data(record_count)
# Benchmark read operations
result = await self._benchmark_read(db, table_name, test_data)
result.database = db_name
result.records = record_count
results.append(result)
return results
async def benchmark_write_operations(self, table_name: str,
record_counts: List[int] = [100, 1000, 10000]) -> List[BenchmarkResult]:
"""Benchmark write operations across different databases"""
results = []
for db_name, db in self.databases.items():
for record_count in record_counts:
# Generate test data
test_data = await self._generate_test_data(record_count)
# Benchmark write operations
result = await self._benchmark_write(db, table_name, test_data)
result.database = db_name
result.records = record_count
results.append(result)
return results
async def benchmark_search_operations(self, table_name: str,
search_queries: List[str]) -> List[BenchmarkResult]:
"""Benchmark search operations across different databases"""
results = []
for db_name, db in self.databases.items():
for query in search_queries:
result = await self._benchmark_search(db, table_name, query)
result.database = db_name
result.records = 0 # Search results vary
results.append(result)
return results
async def _benchmark_read(self, db: BaseDatabase, table_name: str,
test_data: List[Dict[str, Any]]) -> BenchmarkResult:
"""Benchmark read operations for a specific database"""
start_time = time.time()
start_memory = self._get_memory_usage()
start_cpu = self._get_cpu_usage()
# Perform read operations
if isinstance(db, MongoDBConnector):
await db.query(table_name, {})
elif isinstance(db, ElasticsearchConnector):
await db.query(table_name, {'query': {'match_all': {}}})
else:
await db.query(f"SELECT * FROM {table_name}")
end_time = time.time()
end_memory = self._get_memory_usage()
end_cpu = self._get_cpu_usage()
duration = end_time - start_time
memory_usage = end_memory - start_memory
cpu_usage = end_cpu - start_cpu
return BenchmarkResult(
operation='read',
database='',
records=len(test_data),
duration_seconds=duration,
records_per_second=len(test_data) / duration if duration > 0 else 0,
memory_usage_mb=memory_usage,
cpu_usage_percent=cpu_usage
)
async def _benchmark_write(self, db: BaseDatabase, table_name: str,
test_data: List[Dict[str, Any]]) -> BenchmarkResult:
"""Benchmark write operations for a specific database"""
start_time = time.time()
start_memory = self._get_memory_usage()
start_cpu = self._get_cpu_usage()
# Perform write operations
if isinstance(db, MongoDBConnector):
db_client = db.client.get_default_database()
collection = db_client[table_name]
await collection.insert_many(test_data)
elif isinstance(db, ElasticsearchConnector):
# Bulk insert to Elasticsearch
bulk_body = []
for doc in test_data:
bulk_body.append({'index': {'_index': table_name}})
bulk_body.append(doc)
await db.client.bulk(body=bulk_body)
else:
# SQL insert
if test_data:
columns = list(test_data[0].keys())
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES ({placeholders})
"""
for doc in test_data:
values = [doc.get(col) for col in columns]
await db.query(insert_query, values)
end_time = time.time()
end_memory = self._get_memory_usage()
end_cpu = self._get_cpu_usage()
duration = end_time - start_time
memory_usage = end_memory - start_memory
cpu_usage = end_cpu - start_cpu
return BenchmarkResult(
operation='write',
database='',
records=len(test_data),
duration_seconds=duration,
records_per_second=len(test_data) / duration if duration > 0 else 0,
memory_usage_mb=memory_usage,
cpu_usage_percent=cpu_usage
)
async def _benchmark_search(self, db: BaseDatabase, table_name: str,
query: str) -> BenchmarkResult:
"""Benchmark search operations for a specific database"""
start_time = time.time()
start_memory = self._get_memory_usage()
start_cpu = self._get_cpu_usage()
# Perform search operations
if isinstance(db, MongoDBConnector):
await db.query(table_name, {'$text': {'$search': query}})
elif isinstance(db, ElasticsearchConnector):
search_body = {
'query': {
'multi_match': {
'query': query,
'fields': ['*']
}
}
}
await db.query(table_name, search_body)
else:
await db.query(f"""
SELECT * FROM {table_name}
WHERE content ILIKE %s
""", {'query': f'%{query}%'})
end_time = time.time()
end_memory = self._get_memory_usage()
end_cpu = self._get_cpu_usage()
duration = end_time - start_time
memory_usage = end_memory - start_memory
cpu_usage = end_cpu - start_cpu
return BenchmarkResult(
operation='search',
database='',
records=0, # Search results vary
duration_seconds=duration,
records_per_second=0,
memory_usage_mb=memory_usage,
cpu_usage_percent=cpu_usage
)
def generate_report(self, results: List[BenchmarkResult]) -> str:
"""Generate a comprehensive benchmark report"""
report = "# Database Performance Benchmark Report\n\n"
# Group results by operation
operations = {}
for result in results:
if result.operation not in operations:
operations[result.operation] = []
operations[result.operation].append(result)
for operation, op_results in operations.items():
report += f"## {operation.title()} Operations\n\n"
# Group by database
databases = {}
for result in op_results:
if result.database not in databases:
databases[result.database] = []
databases[result.database].append(result)
for db_name, db_results in databases.items():
report += f"### {db_name}\n\n"
# Calculate statistics
durations = [r.duration_seconds for r in db_results]
throughputs = [r.records_per_second for r in db_results]
memory_usage = [r.memory_usage_mb for r in db_results]
cpu_usage = [r.cpu_usage_percent for r in db_results]
report += f"- **Average Duration**: {statistics.mean(durations):.4f}s\n"
report += f"- **Average Throughput**: {statistics.mean(throughputs):.2f} records/s\n"
report += f"- **Average Memory Usage**: {statistics.mean(memory_usage):.2f} MB\n"
report += f"- **Average CPU Usage**: {statistics.mean(cpu_usage):.2f}%\n\n"
return report
Success Criteria
Must-Have Features
- Multiple Database Support - Connect to at least 3 different databases
- Data Synchronization - Keep multiple databases in sync
- Performance Benchmarking - Compare database performance
- Data Modeling - Design optimal schemas for each database
- Query Optimization - Optimize queries for each database type
- Monitoring - Track database performance and usage
- Documentation - Comprehensive documentation of findings
- Testing - Unit tests and integration tests
Bonus Challenges
Advanced Features
- Data Pipeline - Implement ETL processes
- Real-time Sync - Stream data between databases
- Data Quality - Implement data validation and cleaning
- Backup Strategy - Implement backup and recovery
- Security - Implement proper authentication and authorization
- Scalability - Test with large datasets
- Analytics - Extract insights from your data
- Visualization - Create dashboards for monitoring
Getting Started
Setup Instructions
- Set up databases - Install and configure different databases
- Implement connectors - Create database connection classes
- Design schemas - Create optimal schemas for each database
- Build sync system - Implement data synchronization
- Create benchmarks - Build performance testing framework
- Run experiments - Test different scenarios and configurations
- Analyze results - Compare performance and identify best practices
- Document findings - Create comprehensive documentation
Dependencies
requirements.txt
asyncpg>=0.28.0
motor>=3.3.0
elasticsearch>=8.8.0
redis>=4.6.0
clickhouse-connect>=0.6.0
neo4j>=5.12.0
kafka-python>=2.0.2
pandas>=2.0.0
matplotlib>=3.7.0
seaborn>=0.12.0
psutil>=5.9.0
pytest>=7.0.0
Resources
Helpful Links
- PostgreSQL - https://www.postgresql.org/
- MongoDB - https://www.mongodb.com/
- Elasticsearch - https://www.elastic.co/elasticsearch/
- Redis - https://redis.io/
- ClickHouse - https://clickhouse.com/
- Neo4j - https://neo4j.com/
- Apache Kafka - https://kafka.apache.org/
Let's Explore Databases!
Ready to Start?
This assignment will teach you: - Different database types and their use cases - Data modeling and schema design - Performance optimization and benchmarking - Data synchronization and ETL processes - Database monitoring and maintenance - Best practices for database selection
Start with one database and build up to a comprehensive multi-database system!
Next Steps
After Completing This Assignment
- Deploy your system - Set up production databases
- Monitor performance - Track database performance in production
- Share your findings - Document your database comparison results
- Contribute to open source - Share your database connectors
- Move to the next track - Try machine learning pipelines or data visualization next!
Happy database exploring! 🚀