Skip to content

Architecture Deep Dive

Welcome to the comprehensive technical architecture documentation for the Corgi Recommender Service. This document provides an in-depth look at the system's inner workings, design decisions, and implementation details.

Table of Contents

  1. High-Level System Overview
  2. Service Architecture & Orchestration
  3. Core Application Layer
  4. Asynchronous Worker Queue System
  5. RAG (Retrieval-Augmented Generation) System
  6. Agent Framework
  7. Recommendation Engine
  8. API Layer & Authentication
  9. Database Architecture
  10. ELK Integration Layer
  11. Frontend Integration
  12. Security Architecture
  13. Performance & Monitoring

High-Level System Overview

The Corgi Recommender Service is a privacy-first, multi-layered recommendation platform designed for the Fediverse. The system combines traditional recommendation algorithms with modern AI capabilities through a sophisticated agent framework.

graph TB
    subgraph External [External Services]
        ELK[ELK Client]
        MAST[Mastodon Instances]
        CLAUDE[Claude API]
    end

    subgraph Gateway [API Gateway Layer]
        PROXY[Corgi Proxy]
        API[Core API]
    end

    subgraph Processing [Processing Layer]
        AGENTS[Agent Framework]
        RAG[RAG System]
        RECS[Recommendation Engine]
    end

    subgraph Storage [Storage Layer]
        POSTGRES[(PostgreSQL)]
        REDIS[(Redis)]
        EMBEDDINGS[(Vector Store)]
    end

    subgraph Workers [Background Processing]
        CELERY[Celery Workers]
        TASKS[Scheduled Tasks]
    end

    ELK --> PROXY
    PROXY --> API
    API --> AGENTS
    API --> RAG
    API --> RECS

    AGENTS --> CLAUDE
    RAG --> EMBEDDINGS
    RECS --> POSTGRES

    CELERY --> MAST
    CELERY --> POSTGRES
    CELERY --> REDIS

    classDef external fill:#e1f5fe,stroke:#0277bd,color:#000
    classDef gateway fill:#f3e5f5,stroke:#7b1fa2,color:#000
    classDef processing fill:#e8f5e8,stroke:#388e3c,color:#000
    classDef storage fill:#fff3e0,stroke:#f57c00,color:#000
    classDef workers fill:#fce4ec,stroke:#c2185b,color:#000

    class ELK,MAST,CLAUDE external
    class PROXY,API gateway
    class AGENTS,RAG,RECS processing
    class POSTGRES,REDIS,EMBEDDINGS storage
    class CELERY,TASKS workers

Core Design Principles

Privacy First: User data never leaves the processing pipeline without explicit consent. All recommendation processing happens server-side with minimal data retention.

Modularity: Each component is designed as an independent service that can be scaled, updated, or replaced without affecting others.

Extensibility: The agent framework allows for easy integration of new AI capabilities and recommendation strategies.

Performance: Multi-layered caching and asynchronous processing ensure sub-100ms response times for most operations.

Service Architecture & Orchestration

The Corgi platform uses Docker Compose for service orchestration, providing a consistent development and deployment environment.

Service Topology

# docker-compose.yml structure
services:
  corgi-api:          # Core Flask application
  corgi-proxy:        # Request routing and caching
  elk-client:         # ELK integration service
  postgres:           # Primary database
  redis:              # Caching and session storage
  celery-worker:      # Background task processing
  celery-beat:        # Scheduled task coordinator
  flower:             # Celery monitoring

Key Infrastructure Components

Corgi API Server (corgi-api) - Flask-based REST API server - Port: 5002 (configurable via CORGI_PORT) - Handles all recommendation logic, user management, and agent coordination

Corgi Proxy (corgi-proxy) - Nginx-based reverse proxy - Port: 5003 (configurable via CORGI_PROXY_PORT) - Provides SSL termination, request routing, and response caching

ELK Client Service (elk-client) - Dedicated service for ELK integration - Port: 5314 (configurable via ELK_PORT) - Handles seamless timeline enhancement and user detection

Network Architecture

# Network configuration example
CORGI_INTERNAL_NETWORK = "corgi-internal"
EXTERNAL_PORTS = {
    "api": 5002,
    "proxy": 5003,
    "elk": 5314,
    "flower": 5555
}

All services communicate through a dedicated Docker network, ensuring isolation and security while maintaining high-performance inter-service communication.

Core Application Layer

The heart of Corgi is a Flask-based application (app.py) that orchestrates all recommendation services and API endpoints.

Application Structure

# app.py - Core application initialization
from flask import Flask, request, jsonify
from flask_cors import CORS
from utils.auth import require_auth
from routes import register_all_routes

app = Flask(__name__)
CORS(app, origins=['*'])  # Configured for development

# Route registration
register_all_routes(app)

# Health check endpoint
@app.route('/health')
def health_check():
    return jsonify({"status": "healthy", "version": "2.0.0"})

Request Processing Pipeline

  1. Authentication Layer: Validates Bearer tokens and Mastodon instance headers
  2. Rate Limiting: Prevents abuse and ensures fair resource allocation
  3. Route Dispatch: Directs requests to appropriate handlers
  4. Business Logic: Executes recommendation algorithms or data operations
  5. Response Formation: Formats and returns JSON responses

Key Application Components

Route Management (routes/) - recommendations.py: Core recommendation endpoints - interactions.py: User interaction tracking - timeline.py: Timeline enhancement and filtering - analytics.py: A/B testing and performance metrics

Utility Layer (utils/) - auth.py: Authentication and authorization - recommendation_engine.py: Core recommendation logic - ab_testing.py: Experimental framework - privacy.py: Data protection and anonymization

Asynchronous Worker Queue System

Corgi uses Celery with Redis as the message broker for handling time-intensive operations asynchronously.

Worker Architecture

# utils/celery_app.py
from celery import Celery
from kombu import Queue

celery_app = Celery('corgi')
celery_app.conf.update(
    broker_url='redis://redis:6379/0',
    result_backend='redis://redis:6379/0',
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

# Task routing
celery_app.conf.task_routes = {
    'tasks.content_crawler.*': {'queue': 'crawler'},
    'tasks.analytics_tasks.*': {'queue': 'analytics'},
    'tasks.recommendation_tasks.*': {'queue': 'recommendations'},
}

Task Categories

Content Crawling (tasks/content_crawler.py) - Fetches fresh content from Mastodon instances - Processes and enriches posts with metadata - Handles rate limiting and error recovery

Analytics Processing (tasks/analytics_tasks.py) - Computes recommendation performance metrics - Processes A/B test results and statistical analysis - Generates user engagement reports

Recommendation Tasks (tasks/recommendation_tasks.py) - Pre-computes recommendation vectors for active users - Handles batch processing of user preference updates - Manages recommendation cache invalidation

Task Execution Flow

# Example task execution
from tasks.content_crawler import fetch_timeline_posts

# Asynchronous execution
result = fetch_timeline_posts.delay(
    instance_url="mastodon.social",
    user_token="bearer_token",
    limit=100
)

# Result retrieval
posts = result.get(timeout=30)

RAG (Retrieval-Augmented Generation) System

The RAG system provides intelligent code analysis and documentation capabilities, powering the development workflow and user support.

RAG Architecture

graph TB
    subgraph Input [Input Processing]
        CODE[Code Repository]
        DOCS[Documentation]
        LOGS[Development Logs]
    end

    subgraph Processing [RAG Pipeline]
        CHUNK[Content Chunking]
        EMBED[Embedding Generation]
        STORE[Vector Storage]
        QUERY[Query Processing]
    end

    subgraph AI [AI Integration]
        CLAUDE[Claude API]
        RESPONSE[Response Generation]
    end

    CODE --> CHUNK
    DOCS --> CHUNK
    LOGS --> CHUNK

    CHUNK --> EMBED
    EMBED --> STORE

    QUERY --> STORE
    STORE --> CLAUDE
    CLAUDE --> RESPONSE

    classDef input fill:#e3f2fd,stroke:#1976d2,color:#000
    classDef processing fill:#f1f8e9,stroke:#689f38,color:#000
    classDef ai fill:#fce4ec,stroke:#c2185b,color:#000

    class CODE,DOCS,LOGS input
    class CHUNK,EMBED,STORE,QUERY processing
    class CLAUDE,RESPONSE ai

Core RAG Components

Knowledge Base Builder (agents/rag_core.py)

class RAGKnowledgeBase:
    def __init__(self, embedding_model="all-MiniLM-L6-v2"):
        self.embedding_model = embedding_model
        self.vector_store = PostgresVectorStore()

    def ingest_codebase(self, repo_path):
        """Process and index entire codebase"""
        chunks = self.chunk_code_files(repo_path)
        embeddings = self.generate_embeddings(chunks)
        self.vector_store.store_embeddings(embeddings)

Query Interface (scripts/cursor_rag_query.py)

def query_knowledge_base(query: str, limit: int = 5):
    """Query the RAG system for relevant information"""
    query_embedding = generate_embedding(query)
    similar_chunks = vector_store.similarity_search(
        query_embedding, limit=limit
    )

    context = "\n".join([chunk.content for chunk in similar_chunks])
    response = claude_client.generate_response(query, context)

    return {
        "answer": response,
        "sources": [chunk.metadata for chunk in similar_chunks]
    }

RAG Data Sources

  1. Codebase Analysis: All Python files, configurations, and scripts
  2. Documentation: Markdown files, API docs, and user guides
  3. Development History: Commit messages, pull requests, and development logs
  4. Test Coverage: Unit tests, integration tests, and test reports

Agent Framework

The agent framework provides a sophisticated AI orchestration system that coordinates multiple specialized agents for different tasks.

Agent Architecture

# agents/manager_agent.py
class ManagerAgent:
    def __init__(self):
        self.agents = {
            'recommendation': RecommendationAgent(),
            'content_analysis': ContentAnalysisAgent(),
            'privacy_compliance': PrivacyAgent(),
            'performance_optimization': PerformanceAgent()
        }

    def delegate_task(self, task_type, context):
        """Route tasks to appropriate specialized agents"""
        agent = self.agents.get(task_type)
        if agent:
            return agent.execute(context)
        else:
            raise ValueError(f"No agent available for task: {task_type}")

Specialized Agents

Recommendation Agent - Analyzes user behavior patterns - Generates personalized content recommendations - Adapts to user feedback in real-time

Content Analysis Agent - Processes and categorizes social media content - Identifies trending topics and sentiment - Filters inappropriate or spam content

Privacy Compliance Agent - Ensures all data processing meets privacy standards - Handles data anonymization and retention policies - Monitors for potential privacy violations

Performance Optimization Agent - Analyzes system performance metrics - Suggests optimization strategies - Manages resource allocation and caching

Agent Communication Protocol

# agents/agent_config.py
AGENT_COMMUNICATION_PROTOCOL = {
    "message_format": "json",
    "timeout": 30,
    "retry_attempts": 3,
    "fallback_strategy": "graceful_degradation"
}

class AgentMessage:
    def __init__(self, sender, recipient, task_type, payload):
        self.sender = sender
        self.recipient = recipient
        self.task_type = task_type
        self.payload = payload
        self.timestamp = datetime.utcnow()

Recommendation Engine

The recommendation engine is the core intelligence of Corgi, combining multiple algorithms to provide personalized content recommendations.

Recommendation Algorithm Stack

# core/ranking_algorithm.py
class CoreRankingAlgorithm:
    def __init__(self):
        self.algorithms = {
            'semantic_similarity': SemanticSimilarityAlgorithm(),
            'engagement_prediction': EngagementPredictionAlgorithm(),
            'temporal_relevance': TemporalRelevanceAlgorithm(),
            'social_graph': SocialGraphAlgorithm()
        }

    def compute_recommendations(self, user_profile, candidate_posts):
        """Multi-algorithm recommendation computation"""
        scores = {}

        for algorithm_name, algorithm in self.algorithms.items():
            algorithm_scores = algorithm.score_posts(user_profile, candidate_posts)
            scores[algorithm_name] = algorithm_scores

        # Weighted combination of scores
        final_scores = self.combine_scores(scores)
        return self.rank_posts(candidate_posts, final_scores)

Algorithm Components

Semantic Similarity - Uses sentence transformers for content understanding - Computes cosine similarity between user preferences and post content - Handles multilingual content with language detection

Engagement Prediction - Predicts likelihood of user interaction (like, boost, reply) - Based on historical interaction patterns - Considers temporal patterns and user activity cycles

Temporal Relevance - Balances fresh content with evergreen posts - Considers posting time relative to user timezone - Implements decay functions for time-sensitive content

Social Graph Analysis - Analyzes user's social connections and their activity - Identifies influential users in user's network - Recommends content from trusted sources

Recommendation Pipeline

def generate_recommendations(user_id, limit=20):
    """Main recommendation generation pipeline"""

    # 1. User Profile Retrieval
    user_profile = get_user_profile(user_id)

    # 2. Candidate Post Generation
    candidate_posts = get_candidate_posts(user_profile, limit * 10)

    # 3. Algorithm Execution
    scored_posts = ranking_algorithm.compute_recommendations(
        user_profile, candidate_posts
    )

    # 4. Diversity and Freshness Filters
    filtered_posts = apply_diversity_filters(scored_posts)

    # 5. Privacy and Content Filtering
    safe_posts = apply_content_filters(filtered_posts, user_profile)

    # 6. Final Ranking and Limiting
    return safe_posts[:limit]

API Layer & Authentication

The API layer provides a RESTful interface for all Corgi services, with comprehensive authentication and authorization mechanisms.

Authentication System

# utils/auth.py
from functools import wraps
from flask import request, jsonify
import jwt

def require_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # Bearer token validation
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            return jsonify({'error': 'Authentication required'}), 401

        token = auth_header.split(' ')[1]

        # Mastodon instance validation
        instance_header = request.headers.get('X-Mastodon-Instance')
        if not instance_header:
            return jsonify({'error': 'Mastodon instance required'}), 400

        # Token verification
        try:
            decoded_token = verify_mastodon_token(token, instance_header)
            request.user_context = decoded_token
        except InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401

        return f(*args, **kwargs)
    return decorated_function

API Endpoints

Core Recommendation Endpoints

# routes/recommendations.py
@app.route('/api/v1/recommendations', methods=['GET'])
@require_auth
def get_recommendations():
    """Get personalized recommendations for authenticated user"""
    user_id = request.user_context['user_id']
    limit = request.args.get('limit', 20, type=int)

    recommendations = generate_recommendations(user_id, limit)

    return jsonify({
        'recommendations': recommendations,
        'metadata': {
            'total_count': len(recommendations),
            'generation_time': time.time() - start_time,
            'algorithm_version': '2.0.0'
        }
    })

Analytics and Feedback Endpoints

# routes/interactions.py
@app.route('/api/v1/interactions', methods=['POST'])
@require_auth
def record_interaction():
    """Record user interaction with recommended content"""
    interaction_data = request.json

    # Validate interaction data
    if not validate_interaction_data(interaction_data):
        return jsonify({'error': 'Invalid interaction data'}), 400

    # Store interaction
    interaction_id = store_interaction(
        user_id=request.user_context['user_id'],
        post_id=interaction_data['post_id'],
        interaction_type=interaction_data['type'],
        timestamp=datetime.utcnow()
    )

    # Trigger recommendation model update
    update_user_preferences.delay(request.user_context['user_id'])

    return jsonify({'interaction_id': interaction_id}), 201

Rate Limiting and Security

# utils/rate_limiting.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

# Endpoint-specific rate limits
@app.route('/api/v1/recommendations')
@limiter.limit("30 per minute")
def get_recommendations():
    # Implementation
    pass

Database Architecture

Corgi uses PostgreSQL as the primary database with Redis for caching and session management.

Database Schema

-- Core user management
CREATE TABLE user_identities (
    id SERIAL PRIMARY KEY,
    mastodon_user_id VARCHAR(255) NOT NULL,
    instance_url VARCHAR(255) NOT NULL,
    access_token_hash VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(mastodon_user_id, instance_url)
);

-- Post storage and metadata
CREATE TABLE posts (
    id SERIAL PRIMARY KEY,
    mastodon_post_id VARCHAR(255) NOT NULL,
    instance_url VARCHAR(255) NOT NULL,
    content TEXT,
    author_id VARCHAR(255),
    created_at TIMESTAMP,
    engagement_score FLOAT DEFAULT 0.0,
    content_vector VECTOR(384),  -- Embedding storage
    UNIQUE(mastodon_post_id, instance_url)
);

-- User interaction tracking
CREATE TABLE interactions (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES user_identities(id),
    post_id INTEGER REFERENCES posts(id),
    interaction_type VARCHAR(50) NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    context_data JSONB
);

-- Recommendation cache
CREATE TABLE recommendation_cache (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES user_identities(id),
    post_id INTEGER REFERENCES posts(id),
    score FLOAT NOT NULL,
    algorithm_version VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP
);

Database Optimization

Indexing Strategy

-- Performance indexes
CREATE INDEX idx_user_interactions ON interactions(user_id, timestamp);
CREATE INDEX idx_post_engagement ON posts(engagement_score DESC);
CREATE INDEX idx_recommendation_cache_user ON recommendation_cache(user_id, expires_at);

-- Vector similarity index for embeddings
CREATE INDEX idx_posts_vector ON posts USING ivfflat (content_vector vector_cosine_ops);

Connection Pooling

# db/connection_pool.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    pool_recycle=3600
)

ELK Integration Layer

The ELK integration provides seamless enhancement of Mastodon timelines with AI-powered recommendations.

Integration Architecture

// integrations/elk/corgi-seamless.ts
export class CorgiSeamlessIntegration {
    private apiClient: CorgiApiClient;
    private userDetector: UserDetector;
    private cacheManager: CacheManager;

    constructor(config: CorgiConfig) {
        this.apiClient = new CorgiApiClient(config);
        this.userDetector = new UserDetector();
        this.cacheManager = new CacheManager(config.cacheTimeout);
    }

    async enhanceTimeline(posts: MastodonPost[]): Promise<EnhancedPost[]> {
        // 1. User detection and privacy check
        const userContext = await this.userDetector.detectUser();
        if (!userContext.hasOptedIn) {
            return posts; // No enhancement without consent
        }

        // 2. Get recommendations from Corgi API
        const recommendations = await this.apiClient.getRecommendations({
            limit: 10,
            timelineContext: posts.map(p => p.id)
        });

        // 3. Merge recommendations with timeline
        return this.mergeRecommendations(posts, recommendations);
    }
}

User Detection System

// Eight-method user detection for privacy compliance
class UserDetector {
    private detectionMethods = [
        'localStorage_check',
        'sessionStorage_check',
        'url_parameter_analysis',
        'dom_element_inspection',
        'api_response_analysis',
        'user_agent_fingerprinting',
        'timing_analysis',
        'behavioral_pattern_recognition'
    ];

    async detectUser(): Promise<UserContext> {
        const results = await Promise.all(
            this.detectionMethods.map(method => this[method]())
        );

        return this.consolidateResults(results);
    }
}

Timeline Enhancement

// Vue component integration
export default defineComponent({
    name: 'TimelineCorgi',
    async setup() {
        const corgiIntegration = new CorgiSeamlessIntegration({
            apiEndpoint: 'https://corgi.social/api/v1',
            cacheTimeout: 300000, // 5 minutes
            privacyMode: 'strict'
        });

        const enhancedPosts = ref<EnhancedPost[]>([]);

        const loadTimeline = async () => {
            const posts = await fetchTimelinePosts();
            enhancedPosts.value = await corgiIntegration.enhanceTimeline(posts);
        };

        return {
            enhancedPosts,
            loadTimeline
        };
    }
});

Frontend Integration

The frontend provides a modern, responsive interface for recommendation management and analytics.

Technology Stack

  • Framework: Next.js 13 with App Router
  • Language: TypeScript
  • Styling: Tailwind CSS
  • State Management: React Query + Context API
  • Components: Custom component library with shadcn/ui base

Key Frontend Components

Dashboard Overview

// src/components/dashboard/DashboardOverview.tsx
export function DashboardOverview() {
    const { data: metrics, isLoading } = useQuery({
        queryKey: ['dashboard-metrics'],
        queryFn: fetchDashboardMetrics
    });

    if (isLoading) return <DashboardSkeleton />;

    return (
        <div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6">
            <MetricCard
                title="Total Recommendations"
                value={metrics.totalRecommendations}
                trend={metrics.recommendationTrend}
            />
            <MetricCard
                title="User Engagement"
                value={`${metrics.engagementRate}%`}
                trend={metrics.engagementTrend}
            />
            <MetricCard
                title="Active Experiments"
                value={metrics.activeExperiments}
                trend={metrics.experimentTrend}
            />
            <MetricCard
                title="System Health"
                value={metrics.healthScore}
                trend={metrics.healthTrend}
            />
        </div>
    );
}

A/B Testing Interface

// src/components/dashboard/ABTestingExperiments.tsx
export function ABTestingExperiments() {
    const [experiments, setExperiments] = useState<Experiment[]>([]);
    const [createModalOpen, setCreateModalOpen] = useState(false);

    const { mutate: createExperiment } = useMutation({
        mutationFn: createABTestExperiment,
        onSuccess: () => {
            setCreateModalOpen(false);
            refetchExperiments();
        }
    });

    return (
        <div className="space-y-6">
            <div className="flex justify-between items-center">
                <h2 className="text-2xl font-bold">A/B Testing Experiments</h2>
                <Button onClick={() => setCreateModalOpen(true)}>
                    Create Experiment
                </Button>
            </div>

            <ExperimentList experiments={experiments} />

            <ExperimentCreationModal
                isOpen={createModalOpen}
                onClose={() => setCreateModalOpen(false)}
                onSubmit={createExperiment}
            />
        </div>
    );
}

API Integration

// src/lib/api-client.ts
export class CorgiApiClient {
    private baseUrl: string;
    private authToken: string;

    constructor(config: ApiConfig) {
        this.baseUrl = config.baseUrl;
        this.authToken = config.authToken;
    }

    async getRecommendations(params: RecommendationParams): Promise<Recommendation[]> {
        const response = await fetch(`${this.baseUrl}/api/v1/recommendations`, {
            method: 'GET',
            headers: {
                'Authorization': `Bearer ${this.authToken}`,
                'Content-Type': 'application/json'
            },
            params: new URLSearchParams(params)
        });

        if (!response.ok) {
            throw new Error(`API Error: ${response.status}`);
        }

        return response.json();
    }
}

Security Architecture

Corgi implements multiple layers of security to protect user data and ensure system integrity.

Authentication & Authorization

# utils/security.py
import bcrypt
import jwt
from cryptography.fernet import Fernet

class SecurityManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.cipher = Fernet(secret_key.encode())

    def hash_password(self, password: str) -> str:
        """Secure password hashing using bcrypt"""
        return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()

    def verify_password(self, password: str, hashed: str) -> bool:
        """Verify password against hash"""
        return bcrypt.checkpw(password.encode(), hashed.encode())

    def encrypt_sensitive_data(self, data: str) -> str:
        """Encrypt sensitive data for storage"""
        return self.cipher.encrypt(data.encode()).decode()

    def decrypt_sensitive_data(self, encrypted_data: str) -> str:
        """Decrypt sensitive data"""
        return self.cipher.decrypt(encrypted_data.encode()).decode()

Data Protection

Privacy-First Design - All user data is encrypted at rest - Minimal data collection and retention - Automatic data anonymization after configurable periods - No cross-user data sharing without explicit consent

GDPR Compliance

# utils/gdpr.py
class GDPRCompliance:
    def __init__(self, db_session):
        self.db_session = db_session

    def export_user_data(self, user_id: int) -> dict:
        """Export all user data for GDPR compliance"""
        user_data = {
            'profile': self.get_user_profile(user_id),
            'interactions': self.get_user_interactions(user_id),
            'preferences': self.get_user_preferences(user_id),
            'recommendations': self.get_user_recommendations(user_id)
        }
        return user_data

    def delete_user_data(self, user_id: int) -> bool:
        """Complete user data deletion"""
        try:
            self.anonymize_interactions(user_id)
            self.delete_user_profile(user_id)
            self.clear_recommendation_cache(user_id)
            self.db_session.commit()
            return True
        except Exception as e:
            self.db_session.rollback()
            logger.error(f"Error deleting user data: {e}")
            return False

Input Validation & Sanitization

# utils/validation.py
from marshmallow import Schema, fields, validate

class RecommendationRequestSchema(Schema):
    limit = fields.Integer(
        validate=validate.Range(min=1, max=100),
        missing=20
    )
    instance_url = fields.Url(required=True)
    user_token = fields.String(required=True, validate=validate.Length(min=10))

class InteractionSchema(Schema):
    post_id = fields.String(required=True)
    interaction_type = fields.String(
        required=True,
        validate=validate.OneOf(['like', 'boost', 'reply', 'bookmark'])
    )
    timestamp = fields.DateTime(required=True)

Performance & Monitoring

Corgi implements comprehensive monitoring and performance optimization strategies.

Performance Optimization

Caching Strategy

# utils/caching.py
import redis
from functools import wraps

redis_client = redis.Redis(host='redis', port=6379, db=0)

def cache_recommendations(timeout=300):
    def decorator(func):
        @wraps(func)
        def wrapper(user_id, *args, **kwargs):
            cache_key = f"recommendations:{user_id}:{hash(str(args) + str(kwargs))}"

            # Try to get from cache
            cached_result = redis_client.get(cache_key)
            if cached_result:
                return json.loads(cached_result)

            # Compute and cache result
            result = func(user_id, *args, **kwargs)
            redis_client.setex(
                cache_key,
                timeout,
                json.dumps(result, default=str)
            )

            return result
        return wrapper
    return decorator

Database Query Optimization

# utils/database_optimization.py
from sqlalchemy import text
from sqlalchemy.orm import joinedload

class OptimizedQueries:
    @staticmethod
    def get_user_recommendations_batch(user_ids: list, limit: int = 20):
        """Optimized batch recommendation retrieval"""
        query = text("""
            SELECT 
                r.user_id,
                r.post_id,
                r.score,
                p.content,
                p.author_id,
                p.created_at
            FROM recommendation_cache r
            JOIN posts p ON r.post_id = p.id
            WHERE r.user_id = ANY(:user_ids)
            AND r.expires_at > NOW()
            ORDER BY r.user_id, r.score DESC
            LIMIT :limit
        """)

        return session.execute(query, {
            'user_ids': user_ids,
            'limit': limit * len(user_ids)
        }).fetchall()

Monitoring & Observability

Health Checks

# utils/health_checks.py
class HealthChecker:
    def __init__(self):
        self.checks = [
            self.check_database_connection,
            self.check_redis_connection,
            self.check_claude_api,
            self.check_recommendation_engine,
            self.check_celery_workers
        ]

    async def run_health_checks(self):
        """Run all health checks and return status"""
        results = {}
        overall_status = "healthy"

        for check in self.checks:
            try:
                result = await check()
                results[check.__name__] = result
                if not result['healthy']:
                    overall_status = "unhealthy"
            except Exception as e:
                results[check.__name__] = {
                    'healthy': False,
                    'error': str(e)
                }
                overall_status = "unhealthy"

        return {
            'status': overall_status,
            'timestamp': datetime.utcnow().isoformat(),
            'checks': results
        }

Metrics Collection

# utils/metrics.py
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

# Define metrics
recommendation_requests = Counter(
    'corgi_recommendation_requests_total',
    'Total recommendation requests',
    ['endpoint', 'status']
)

recommendation_latency = Histogram(
    'corgi_recommendation_latency_seconds',
    'Recommendation generation latency',
    ['algorithm']
)

active_users = Gauge(
    'corgi_active_users',
    'Number of active users',
    ['timeframe']
)

# Usage example
@recommendation_requests.count_exceptions()
@recommendation_latency.time()
def generate_recommendations(user_id, limit=20):
    # Implementation
    pass

Automated Monitoring

# monitoring/health_monitor.py
import asyncio
import aiohttp
from datetime import datetime

class HealthMonitor:
    def __init__(self, endpoints, check_interval=30):
        self.endpoints = endpoints
        self.check_interval = check_interval
        self.running = False

    async def monitor_endpoints(self):
        """Continuously monitor endpoint health"""
        self.running = True

        while self.running:
            for endpoint in self.endpoints:
                try:
                    async with aiohttp.ClientSession() as session:
                        start_time = datetime.now()
                        async with session.get(endpoint['url']) as response:
                            response_time = (datetime.now() - start_time).total_seconds()

                            health_data = {
                                'endpoint': endpoint['name'],
                                'url': endpoint['url'],
                                'status_code': response.status,
                                'response_time': response_time,
                                'timestamp': datetime.now().isoformat()
                            }

                            self.log_health_data(health_data)

                except Exception as e:
                    self.log_error(endpoint, str(e))

            await asyncio.sleep(self.check_interval)


Summary

The Corgi Recommender Service represents a sophisticated, privacy-first recommendation platform that combines traditional machine learning with modern AI capabilities. Its modular architecture ensures scalability, maintainability, and extensibility while maintaining high performance and security standards.

Key architectural strengths:

  • Modularity: Each component can be developed, tested, and deployed independently
  • Scalability: Horizontal scaling through containerization and microservices
  • Privacy: Built-in data protection and GDPR compliance
  • Performance: Multi-layer caching and async processing for sub-100ms responses
  • Extensibility: Agent framework allows for easy integration of new AI capabilities

This architecture provides a solid foundation for building advanced recommendation systems while maintaining user privacy and system reliability.

For implementation details and code examples, refer to the specific component documentation in the Reference section.