Source code

Revision control

Copy as Markdown

Other Tools

#!/usr/bin/env python
import asyncio
import json
import aioredis
import django
import websockets
django.setup()
from django.contrib.contenttypes.models import ContentType
from sesame.utils import get_user
from websockets.frames import CloseCode
CONNECTIONS = {}
def get_content_types(user):
"""Return the set of IDs of content types visible by user."""
# This does only three database queries because Django caches
# all permissions on the first call to user.has_perm(...).
return {
ct.id
for ct in ContentType.objects.all()
if user.has_perm(f"{ct.app_label}.view_{ct.model}")
or user.has_perm(f"{ct.app_label}.change_{ct.model}")
}
async def handler(websocket):
"""Authenticate user and register connection in CONNECTIONS."""
sesame = await websocket.recv()
user = await asyncio.to_thread(get_user, sesame)
if user is None:
await websocket.close(CloseCode.INTERNAL_ERROR, "authentication failed")
return
ct_ids = await asyncio.to_thread(get_content_types, user)
CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
try:
await websocket.wait_closed()
finally:
del CONNECTIONS[websocket]
async def process_events():
"""Listen to events in Redis and process them."""
redis = aioredis.from_url("redis://127.0.0.1:6379/1")
pubsub = redis.pubsub()
await pubsub.subscribe("events")
async for message in pubsub.listen():
if message["type"] != "message":
continue
payload = message["data"].decode()
# Broadcast event to all users who have permissions to see it.
event = json.loads(payload)
recipients = (
websocket
for websocket, connection in CONNECTIONS.items()
if event["content_type_id"] in connection["content_type_ids"]
)
websockets.broadcast(recipients, payload)
async def main():
async with websockets.serve(handler, "localhost", 8888):
await process_events() # runs forever
if __name__ == "__main__":
asyncio.run(main())