Skip to main content

Async gRPC Support

High-concurrency async gRPC server with streaming support.

What's Changed

Async Server (v1.5.8+)

gRPC server now uses grpc.aio (async) instead of sync ThreadPoolExecutor:

Before (Sync)Now (Async)
grpc.server()grpc.aio.server()
ThreadPoolExecutorasyncio event loop
~10 concurrent requests1000+ concurrent requests
No streaming supportFull streaming support

How It Works

┌─────────────────────────────────────┐
│ python manage.py rungrpc │
│ │
│ ┌─────────────────────────────┐ │
│ │ grpc.aio.server() [ASYNC] │ │
│ │ └─ asyncio event loop │ │
│ │ ├─ API Key Auth (async) │ │
│ │ ├─ Logging (async) │ │
│ │ └─ Your Services │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘

Key Points:

  • Server runs async automatically
  • API key auth works async
  • Request logging works async
  • Django ORM wrapped in asyncio.to_thread()

Configuration

Enable Async Options

# api/config.py
from django_cfg import DjangoConfig, GRPCConfig, GRPCServerConfig

class MyConfig(DjangoConfig):
grpc = GRPCConfig(
enabled=True,
server=GRPCServerConfig(
host="[::]",
port=50051,

# Async options
max_concurrent_streams=1000, # Concurrent streams per connection
asyncio_debug=False, # Debug mode (dev only)

# Standard options still work
enable_reflection=True,
enable_health_check=True,
),
)

Async Configuration Options

OptionTypeDefaultDescription
max_concurrent_streamsintNoneMax concurrent streams per connection (None = unlimited)
asyncio_debugboolFalseEnable asyncio debug mode (shows warnings)

Writing Services

Most services work as-is - no changes needed:

from django_cfg.apps.integrations.grpc.services import BaseService

class UserService(BaseService):
def GetUser(self, request, context):
# Standard sync code - works on async server
user = self.require_user(context)
db_user = User.objects.get(id=request.user_id)
return UserResponse(id=db_user.id, name=db_user.name)

Why it works: Async server wraps sync methods automatically.

Async Services (Advanced)

For I/O-bound operations or streaming:

import asyncio
from django_cfg.apps.integrations.grpc.services import BaseService

class UserService(BaseService):
async def GetUser(self, request, context):
# Async method
user = self.require_user(context)

# Wrap Django ORM in asyncio.to_thread
db_user = await asyncio.to_thread(
User.objects.get,
id=request.user_id
)

return UserResponse(id=db_user.id, name=db_user.name)

async def StreamUsers(self, request, context):
# Server-side streaming
users = await asyncio.to_thread(
lambda: list(User.objects.all()[:100])
)

for user in users:
yield UserResponse(id=user.id, name=user.name)
await asyncio.sleep(0) # Yield control to event loop

Streaming Examples

Server-Side Streaming

class EventService(BaseService):
async def StreamEvents(self, request, context):
"""Stream events to client."""
while not context.cancelled():
# Get events from DB
events = await asyncio.to_thread(
lambda: Event.objects.filter(user_id=request.user_id)[:10]
)

for event in events:
yield EventResponse(
id=event.id,
type=event.type,
data=event.data,
)

await asyncio.sleep(1) # Stream every 1 second

Client-Side Streaming

class LogService(BaseService):
async def UploadLogs(self, request_iterator, context):
"""Receive stream of logs from client."""
count = 0
async for log_entry in request_iterator:
# Process each log entry
await asyncio.to_thread(
Log.objects.create,
message=log_entry.message,
level=log_entry.level,
)
count += 1

return UploadResponse(logs_received=count)

Bidirectional Streaming

class ChatService(BaseService):
async def Chat(self, request_iterator, context):
"""Bidirectional chat stream."""
async for message in request_iterator:
# Process incoming message
await asyncio.to_thread(
Message.objects.create,
text=message.text,
user_id=message.user_id,
)

# Send response
yield ChatResponse(
text=f"Received: {message.text}",
timestamp=int(time.time()),
)

Performance

Concurrency Comparison

MetricSync ServerAsync Server
Max concurrent requests~101000+
Memory per request2-4 MB50-100 KB
Streaming supportLimitedFull support
I/O-bound workloadsSlowFast

When to Use Async

Use async for:

  • High-concurrency scenarios (100+ concurrent clients)
  • Server-side streaming
  • Bidirectional streaming
  • I/O-bound operations (external APIs, long DB queries)

Sync is fine for:

  • Low-medium concurrency (under 50 clients)
  • Simple CRUD operations
  • CPU-bound workloads

Django ORM with Async

The Problem

Django ORM is synchronous - can't use await:

# ❌ Won't work
user = await User.objects.get(id=1)

The Solution

Wrap ORM calls in asyncio.to_thread():

# ✅ Works
import asyncio
user = await asyncio.to_thread(User.objects.get, id=1)

Helper Pattern

Create a helper for cleaner code:

class UserService(BaseService):
async def _get_user_async(self, user_id):
"""Helper to get user async."""
return await asyncio.to_thread(
User.objects.get,
id=user_id
)

async def GetUser(self, request, context):
user = await self._get_user_async(request.user_id)
return UserResponse(...)

Running the Server

Same command - async enabled automatically:

# Development
python manage.py rungrpc

# Production
python manage.py rungrpc --host 0.0.0.0 --port 50051

Debugging

Enable asyncio debug mode:

# api/config.py
server=GRPCServerConfig(
asyncio_debug=True, # Shows async warnings
)

Or via command line:

python manage.py rungrpc --asyncio-debug

Migration from Old Code

No Changes Needed

If you have existing sync services - they work as-is:

# Old sync code - still works!
class UserService(BaseService):
def GetUser(self, request, context):
return UserResponse(...)

Optional: Convert to Async

Only convert if you need streaming or high concurrency:

# New async version
class UserService(BaseService):
async def GetUser(self, request, context):
user = await asyncio.to_thread(...)
return UserResponse(...)

Common Issues

Issue: "RuntimeError: no running event loop"

Cause: Trying to use await in sync context

Solution: Make method async def:

# Before
def GetUser(self, request, context):
user = await something() # ❌ Error

# After
async def GetUser(self, request, context):
user = await something() # ✅ Works

Issue: "This query is synchronous"

Cause: Using Django ORM directly with await

Solution: Wrap in asyncio.to_thread():

# Before
user = await User.objects.get(id=1) # ❌ Error

# After
user = await asyncio.to_thread(User.objects.get, id=1) # ✅ Works

Best Practices

  1. Keep sync by default - Only use async when needed
  2. Wrap ORM calls - Always use asyncio.to_thread() for Django ORM
  3. Yield control - Use await asyncio.sleep(0) in tight loops
  4. Handle cancellation - Check context.cancelled() in streams
  5. Test thoroughly - Async bugs are harder to debug

Next Steps


Summary: Async is enabled automatically. Write services normally (sync), or use async def for streaming/high-concurrency. Wrap Django ORM in asyncio.to_thread().