import asyncio
import aioredis
import logging
from . import base
log = logging.getLogger(__name__)
[docs]class RedisMessagesCenter(base.BaseMessagesCenter):
"""Messages center on top of Redis LPUSH / BRPOP pattern.
After creating messages center instance `init()` coroutine
also must be called.
:param loop: asyncio event loop
:param conn_params: Redis connection params as dict
:param reliable: Use reliable queue or simple queue, default is ``False``
"""
def __init__(self, loop=None, conn_params=None, reliable=False):
self.conn_params = conn_params
queue_cls = ReliableRedisQueue if reliable else RedisQueue
queue = queue_cls(loop=loop, conn_params=conn_params)
super().__init__(loop=loop, queue=queue)
@asyncio.coroutine
[docs] def init(self):
"""Setup main Redis connection for queue."""
yield from self.queue.connect()
@asyncio.coroutine
def done(self):
yield from self.queue.close()
[docs]class RedisQueue(base.BaseQueue):
"""Messages queue on top of Redis LPUSH / BRPOP pattern.
Schema description:
1. Messages are LPUSH'ed to list with "{channel}" key.
2. Receiver just listens for "{channel}" list updates
with BRPOP.
:param loop: asyncio event loop
:param websocket: WebSocket handler instance
:param conn_params: Redis connection params
"""
CLOSE_COMMAND = '!'
def __init__(self, loop=None, conn_params=None):
self.loop = loop
self.conn_params = conn_params
[docs] def add_socket(self, channel, websocket, proto=None):
"""Register WebSocket for receiving messages from channel.
:param channel: String channel identifier, based on user id or some hash string
:param websocket: Tornado WebSocket handler instance
:param proto: Messages protocol instance
"""
websocket.is_closed = False
ready_message = proto.make_message(type=proto.TRANS_READY)
websocket.write_message(proto.dump_message(ready_message))
self.loop.create_task(self.listen_queue(channel, websocket))
[docs] def del_socket(self, channel, websocket, proto=None):
"""Unregister WebSocket from receiving messages from channel.
:param channel: String channel identifier, based on user id or some hash string
:param websocket: Tornado WebSocket handler instance
:param proto: Messages protocol instance
"""
websocket.is_closed = True
self.loop.create_task(self.put_message(
[channel], self.CLOSE_COMMAND, proto=proto))
@asyncio.coroutine
[docs] def connect(self):
"""Setup main Redis connection."""
self.conn = yield from aioredis.create_redis(
loop=self.loop, **self.conn_params)
@asyncio.coroutine
def close(self):
self.conn.close()
@asyncio.coroutine
[docs] def put_message(self, channels, message, proto=None, from_channel=None):
"""Put messages on queue.
:param channels: List of destination channels
:param message: Message dict object
:param proto: Messages protocol instance
:param from_channel: Message from channel
"""
if isinstance(message, str):
raw_message = message
else:
raw_message = proto.dump_message(message)
for ch in channels:
yield from self.conn.lpush(ch, raw_message)
@asyncio.coroutine
[docs] def listen_queue(self, channel, websocket):
"""Start queue listener for channel and WebSocket connection."""
redis_conn = yield from aioredis.create_redis(
loop=self.loop, **self.conn_params)
while True:
val = yield from redis_conn.brpop(channel, timeout=10)
log.debug("listen_queue: %s" % val)
if websocket.is_closed:
redis_conn.close()
return
elif val:
websocket.write_message(val[1])
class ReliableRedisQueue(RedisQueue):
"""Reliable messages queue on top of Redis BRPOPLPUSH pattern.
Storage schema description:
1. Every message is stored as single value with key
"{channel}:{message id}" with expire time.
2. Incoming messages queue is represented via list of messages
ids: [{message id 1}, {message id 2}, ...]
3. Incoming queue key is "{channel}", waiting delivery confirmation
queue key is "{channel}:wait"
:param loop: asyncio event loop
:param websocket: WebSocket handler instance
:param conn_params: Redis connection params
"""
@asyncio.coroutine
def put_message(self, channels, message, proto=None, from_channel=None):
"""Put messages on queue.
:param channels: List of destination channels
:param message: Message dict object
:param proto: Messages protocol instance
:param from_channel: Message from channel
"""
if isinstance(message, dict):
message_dump = proto.dump_message(message)
else:
message_dump = None
for channel in channels:
# Store every message which has ID within separate list,
# also store from channel with message as 2-nd list item.
if message_dump and ('id' in message):
message_key = '%s:%s' % (channel, message['id'])
queue_data = message_key
values = (message_dump, from_channel or '')
yield from self.conn.rpush(message_key, *values)
# If message has no ID or is not dict itself,
# then just pass it as is.
else:
queue_data = message_dump or message
# Put message ID or raw message on queue
yield from self.conn.lpush(channel, queue_data)
@asyncio.coroutine
def check_delivered(self, channel, message_id):
"""Check if message is delivered.
"""
message_key = '%s:%s' % (channel, message_id)
llen = yield from self.conn.llen(message_key)
return llen == 0
@asyncio.coroutine
def pop_delivered(self, channel, message_id, proto=None):
"""Pop delivered message by ID.
Remove message by key "{channel}:{message id}" and remove
that key from waiting queue.
:param channel: Channel reveived message
:param message_id: Message ID
:param proto: Messages protocol instance
:return: Tuple (delivered message, from channel)
"""
message_key = '%s:%s' % (channel, message_id)
wait_queue = '%s:wait' % channel
raw_message = yield from self.conn.lpop(message_key)
from_channel = yield from self.conn.lpop(message_key)
yield from self.conn.lrem(wait_queue, 1, message_key)
if raw_message:
message = proto.load_message(raw_message.decode('utf-8'))
return message, from_channel.decode('utf-8')
@asyncio.coroutine
def listen_queue(self, channel, websocket):
"""Start queue listener for channel and WebSocket connection.
Send wait queue first to deliver messages that was not delivered
on previous session. Then start listening for new messages. Every
message with ID is send through WebSocket and put on wait queue.
After delivery confirmation message is removed from wait queue,
see :meth:`.pop_delivered` method.
"""
redis_conn = yield from aioredis.create_redis(
loop=self.loop, **self.conn_params)
# Send wait queue first
wait_queue = '%s:wait' % channel
yield from self._send_wait_queue(
wait_queue, redis_conn, channel, websocket)
# Wait for new messages and send
while True:
raw = yield from redis_conn.brpoplpush(
channel, wait_queue)
log.debug("listen_queue: %s" % raw)
if not raw:
continue
val = raw.decode('utf-8')
if val == self.CLOSE_COMMAND:
yield from redis_conn.lrem(wait_queue, 0, self.CLOSE_COMMAND)
redis_conn.close()
return
else:
pop_wait = yield from self._write_message(
redis_conn, val, channel, websocket)
if pop_wait:
yield from redis_conn.lpop(wait_queue)
@asyncio.coroutine
def _send_wait_queue(self, wait_queue, redis_conn, channel, websocket):
"""Send messages from waiting queue.
:param wait_queue: Wait queue key
:param redis_conn: Redis connection
:param channel: Message channel
:param websocket: WebSocket connection
"""
wait_messages = yield from redis_conn.lrange(wait_queue, 0, -1)
for raw in reversed(wait_messages):
val = raw.decode('utf-8')
if val.startswith(channel):
yield from self._write_message(
redis_conn, val, channel, websocket)
else:
# Actually we should not be here, if everything works fine!
# But due to [old] bugs there could be trash messages on wait
# queue, so we just clean them up.
# TODO: place WARNING here
yield from self.conn.lrem(wait_queue, 1, val)
@asyncio.coroutine
def _write_message(self, redis_conn, msg_or_id, channel, websocket):
"""Write message to WebSocket output by ID or raw value.
Messages with confirmation are stored separatelly
and only their id is passed to queue. Messages
without delivery confirmation are just sent as is.
:param redis_conn: Redis connection
:param msg_or_id: Message ID or dump to str
:param channel: Message channel
:param websocket: WebSocket connection
:return: `True` if message should be removed from wait
queue, because doesn't need confirmation.
"""
# get by id and send
if msg_or_id.startswith(channel):
message_dump = yield from redis_conn.lindex(msg_or_id, 0)
if message_dump:
websocket.write_message(message_dump)
# just send
else:
websocket.write_message(msg_or_id)
return True