Security Model
Corgi implements a comprehensive defense-in-depth security architecture that protects user data, prevents common attacks, and maintains system integrity through multiple layers of security controls. This document details how each layer contributes to the overall security posture.
Defense-in-Depth Architecture
graph TB
subgraph External ["External Layer"]
USER[User/Client]
MASTODON[Mastodon Instance]
ATTACKER[Potential Attacker]
end
subgraph Gateway ["Gateway Layer"]
CORS[CORS Protection]
RATE[Rate Limiting]
SIZE[Size Limits]
end
subgraph Application ["Application Layer"]
AUTH[Authentication]
VALID[Input Validation]
CSRF[CSRF Protection]
HEADERS[Security Headers]
end
subgraph Data ["Data Layer"]
ENCRYPT[Encryption]
PRIVACY[Privacy Protection]
ANON[Anonymization]
end
subgraph Agent ["Agent Layer"]
SECURITY[SecurityHealingAgent]
MONITOR[Security Monitoring]
HEAL[Auto-Remediation]
end
USER --> CORS
MASTODON --> CORS
ATTACKER -.->|Blocked| CORS
CORS --> RATE
RATE --> SIZE
SIZE --> AUTH
AUTH --> VALID
VALID --> CSRF
CSRF --> HEADERS
HEADERS --> ENCRYPT
ENCRYPT --> PRIVACY
PRIVACY --> ANON
ANON --> SECURITY
SECURITY --> MONITOR
MONITOR --> HEAL
%% Styling
classDef gateway fill:#2d3748,stroke:#4299e1,stroke-width:2px,color:#e2e8f0
classDef app fill:#2b6cb0,stroke:#63b3ed,stroke-width:2px,color:#e2e8f0
classDef data fill:#2c5282,stroke:#90cdf4,stroke-width:2px,color:#e2e8f0
classDef agent fill:#1a365d,stroke:#bee3f8,stroke-width:2px,color:#e2e8f0
classDef external fill:#4a5568,stroke:#cbd5e0,stroke-width:2px,color:#e2e8f0
class CORS,RATE,SIZE gateway
class AUTH,VALID,CSRF,HEADERS app
class ENCRYPT,PRIVACY,ANON data
class SECURITY,MONITOR,HEAL agent
class USER,MASTODON,ATTACKER external
Gateway Layer Security
The gateway layer provides the first line of defense against external threats.
CORS Protection
Implementation:
# app.py
CORS(app, origins=[
"https://elk.zone",
"https://*.elk.zone"
], supports_credentials=True)
Threat Mitigation: Prevents unauthorized cross-origin requests, protecting against CSRF attacks from malicious websites.
Rate Limiting
Implementation:
# routes/recommendations.py
@limiter.limit("100 per minute")
@limiter.limit("1000 per hour")
@limiter.limit("10000 per day")
def get_recommendations():
# Protected endpoint
Threat Mitigation: Prevents DoS attacks, brute force attempts, and resource exhaustion.
Request Size Limits
Implementation:
# app.py
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max
Threat Mitigation: Prevents memory exhaustion attacks and malicious large payloads.
Application Layer Security
The application layer enforces authentication, validates input, and protects against web vulnerabilities.
Authentication System
OAuth2 Flow:
# utils/auth.py
def validate_token(token: str, instance: str) -> Optional[Dict]:
"""Validates Mastodon OAuth token."""
headers = {"Authorization": f"Bearer {token}"}
try:
# Verify with Mastodon instance
response = requests.get(
f"https://{instance}/api/v1/accounts/verify_credentials",
headers=headers,
timeout=5
)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(f"Token validation failed: {e}")
return None
Security Features: - Token validation with source instance - Timeout protection against slow attacks - No token storage (stateless) - Secure error handling
Input Validation and Sanitization
Implementation:
# utils/input_sanitization.py
def sanitize_mastodon_content(content: str) -> str:
"""Sanitizes HTML content from Mastodon posts."""
if not content:
return ""
# Parse HTML
soup = BeautifulSoup(content, 'html.parser')
# Remove script tags and event handlers
for script in soup(['script', 'style']):
script.decompose()
# Remove dangerous attributes
for tag in soup.find_all(True):
tag.attrs = {
key: value for key, value in tag.attrs.items()
if not key.startswith('on') # Remove onclick, onload, etc.
}
return str(soup)
Threat Mitigation: Prevents XSS attacks, SQL injection, and code injection.
CSRF Protection
Implementation:
# app.py
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
Additional Protection: - SameSite cookies prevent CSRF - Secure flag ensures HTTPS-only transmission - HttpOnly prevents JavaScript access
Security Headers
Implementation:
# app.py
@app.after_request
def set_security_headers(response):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
return response
Protection Matrix:
| Header | Threat Mitigated |
|---|---|
| X-Content-Type-Options | MIME type sniffing attacks |
| X-Frame-Options | Clickjacking attacks |
| X-XSS-Protection | Reflected XSS (legacy browsers) |
| HSTS | Protocol downgrade attacks |
| CSP | XSS, data injection attacks |
Data Layer Security
The data layer ensures information is protected at rest and in transit.
Encryption
Database Encryption:
# config.py
SQLALCHEMY_DATABASE_URI = os.getenv(
'DATABASE_URL',
'postgresql://user:pass@localhost/corgi?sslmode=require'
)
Features: - TLS encryption for database connections - Encrypted backups - No sensitive data in logs
Privacy Protection
User Aliasing:
# utils/privacy.py
def generate_user_alias(username: str, instance: str) -> str:
"""Generates a privacy-preserving user alias."""
# Combine username and instance
identifier = f"{username}@{instance}"
# Generate consistent hash
hash_object = hashlib.sha256(identifier.encode())
hash_hex = hash_object.hexdigest()
# Create readable alias
adjectives = ["happy", "clever", "swift", "gentle", "brave"]
animals = ["corgi", "fox", "owl", "dolphin", "eagle"]
# Use hash to select consistently
adj_index = int(hash_hex[:8], 16) % len(adjectives)
animal_index = int(hash_hex[8:16], 16) % len(animals)
number = int(hash_hex[16:20], 16) % 10000
return f"{adjectives[adj_index]}-{animals[animal_index]}-{number}"
Privacy Features: - No PII storage - Consistent anonymization - Instance-specific aliases - No reverse mapping
Data Minimization
Interaction Storage:
# routes/interactions.py
def store_interaction(user_id: str, post_id: str, action: str):
"""Stores minimal interaction data."""
interaction = {
'user_alias': generate_user_alias(user_id),
'post_id': post_id,
'action': action,
'timestamp': datetime.utcnow(),
# No IP addresses, user agents, or tracking data
}
db.session.add(Interaction(**interaction))
Agent Layer Security
The agent layer provides automated security monitoring and remediation.
SecurityHealingAgent
Core Capabilities:
# agents/security_healing_agent.py
class SecurityHealingAgent(BaseAgent):
def scan_for_vulnerabilities(self):
"""Performs automated security scans."""
vulnerabilities = []
# Check for SQL injection patterns
sql_patterns = [
r"SELECT.*FROM.*WHERE",
r"UPDATE.*SET.*WHERE",
r"DELETE.*FROM.*WHERE"
]
for file_path in self.get_python_files():
content = self.read_file(file_path)
for pattern in sql_patterns:
if re.search(pattern, content, re.IGNORECASE):
if not self.uses_parameterized_queries(content):
vulnerabilities.append({
'type': 'SQL_INJECTION',
'file': file_path,
'severity': 'HIGH'
})
return vulnerabilities
Automated Remediation:
def auto_fix_vulnerability(self, vulnerability):
"""Automatically fixes detected vulnerabilities."""
if vulnerability['type'] == 'SQL_INJECTION':
# Convert to parameterized query
self.convert_to_parameterized_query(
vulnerability['file'],
vulnerability['line']
)
# Verify fix
if self.verify_fix(vulnerability):
self.log_remediation(vulnerability)
return True
return False
Security Monitoring
Real-time Threat Detection:
def monitor_authentication_failures(self):
"""Monitors for authentication attack patterns."""
failure_threshold = 5
time_window = 300 # 5 minutes
failures = self.get_recent_auth_failures(time_window)
for ip, count in failures.items():
if count >= failure_threshold:
self.block_ip_temporarily(ip, duration=3600)
self.alert_security_team(f"Blocked IP {ip} - {count} auth failures")
Best Practices
Secure Development
- Code Reviews: All security-related changes require review
- Dependency Scanning: Automated vulnerability scanning in CI/CD
- Security Testing: Regular penetration testing and security audits
- Least Privilege: Minimal permissions for all components
Incident Response
- Detection: Automated monitoring and alerting
- Containment: Automatic IP blocking and rate limiting
- Remediation: SecurityHealingAgent fixes vulnerabilities
- Documentation: All incidents logged for analysis
Compliance
GDPR Compliance: - Right to deletion implemented - Data portability supported - Privacy by design principles - Transparent data processing
Security Standards: - OWASP Top 10 protection - NIST guidelines followed - Regular security assessments - Continuous improvement
Security Checklist
When implementing new features, ensure:
- [ ] Input validation on all user data
- [ ] Authentication required for sensitive operations
- [ ] Rate limiting on resource-intensive endpoints
- [ ] Sanitization of all output
- [ ] Logging without sensitive data
- [ ] Error messages don't leak information
- [ ] Dependencies are up to date
- [ ] Security headers are maintained
- [ ] Privacy implications considered
- [ ] SecurityHealingAgent can monitor the feature
Conclusion
Corgi's security model provides comprehensive protection through multiple defensive layers, automated monitoring, and proactive remediation. The combination of traditional security controls with intelligent agents creates a robust security posture that adapts to emerging threats while maintaining user privacy and system performance.