- Homeless shelter mgmt + Cybersecurity threat intelligence.
- Learn CQRS, Event Sourcing, Kafka, K8s (Tools may change based on real world lessons learned or
- System design interview prep
This repository serves as a comprehensive guide for designing, building, and deploying data-intensive applications at scale. Through two real-world example projects, we demonstrate architectural patterns, implementation strategies, and best practices that ensure reliability, maintainability, and performance in production environments.
What makes an application "data-intensive"?
- Data volume, complexity, or velocity is the primary challenge
- Processing requirements exceed simple CRUD operations
- System reliability and data integrity are critical
- Scalability must be designed in from the start
- Multiple data sources and formats require integration
Our implementations follow these foundational principles:
- Data as a First-Class Citizen: Schema design, data flow, and storage strategy drive architectural decisions
- Reliability Through Redundancy: Multi-layer fault tolerance, replication, and graceful degradation
- Scalability by Design: Horizontal scaling patterns with stateless services and partitioned data
- Security in Depth: Encryption, access controls, and audit trails at every layer
- Observable Systems: Comprehensive logging, metrics, and tracing for production operations
- Eventual Consistency Awareness: Embrace distributed systems realities with appropriate consistency models
- Container Orchestration: Kubernetes for deployment and scaling
- Message Queues: Apache Kafka for event streaming, RabbitMQ for task queues
- Caching: Redis for session state and query results
- Monitoring: Prometheus + Grafana for metrics, ELK stack for logs
- Primary Storage: PostgreSQL (OLTP), TimescaleDB (time-series)
- Analytics: ClickHouse (OLAP), Apache Spark for batch processing
- Document Store: MongoDB for semi-structured data
- Graph Database: Neo4j for relationship-heavy queries
- Object Storage: MinIO/S3 for files and backups
- API Framework: FastAPI (Python), Express.js (Node.js)
- Authentication: OAuth 2.0 + JWT, Keycloak for identity management
- API Gateway: Kong for routing, rate limiting, and authentication
Sky view of project scaffhold.
TerraTest/
├── docs/
│ ├── architecture/ # System design documents
│ ├── data-models/ # ERDs and schema documentation
│ └── deployment/ # Infrastructure and deployment guides
├── shared/
│ ├── common-patterns/ # Reusable architectural components
│ ├── utils/ # Shared utilities and libraries
│ └── monitoring/ # Observability configurations
├── project-1-shelter-management/
│ ├── api/ # REST and GraphQL APIs
│ ├── services/ # Microservices (intake, case-mgmt, reporting)
│ ├── database/ # Migrations and seed data
│ ├── analytics/ # ETL pipelines and dashboards
│ └── infrastructure/ # Terraform/K8s manifests
├── project-2-threat-intelligence/
│ ├── ingestion/ # Data ingestion pipelines
│ ├── correlation-engine/ # Threat matching and scoring
│ ├── ml-models/ # Probability scoring models
│ ├── api/ # Threat intelligence APIs
│ └── dashboards/ # Security operations dashboards
└── benchmarks/ # Performance and load testing
A comprehensive platform for managing multi-location homeless shelter operations, tracking participants from intake through successful housing placement. The system handles sensitive personal information, coordinates services across multiple organizations, and provides real-time visibility into shelter capacity and resource utilization.
Key Challenges:
- High data sensitivity requiring robust security and compliance (HIPAA, GDPR)
- Real-time occupancy tracking across multiple facilities
- Long-term participant histories spanning years and multiple interactions
- Complex reporting for government compliance and grant funding
- Mobile-first access for case workers in the field
graph TB
subgraph "Client Layer"
WebApp[Web Dashboard]
MobileApp[Mobile App]
ReportingUI[Reporting Portal]
end
subgraph "API Gateway"
Gateway[Kong API Gateway]
end
subgraph "Application Services"
IntakeService[Intake Service]
CaseMgmt[Case Management]
OccupancyService[Occupancy Tracking]
ReportingService[Reporting Service]
end
subgraph "Data Layer"
PostgresMain[(PostgreSQL - Primary)]
PostgresReplica[(PostgreSQL - Replica)]
TimescaleDB[(TimescaleDB - Events)]
Redis[(Redis Cache)]
end
subgraph "Event Processing"
Kafka[Kafka Event Stream]
Analytics[Analytics Pipeline]
end
WebApp --> Gateway
MobileApp --> Gateway
Gateway --> IntakeService
Gateway --> CaseMgmt
Gateway --> OccupancyService
Gateway --> ReportingService
IntakeService --> PostgresMain
CaseMgmt --> PostgresMain
OccupancyService --> Redis
OccupancyService --> TimescaleDB
PostgresMain -.Replication.-> PostgresReplica
ReportingService --> PostgresReplica
IntakeService --> Kafka
CaseMgmt --> Kafka
Kafka --> Analytics
Analytics --> ReportingService
Participants Table (PostgreSQL)
CREATE TABLE participants (
participant_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
encrypted_ssn BYTEA, -- Encrypted at application layer
first_name_hash VARCHAR(64), -- Searchable hash
date_of_birth_encrypted BYTEA,
intake_date TIMESTAMP NOT NULL,
current_status VARCHAR(50),
assigned_case_worker_id UUID,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_participant_status ON participants(current_status);
CREATE INDEX idx_participant_case_worker ON participants(assigned_case_worker_id);Disposition Events (TimescaleDB)
CREATE TABLE disposition_events (
event_id BIGSERIAL,
participant_id UUID NOT NULL,
event_type VARCHAR(50) NOT NULL, -- intake, service, placement, exit
event_timestamp TIMESTAMPTZ NOT NULL,
disposition_category VARCHAR(100), -- housing_placed, employed, program_complete
facility_id UUID,
metadata JSONB,
PRIMARY KEY (event_timestamp, event_id)
);
SELECT create_hypertable('disposition_events', 'event_timestamp');Occupancy Tracking (Redis + TimescaleDB)
- Redis: Real-time bed availability and reservations (TTL-based locks)
- TimescaleDB: Historical occupancy data for capacity planning
- Encryption Strategy: Field-level encryption for PII using AES-256, keys managed via HashiCorp Vault
- Search Without Exposure: Searchable hashes for names (HMAC-SHA256 with service-specific salt)
- Audit Trail: Every data modification logged to immutable append-only table
- Multi-tenancy: Organization isolation via PostgreSQL Row-Level Security (RLS)
- Mobile Offline Support: CouchDB sync for case worker mobile apps with eventual consistency
Prerequisites:
- Docker Desktop or Podman
- Kubernetes cluster (minikube for local dev)
- Terraform >= 1.5
- kubectl >= 1.27Quick Start:
# Clone repository
git clone https://github.com/Odiambo/TerraTest.git
cd TerraTest/project-1-shelter-management
# Initialize infrastructure
cd infrastructure
terraform init
terraform apply
# Deploy services
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/deployments/
# Run database migrations
kubectl exec -it deployment/api-service -- npm run migrate
# Access dashboard
kubectl port-forward service/web-dashboard 3000:80Environment Variables:
DATABASE_URL=postgresql://user:pass@postgres:5432/shelter_db
REDIS_URL=redis://redis:6379
KAFKA_BROKERS=kafka:9092
VAULT_ADDR=https://vault:8200
ENCRYPTION_KEY_PATH=secret/data/shelter/encryption-keys
JWT_SECRET=<secure-random-string>- Write Throughput: 5,000 intake events/hour sustained
- Query Latency: P95 < 200ms for participant searches
- Occupancy Updates: Sub-second real-time updates via WebSocket
- Report Generation: Complex 12-month analytics < 5 seconds
An automated threat intelligence platform that ingests penetration testing results from multiple security tools, correlates findings against global threat databases, matches attack patterns to known threat actor techniques, and calculates risk probability scores. The system provides real-time alerting for critical threats and generates actionable reports for security teams.
Key Challenges:
- High-velocity data ingestion (millions of events per day)
- Complex correlation across heterogeneous data sources
- Real-time pattern matching against evolving threat signatures
- Probabilistic risk scoring with low false-positive rates
- Integration with 20+ security tools and threat feeds
graph TB
subgraph "Data Sources"
PenTest[Penetration Test Tools]
ThreatFeeds[Threat Intelligence Feeds]
SIEM[SIEM Systems]
Scanners[Vulnerability Scanners]
end
subgraph "Ingestion Layer"
Ingest1[Ingestion Service 1]
Ingest2[Ingestion Service 2]
Ingest3[Ingestion Service N]
Normalizer[Data Normalizer]
end
subgraph "Stream Processing"
KafkaIn[Kafka - Raw Events]
Flink[Apache Flink]
KafkaProcessed[Kafka - Normalized]
end
subgraph "Correlation Engine"
Matcher[Pattern Matcher]
Scorer[Risk Scorer]
MLService[ML Inference]
end
subgraph "Storage"
ClickHouse[(ClickHouse - Events)]
Neo4j[(Neo4j - Relationships)]
Elasticsearch[(Elasticsearch - Search)]
end
subgraph "API & Alerting"
API[FastAPI Service]
AlertEngine[Alert Engine]
Dashboard[Security Dashboard]
end
PenTest --> Ingest1
ThreatFeeds --> Ingest2
SIEM --> Ingest3
Scanners --> Ingest3
Ingest1 --> KafkaIn
Ingest2 --> KafkaIn
Ingest3 --> KafkaIn
KafkaIn --> Flink
Flink --> Normalizer
Normalizer --> KafkaProcessed
KafkaProcessed --> Matcher
Matcher --> Scorer
Scorer --> MLService
MLService --> ClickHouse
MLService --> Neo4j
MLService --> Elasticsearch
ClickHouse --> API
Neo4j --> API
Elasticsearch --> API
API --> Dashboard
Scorer --> AlertEngine
AlertEngine --> Dashboard
Normalized Security Events (ClickHouse)
CREATE TABLE security_events (
event_id UUID,
event_timestamp DateTime64(3),
source_tool String,
event_type LowCardinality(String), -- vuln, exploit, anomaly
severity LowCardinality(String), -- critical, high, medium, low
target_ip IPv4,
target_port UInt16,
attack_technique String, -- MITRE ATT&CK technique ID
raw_signature String,
normalized_signature String,
risk_score Float32,
false_positive_probability Float32,
metadata String -- JSON
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_timestamp)
ORDER BY (event_timestamp, severity, target_ip)
TTL event_timestamp + INTERVAL 2 YEAR;Threat Actor TTPs (Neo4j Graph)
// Threat Actor Node
CREATE (actor:ThreatActor {
id: 'APT29',
name: 'Cozy Bear',
sophistication: 'Advanced',
last_seen: datetime()
})
// Attack Technique Node
CREATE (technique:Technique {
id: 'T1566.001',
name: 'Spearphishing Attachment',
tactic: 'Initial Access'
})
// Relationship
CREATE (actor)-[:USES {frequency: 0.85, last_observed: date()}]->(technique)
// Query for matching patterns
MATCH path = (e:Event)-[:MATCHES]->(t:Technique)<-[:USES]-(a:ThreatActor)
WHERE e.timestamp > datetime() - duration('P7D')
RETURN a.name, collect(t.name) as techniques, count(*) as occurrences
ORDER BY occurrences DESCRisk Scoring Model (Feature Store + ML)
# Bayesian Network for Risk Scoring
features = {
'cve_severity': 0.9, # CVSS base score
'exploit_available': 0.8, # Public exploit exists
'asset_criticality': 0.95, # Business impact
'threat_actor_confidence': 0.7, # Attribution confidence
'ttp_frequency': 0.6, # Recent activity
'environmental_score': 0.85 # Network exposure
}
# Weighted probability calculation
risk_score = bayesian_network.infer(
evidence=features,
query='compromise_probability'
)- Columnar Storage: ClickHouse for time-series analytics (10x faster than PostgreSQL for our queries)
- Graph for Relationships: Neo4j to model complex threat actor relationships and attack chains
- Stream Processing: Apache Flink for stateful stream processing (pattern detection over time windows)
- ML Pipeline: Feature store with online/offline consistency, model versioning with MLflow
- Deduplication: Bloom filters + consistent hashing to identify duplicate events across tools
- Partitioning Strategy: Time-based partitioning with 2-year TTL, hot/cold storage tiers
Pattern Matching Pipeline:
def correlate_threat(event):
# Stage 1: Signature matching
signatures = match_signatures(event.raw_data, threat_db)
# Stage 2: Behavioral analysis
attack_chain = detect_attack_chain(
event,
window=timedelta(hours=24)
)
# Stage 3: TTP mapping
techniques = map_to_mitre_attack(signatures, attack_chain)
# Stage 4: Threat actor attribution
actors = attribute_to_threat_actors(
techniques,
confidence_threshold=0.7
)
# Stage 5: Risk scoring
risk = calculate_risk_score(
event=event,
techniques=techniques,
actors=actors,
asset_context=get_asset_criticality(event.target_ip)
)
return ThreatIntelligence(
event_id=event.id,
matched_signatures=signatures,
techniques=techniques,
attributed_actors=actors,
risk_score=risk.score,
confidence=risk.confidence
)Prerequisites:
- Kubernetes cluster (AWS EKS, GCP GKE, or Azure AKS recommended)
- Apache Kafka cluster (Confluent Cloud or self-hosted)
- MinIO or S3 for object storage
- GPU nodes for ML inference (optional, improves performance 3x)Quick Start:
cd TerraTest/project-2-threat-intelligence
# Deploy infrastructure
cd infrastructure
terraform init
terraform apply -var-file=production.tfvars
# Deploy Kafka connectors
kubectl apply -f k8s/kafka-connect/
# Deploy stream processing jobs
flink run -d ./flink-jobs/threat-correlator.jar
# Deploy API and dashboards
helm install threat-intel ./helm/threat-intelligence-platform
# Load threat intelligence feeds
kubectl exec -it deployment/data-loader -- python load_feeds.py \
--mitre-attack \
--cve-database \
--threat-actors
# Access security dashboard
kubectl port-forward service/dashboard 8080:80Configuration:
# config/correlation-engine.yaml
ingestion:
batch_size: 1000
flush_interval: 5s
correlation:
matching_algorithms:
- signature_hash
- fuzzy_match
- behavioral_pattern
confidence_thresholds:
signature_match: 0.95
behavioral_match: 0.75
ml_inference: 0.80
alerting:
critical_threshold: 0.9
high_threshold: 0.7
notification_channels:
- slack
- pagerduty
- email
ml_models:
risk_scorer:
version: "v2.3.1"
endpoint: "http://mlflow:5000/models/risk-scorer"
fallback: "rule_based"- Ingestion Rate: 50,000 events/second sustained
- Correlation Latency: P99 < 500ms from event to correlated threat
- Query Performance: Complex threat hunting queries < 2 seconds over 90 days of data
- Alert Latency: Critical alerts delivered within 10 seconds of detection
- Storage Efficiency: 20:1 compression ratio on raw event data
Both projects use event sourcing for audit trails and system consistency:
# shared/common-patterns/event_sourcing.py
class EventStore:
def append_event(self, aggregate_id, event_type, payload):
event = Event(
aggregate_id=aggregate_id,
event_type=event_type,
payload=payload,
timestamp=utcnow(),
sequence=self.get_next_sequence(aggregate_id)
)
self.store.write(event)
self.publish_to_stream(event)
def rebuild_state(self, aggregate_id):
events = self.store.read_events(aggregate_id)
return reduce(self.apply_event, events, initial_state)Prevents cascading failures when external APIs are degraded:
# shared/common-patterns/circuit_breaker.py
from pybreaker import CircuitBreaker
threat_feed_breaker = CircuitBreaker(
fail_max=5,
timeout_duration=60,
expected_exception=RequestException
)
@threat_feed_breaker
def fetch_threat_intelligence(indicator):
response = requests.get(f"{THREAT_API}/indicator/{indicator}")
return response.json()Separate read and write paths for optimized performance:
Write Path: API → Command Handler → PostgreSQL (Primary) → Event Bus
Read Path: API → Query Handler → PostgreSQL (Replica) → Cache → Response
Reusable encryption service for sensitive data:
# shared/utils/encryption.py
from cryptography.fernet import Fernet
import hashlib
class FieldEncryption:
def __init__(self, key_service):
self.key = key_service.get_key('field-encryption')
self.cipher = Fernet(self.key)
def encrypt(self, plaintext: str) -> bytes:
return self.cipher.encrypt(plaintext.encode())
def decrypt(self, ciphertext: bytes) -> str:
return self.cipher.decrypt(ciphertext).decode()
def searchable_hash(self, value: str) -> str:
return hashlib.sha256(f"{self.salt}{value}".encode()).hexdigest()Protects APIs from abuse:
# shared/common-patterns/rate_limiter.py
from fastapi import Request
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.get("/api/participants")
@limiter.limit("100/minute")
async def get_participants(request: Request):
return {"participants": [...]}- Load Testing: Locust for distributed load generation
- Metrics Collection: Prometheus + custom exporters
- Database Profiling: pg_stat_statements, EXPLAIN ANALYZE
- Infrastructure: AWS EKS, r5.2xlarge nodes
| Operation | Throughput | Latency (P95) | Notes |
|---|---|---|---|
| Participant Intake | 10/hour | 120ms | Including encryption |
| Case Updates | 1200/day | 80ms | Cached case worker data |
| Occupancy Check | 1,000/sec | 15ms | Redis-backed |
| Complex Report | N/A | 4.5s | 12-month analytics |
| Concurrent Users | 4 | N/A | Sustained for 8 hours |
| Operation | Throughput | Latency (P95) | Notes |
|---|---|---|---|
| Event Ingestion | 20,000/sec | N/A | Kafka buffered |
| Threat Correlation | 12,000/sec | 450ms | ML inference included |
| Graph Query (3-hop) | 800/sec | 280ms | Neo4j, complex patterns |
| Time-Series Query | 200/sec | 1.8s | 90-day window, *estimate events |
| Alert Processing | 1,500/sec | 95ms | Critical path optimized |
Horizontal Scaling (Threat Intelligence):
2 nodes: 25,000 events/sec
4 nodes: 48,000 events/sec (96% linear)
8 nodes: 91,000 events/sec (91% linear)
16 nodes: 165,000 events/sec (82% linear)
Database Scaling (Shelter Management):
Single PostgreSQL: 3,500 writes/sec
+ Read Replicas (3): 3,500 writes/sec, 15,000 reads/sec
+ Connection Pooling: 5,200 writes/sec, 22,000 reads/sec
+ Partitioning: 7,800 writes/sec, 28,000 reads/sec
This scaling is assumming the use in the test cities within the test mid-western state. There are currently 8,300 known homeless adults (2025) and a total of 12,000 unhoused people on any given evening in the test state (2025). Data-wise this is low intensity. It still works as a learning model do to prevalence and incidence tracking. Furthermore, human data showing disposition, migration and economic participation is vluable for subsidy wrapping and population management.
For instence, over 1.2 million citizens in the test State spend over %50 of their income on housing leaving them at risk for homelessness. It is not yet calculated how many people are cyclically homeless (people lacking an adequate place to sleep for > 30 days within a twelve month period. Or, lacking housing after a lease termination or inviction). Temporary homelessness can last up to two years (nationwide). Being out of the housing cycle also leaves people prone to poor quality housing and dispoportionate move-in cost. both of these challenges can lead to housing insecurity (cyclical homelessness) within the next 12 months.
Properly tracking the causes and consequenses is worth the data performance exercised in this project.
Local Kubernetes (minikube)
- Single-node PostgreSQL
- Redis standalone
- Kafka single broker
- No replication
- Hot reload enabled
Cloud Kubernetes (3 nodes)
- PostgreSQL primary + 1 replica
- Redis Sentinel (3 nodes)
- Kafka cluster (3 brokers)
- Load balancer
- Metrics collection
Cloud Kubernetes (10+ nodes, auto-scaling)
- PostgreSQL HA (Patroni, 3 nodes)
- Redis Cluster (6 nodes, 3 shards)
- Kafka cluster (5 brokers, RF=3)
- Multi-AZ deployment
- Full observability stack
- Automated backups
- Disaster recovery
- OAuth 2.0 + OpenID Connect via Keycloak
- JWT tokens with short expiration (15 min access, 7 day refresh)
- Role-Based Access Control (RBAC) with fine-grained permissions
- API Key rotation every 90 days
- Encryption at rest: AES-256 for all databases
- Encryption in transit: TLS 1.3 for all communications
- Key management: HashiCorp Vault with auto-rotation
- Field-level encryption: PII encrypted at application layer
- Secure deletion: Cryptographic erasure (key deletion)
- Network policies: Kubernetes NetworkPolicy for service isolation
- Zero-trust networking: Mutual TLS between services
- API Gateway: Rate limiting, IP whitelisting, DDoS protection
- Secrets management: Never store secrets in code or environment variables
- HIPAA (Shelter Management): BAA agreements, audit logs, access controls
- GDPR: Right to erasure, data portability, consent management
- SOC 2 Type II: Continuous compliance monitoring
- PCI DSS (if payment processing): Tokenization, secure transmission
# Key metrics tracked
- request_duration_seconds (histogram)
- request_total (counter)
- active_connections (gauge)
- queue_depth (gauge)
- error_rate (counter)
- data_ingestion_rate (gauge)
- database_query_duration (histogram){
"timestamp": "2026-01-30T10:15:30.123Z",
"level": "INFO",
"service": "intake-service",
"trace_id": "a3f2b1c9-...",
"span_id": "7d8e9f10-...",
"user_id": "hashed_user_id",
"action": "participant_created",
"duration_ms": 145,
"metadata": {
"facility_id": "uuid",
"case_worker_id": "uuid"
}
}- Distributed tracing across microservices
- Visualize request flows and bottlenecks
- Identify slow database queries
- Track external API latencies
# Critical Alerts (PagerDuty)
- High error rate (>5% for 5 minutes)
- Database replication lag (>30 seconds)
- Service down (health check failing)
- Disk usage >90%
# Warning Alerts (Slack)
- Elevated latency (P95 >500ms for 10 minutes)
- High memory usage (>80%)
- Certificate expiring (<30 days)We welcome contributions! Please follow these guidelines:
- Fork the repository and create a feature branch
- Follow coding standards: Use linters (Black, ESLint) and type hints
- Write tests: Minimum 80% code coverage required
- Document changes: Update relevant README sections and inline comments
- Submit PR: Include description, tests, and documentation
# Python
black .
mypy .
pytest --cov=. --cov-report=html
# JavaScript/TypeScript
npm run lint
npm run test
npm run type-checkFollow conventional commits:
feat: add real-time occupancy WebSocket endpoint
fix: resolve race condition in threat correlation
docs: update deployment instructions for AWS
perf: optimize participant search query
- Tests pass locally
- Code coverage maintained or improved
- Documentation updated
- No security vulnerabilities (Snyk scan)
- Performance impact assessed
- Breaking changes documented
- Designing Data-Intensive Applications by Martin Kleppmann (foundational)
- Database Internals by Alex Petrov
- Streaming Systems by Tyler Akidau
- Building Microservices by Sam Newman
- Kafka: A Distributed Messaging System
- The Google File System
- Dynamo: Amazon's Highly Available Key-value Store
- MITRE ATT&CK Framework
We may not use the tool per say but conceopts from design papers were used. For instance, the Apache tools and New Relic are not in the wild application tooling.
- Data Pipeline: Apache Airflow, Prefect, Dagster
- Stream Processing: Apache Flink, Kafka Streams, Apache Beam
- Monitoring: Prometheus, Grafana, Datadog, New Relic
- Testing: Locust, K6, Apache JMeter
- Security: OWASP ZAP, Snyk, Trivy
MIT License - see LICENSE file for details.
This project incorporates patterns and practices from:
- The SRE community at Google
- CNCF projects (Kubernetes, Prometheus, Jaeger)
- MITRE ATT&CK Framework contributors
- Open-source security tool maintainers
AI Disclosure: Architecture patterns and code examples generated with assistance from OpenAI GPT-4 and Anthropic Claude. All implementations reviewed and tested by human engineers.
Maintainers: @Odiambo
Last Updated: 2026-01-30
Version: 1.5.0