Django WebSocket 系列(六):结合 Celery 的 WebSocket 消息推送示例

说明: channels是异步的,不能用同步的方式推送消息。

1、单通道推送

说明 :单通道推送指的是服务器向某一个特定的WebSocket连接推送消息。这种方式通常用于一对一的通信场景,例如私人聊天、个性化通知等。在这种方式中,每个客户端与服务器之间建立独立的WebSocket连接,服务器可以向该特定连接推送消息。

  • 场景 :用户A与服务器建立WebSocket连接,服务器通过这个连接向用户A推送个人消息。
  • 优点 :精确控制,确保消息只发送给指定用户。
  • 缺点 :需要为每个用户维护独立的连接,可能会导致资源消耗较大。

步骤:

  1. 定义websocket消费者
  2. 定义celery任务,用于往连接(通道)中推送消息。
  3. 建立连接后websocket消费者调用celery任务

服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。

websocket消费者:

import datetime
import json

from channels.generic.websocket import AsyncWebsocketConsumer
from celery_tasks.tasks import alarm_task


class AlarmConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.result = alarm_task.delay(self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.result.revoke(terminate=True)

    async def send_message(self, event):
        message = event['message']
        await self.send(text_data=json.dumps(message))

    async def receive(self, text_data=None, bytes_data=None):
        if text_data == 'ping':
            await self.send(text_data=json.dumps(
                {"type": "ping", "data": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}))
        else:
            data = json.loads(text_data)
            message = data.get('message')
            await self.send(text_data=json.dumps(message))
  1. connect时创建celery异步任务
  2. disconnect时撤销该异步任务

celery任务:

import time

from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime


@shared_task
def alarm_task(channel_name):
    channel_layer = get_channel_layer()
    while True:
        async_to_sync(channel_layer.send)(channel_name, {
            'type': 'send.message',
            'message': {"type": "result", "data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
        })
        time.sleep(60)
  1. 注意这些需要安装asgiref
  2. 通过Channels 的通道层(channel layer)实例往指定通道内推送消息。

2、Channels 的通道层

这个知识点很重要,很抱歉一直到最后一篇才讲到它。

通道层是 Channels 的核心概念之一,负责处理消息的发送和接收。在 Django Channels 中,通道层是一个用于在不同的消费者(consumers)之间传递消息的抽象层。通过通道层,可以实现 WebSocket、HTTP 请求以及其他类型的消息传递。

  1. 获取通道层get_channel_layer 是 Django Channels 提供的一个函数,用于获取当前的通道层(channel layer)实例。

  2. 往指定通道推送消息:

    async_to_sync(channel_layer.send)( 'specific_channel_name', # 通道名称 { 'type': 'chat.message', # 自定义的消息类型 'message': 'Hello, World!', # 发送的消息内容 } )

  3. 注意我们这里用到的通道层的send方法

  4. async_to_sync:使用 async_to_sync 是为了在同步上下文中调用异步代码。因为 Django 视图和 Celery 任务通常是同步函数,而 Channels 的通道层操作是异步的,所以需要使用 async_to_sync 将异步代码同步化,从而在同步上下文中调用异步函数。
  5. type:指示 Django Channels消费者在接收到消息时应该调用哪个方法来处理该消息。
    • 指定处理方法type 字段的值是一个字符串,对应于消费者类中的一个方法名。当消息被发送到组或通道时,Django Channels 会根据 type 字段的值来确定应该调用哪个方法来处理该消息。
    • 消息分派 :当消息到达时,Django Channels 会自动调用与 type 字段值相对应的方法。这种方式允许你在一个消费者中定义多个处理方法,每个方法处理不同类型的消息。
    • 这里type是chat.message,那么你的消费者中该有一个chat_message的方法用来推送消息。
  6. message:推送的数据

3、组内推送

说明 :组内推送指的是服务器向一组客户端推送消息。这种方式适用于一对多或多对多的通信场景,例如群聊、群体通知等。在这种方式中,多个客户端加入同一个组,服务器可以向该组内的所有连接推送消息。

  • 场景 :用户A、用户B和用户C都加入了同一个群组,服务器向这个群组推送消息,所有群组内的用户都会收到该消息。
  • 优点 :高效推送,可以一次性向多个客户端发送消息。
  • 缺点 :需要管理群组内的成员,确保消息能够正确发送给组内所有成员。

服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。

消费者:

import json

from channels.generic.websocket import AsyncWebsocketConsumer
from utils import create_alarm_task_if_not_existsif_stop_task

class AlarmConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.group_name = f'alarm'
        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )
        try:
            create_alarm_task_if_not_exists(self.group_name)
            await self.accept()
        except Exception:
            return

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )
        if_stop_task(self.group_name)

    async def send_message(self, event):
        message = event['message']
        await self.send(text_data=json.dumps(message))
  1. 建立连接将该连接加入到组内,检查是否创建了celery任务,如果没有就创建任务,开启推送,否则不创建。
  2. 断开连接时从组内移除掉该连接(可以称为通道),检查是否需要停止任务,因为当组内没有通道时就没有必要往该组发送消息了。

utils: 这里的两个方法很重要,判断何时需要创建任务,合适撤销任务

from django.core.cache import caches
from .tasks import group_alarm_task
from .celery import app as celery_app

# ws多分组celery创建
def create_alarm_task_if_not_exists(project_id, group_name):
    task_key = f"celery_task_{group_name}"
    lock_key = f"lock_{group_name}"
    cache = caches['redis_15']
    with cache.lock(lock_key, timeout=10):
        if cache.get(group_name) is None:
            cache.set(group_name, 0, None)
        cache.incr(group_name)
        if not cache.get(task_key):
            task = group_alarm_task.delay(group_name, project_id)
            cache.set(task_key, task.id, None)


# ws分组celery任务停止
def if_stop_task(group_name):
    task_key = f"celery_task_{group_name}"
    lock_key = f"lock_{group_name}"
    cache = caches['redis_15']
    with cache.lock(lock_key, timeout=10):
        task_id = cache.get(task_key)
        count = cache.decr(group_name)
        if count == 0:
            cache.delete(task_key)
            cache.delete(group_name)
            if task_id:
                celery_app.control.revoke(task_id, terminate=True)
  1. redis_15:采用了特定的库来存储key,因为需要在django启动时清空这个库避免一些问题。
  2. create_alarm_task_if_not_exists :在某个 WebSocket 组有新的连接时调用。确保只为每个组创建一个 Celery 任务,并记录当前组的连接数量。
  3. if_stop_task :在某个 WebSocket 组有连接断开时调用。减少当前组的连接数量,如果没有连接时,停止对应的 Celery 任务。

task:

import time

from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime


@shared_task
def group_alarm_task(group_name):
    channel_layer = get_channel_layer()
    while True:
        async_to_sync(channel_layer.group_send)(group_name, {
            'type': 'send.message',
            'message': {"type": "result", "data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
        })
        time.sleep(60)
  1. 这里需要注意的就是使用group_send方法来向组中发送消息,不同于通道的send

4、我的最终选择

我可能不会采用celery,而是选择asyncio,原因看这篇文章《celery 系列:我实际开发过程中遇到的问题(6)》


发表评论

评论列表,共 0 条评论

    暂无评论