"""Main messages router."""
import asyncio
from . import proto as base_proto
[docs]class BaseRoute:
"""Base messages route class.
Routes are registered in messages center, every route is
responsible for delivering single messages type, i.e.
direct users messages, group chat messages, system
notifications, etc.
"""
@asyncio.coroutine
[docs] def process(self, message, websocket=None, proto=None):
"""Process message and return receiver channel for the message.
Scenarios:
- If method returns ``None``, message is simply passed to next
router in chain.
- If method returns non-empty channel string, then message will
be sent to that channel.
- If method returns ``True``, then routing chain should be stopped
for this message.
:param message: Arbitrary message object.
:param proto: Messages protocol instance
:returns: String channel identifier or ``True`` or ``None``
"""
raise NotImplementedError
@asyncio.coroutine
[docs] def post_process(self, message, to_channel=None, queue=None):
"""Post process message after putting it on delivery queue.
Scenarios examples:
- Test if message was delivered after certain timeout and
notify via another channel (email, APNS, SMS) if not.
- Send extra service message right after main message.
- etc.
"""
pass
[docs]class BaseQueue:
"""Base messages queue class."""
[docs] def add_socket(self, channel, websocket, proto=None):
"""Register WebSocket for receiving messages from channel.
Method implementation has to write 'ready' transport message
on success or close WebSocket connection.
:param channel: String channel identifier, based on user id or some
hash string
:param websocket: Tornado WebSocket handler instance
"""
raise NotImplementedError
[docs] def del_socket(self, channel, websocket, proto=None):
"""Unregister WebSocket from receiving messages from channel.
:param channel: String channel identifier, based on user id or some
hash string
:param websocket: Tornado WebSocket handler instance
:param proto: Messages protocol instance
"""
raise NotImplementedError
@asyncio.coroutine
[docs] def put_message(self, channels, message, proto=None, from_channel=None):
"""Put messages on queue.
:param channels: List of destination channels
:param message: Message dict object
:param proto: Messages protocol instance
:param from_channel: Message from channel
"""
raise NotImplementedError
@asyncio.coroutine
[docs] def check_delivered(self, channel, message_id):
"""Check if message is delivered.
Default implementation always returns True, assuming
no messages tracking is implemented initially.
"""
return True
@asyncio.coroutine
[docs] def pop_delivered(self, channel, message_id, proto=None):
"""Mark message as delivered by ID.
Default implementation is empty.
:param channel: Channel reveived message
:param message_id: Message ID
:param proto: Messages protocol instance
"""
pass
[docs]class BaseMessagesCenter:
"""Messages center provides top-level messages routing.
:param loop: asyncio event loop
:param queue: Messages queue instance
:param proto: Messages protocol instance or :class:`.BaseProtocol`
instance will be used by default
Attributes:
- `.proto`: :class:`BaseProtocol` or subclass instance
- `.loop`: asyncio event loop
- `.queue`: :class:`BaseQueue` subclass instance
- `.routes`: routes objects list
"""
def __init__(self, loop=None, proto=None, queue=None):
assert queue, "Error, queue argument not specified."
self.loop = loop or asyncio.get_event_loop()
self.proto = proto or base_proto.BaseProtocol()
self.queue = queue
self.routes = []
[docs] def add_socket(self, channel, websocket):
"""Register WebSocket for receiving messages from channel.
:param channel: String channel identifier, based on user id or some hash string
:param websocket: Tornado WebSocket handler instance
"""
self.queue.add_socket(channel, websocket, proto=self.proto)
[docs] def del_socket(self, channel, websocket):
"""Unregister WebSocket from receiving messages from channel.
:param channel: String channel identifier, based on user id or some hash string
:param websocket: Tornado WebSocket handler instance
"""
self.queue.del_socket(channel, websocket, proto=self.proto)
[docs] def add_route(self, route):
"""Add messages route to routing chain, see :class:`.BaseRoute`
Message routes are processed in the same order as they added.
:param route: Route instance
"""
assert (not route in self.routes), ("Error, route %s is already added." % route)
self.routes.append(route)
[docs] def del_route(self, route):
"""Remove message route, see :class:`.BaseRoute`
:param route: Route instance
"""
self.routes.remove(route)
@asyncio.coroutine
[docs] def transport(self, message=None, websocket=None):
"""Process transport layer for messages.
Performs reliable delivery process, notifies sender on successful
message delivery.
:param message: Data or transport message to process
:param websocket: WebSocket which has received message to process
"""
assert message and websocket, ("Error, message and websocket"
"arguments are required!")
# Process messages with transport types.
if message['type'] in self.proto.TRANS_TYPES:
# Ping (type=1001), say 'pong'
if message['type'] == self.proto.TRANS_PING:
yield from self._transport_ping(message, websocket)
# Received (type=200)
elif message['type'] == self.proto.TRANS_RECV_GOT_IT:
yield from self._transport_gotit(message, websocket)
# Respond to data message, start delivery
else:
yield from self._transport_start(message, websocket)
@asyncio.coroutine
def _transport_ping(self, message, websocket):
"""Process ping type=1001 transport message."""
pong = self.proto.make_message(type=self.proto.TRANS_PONG)
websocket.write_message(pong)
@asyncio.coroutine
def _transport_start(self, message, websocket):
"""Respond to sender on message delivery start, e.g.
when server first gets message for delivery, it says
'Got It' type=100."""
got_it = self.proto.make_message(
type=self.proto.TRANS_SERV_GOT_IT, data=message['id'])
websocket.write_message(got_it)
@asyncio.coroutine
def _transport_gotit(self, message, websocket):
"""Process 'Got It' type=200 transport message.
Pop and mark message as delivered by ID, and then
notify message sender on successful delivery.
"""
message_id = message['data']
channel = websocket.get_channel()
delivered = yield from self.queue.pop_delivered(
channel, message_id, proto=self.proto)
if delivered:
delivered_message, from_channel = delivered[0], delivered[1]
notify_message = self.proto.make_message(
type=self.proto.TRANS_DELIVERED,
data=delivered_message['id'])
yield from self.queue.put_message([from_channel],
notify_message,
proto=self.proto)
@asyncio.coroutine
[docs] def process(self, raw_or_message, websocket=None):
"""Process message to routing chain and send to WebSockets.
Message is passed to registered routes, they return receivers
channels and then message is sent to that channels.
:param raw_or_message: Raw message string or message dict
:param websocket: WebSocket connection handler received new message,
this is optional parameter, because message could
also be created at server internally
"""
try:
if isinstance(raw_or_message, str):
message = self.proto.load_message(raw_or_message)
else:
message = raw_or_message
except ValueError as e:
# TODO: handle message format errors
print("Error,", e)
return
# Transport layer
if websocket:
yield from self.transport(message=message, websocket=websocket)
# Stop processing if it's a transport message
if message['type'] in self.proto.TRANS_TYPES:
return
# Data message
destinations = []
for route in self.routes:
to_channel = yield from route.process(message, websocket, proto=self.proto)
if to_channel is True:
break
elif isinstance(to_channel, str):
destinations.append((route, to_channel))
# Put on delivery queue
if destinations:
from_channel = websocket.get_channel() if websocket else None
to_channels = [d[1] for d in destinations]
yield from self.queue.put_message(to_channels, message,
proto=self.proto,
from_channel=from_channel)
# Post process message
for (route, to_channel) in destinations:
self.loop.create_task(route.post_process(message,
to_channel,
queue=self.queue))