Architecture
Understanding the Centrifugo WebSocket RPC architecture and how all components work together.
Overview
┌──────────────┐ WebSocket ┌──────────────┐
│ │◄──────────────────────────►│ │
│ Frontend │ Correlation ID Pattern │ Centrifugo │
│ Client │ │ Server │
│ │ │ │
└──────────────┘ └──────┬───────┘
│
pub/sub channels
│
┌──────▼───────┐
│ │
│ Django │
│ Handlers │
│ │
└──────────────┘
Core Components
1. Pydantic Models (Single Source of Truth)
All types are defined once using Pydantic:
class TaskStatsParams(BaseModel):
user_id: str = Field(..., description="User ID")
class TaskStatsResult(BaseModel):
total: int = Field(..., description="Total tasks")
These models drive:
- ✅ Runtime validation (Python)
- ✅ Code generation (TypeScript, Go)
- ✅ API documentation
- ✅ Type safety across all languages
2. @websocket_rpc Decorator
Registers handlers with the router:
@websocket_rpc("tasks.get_stats")
async def get_task_stats(conn, params: TaskStatsParams) -> TaskStatsResult:
return TaskStatsResult(total=100)
What it does:
- Extracts type hints (Pydantic models)
- Registers with MessageRouter (runtime)
- Registers with RPCRegistry (codegen)
- Validates async handler
- Stores docstring and metadata
3. MessageRouter (Runtime)
Routes incoming RPC calls to handlers:
router = MessageRouter()
# Decorator registers handler
router.register_handler("tasks.get_stats", handler_func)
# Runtime call
result = await router.handle_message("tasks.get_stats", params)
4. Code Generators
Generate clients from registered handlers:
Discovery:
methods = discover_rpc_methods_from_router(router)
# Returns list of RPCMethodInfo with:
# - method name
# - params model
# - result model
# - docstring
Generation:
# Python
generator = PythonThinGenerator(methods)
generator.generate(output_dir)
# TypeScript
generator = TypeScriptThinGenerator(methods)
generator.generate(output_dir)
# Go
generator = GoThinGenerator(methods)
generator.generate(output_dir)
Thin Wrapper Pattern
Two-Layer Architecture
Layer 1: Base RPC Client
- Handles WebSocket connection
- Implements correlation ID pattern
- Manages pub/sub subscriptions
- Matches responses
# Base layer (generic RPC)
class CentrifugoRPCClient:
async def call(self, method: str, params: dict) -> dict:
correlation_id = generate_uuid()
await self.publish("rpc.requests", {
"method": method,
"params": params,
"correlation_id": correlation_id,
"reply_to": f"user#{self.user_id}"
})
return await self.wait_for_response(correlation_id)
Layer 2: Typed API Client
- Type-safe methods
- One method per RPC endpoint
- Thin wrapper (no business logic)
# Typed layer (specific methods)
class APIClient:
async def tasks_get_stats(
self,
params: TaskStatsParams
) -> TaskStatsResult:
result = await self.rpc.call(
"tasks.get_stats",
params.model_dump()
)
return TaskStatsResult(**result)
Benefits
- ✅ Small code size - Thin wrappers, not fat SDKs
- ✅ Maintainability - Separation of concerns
- ✅ Flexibility - Easy to extend base client
- ✅ Type safety - Full typing at API layer
Correlation ID Pattern
Request-response over pub/sub:
1. Client generates correlation_id = "uuid-123"
↓
2. Client publishes to "rpc.requests":
{
method: "tasks.get_stats",
params: {...},
correlation_id: "uuid-123",
reply_to: "user#456"
}
↓
3. Server receives message on "rpc.requests"
↓
4. Server calls handler: get_task_stats(params)
↓
5. Server publishes to "user#456":
{
correlation_id: "uuid-123",
result: {...}
}
↓
6. Client receives on "user#456"
↓
7. Client matches by correlation_id
↓
8. Client returns result to caller
Implementation
Client Side:
private pendingCalls = new Map<string, (result: any) => void>();
async call(method: string, params: any): Promise<any> {
const correlationId = generateUUID();
// Create promise
const promise = new Promise((resolve) => {
this.pendingCalls.set(correlationId, resolve);
});
// Publish request
await this.client.publish('rpc.requests', {
method,
params,
correlation_id: correlationId,
reply_to: `user#${this.userId}`
});
return promise;
}
private handleResponse(message: any) {
const correlationId = message.correlation_id;
const callback = this.pendingCalls.get(correlationId);
if (callback) {
callback(message.result);
this.pendingCalls.delete(correlationId);
}
}
Server Side:
async def handle_rpc_request(message):
method = message["method"]
params = message["params"]
correlation_id = message["correlation_id"]
reply_to = message["reply_to"]
# Call handler
result = await router.handle_message(method, params)
# Publish response
await centrifugo.publish(reply_to, {
"correlation_id": correlation_id,
"result": result
})
Type Conversion System
Pydantic → TypeScript
# Type converter
def pydantic_to_typescript(field_type):
if field_type == str:
return "string"
if field_type == int or field_type == float:
return "number"
if field_type == bool:
return "boolean"
if is_list(field_type):
inner = get_args(field_type)[0]
return f"{pydantic_to_typescript(inner)}[]"
if is_optional(field_type):
inner = get_args(field_type)[0]
return f"{pydantic_to_typescript(inner)} | null"
if is_basemodel(field_type):
return field_type.__name__ # Interface name
Pydantic → Go
def pydantic_to_go(field_type):
if field_type == str:
return "string"
if field_type == int:
return "int64"
if field_type == float:
return "float64"
if field_type == bool:
return "bool"
if is_list(field_type):
inner = get_args(field_type)[0]
return f"[]{pydantic_to_go(inner)}"
if is_optional(field_type):
inner = get_args(field_type)[0]
return f"*{pydantic_to_go(inner)}" # Pointer
if is_basemodel(field_type):
return field_type.__name__ # Struct name
Scalability
Horizontal Scaling
Centrifugo supports millions of concurrent connections:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │ │ Client │ │ Client │
│ 1-100k │ │ 100k+ │ │ 200k+ │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────┐
│ Load Balancer (nginx) │
└─────────┬───────────────────┬───────────┘
│ │
┌────▼─────┐ ┌────▼─────┐
│ Centri 1 │ │ Centri 2 │
└────┬─────┘ └────┬─────┘
│ │
└─────────┬─────────┘
│
┌─────▼──────┐
│ Django │
│ Handlers │
└────────────┘
Redis Broker
Centrifugo uses Redis for pub/sub across instances:
# centrifugo.json
{
"engine": "redis",
"redis_address": "redis://localhost:6379"
}
Next Steps
- API Reference - Complete API documentation
- Setup Guide - Configuration details