我们用React构建了一个前端,用Django Rest框架和通道构建了一个后端。我们使用Heroku Redis作为我们的Redis提供商。我们的用户通过ReconnectingWebSocket
连接到channel .
我们正在使用Python 3.6和channel 2.4
问题是我们的API调用试图将信息传递给套接字,而它们并不总是将信息传递给消费者。我通过打印记录了调用的步骤,打印了即将尝试发送的channel_name
,并确认它是在连接时返回给用户的,但是消费者中的打印没有被调用,这意味着消息从未被发送给用户。
如果我将dynos的数量增加到或多或少的1-1,用户连接到套接字,那么它似乎解决了问题(或至少使它更可靠)。从我的理解,1 dyno应该能够处理许多套接字连接。我的消费者没有收到信号有什么原因吗?是否有理由通过扩展dynos的数量来解决问题?
在连接时,我让用户加入一个名为"u_{他们的id}"的组;允许潜在地将信号发送到以同一用户登录的多台计算机。我试过通过他们的channel_name
直接发送消息,并通过该组,当消息没有通过似乎都没有通过。prints
验证channel_names
是正确的,消费者仍然没有收到消息。似乎没有任何错误发生。它可能不工作,然后我刷新收件人,它会工作,然后我再次刷新收件人,它又不工作了。
套接字连接当然是活的-我在前端做了一个简单的函数,ping套接字,当我这样做时(即使消费者没有从API调用中获得信号),它会响应。
我还注意到,如果我重新启动我的dynos,当它们加载和套接字重新连接时,第一个用户的信号通过API调用工作了很短的时间,然后它们开始不再通过。此外,如果我不使用套接字一段时间,然后刷新,他们似乎也开始短暂地再次工作。
Procfile
web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0
consumers.py
import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import OnlineStatus, DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone
class HeaderConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
await self.send("request_for_token")
async def continue_connect(self):
print(self.channel_name)
print(self.channel_layer)
await self.send(json.dumps({'your_channel_name': self.channel_name}))
await self.get_user_from_token(self.scope['token'])
await self.channel_layer.group_send(
"online_users",
{
"type": "new_user_online",
"user": self.user,
"channel_layer": str(self.channel_layer),
"channel_name": self.channel_name,
}
)
await self.channel_layer.group_add(
"online_users",
self.channel_name,
)
print("adding to personal group u_%d" % self.user['id'])
await self.channel_layer.group_add(
"u_%d" % self.user['id'],
self.channel_name,
)
self.message_threads = set()
self.message_threads = await self.get_message_ids()
for thread in self.message_threads:
await self.monitor_thread(thread)
self.doa = await self.check_for_or_establish_dailyonlineactivity()
self.online_status = await self.establish_onlinestatus()
await self.add_to_online_status_list()
self.user_id_list = await self.get_online_user_list()
await self.send_online_user_list()
async def disconnect(self, code):
# Leave all the rooms we are still in
if hasattr(self, 'user'):
await self.remove_from_dailyonlineactivity()
try:
await self.channel_layer.group_discard(
"u_%d" % self.user['id'],
self.channel_name,
)
except Exception as e:
print("issue with self channel")
print(e)
try:
await self.channel_layer.group_send(
"online_users",
{
"type": "user_went_offline",
"message": self.user['id'],
}
)
except Exception as e:
print("issue with online_users")
print(e)
await self.channel_layer.group_discard(
"online_users",
self.channel_name,
)
try:
for thread_id in list(self.message_threads):
print("leaving " + str(thread_id))
try:
self.message_threads.discard(thread_id)
await self.channel_layer.group_discard(
"m_%d" % thread_id,
self.channel_name,
)
except ClientError:
pass
except Exception as e:
print("issue with threads")
print(e)
async def receive(self, text_data):
print(text_data)
text_data_json = json.loads(text_data)
if 'token' in text_data_json:
self.scope['token'] = text_data_json['token']
await self.continue_connect()
#self.send(text_data=json.dumps({
# 'message': message
#}))
async def new_message(self, event):
# Send a message down to the client
await self.send(text_data=json.dumps(
{
"type": event['type'],
"thread": event['thread'],
"message": event["message"],
},
))
async def user_went_offline(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def send_call_ring(self, event):
print("SENDING CALL RING")
print(event["message"])
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def rejoin_call(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def popup_notification(self, event):
print("sending popup_notification")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def new_call_participant(self, event):
print("new_call_participant received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def new_participants_invited(self, event):
print("new_participants_invited received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def share_document_via_videocall(self, event):
print("share_document received")
print(event)
print(self.channel_name)
print(self.user['id'])
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_video_share_link(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_video_hand_up(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_video_address_hand_up(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def you_are_dominant_speaker(self, event):
# Send a message down to the client
print("SENDING DOMINANT SPEAKER")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def you_are_no_longer_dominant_speaker(self, event):
print("SENDING NO LONGER DOMINANT SPEAKER")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_video_screenshare(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_video_reaction(self, event):
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def video_call_thread(self, event):
print("sending video call thread")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def video_call_chat_message(self, event):
print("sending video call chat message")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_chat_message(self, event):
print("sending event chat message")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def to_next_agenda_item(self, event):
print("sending video call chat message")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def mute_all_event_participants(self, event):
print("sending mute all participants")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_started(self, event):
print("event started consumer")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def event_ended(self, event):
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def video_call_reaction(self, event):
print("sending video call reaction")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def new_user_online(self, event):
print("user_online received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["user"],
"channel_layer": event["channel_layer"],
"channel_name": event["channel_name"],
},
))
@database_sync_to_async
def get_message_ids(self):
return set(Thread.objects.filter(participants__id=self.user['id'], subject="").values_list('id', flat=True))
async def monitor_thread(self, thread_id):
print("monitoring thread %d" % thread_id)
print("on channel %s" % self.channel_name)
await self.channel_layer.group_add(
"m_%d" % thread_id,
self.channel_name,
)
@database_sync_to_async
def get_user_from_token(self, t):
try:
print("trying token" + t)
token = Token.objects.get(key=t)
self.user = token.user.get_profile.json()
except Token.DoesNotExist:
print("failed")
self.user = AnonymousUser()
@database_sync_to_async
def check_for_or_establish_dailyonlineactivity(self):
doa, created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
if created:
print("created DOA %d" %doa.id)
else:
print("found existing DOA %d" %doa.id)
return doa
@database_sync_to_async
def establish_onlinestatus(self):
old_os = OnlineStatus.objects.filter(user_id=self.user['id'], online_to=None)
if old_os.exists():
for os in old_os:
print("found unclosed OS %d" % old_os[0].id)
os.online_to = timezone.now()
os.save()
new_os = OnlineStatus(
user_id=self.user['id'],
channel_name=self.channel_name
)
new_os.save()
return new_os
@database_sync_to_async
def add_to_online_status_list(self):
self.doa.currently_active_users.add(self.user['id'])
self.doa.all_daily_users.add(self.user['id'])
self.doa.online_log.add(self.online_status)
self.doa.save()
@database_sync_to_async
def remove_from_dailyonlineactivity(self):
if hasattr(self, 'doa') and self.doa is not None:
self.doa.currently_active_users.remove(self.user['id'])
if hasattr(self, 'onine_status') and self.online_status is not None:
self.online_status.online_to = timezone.now()
self.online_status.save()
@database_sync_to_async
def get_online_user_list(self):
user_id_list = list(self.doa.currently_active_users.all().values_list('id', flat=True))
user_id_list.remove(self.user['id'])
return user_id_list
async def send_online_user_list(self):
print("sending online_users")
await self.send(text_data=json.dumps(
{
"type": "online_users",
"message": self.user_id_list,
},
))
async def participant_ignored(self, event):
print("irgnored call")
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def participant_left(self, event):
print("left call")
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def participant_joined(self, event):
print("left call")
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
async def video_screenshare(self, event):
print("sending screenshare")
await self.send(text_data=json.dumps(
{
"type": event['type'],
"message": event["message"],
},
))
添加Profile到VideoRoom时触发的django信号:
@receiver(m2m_changed, sender=VideoRoom.invitees.through)
def invitee_added(sender, **kwargs):
instance = kwargs.pop('instance', None)
action = kwargs.pop('action', None)
pk = kwargs.pop('pk_set', None)
if action == 'post_add':
if len(pk) > 0:
user = Profile.objects.get(id=list(pk)[0])
if instance.initiator.id == user.id:
return
identity = "u_%d" % user.id
# Create access token with credentials
token = AccessToken(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_API_KEY, settings.TWILIO_API_SECRET,
identity=identity, ttl=86399)
# Create a Video grant and add to token
video_grant = VideoGrant(room=instance.room_name)
token.add_grant(video_grant)
invitee_access_token = VideoAccessToken(user=user, token=token.to_jwt())
invitee_access_token.save()
instance.invitee_access_tokens.add(invitee_access_token)
channel_layer = get_channel_layer()
print(channel_layer)
profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}
for u in instance.current_participants.all():
profiles["u_%d" % u.id] = u.json()
print("instance.type")
print(instance.type)
if instance.type != 'event':
print("sending to existing users")
for key, value in profiles.items():
if value['id'] != user.id:
async_to_sync(channel_layer.group_send)(
key,
{'type': 'new_call_participant',
'message': {
'key': "u_%d" % user.id,
'value': user.json()
}
}
)
ons = OnlineStatus.objects.get(user=user, online_to=None)
print("in signal, sending to %s on channel %s" % (user.full_name, ons.channel_name))
async_to_sync(channel_layer.send)(
ons.channel_name,
{'type': 'send_call_ring',
'message': {
'id': instance.id,
'room_name': instance.room_name,
'identity': "u_%d" % user.id,
'profiles': profiles,
'token': invitee_access_token.token.decode(),
'answered': False,
'initiated': False,
'caller': instance.initiator.json()
}
}
)
socket信号不成功时的日志:
2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal, sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http
呼叫成功时:
2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal, sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring', 'message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http
问题最终是Redis。我从通道-redis转换到通道-rabbitmq,我所有的问题都消失了。我不知道它是与我的Redis提供商还是与channels-redis,但简单地改变后端解决了所有问题。