Architecture Deep Dive
Welcome to the comprehensive technical architecture documentation for the Corgi Recommender Service. This document provides an in-depth look at the system's inner workings, design decisions, and implementation details.
Table of Contents
- High-Level System Overview
- Service Architecture & Orchestration
- Core Application Layer
- Asynchronous Worker Queue System
- RAG (Retrieval-Augmented Generation) System
- Agent Framework
- Recommendation Engine
- API Layer & Authentication
- Database Architecture
- ELK Integration Layer
- Frontend Integration
- Security Architecture
- Performance & Monitoring
High-Level System Overview
The Corgi Recommender Service is a privacy-first, multi-layered recommendation platform designed for the Fediverse. The system combines traditional recommendation algorithms with modern AI capabilities through a sophisticated agent framework.
graph TB
subgraph External [External Services]
ELK[ELK Client]
MAST[Mastodon Instances]
CLAUDE[Claude API]
end
subgraph Gateway [API Gateway Layer]
PROXY[Corgi Proxy]
API[Core API]
end
subgraph Processing [Processing Layer]
AGENTS[Agent Framework]
RAG[RAG System]
RECS[Recommendation Engine]
end
subgraph Storage [Storage Layer]
POSTGRES[(PostgreSQL)]
REDIS[(Redis)]
EMBEDDINGS[(Vector Store)]
end
subgraph Workers [Background Processing]
CELERY[Celery Workers]
TASKS[Scheduled Tasks]
end
ELK --> PROXY
PROXY --> API
API --> AGENTS
API --> RAG
API --> RECS
AGENTS --> CLAUDE
RAG --> EMBEDDINGS
RECS --> POSTGRES
CELERY --> MAST
CELERY --> POSTGRES
CELERY --> REDIS
classDef external fill:#e1f5fe,stroke:#0277bd,color:#000
classDef gateway fill:#f3e5f5,stroke:#7b1fa2,color:#000
classDef processing fill:#e8f5e8,stroke:#388e3c,color:#000
classDef storage fill:#fff3e0,stroke:#f57c00,color:#000
classDef workers fill:#fce4ec,stroke:#c2185b,color:#000
class ELK,MAST,CLAUDE external
class PROXY,API gateway
class AGENTS,RAG,RECS processing
class POSTGRES,REDIS,EMBEDDINGS storage
class CELERY,TASKS workers
Core Design Principles
Privacy First: User data never leaves the processing pipeline without explicit consent. All recommendation processing happens server-side with minimal data retention.
Modularity: Each component is designed as an independent service that can be scaled, updated, or replaced without affecting others.
Extensibility: The agent framework allows for easy integration of new AI capabilities and recommendation strategies.
Performance: Multi-layered caching and asynchronous processing ensure sub-100ms response times for most operations.
Service Architecture & Orchestration
The Corgi platform uses Docker Compose for service orchestration, providing a consistent development and deployment environment.
Service Topology
# docker-compose.yml structure
services:
corgi-api: # Core Flask application
corgi-proxy: # Request routing and caching
elk-client: # ELK integration service
postgres: # Primary database
redis: # Caching and session storage
celery-worker: # Background task processing
celery-beat: # Scheduled task coordinator
flower: # Celery monitoring
Key Infrastructure Components
Corgi API Server (corgi-api)
- Flask-based REST API server
- Port: 5002 (configurable via CORGI_PORT)
- Handles all recommendation logic, user management, and agent coordination
Corgi Proxy (corgi-proxy)
- Nginx-based reverse proxy
- Port: 5003 (configurable via CORGI_PROXY_PORT)
- Provides SSL termination, request routing, and response caching
ELK Client Service (elk-client)
- Dedicated service for ELK integration
- Port: 5314 (configurable via ELK_PORT)
- Handles seamless timeline enhancement and user detection
Network Architecture
# Network configuration example
CORGI_INTERNAL_NETWORK = "corgi-internal"
EXTERNAL_PORTS = {
"api": 5002,
"proxy": 5003,
"elk": 5314,
"flower": 5555
}
All services communicate through a dedicated Docker network, ensuring isolation and security while maintaining high-performance inter-service communication.
Core Application Layer
The heart of Corgi is a Flask-based application (app.py) that orchestrates all recommendation services and API endpoints.
Application Structure
# app.py - Core application initialization
from flask import Flask, request, jsonify
from flask_cors import CORS
from utils.auth import require_auth
from routes import register_all_routes
app = Flask(__name__)
CORS(app, origins=['*']) # Configured for development
# Route registration
register_all_routes(app)
# Health check endpoint
@app.route('/health')
def health_check():
return jsonify({"status": "healthy", "version": "2.0.0"})
Request Processing Pipeline
- Authentication Layer: Validates Bearer tokens and Mastodon instance headers
- Rate Limiting: Prevents abuse and ensures fair resource allocation
- Route Dispatch: Directs requests to appropriate handlers
- Business Logic: Executes recommendation algorithms or data operations
- Response Formation: Formats and returns JSON responses
Key Application Components
Route Management (routes/)
- recommendations.py: Core recommendation endpoints
- interactions.py: User interaction tracking
- timeline.py: Timeline enhancement and filtering
- analytics.py: A/B testing and performance metrics
Utility Layer (utils/)
- auth.py: Authentication and authorization
- recommendation_engine.py: Core recommendation logic
- ab_testing.py: Experimental framework
- privacy.py: Data protection and anonymization
Asynchronous Worker Queue System
Corgi uses Celery with Redis as the message broker for handling time-intensive operations asynchronously.
Worker Architecture
# utils/celery_app.py
from celery import Celery
from kombu import Queue
celery_app = Celery('corgi')
celery_app.conf.update(
broker_url='redis://redis:6379/0',
result_backend='redis://redis:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Task routing
celery_app.conf.task_routes = {
'tasks.content_crawler.*': {'queue': 'crawler'},
'tasks.analytics_tasks.*': {'queue': 'analytics'},
'tasks.recommendation_tasks.*': {'queue': 'recommendations'},
}
Task Categories
Content Crawling (tasks/content_crawler.py)
- Fetches fresh content from Mastodon instances
- Processes and enriches posts with metadata
- Handles rate limiting and error recovery
Analytics Processing (tasks/analytics_tasks.py)
- Computes recommendation performance metrics
- Processes A/B test results and statistical analysis
- Generates user engagement reports
Recommendation Tasks (tasks/recommendation_tasks.py)
- Pre-computes recommendation vectors for active users
- Handles batch processing of user preference updates
- Manages recommendation cache invalidation
Task Execution Flow
# Example task execution
from tasks.content_crawler import fetch_timeline_posts
# Asynchronous execution
result = fetch_timeline_posts.delay(
instance_url="mastodon.social",
user_token="bearer_token",
limit=100
)
# Result retrieval
posts = result.get(timeout=30)
RAG (Retrieval-Augmented Generation) System
The RAG system provides intelligent code analysis and documentation capabilities, powering the development workflow and user support.
RAG Architecture
graph TB
subgraph Input [Input Processing]
CODE[Code Repository]
DOCS[Documentation]
LOGS[Development Logs]
end
subgraph Processing [RAG Pipeline]
CHUNK[Content Chunking]
EMBED[Embedding Generation]
STORE[Vector Storage]
QUERY[Query Processing]
end
subgraph AI [AI Integration]
CLAUDE[Claude API]
RESPONSE[Response Generation]
end
CODE --> CHUNK
DOCS --> CHUNK
LOGS --> CHUNK
CHUNK --> EMBED
EMBED --> STORE
QUERY --> STORE
STORE --> CLAUDE
CLAUDE --> RESPONSE
classDef input fill:#e3f2fd,stroke:#1976d2,color:#000
classDef processing fill:#f1f8e9,stroke:#689f38,color:#000
classDef ai fill:#fce4ec,stroke:#c2185b,color:#000
class CODE,DOCS,LOGS input
class CHUNK,EMBED,STORE,QUERY processing
class CLAUDE,RESPONSE ai
Core RAG Components
Knowledge Base Builder (agents/rag_core.py)
class RAGKnowledgeBase:
def __init__(self, embedding_model="all-MiniLM-L6-v2"):
self.embedding_model = embedding_model
self.vector_store = PostgresVectorStore()
def ingest_codebase(self, repo_path):
"""Process and index entire codebase"""
chunks = self.chunk_code_files(repo_path)
embeddings = self.generate_embeddings(chunks)
self.vector_store.store_embeddings(embeddings)
Query Interface (scripts/cursor_rag_query.py)
def query_knowledge_base(query: str, limit: int = 5):
"""Query the RAG system for relevant information"""
query_embedding = generate_embedding(query)
similar_chunks = vector_store.similarity_search(
query_embedding, limit=limit
)
context = "\n".join([chunk.content for chunk in similar_chunks])
response = claude_client.generate_response(query, context)
return {
"answer": response,
"sources": [chunk.metadata for chunk in similar_chunks]
}
RAG Data Sources
- Codebase Analysis: All Python files, configurations, and scripts
- Documentation: Markdown files, API docs, and user guides
- Development History: Commit messages, pull requests, and development logs
- Test Coverage: Unit tests, integration tests, and test reports
Agent Framework
The agent framework provides a sophisticated AI orchestration system that coordinates multiple specialized agents for different tasks.
Agent Architecture
# agents/manager_agent.py
class ManagerAgent:
def __init__(self):
self.agents = {
'recommendation': RecommendationAgent(),
'content_analysis': ContentAnalysisAgent(),
'privacy_compliance': PrivacyAgent(),
'performance_optimization': PerformanceAgent()
}
def delegate_task(self, task_type, context):
"""Route tasks to appropriate specialized agents"""
agent = self.agents.get(task_type)
if agent:
return agent.execute(context)
else:
raise ValueError(f"No agent available for task: {task_type}")
Specialized Agents
Recommendation Agent - Analyzes user behavior patterns - Generates personalized content recommendations - Adapts to user feedback in real-time
Content Analysis Agent - Processes and categorizes social media content - Identifies trending topics and sentiment - Filters inappropriate or spam content
Privacy Compliance Agent - Ensures all data processing meets privacy standards - Handles data anonymization and retention policies - Monitors for potential privacy violations
Performance Optimization Agent - Analyzes system performance metrics - Suggests optimization strategies - Manages resource allocation and caching
Agent Communication Protocol
# agents/agent_config.py
AGENT_COMMUNICATION_PROTOCOL = {
"message_format": "json",
"timeout": 30,
"retry_attempts": 3,
"fallback_strategy": "graceful_degradation"
}
class AgentMessage:
def __init__(self, sender, recipient, task_type, payload):
self.sender = sender
self.recipient = recipient
self.task_type = task_type
self.payload = payload
self.timestamp = datetime.utcnow()
Recommendation Engine
The recommendation engine is the core intelligence of Corgi, combining multiple algorithms to provide personalized content recommendations.
Recommendation Algorithm Stack
# core/ranking_algorithm.py
class CoreRankingAlgorithm:
def __init__(self):
self.algorithms = {
'semantic_similarity': SemanticSimilarityAlgorithm(),
'engagement_prediction': EngagementPredictionAlgorithm(),
'temporal_relevance': TemporalRelevanceAlgorithm(),
'social_graph': SocialGraphAlgorithm()
}
def compute_recommendations(self, user_profile, candidate_posts):
"""Multi-algorithm recommendation computation"""
scores = {}
for algorithm_name, algorithm in self.algorithms.items():
algorithm_scores = algorithm.score_posts(user_profile, candidate_posts)
scores[algorithm_name] = algorithm_scores
# Weighted combination of scores
final_scores = self.combine_scores(scores)
return self.rank_posts(candidate_posts, final_scores)
Algorithm Components
Semantic Similarity - Uses sentence transformers for content understanding - Computes cosine similarity between user preferences and post content - Handles multilingual content with language detection
Engagement Prediction - Predicts likelihood of user interaction (like, boost, reply) - Based on historical interaction patterns - Considers temporal patterns and user activity cycles
Temporal Relevance - Balances fresh content with evergreen posts - Considers posting time relative to user timezone - Implements decay functions for time-sensitive content
Social Graph Analysis - Analyzes user's social connections and their activity - Identifies influential users in user's network - Recommends content from trusted sources
Recommendation Pipeline
def generate_recommendations(user_id, limit=20):
"""Main recommendation generation pipeline"""
# 1. User Profile Retrieval
user_profile = get_user_profile(user_id)
# 2. Candidate Post Generation
candidate_posts = get_candidate_posts(user_profile, limit * 10)
# 3. Algorithm Execution
scored_posts = ranking_algorithm.compute_recommendations(
user_profile, candidate_posts
)
# 4. Diversity and Freshness Filters
filtered_posts = apply_diversity_filters(scored_posts)
# 5. Privacy and Content Filtering
safe_posts = apply_content_filters(filtered_posts, user_profile)
# 6. Final Ranking and Limiting
return safe_posts[:limit]
API Layer & Authentication
The API layer provides a RESTful interface for all Corgi services, with comprehensive authentication and authorization mechanisms.
Authentication System
# utils/auth.py
from functools import wraps
from flask import request, jsonify
import jwt
def require_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Bearer token validation
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authentication required'}), 401
token = auth_header.split(' ')[1]
# Mastodon instance validation
instance_header = request.headers.get('X-Mastodon-Instance')
if not instance_header:
return jsonify({'error': 'Mastodon instance required'}), 400
# Token verification
try:
decoded_token = verify_mastodon_token(token, instance_header)
request.user_context = decoded_token
except InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated_function
API Endpoints
Core Recommendation Endpoints
# routes/recommendations.py
@app.route('/api/v1/recommendations', methods=['GET'])
@require_auth
def get_recommendations():
"""Get personalized recommendations for authenticated user"""
user_id = request.user_context['user_id']
limit = request.args.get('limit', 20, type=int)
recommendations = generate_recommendations(user_id, limit)
return jsonify({
'recommendations': recommendations,
'metadata': {
'total_count': len(recommendations),
'generation_time': time.time() - start_time,
'algorithm_version': '2.0.0'
}
})
Analytics and Feedback Endpoints
# routes/interactions.py
@app.route('/api/v1/interactions', methods=['POST'])
@require_auth
def record_interaction():
"""Record user interaction with recommended content"""
interaction_data = request.json
# Validate interaction data
if not validate_interaction_data(interaction_data):
return jsonify({'error': 'Invalid interaction data'}), 400
# Store interaction
interaction_id = store_interaction(
user_id=request.user_context['user_id'],
post_id=interaction_data['post_id'],
interaction_type=interaction_data['type'],
timestamp=datetime.utcnow()
)
# Trigger recommendation model update
update_user_preferences.delay(request.user_context['user_id'])
return jsonify({'interaction_id': interaction_id}), 201
Rate Limiting and Security
# utils/rate_limiting.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
# Endpoint-specific rate limits
@app.route('/api/v1/recommendations')
@limiter.limit("30 per minute")
def get_recommendations():
# Implementation
pass
Database Architecture
Corgi uses PostgreSQL as the primary database with Redis for caching and session management.
Database Schema
-- Core user management
CREATE TABLE user_identities (
id SERIAL PRIMARY KEY,
mastodon_user_id VARCHAR(255) NOT NULL,
instance_url VARCHAR(255) NOT NULL,
access_token_hash VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(mastodon_user_id, instance_url)
);
-- Post storage and metadata
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
mastodon_post_id VARCHAR(255) NOT NULL,
instance_url VARCHAR(255) NOT NULL,
content TEXT,
author_id VARCHAR(255),
created_at TIMESTAMP,
engagement_score FLOAT DEFAULT 0.0,
content_vector VECTOR(384), -- Embedding storage
UNIQUE(mastodon_post_id, instance_url)
);
-- User interaction tracking
CREATE TABLE interactions (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES user_identities(id),
post_id INTEGER REFERENCES posts(id),
interaction_type VARCHAR(50) NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
context_data JSONB
);
-- Recommendation cache
CREATE TABLE recommendation_cache (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES user_identities(id),
post_id INTEGER REFERENCES posts(id),
score FLOAT NOT NULL,
algorithm_version VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP
);
Database Optimization
Indexing Strategy
-- Performance indexes
CREATE INDEX idx_user_interactions ON interactions(user_id, timestamp);
CREATE INDEX idx_post_engagement ON posts(engagement_score DESC);
CREATE INDEX idx_recommendation_cache_user ON recommendation_cache(user_id, expires_at);
-- Vector similarity index for embeddings
CREATE INDEX idx_posts_vector ON posts USING ivfflat (content_vector vector_cosine_ops);
Connection Pooling
# db/connection_pool.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
ELK Integration Layer
The ELK integration provides seamless enhancement of Mastodon timelines with AI-powered recommendations.
Integration Architecture
// integrations/elk/corgi-seamless.ts
export class CorgiSeamlessIntegration {
private apiClient: CorgiApiClient;
private userDetector: UserDetector;
private cacheManager: CacheManager;
constructor(config: CorgiConfig) {
this.apiClient = new CorgiApiClient(config);
this.userDetector = new UserDetector();
this.cacheManager = new CacheManager(config.cacheTimeout);
}
async enhanceTimeline(posts: MastodonPost[]): Promise<EnhancedPost[]> {
// 1. User detection and privacy check
const userContext = await this.userDetector.detectUser();
if (!userContext.hasOptedIn) {
return posts; // No enhancement without consent
}
// 2. Get recommendations from Corgi API
const recommendations = await this.apiClient.getRecommendations({
limit: 10,
timelineContext: posts.map(p => p.id)
});
// 3. Merge recommendations with timeline
return this.mergeRecommendations(posts, recommendations);
}
}
User Detection System
// Eight-method user detection for privacy compliance
class UserDetector {
private detectionMethods = [
'localStorage_check',
'sessionStorage_check',
'url_parameter_analysis',
'dom_element_inspection',
'api_response_analysis',
'user_agent_fingerprinting',
'timing_analysis',
'behavioral_pattern_recognition'
];
async detectUser(): Promise<UserContext> {
const results = await Promise.all(
this.detectionMethods.map(method => this[method]())
);
return this.consolidateResults(results);
}
}
Timeline Enhancement
// Vue component integration
export default defineComponent({
name: 'TimelineCorgi',
async setup() {
const corgiIntegration = new CorgiSeamlessIntegration({
apiEndpoint: 'https://corgi.social/api/v1',
cacheTimeout: 300000, // 5 minutes
privacyMode: 'strict'
});
const enhancedPosts = ref<EnhancedPost[]>([]);
const loadTimeline = async () => {
const posts = await fetchTimelinePosts();
enhancedPosts.value = await corgiIntegration.enhanceTimeline(posts);
};
return {
enhancedPosts,
loadTimeline
};
}
});
Frontend Integration
The frontend provides a modern, responsive interface for recommendation management and analytics.
Technology Stack
- Framework: Next.js 13 with App Router
- Language: TypeScript
- Styling: Tailwind CSS
- State Management: React Query + Context API
- Components: Custom component library with shadcn/ui base
Key Frontend Components
Dashboard Overview
// src/components/dashboard/DashboardOverview.tsx
export function DashboardOverview() {
const { data: metrics, isLoading } = useQuery({
queryKey: ['dashboard-metrics'],
queryFn: fetchDashboardMetrics
});
if (isLoading) return <DashboardSkeleton />;
return (
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6">
<MetricCard
title="Total Recommendations"
value={metrics.totalRecommendations}
trend={metrics.recommendationTrend}
/>
<MetricCard
title="User Engagement"
value={`${metrics.engagementRate}%`}
trend={metrics.engagementTrend}
/>
<MetricCard
title="Active Experiments"
value={metrics.activeExperiments}
trend={metrics.experimentTrend}
/>
<MetricCard
title="System Health"
value={metrics.healthScore}
trend={metrics.healthTrend}
/>
</div>
);
}
A/B Testing Interface
// src/components/dashboard/ABTestingExperiments.tsx
export function ABTestingExperiments() {
const [experiments, setExperiments] = useState<Experiment[]>([]);
const [createModalOpen, setCreateModalOpen] = useState(false);
const { mutate: createExperiment } = useMutation({
mutationFn: createABTestExperiment,
onSuccess: () => {
setCreateModalOpen(false);
refetchExperiments();
}
});
return (
<div className="space-y-6">
<div className="flex justify-between items-center">
<h2 className="text-2xl font-bold">A/B Testing Experiments</h2>
<Button onClick={() => setCreateModalOpen(true)}>
Create Experiment
</Button>
</div>
<ExperimentList experiments={experiments} />
<ExperimentCreationModal
isOpen={createModalOpen}
onClose={() => setCreateModalOpen(false)}
onSubmit={createExperiment}
/>
</div>
);
}
API Integration
// src/lib/api-client.ts
export class CorgiApiClient {
private baseUrl: string;
private authToken: string;
constructor(config: ApiConfig) {
this.baseUrl = config.baseUrl;
this.authToken = config.authToken;
}
async getRecommendations(params: RecommendationParams): Promise<Recommendation[]> {
const response = await fetch(`${this.baseUrl}/api/v1/recommendations`, {
method: 'GET',
headers: {
'Authorization': `Bearer ${this.authToken}`,
'Content-Type': 'application/json'
},
params: new URLSearchParams(params)
});
if (!response.ok) {
throw new Error(`API Error: ${response.status}`);
}
return response.json();
}
}
Security Architecture
Corgi implements multiple layers of security to protect user data and ensure system integrity.
Authentication & Authorization
# utils/security.py
import bcrypt
import jwt
from cryptography.fernet import Fernet
class SecurityManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.cipher = Fernet(secret_key.encode())
def hash_password(self, password: str) -> str:
"""Secure password hashing using bcrypt"""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode(), hashed.encode())
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data for storage"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
Data Protection
Privacy-First Design - All user data is encrypted at rest - Minimal data collection and retention - Automatic data anonymization after configurable periods - No cross-user data sharing without explicit consent
GDPR Compliance
# utils/gdpr.py
class GDPRCompliance:
def __init__(self, db_session):
self.db_session = db_session
def export_user_data(self, user_id: int) -> dict:
"""Export all user data for GDPR compliance"""
user_data = {
'profile': self.get_user_profile(user_id),
'interactions': self.get_user_interactions(user_id),
'preferences': self.get_user_preferences(user_id),
'recommendations': self.get_user_recommendations(user_id)
}
return user_data
def delete_user_data(self, user_id: int) -> bool:
"""Complete user data deletion"""
try:
self.anonymize_interactions(user_id)
self.delete_user_profile(user_id)
self.clear_recommendation_cache(user_id)
self.db_session.commit()
return True
except Exception as e:
self.db_session.rollback()
logger.error(f"Error deleting user data: {e}")
return False
Input Validation & Sanitization
# utils/validation.py
from marshmallow import Schema, fields, validate
class RecommendationRequestSchema(Schema):
limit = fields.Integer(
validate=validate.Range(min=1, max=100),
missing=20
)
instance_url = fields.Url(required=True)
user_token = fields.String(required=True, validate=validate.Length(min=10))
class InteractionSchema(Schema):
post_id = fields.String(required=True)
interaction_type = fields.String(
required=True,
validate=validate.OneOf(['like', 'boost', 'reply', 'bookmark'])
)
timestamp = fields.DateTime(required=True)
Performance & Monitoring
Corgi implements comprehensive monitoring and performance optimization strategies.
Performance Optimization
Caching Strategy
# utils/caching.py
import redis
from functools import wraps
redis_client = redis.Redis(host='redis', port=6379, db=0)
def cache_recommendations(timeout=300):
def decorator(func):
@wraps(func)
def wrapper(user_id, *args, **kwargs):
cache_key = f"recommendations:{user_id}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Compute and cache result
result = func(user_id, *args, **kwargs)
redis_client.setex(
cache_key,
timeout,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
Database Query Optimization
# utils/database_optimization.py
from sqlalchemy import text
from sqlalchemy.orm import joinedload
class OptimizedQueries:
@staticmethod
def get_user_recommendations_batch(user_ids: list, limit: int = 20):
"""Optimized batch recommendation retrieval"""
query = text("""
SELECT
r.user_id,
r.post_id,
r.score,
p.content,
p.author_id,
p.created_at
FROM recommendation_cache r
JOIN posts p ON r.post_id = p.id
WHERE r.user_id = ANY(:user_ids)
AND r.expires_at > NOW()
ORDER BY r.user_id, r.score DESC
LIMIT :limit
""")
return session.execute(query, {
'user_ids': user_ids,
'limit': limit * len(user_ids)
}).fetchall()
Monitoring & Observability
Health Checks
# utils/health_checks.py
class HealthChecker:
def __init__(self):
self.checks = [
self.check_database_connection,
self.check_redis_connection,
self.check_claude_api,
self.check_recommendation_engine,
self.check_celery_workers
]
async def run_health_checks(self):
"""Run all health checks and return status"""
results = {}
overall_status = "healthy"
for check in self.checks:
try:
result = await check()
results[check.__name__] = result
if not result['healthy']:
overall_status = "unhealthy"
except Exception as e:
results[check.__name__] = {
'healthy': False,
'error': str(e)
}
overall_status = "unhealthy"
return {
'status': overall_status,
'timestamp': datetime.utcnow().isoformat(),
'checks': results
}
Metrics Collection
# utils/metrics.py
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
recommendation_requests = Counter(
'corgi_recommendation_requests_total',
'Total recommendation requests',
['endpoint', 'status']
)
recommendation_latency = Histogram(
'corgi_recommendation_latency_seconds',
'Recommendation generation latency',
['algorithm']
)
active_users = Gauge(
'corgi_active_users',
'Number of active users',
['timeframe']
)
# Usage example
@recommendation_requests.count_exceptions()
@recommendation_latency.time()
def generate_recommendations(user_id, limit=20):
# Implementation
pass
Automated Monitoring
# monitoring/health_monitor.py
import asyncio
import aiohttp
from datetime import datetime
class HealthMonitor:
def __init__(self, endpoints, check_interval=30):
self.endpoints = endpoints
self.check_interval = check_interval
self.running = False
async def monitor_endpoints(self):
"""Continuously monitor endpoint health"""
self.running = True
while self.running:
for endpoint in self.endpoints:
try:
async with aiohttp.ClientSession() as session:
start_time = datetime.now()
async with session.get(endpoint['url']) as response:
response_time = (datetime.now() - start_time).total_seconds()
health_data = {
'endpoint': endpoint['name'],
'url': endpoint['url'],
'status_code': response.status,
'response_time': response_time,
'timestamp': datetime.now().isoformat()
}
self.log_health_data(health_data)
except Exception as e:
self.log_error(endpoint, str(e))
await asyncio.sleep(self.check_interval)
Summary
The Corgi Recommender Service represents a sophisticated, privacy-first recommendation platform that combines traditional machine learning with modern AI capabilities. Its modular architecture ensures scalability, maintainability, and extensibility while maintaining high performance and security standards.
Key architectural strengths:
- Modularity: Each component can be developed, tested, and deployed independently
- Scalability: Horizontal scaling through containerization and microservices
- Privacy: Built-in data protection and GDPR compliance
- Performance: Multi-layer caching and async processing for sub-100ms responses
- Extensibility: Agent framework allows for easy integration of new AI capabilities
This architecture provides a solid foundation for building advanced recommendation systems while maintaining user privacy and system reliability.
For implementation details and code examples, refer to the specific component documentation in the Reference section.