Django WebSocket 系列(五):结合 asyncio 的 WebSocket 消息推送示例

说明: channels是异步的,不能用同步的方式推送消息。

1、单通道推送

说明 :单通道推送指的是服务器向某一个特定的WebSocket连接推送消息。这种方式通常用于一对一的通信场景,例如私人聊天、个性化通知等。在这种方式中,每个客户端与服务器之间建立独立的WebSocket连接,服务器可以向该特定连接推送消息。

  • 场景 :用户A与服务器建立WebSocket连接,服务器通过这个连接向用户A推送个人消息。
  • 优点 :精确控制,确保消息只发送给指定用户。
  • 缺点 :需要为每个用户维护独立的连接,可能会导致资源消耗较大。

服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。

import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class AlertConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        self.send_alert_task = asyncio.create_task(self.send_alerts())

    async def disconnect(self, close_code):
        self.send_alert_task.cancel()

    async def send_alerts(self):
        while True:
            await self.send(text_data=json.dumps({
                'message': 'Alert!'
            }))
            await asyncio.sleep(1)
  1. 建立连接时connect,创建一个异步任务来推送消息
  2. 连接断开时disconnect,取消该异步任务。
  3. send_alerts调用send来往该通道推送消息。

2、组内推送

说明 :组内推送指的是服务器向一组客户端推送消息。这种方式适用于一对多或多对多的通信场景,例如群聊、群体通知等。在这种方式中,多个客户端加入同一个组,服务器可以向该组内的所有连接推送消息。

  • 场景 :用户A、用户B和用户C都加入了同一个群组,服务器向这个群组推送消息,所有群组内的用户都会收到该消息。
  • 优点 :高效推送,可以一次性向多个客户端发送消息。
  • 缺点 :需要管理群组内的成员,确保消息能够正确发送给组内所有成员。

要求: 这里比较重要!

  1. 第一个连接建立,该连接加入到组内后开启消息推送异步任务;后续建立的连接仅加入到组中,不再开启消息推送异步任务,否则就会重复。
  2. 推送过程中任意连接断开后不会影响推送任务,即不会导致推送中断。比如,按照1中的描述可以看到消息推送任务是第一个连接开启的,那么当第一个连接断开后,它的消息推送任务也应该停止。但是还有建立的连接,应该继续往这些连接中推送消息,所以这里需要采取些机制来保证。
  3. 组内没有连接后,停止向组内推送消息,即终止任务。

服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。

import datetime
import json
import asyncio

from django.core.cache import cache
from channels.generic.websocket import AsyncWebsocketConsumer

# 报警推送
ALARM_GROUP_NAME = "alerts_group"
ALARM_SENDER_KEY = f"{ALARM_GROUP_NAME}_sender"
ALARM_MEMBERS_KEY = f"{ALARM_GROUP_NAME}_members"


class AlertConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.add_to_group()
        await self.check_and_start_alerts()

    async def disconnect(self, close_code):
        await self.remove_from_group()
        await self.check_and_stop_alerts()

    async def add_to_group(self):
        members = cache.get(ALARM_MEMBERS_KEY, set())
        members.add(self.channel_name)
        cache.set(ALARM_MEMBERS_KEY, members, None)
        await self.channel_layer.group_add(ALARM_GROUP_NAME, self.channel_name)

    async def remove_from_group(self):
        members = cache.get(ALARM_MEMBERS_KEY, set())
        members.discard(self.channel_name)
        cache.set(ALARM_MEMBERS_KEY, members, None)
        await self.channel_layer.group_discard(ALARM_GROUP_NAME, self.channel_name)

    async def check_and_start_alerts(self):
        sender_channel = cache.get(ALARM_SENDER_KEY)
        if not sender_channel:
            cache.set(ALARM_SENDER_KEY, self.channel_name, None)
            self.send_alert_task = asyncio.create_task(self.send_alerts())

    async def check_and_stop_alerts(self):
        sender_channel = cache.get(ALARM_SENDER_KEY)
        if sender_channel == self.channel_name:
            cache.delete(ALARM_SENDER_KEY)
            if hasattr(self, 'send_alert_task'):
                self.send_alert_task.cancel()

            new_sender_channel = await self.get_new_sender()
            if new_sender_channel:
                cache.set(ALARM_SENDER_KEY, new_sender_channel, None)
                await self.channel_layer.send(new_sender_channel, {
                    'type': 'start_sending_alerts',
                })
            else:
                cache.delete(ALARM_MEMBERS_KEY)

    async def get_new_sender(self):
        members = cache.get(ALARM_MEMBERS_KEY, set())
        if members:
            return members.pop()
        return None

    async def send_alerts(self):
        while True:
            await self.channel_layer.group_send(
                ALARM_GROUP_NAME,
                {
                    'type': 'alert_message',
                    'message': f'Alert{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
                }
            )
            await asyncio.sleep(1)

    async def alert_message(self, event):
        await self.send(text_data=json.dumps({
            'message': event['message']
        }))

    async def start_sending_alerts(self, event):
        self.send_alert_task = asyncio.create_task(self.send_alerts())

可以看到比单连接推送复杂很多,当然,这是我设计的,我只是一个刚满一年的菜鸟,应该会有更简单的方式。

类的主要功能:

  1. 连接管理 :处理 WebSocket 连接的建立和断开。
  2. 组管理 :将客户端加入或移除一个组,并缓存这些成员。
  3. 警报发送管理 :启动和停止警报消息的发送。

连接和断开

  • connect 方法:
    • 接受 WebSocket 连接。
    • 将当前连接加入警报组。
    • 检查并启动警报发送任务(如果当前没有发送者)。
  • disconnect 方法:
    • 从警报组中移除当前连接。
    • 检查并停止警报发送任务(如果当前发送者是该连接)。
    • 如果当前发送者断开连接,选择新的发送者并发送消息通知其开始发送警报。

组管理

  • add_to_group 方法:
    • 从缓存中获取当前警报组成员,将当前连接加入该组,并更新缓存。
    • 使用 channel_layer 将当前连接添加到警报组中。
  • remove_from_group 方法:
    • 从缓存中获取当前警报组成员,将当前连接从该组移除,并更新缓存。
    • 使用 channel_layer 将当前连接从警报组中移除。

警报发送管理

  • check_and_start_alerts 方法:
    • 检查缓存中是否已有警报发送者。
    • 如果没有,则将当前连接设置为发送者,并启动警报发送任务。
  • check_and_stop_alerts 方法:
    • 检查当前发送者是否是当前连接。
    • 如果是,则删除发送者缓存,并取消警报发送任务。
    • 选择新的发送者,并通知其开始发送警报。如果没有新的发送者,则清空成员缓存。
  • get_new_sender 方法:
    • 从缓存中获取当前警报组成员并返回一个新的发送者(如果有的话)。
  • send_alerts 方法:
    • 无限循环中,每秒钟向警报组发送一次包含当前时间的警报消息。

消息处理

  • alert_message 方法:
    • 处理来自组内的警报消息,并将其发送到 WebSocket 客户端。
  • start_sending_alerts 方法:
    • 通过向新的发送者的频道发送一个事件,通知它开始发送报警消息。

重点:通过维护一个共享的 Redis 集合来跟踪组中的所有连接,并且当一个连接断开时,从集合中选择一个新的连接作为发送者。为了防止后端挂掉不会调用disconnect方法,请在启动django时清空redis库。可以指定一个redis存放key

缺陷:可以看到组名等是在全局变量中写死的,也就是说这个消费者只能处理一类任务。 虽然我没测试过,但是通过客户端传递flag,后端根据flag动态更新组名,通过判断决定发送什么消息,可以实现该消费者处理多类任务。


发表评论

评论列表,共 0 条评论

    暂无评论