API Reference
Complete API documentation for Centrifugo WebSocket RPC integration.
Python API
Decorator
@websocket_rpc(method_name: str)
Registers an async function as an RPC handler.
Parameters:
method_name(str): RPC method name (e.g.,"tasks.get_stats")
Example:
@websocket_rpc("tasks.get_stats")
async def get_task_stats(conn, params: TaskStatsParams) -> TaskStatsResult:
return TaskStatsResult(total=100)
Handler Signature:
async def handler_name(
conn: ConnectionContext,
params: ParamsModel
) -> ResultModel:
...
Client Classes
CentrifugoRPCClient
Base RPC client handling WebSocket and correlation IDs.
Constructor:
CentrifugoRPCClient(
url: str,
token: str,
user_id: str
)
Methods:
async connect() -> None
Establishes WebSocket connection and subscribes to user channel.
Example:
await rpc.connect()
async disconnect() -> None
Closes WebSocket connection.
Example:
await rpc.disconnect()
async call(method: str, params: dict) -> dict
Calls an RPC method with correlation ID pattern.
Parameters:
method(str): RPC method nameparams(dict): Request parameters
Returns:
dict: Response data
Example:
result = await rpc.call("tasks.get_stats", {"user_id": "123"})
APIClient
Type-safe wrapper with generated methods.
Constructor:
APIClient(rpc_client: CentrifugoRPCClient)
Generated Methods:
Each RPC handler generates a typed method:
async def <method_name>(
self,
params: <ParamsModel>
) -> <ResultModel>
Example:
api = APIClient(rpc)
result = await api.tasks_get_stats(TaskStatsParams(user_id="123"))
Configuration
DjangoCfgCentrifugoConfig
Configuration model for Centrifugo integration.
Parameters:
enabled(bool): Enable Centrifugo integrationwrapper_url(str): Django wrapper service URLwrapper_api_key(str): Wrapper API keycentrifugo_url(str): Centrifugo WebSocket URLcentrifugo_api_url(str): Centrifugo HTTP API URLcentrifugo_api_key(str): Centrifugo API keycentrifugo_token_hmac_secret(str): HMAC secret for JWTack_timeout(int, optional): RPC timeout in seconds (default: 30)log_level(str, optional): Logging level (default: "INFO")log_all_calls(bool, optional): Log all RPC calls (default: True)log_only_with_ack(bool, optional): Log only ack calls (default: False)
Example:
centrifugo: DjangoCfgCentrifugoConfig = DjangoCfgCentrifugoConfig(
enabled=True,
wrapper_url="http://localhost:8001",
wrapper_api_key="secret-key",
centrifugo_url="ws://localhost:8000/connection/websocket",
centrifugo_api_url="http://localhost:8000/api",
centrifugo_api_key="api-key",
centrifugo_token_hmac_secret="hmac-secret",
ack_timeout=60,
log_level="DEBUG"
)
TypeScript API
Client Classes
CentrifugoRPCClient
Base RPC client.
Constructor:
constructor(
url: string,
token: string,
userId: string
)
Methods:
async connect(): Promise<void>
Connects to Centrifugo.
Example:
await rpc.connect();
disconnect(): void
Disconnects from Centrifugo.
Example:
rpc.disconnect();
async call(method: string, params: any): Promise<any>
Calls RPC method.
Parameters:
method(string): RPC method nameparams(any): Request parameters
Returns:
Promise<any>: Response data
Example:
const result = await rpc.call('tasks.get_stats', { user_id: '123' });
APIClient
Type-safe API wrapper.
Constructor:
constructor(rpc: CentrifugoRPCClient)
Generated Methods:
async <methodName>(params: <ParamsType>): Promise<ResultType>
Example:
const api = new APIClient(rpc);
const result = await api.tasksGetStats({ user_id: '123' });
Type Interfaces
Generated TypeScript interfaces from Pydantic models:
interface TaskStatsParams {
user_id: string;
include_completed?: boolean;
}
interface TaskStatsResult {
total: number;
completed: number;
pending: number;
}
Go API
Client Struct
APIClient
Combined RPC and API client.
Constructor:
func NewAPIClient(url, token, userID string) *APIClient
Methods:
func (c *APIClient) Connect(ctx context.Context) error
Connects to Centrifugo.
Example:
err := client.Connect(ctx)
func (c *APIClient) Disconnect()
Disconnects from Centrifugo.
Example:
defer client.Disconnect()
func (c *APIClient) Call(ctx context.Context, method string, params interface{}) ([]byte, error)
Calls RPC method.
Parameters:
ctx(context.Context): Context for timeout/cancellationmethod(string): RPC method nameparams(interface): Request parameters
Returns:
[]byte: Response JSONerror: Error if any
Example:
result, err := client.Call(ctx, "tasks.get_stats", params)
Generated Methods
Each handler generates a typed method:
func (api *APIClient) <MethodName>(
ctx context.Context,
params <ParamsStruct>
) (*<ResultStruct>, error)
Example:
result, err := api.TasksGetStats(ctx, TaskStatsParams{
UserId: "123",
IncludeCompleted: true,
})
Type Structs
Generated Go structs from Pydantic models:
type TaskStatsParams struct {
UserId string `json:"user_id"`
IncludeCompleted bool `json:"include_completed"`
}
type TaskStatsResult struct {
Total int64 `json:"total"`
Completed int64 `json:"completed"`
Pending int64 `json:"pending"`
}
Management Commands
generate_centrifugo_clients
Generates type-safe clients from RPC handlers.
Usage:
python manage.py generate_centrifugo_clients [OPTIONS]
Options:
--output DIR: Output directory (required)--all: Generate all clients (Python, TypeScript, Go)--python: Generate Python client--typescript: Generate TypeScript client--go: Generate Go client--verbose: Verbose output
Examples:
# All clients
python manage.py generate_centrifugo_clients --output ./opensdk --all
# Specific languages
python manage.py generate_centrifugo_clients --output ./opensdk --python --typescript
# With verbose output
python manage.py generate_centrifugo_clients --output ./opensdk --all --verbose
Error Handling
Python Exceptions
# Validation errors
from pydantic import ValidationError
try:
params = TaskStatsParams(user_id=123) # Wrong type
except ValidationError as e:
print(e)
# Connection errors
try:
await rpc.connect()
except ConnectionError as e:
print(f"Failed to connect: {e}")
# RPC errors
try:
result = await api.tasks_get_stats(params)
except ValueError as e:
print(f"RPC error: {e}")
TypeScript Errors
// Connection errors
try {
await rpc.connect();
} catch (error) {
console.error('Connection failed:', error);
}
// RPC errors
try {
const result = await api.tasksGetStats(params);
} catch (error) {
if (error.code) {
console.error(`RPC error ${error.code}: ${error.message}`);
}
}
Go Errors
// Connection errors
if err := client.Connect(ctx); err != nil {
log.Fatalf("Connection failed: %v", err)
}
// RPC errors
result, err := api.TasksGetStats(ctx, params)
if err != nil {
if rpcErr, ok := err.(*RPCError); ok {
log.Printf("RPC error %d: %s", rpcErr.Code, rpcErr.Message)
} else {
log.Printf("Error: %v", err)
}
}
Environment Variables
Recommended environment variable names:
# Centrifugo server
CENTRIFUGO_URL=ws://localhost:8000/connection/websocket
CENTRIFUGO_API_URL=http://localhost:8000/api
CENTRIFUGO_API_KEY=your-api-key
CENTRIFUGO_TOKEN_HMAC_SECRET=your-hmac-secret
# Django wrapper
CENTRIFUGO_WRAPPER_URL=http://localhost:8001
CENTRIFUGO_WRAPPER_API_KEY=your-wrapper-key
# Behavior
CENTRIFUGO_ACK_TIMEOUT=30
CENTRIFUGO_LOG_LEVEL=INFO
CENTRIFUGO_LOG_ALL_CALLS=true
Next Steps
- Setup Guide - Installation and configuration
- Backend Guide - Create RPC handlers
- Frontend Guide - Use clients in your app