说明: channels是异步的,不能用同步的方式推送消息。
1、单通道推送
说明 :单通道推送指的是服务器向某一个特定的WebSocket连接推送消息。这种方式通常用于一对一的通信场景,例如私人聊天、个性化通知等。在这种方式中,每个客户端与服务器之间建立独立的WebSocket连接,服务器可以向该特定连接推送消息。
- 场景 :用户A与服务器建立WebSocket连接,服务器通过这个连接向用户A推送个人消息。
- 优点 :精确控制,确保消息只发送给指定用户。
- 缺点 :需要为每个用户维护独立的连接,可能会导致资源消耗较大。
步骤:
- 定义websocket消费者
- 定义celery任务,用于往连接(通道)中推送消息。
- 建立连接后websocket消费者调用celery任务
服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。
websocket消费者:
import datetime
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from celery_tasks.tasks import alarm_task
class AlarmConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.result = alarm_task.delay(self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.result.revoke(terminate=True)
async def send_message(self, event):
message = event['message']
await self.send(text_data=json.dumps(message))
async def receive(self, text_data=None, bytes_data=None):
if text_data == 'ping':
await self.send(text_data=json.dumps(
{"type": "ping", "data": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}))
else:
data = json.loads(text_data)
message = data.get('message')
await self.send(text_data=json.dumps(message))
connect
时创建celery异步任务disconnect
时撤销该异步任务
celery任务:
import time
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime
@shared_task
def alarm_task(channel_name):
channel_layer = get_channel_layer()
while True:
async_to_sync(channel_layer.send)(channel_name, {
'type': 'send.message',
'message': {"type": "result", "data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
})
time.sleep(60)
- 注意这些需要安装
asgiref
- 通过Channels 的通道层(channel layer)实例往指定通道内推送消息。
2、Channels 的通道层
这个知识点很重要,很抱歉一直到最后一篇才讲到它。
通道层是 Channels 的核心概念之一,负责处理消息的发送和接收。在 Django Channels 中,通道层是一个用于在不同的消费者(consumers)之间传递消息的抽象层。通过通道层,可以实现 WebSocket、HTTP 请求以及其他类型的消息传递。
-
获取通道层 :
get_channel_layer
是 Django Channels 提供的一个函数,用于获取当前的通道层(channel layer)实例。 -
往指定通道推送消息:
async_to_sync(channel_layer.send)( 'specific_channel_name', # 通道名称 { 'type': 'chat.message', # 自定义的消息类型 'message': 'Hello, World!', # 发送的消息内容 } )
-
注意我们这里用到的通道层的
send
方法 async_to_sync
:使用async_to_sync
是为了在同步上下文中调用异步代码。因为 Django 视图和 Celery 任务通常是同步函数,而 Channels 的通道层操作是异步的,所以需要使用async_to_sync
将异步代码同步化,从而在同步上下文中调用异步函数。type
:指示Django Channels
消费者在接收到消息时应该调用哪个方法来处理该消息。- 指定处理方法 :
type
字段的值是一个字符串,对应于消费者类中的一个方法名。当消息被发送到组或通道时,Django Channels 会根据type
字段的值来确定应该调用哪个方法来处理该消息。 - 消息分派 :当消息到达时,
Django Channels
会自动调用与type
字段值相对应的方法。这种方式允许你在一个消费者中定义多个处理方法,每个方法处理不同类型的消息。 - 这里type是
chat.message
,那么你的消费者中该有一个chat_message
的方法用来推送消息。
- 指定处理方法 :
message
:推送的数据
3、组内推送
说明 :组内推送指的是服务器向一组客户端推送消息。这种方式适用于一对多或多对多的通信场景,例如群聊、群体通知等。在这种方式中,多个客户端加入同一个组,服务器可以向该组内的所有连接推送消息。
- 场景 :用户A、用户B和用户C都加入了同一个群组,服务器向这个群组推送消息,所有群组内的用户都会收到该消息。
- 优点 :高效推送,可以一次性向多个客户端发送消息。
- 缺点 :需要管理群组内的成员,确保消息能够正确发送给组内所有成员。
服务器端代码示例:这里我们的需求时连接建立后不断地向客户端推送消息。
消费者:
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from utils import create_alarm_task_if_not_existsif_stop_task
class AlarmConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.group_name = f'alarm'
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
try:
create_alarm_task_if_not_exists(self.group_name)
await self.accept()
except Exception:
return
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
if_stop_task(self.group_name)
async def send_message(self, event):
message = event['message']
await self.send(text_data=json.dumps(message))
- 建立连接将该连接加入到组内,检查是否创建了celery任务,如果没有就创建任务,开启推送,否则不创建。
- 断开连接时从组内移除掉该连接(可以称为通道),检查是否需要停止任务,因为当组内没有通道时就没有必要往该组发送消息了。
utils: 这里的两个方法很重要,判断何时需要创建任务,合适撤销任务
from django.core.cache import caches
from .tasks import group_alarm_task
from .celery import app as celery_app
# ws多分组celery创建
def create_alarm_task_if_not_exists(project_id, group_name):
task_key = f"celery_task_{group_name}"
lock_key = f"lock_{group_name}"
cache = caches['redis_15']
with cache.lock(lock_key, timeout=10):
if cache.get(group_name) is None:
cache.set(group_name, 0, None)
cache.incr(group_name)
if not cache.get(task_key):
task = group_alarm_task.delay(group_name, project_id)
cache.set(task_key, task.id, None)
# ws分组celery任务停止
def if_stop_task(group_name):
task_key = f"celery_task_{group_name}"
lock_key = f"lock_{group_name}"
cache = caches['redis_15']
with cache.lock(lock_key, timeout=10):
task_id = cache.get(task_key)
count = cache.decr(group_name)
if count == 0:
cache.delete(task_key)
cache.delete(group_name)
if task_id:
celery_app.control.revoke(task_id, terminate=True)
redis_15
:采用了特定的库来存储key,因为需要在django启动时清空这个库避免一些问题。create_alarm_task_if_not_exists
:在某个 WebSocket 组有新的连接时调用。确保只为每个组创建一个 Celery 任务,并记录当前组的连接数量。if_stop_task
:在某个 WebSocket 组有连接断开时调用。减少当前组的连接数量,如果没有连接时,停止对应的 Celery 任务。
task:
import time
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime
@shared_task
def group_alarm_task(group_name):
channel_layer = get_channel_layer()
while True:
async_to_sync(channel_layer.group_send)(group_name, {
'type': 'send.message',
'message': {"type": "result", "data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
})
time.sleep(60)
- 这里需要注意的就是使用
group_send
方法来向组中发送消息,不同于通道的send
。
4、我的最终选择
我可能不会采用celery,而是选择asyncio,原因看这篇文章《celery 系列:我实际开发过程中遇到的问题(6)》
评论列表,共 0 条评论
暂无评论