Backend Developer Guide
Learn how to create type-safe RPC handlers using Pydantic models and the @websocket_rpc decorator.
Creating Your First Handler
1. Define Pydantic Models
Create request and response models using Pydantic v2:
# core/centrifugo_handlers.py
from pydantic import BaseModel, Field
from typing import List, Optional
class TaskStatsParams(BaseModel):
"""Request parameters for task statistics."""
user_id: str = Field(..., description="User ID to fetch stats for")
include_completed: bool = Field(True, description="Include completed tasks")
class TaskStatsResult(BaseModel):
"""Task statistics response."""
total: int = Field(..., description="Total number of tasks")
completed: int = Field(..., description="Number of completed tasks")
pending: int = Field(..., description="Number of pending tasks")
user_id: str = Field(..., description="User ID")
2. Create RPC Handler
Use the @websocket_rpc decorator to register your handler:
from django_cfg.apps.centrifugo.decorators import websocket_rpc
@websocket_rpc("tasks.get_stats")
async def get_task_stats(conn, params: TaskStatsParams) -> TaskStatsResult:
"""
Get task statistics for a user.
This handler retrieves comprehensive task statistics including
total, completed, and pending task counts.
"""
from apps.tasks.models import Task
# Use params as Pydantic model
user_id = params.user_id
# Query database
total = Task.objects.filter(user_id=user_id).count()
completed = Task.objects.filter(user_id=user_id, status='completed').count()
pending = total - completed
# Return Pydantic model
return TaskStatsResult(
total=total,
completed=completed,
pending=pending,
user_id=user_id
)
3. Register Handlers
Import handlers in your AppConfig.ready():
# core/apps.py
from django.apps import AppConfig
class CoreConfig(AppConfig):
name = "core"
default_auto_field = "django.db.models.BigAutoField"
def ready(self):
"""Import handlers to register them with the router."""
from . import centrifugo_handlers
Handler Best Practices
Use Type Hints
Type hints are required - they drive code generation:
# ✅ GOOD - Pydantic models with type hints
@websocket_rpc("user.get")
async def get_user(conn, params: GetUserParams) -> UserResult:
...
# ❌ BAD - No type hints
@websocket_rpc("user.get")
async def get_user(conn, params):
...
Add Field Descriptions
Descriptions appear in generated client documentation:
class UserParams(BaseModel):
user_id: str = Field(..., description="Unique user identifier")
include_profile: bool = Field(
True,
description="Include full profile data in response"
)
Generates TypeScript documentation:
interface UserParams {
/** Unique user identifier */
user_id: string;
/** Include full profile data in response */
include_profile?: boolean;
}
Write Handler Docstrings
Docstrings appear in README files:
@websocket_rpc("tasks.create")
async def create_task(conn, params: CreateTaskParams) -> TaskResult:
"""
Create a new task.
Creates a task for the specified user with the given title and description.
Automatically sets created_at timestamp and initializes status to 'pending'.
Returns the created task with its generated ID.
"""
...
Use Async Handlers
All handlers must be async:
# ✅ GOOD
@websocket_rpc("tasks.list")
async def list_tasks(conn, params: ListParams) -> ListResult:
tasks = await Task.objects.filter(user_id=params.user_id).alist()
return ListResult(tasks=[...])
# ❌ BAD - sync handler
@websocket_rpc("tasks.list")
def list_tasks(conn, params: ListParams) -> ListResult:
...
Naming Conventions
Method Names
Use dot notation for namespacing:
@websocket_rpc("tasks.list") # List tasks
@websocket_rpc("tasks.create") # Create task
@websocket_rpc("tasks.update") # Update task
@websocket_rpc("tasks.delete") # Delete task
@websocket_rpc("users.get") # Get user
@websocket_rpc("users.profile") # Get user profile
@websocket_rpc("system.health") # System health check
@websocket_rpc("system.stats") # System statistics
This generates organized client methods:
# Python
api.tasks_list(...)
api.tasks_create(...)
api.users_get(...)
api.system_health(...)
// TypeScript
api.tasksList(...)
api.tasksCreate(...)
api.usersGet(...)
api.systemHealth(...)
// Go
api.TasksList(ctx, ...)
api.TasksCreate(ctx, ...)
api.UsersGet(ctx, ...)
api.SystemHealth(ctx, ...)
Model Names
Follow these conventions:
# Request models: <Action><Entity>Params
class CreateTaskParams(BaseModel): ...
class UpdateUserParams(BaseModel): ...
class ListTasksParams(BaseModel): ...
# Response models: <Entity>Result or <Action><Entity>Result
class TaskResult(BaseModel): ...
class UserResult(BaseModel): ...
class TaskListResult(BaseModel): ...
Advanced Patterns
Optional Parameters
Use Optional for optional fields:
from typing import Optional
class SearchParams(BaseModel):
query: str = Field(..., description="Search query")
limit: Optional[int] = Field(None, description="Max results")
offset: Optional[int] = Field(None, description="Result offset")
Generates:
interface SearchParams {
query: string;
limit?: number | null;
offset?: number | null;
}
Lists and Nested Models
Use List for arrays and nest models:
from typing import List
class Tag(BaseModel):
name: str
color: str
class Task(BaseModel):
id: int
title: str
tags: List[Tag]
class TaskListResult(BaseModel):
tasks: List[Task]
total: int
Generates:
interface Tag {
name: string;
color: string;
}
interface Task {
id: number;
title: string;
tags: Tag[];
}
interface TaskListResult {
tasks: Task[];
total: number;
}
Using Connection Object
The conn parameter provides access to connection metadata:
@websocket_rpc("user.profile")
async def get_profile(conn, params: ProfileParams) -> ProfileResult:
# Access user from connection
user_id = conn.user_id # From JWT token
# Use in business logic
profile = await UserProfile.objects.get(user_id=user_id)
return ProfileResult(
username=profile.username,
email=profile.email
)
Error Handling
Raise exceptions for error cases:
from django.core.exceptions import ObjectDoesNotExist
@websocket_rpc("tasks.get")
async def get_task(conn, params: GetTaskParams) -> TaskResult:
try:
task = await Task.objects.aget(id=params.task_id)
return TaskResult(...)
except ObjectDoesNotExist:
raise ValueError(f"Task {params.task_id} not found")
Clients receive error responses:
try:
result = await api.tasks_get(GetTaskParams(task_id=999))
except Exception as e:
print(f"Error: {e}") # "Task 999 not found"
Database Queries
Use Django ORM async methods:
@websocket_rpc("tasks.list")
async def list_tasks(conn, params: ListParams) -> ListResult:
# Async query
tasks = await Task.objects.filter(
user_id=params.user_id
).order_by('-created_at').alist()
# Transform to Pydantic
task_list = [
TaskItem(id=t.id, title=t.title, status=t.status)
for t in tasks
]
return ListResult(tasks=task_list, total=len(task_list))
Testing Handlers
Unit Tests
Test handlers directly:
# core/tests/test_centrifugo_handlers.py
import pytest
from core.centrifugo_handlers import get_task_stats, TaskStatsParams
@pytest.mark.asyncio
async def test_get_task_stats():
# Create test data
user = await User.objects.acreate(username="test")
await Task.objects.acreate(user=user, status="completed")
await Task.objects.acreate(user=user, status="pending")
# Call handler
result = await get_task_stats(
conn=None, # Mock connection
params=TaskStatsParams(user_id=user.id)
)
# Assert results
assert result.total == 2
assert result.completed == 1
assert result.pending == 1
Integration Tests
Test with generated clients:
@pytest.mark.asyncio
async def test_rpc_client_integration():
from opensdk.python import CentrifugoRPCClient, APIClient
# Setup client
rpc = CentrifugoRPCClient(
url=settings.CENTRIFUGO_URL,
token=generate_test_token(),
user_id="test-user"
)
await rpc.connect()
api = APIClient(rpc)
# Call RPC method
result = await api.tasks_get_stats(TaskStatsParams(user_id="test-user"))
# Verify
assert result.total >= 0
assert result.completed >= 0
Complete Example
Here's a complete CRUD example for a Todo app:
# core/centrifugo_handlers.py
from pydantic import BaseModel, Field
from typing import List, Optional
from django_cfg.apps.centrifugo.decorators import websocket_rpc
from apps.todos.models import Todo
# ========================================
# Models
# ========================================
class TodoItem(BaseModel):
id: int
title: str
completed: bool
created_at: str
class CreateTodoParams(BaseModel):
title: str = Field(..., description="Todo title")
class UpdateTodoParams(BaseModel):
id: int = Field(..., description="Todo ID")
title: Optional[str] = Field(None, description="New title")
completed: Optional[bool] = Field(None, description="Completion status")
class DeleteTodoParams(BaseModel):
id: int = Field(..., description="Todo ID to delete")
class ListTodosParams(BaseModel):
completed: Optional[bool] = Field(None, description="Filter by completion")
limit: Optional[int] = Field(10, description="Max results")
class TodoResult(BaseModel):
todo: TodoItem
class TodoListResult(BaseModel):
todos: List[TodoItem]
total: int
class SuccessResult(BaseModel):
success: bool
message: str
# ========================================
# Handlers
# ========================================
@websocket_rpc("todos.create")
async def create_todo(conn, params: CreateTodoParams) -> TodoResult:
"""Create a new todo item."""
todo = await Todo.objects.acreate(
user_id=conn.user_id,
title=params.title,
completed=False
)
return TodoResult(
todo=TodoItem(
id=todo.id,
title=todo.title,
completed=todo.completed,
created_at=todo.created_at.isoformat()
)
)
@websocket_rpc("todos.update")
async def update_todo(conn, params: UpdateTodoParams) -> TodoResult:
"""Update an existing todo item."""
todo = await Todo.objects.aget(id=params.id, user_id=conn.user_id)
if params.title is not None:
todo.title = params.title
if params.completed is not None:
todo.completed = params.completed
await todo.asave()
return TodoResult(
todo=TodoItem(
id=todo.id,
title=todo.title,
completed=todo.completed,
created_at=todo.created_at.isoformat()
)
)
@websocket_rpc("todos.delete")
async def delete_todo(conn, params: DeleteTodoParams) -> SuccessResult:
"""Delete a todo item."""
await Todo.objects.filter(id=params.id, user_id=conn.user_id).adelete()
return SuccessResult(
success=True,
message=f"Todo {params.id} deleted"
)
@websocket_rpc("todos.list")
async def list_todos(conn, params: ListTodosParams) -> TodoListResult:
"""List todos with optional filtering."""
query = Todo.objects.filter(user_id=conn.user_id)
if params.completed is not None:
query = query.filter(completed=params.completed)
query = query.order_by('-created_at')[:params.limit]
todos = await query.alist()
todo_items = [
TodoItem(
id=t.id,
title=t.title,
completed=t.completed,
created_at=t.created_at.isoformat()
)
for t in todos
]
return TodoListResult(
todos=todo_items,
total=len(todo_items)
)
Next Steps
- Client Generation - Generate type-safe clients
- Frontend Guide - Use clients in your frontend
- API Reference - Complete API documentation
Before generating clients:
- ✅ All handlers use Pydantic models
- ✅ Type hints on all parameters and returns
- ✅ Descriptive field descriptions
- ✅ Handler docstrings written
- ✅ Handlers are async
- ✅ Handlers registered in apps.py