说明: channels是异步的,不能用同步的方式推送消息。
1、单通道推送
说明 :单通道推送指的是服务器向某一个特定的WebSocket连接推送消息。这种方式通常用于一对一的通信场景,例如私人聊天、个性化通知等。在这种方式中,每个客户端与服务器之间建立独立的WebSocket连接,服务器可以向该特定连接推送消息。
- 场景 :用户A与服务器建立WebSocket连接,服务器通过这个连接向用户A推送个人消息。
- 优点 :精确控制,确保消息只发送给指定用户。
- 缺点 :需要为每个用户维护独立的连接,可能会导致资源消耗较大。
服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class AlertConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
self.send_alert_task = asyncio.create_task(self.send_alerts())
async def disconnect(self, close_code):
self.send_alert_task.cancel()
async def send_alerts(self):
while True:
await self.send(text_data=json.dumps({
'message': 'Alert!'
}))
await asyncio.sleep(1)
- 建立连接时
connect
,创建一个异步任务来推送消息 - 连接断开时
disconnect
,取消该异步任务。 send_alerts
调用send
来往该通道推送消息。
2、组内推送
说明 :组内推送指的是服务器向一组客户端推送消息。这种方式适用于一对多或多对多的通信场景,例如群聊、群体通知等。在这种方式中,多个客户端加入同一个组,服务器可以向该组内的所有连接推送消息。
- 场景 :用户A、用户B和用户C都加入了同一个群组,服务器向这个群组推送消息,所有群组内的用户都会收到该消息。
- 优点 :高效推送,可以一次性向多个客户端发送消息。
- 缺点 :需要管理群组内的成员,确保消息能够正确发送给组内所有成员。
要求: 这里比较重要!
- 第一个连接建立,该连接加入到组内后开启消息推送异步任务;后续建立的连接仅加入到组中,不再开启消息推送异步任务,否则就会重复。
- 推送过程中任意连接断开后不会影响推送任务,即不会导致推送中断。比如,按照
1
中的描述可以看到消息推送任务是第一个连接开启的,那么当第一个连接断开后,它的消息推送任务也应该停止。但是还有建立的连接,应该继续往这些连接中推送消息,所以这里需要采取些机制来保证。 - 组内没有连接后,停止向组内推送消息,即终止任务。
服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。
import datetime
import json
import asyncio
from django.core.cache import cache
from channels.generic.websocket import AsyncWebsocketConsumer
# 报警推送
ALARM_GROUP_NAME = "alerts_group"
ALARM_SENDER_KEY = f"{ALARM_GROUP_NAME}_sender"
ALARM_MEMBERS_KEY = f"{ALARM_GROUP_NAME}_members"
class AlertConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
await self.add_to_group()
await self.check_and_start_alerts()
async def disconnect(self, close_code):
await self.remove_from_group()
await self.check_and_stop_alerts()
async def add_to_group(self):
members = cache.get(ALARM_MEMBERS_KEY, set())
members.add(self.channel_name)
cache.set(ALARM_MEMBERS_KEY, members, None)
await self.channel_layer.group_add(ALARM_GROUP_NAME, self.channel_name)
async def remove_from_group(self):
members = cache.get(ALARM_MEMBERS_KEY, set())
members.discard(self.channel_name)
cache.set(ALARM_MEMBERS_KEY, members, None)
await self.channel_layer.group_discard(ALARM_GROUP_NAME, self.channel_name)
async def check_and_start_alerts(self):
sender_channel = cache.get(ALARM_SENDER_KEY)
if not sender_channel:
cache.set(ALARM_SENDER_KEY, self.channel_name, None)
self.send_alert_task = asyncio.create_task(self.send_alerts())
async def check_and_stop_alerts(self):
sender_channel = cache.get(ALARM_SENDER_KEY)
if sender_channel == self.channel_name:
cache.delete(ALARM_SENDER_KEY)
if hasattr(self, 'send_alert_task'):
self.send_alert_task.cancel()
new_sender_channel = await self.get_new_sender()
if new_sender_channel:
cache.set(ALARM_SENDER_KEY, new_sender_channel, None)
await self.channel_layer.send(new_sender_channel, {
'type': 'start_sending_alerts',
})
else:
cache.delete(ALARM_MEMBERS_KEY)
async def get_new_sender(self):
members = cache.get(ALARM_MEMBERS_KEY, set())
if members:
return members.pop()
return None
async def send_alerts(self):
while True:
await self.channel_layer.group_send(
ALARM_GROUP_NAME,
{
'type': 'alert_message',
'message': f'Alert{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
)
await asyncio.sleep(1)
async def alert_message(self, event):
await self.send(text_data=json.dumps({
'message': event['message']
}))
async def start_sending_alerts(self, event):
self.send_alert_task = asyncio.create_task(self.send_alerts())
可以看到比单连接推送复杂很多,当然,这是我设计的,我只是一个刚满一年的菜鸟,应该会有更简单的方式。
类的主要功能:
- 连接管理 :处理 WebSocket 连接的建立和断开。
- 组管理 :将客户端加入或移除一个组,并缓存这些成员。
- 警报发送管理 :启动和停止警报消息的发送。
连接和断开
connect
方法:- 接受 WebSocket 连接。
- 将当前连接加入警报组。
- 检查并启动警报发送任务(如果当前没有发送者)。
disconnect
方法:- 从警报组中移除当前连接。
- 检查并停止警报发送任务(如果当前发送者是该连接)。
- 如果当前发送者断开连接,选择新的发送者并发送消息通知其开始发送警报。
组管理
add_to_group
方法:- 从缓存中获取当前警报组成员,将当前连接加入该组,并更新缓存。
- 使用
channel_layer
将当前连接添加到警报组中。
remove_from_group
方法:- 从缓存中获取当前警报组成员,将当前连接从该组移除,并更新缓存。
- 使用
channel_layer
将当前连接从警报组中移除。
警报发送管理
check_and_start_alerts
方法:- 检查缓存中是否已有警报发送者。
- 如果没有,则将当前连接设置为发送者,并启动警报发送任务。
check_and_stop_alerts
方法:- 检查当前发送者是否是当前连接。
- 如果是,则删除发送者缓存,并取消警报发送任务。
- 选择新的发送者,并通知其开始发送警报。如果没有新的发送者,则清空成员缓存。
get_new_sender
方法:- 从缓存中获取当前警报组成员并返回一个新的发送者(如果有的话)。
send_alerts
方法:- 无限循环中,每秒钟向警报组发送一次包含当前时间的警报消息。
消息处理
alert_message
方法:- 处理来自组内的警报消息,并将其发送到 WebSocket 客户端。
start_sending_alerts
方法:- 通过向新的发送者的频道发送一个事件,通知它开始发送报警消息。
重点:通过维护一个共享的 Redis 集合来跟踪组中的所有连接,并且当一个连接断开时,从集合中选择一个新的连接作为发送者。为了防止后端挂掉不会调用disconnect方法,请在启动django时清空redis库。可以指定一个redis存放key
缺陷:可以看到组名等是在全局变量中写死的,也就是说这个消费者只能处理一类任务。 虽然我没测试过,但是通过客户端传递flag,后端根据flag动态更新组名,通过判断决定发送什么消息,可以实现该消费者处理多类任务。
评论列表,共 0 条评论
暂无评论