# rdio-sock - Rdio WebSocket Library
# Copyright (C) 2013 fzza- <fzzzzzzzza@gmail.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from ws4py.client.threadedclient import WebSocketClient
from rdiosock.exceptions import RdioException, RdioApiError
from rdiosock.logr import Logr
from rdiosock.utils import update_attrs, randint, random_id, EventHook
[docs]class RdioPubSub:
"""PubSub client"""
def __init__(self, sock):
self._sock = sock
self.ws = None
self.name = "_web_" + str(random_id())
# Events
self.on_connected = EventHook()
# { "<topic>" : [<callbacks] }
self._subscription_callbacks = {}
# PubSub info
self.topic = None
self.token = None
self.servers = None
[docs] def connect(self, update=True):
"""Connect to PubSub server
:param update: Force pubsub info update (pubsubInfo)
:type update: bool
"""
if self._sock.user.authorization_key is None or \
self._sock.user.session_cookie is None:
raise RdioException()
if update:
self.update_info() # Update our PubSub info
if self.token is None or self.token == '':
raise RdioApiError()
# Start up the WebSocket client
self.ws = RdioPubSubClient(self, self.received_message, self.reconnect)
self.ws.connect()
def update_info(self):
info_result = self._sock._api_post('pubsubInfo')
if info_result['status'] == 'error':
raise RdioApiError(info_result)
Logr.debug('----------- PubSub ------------')
update_attrs(self, info_result['result'], trace=True)
[docs] def publish(self, topic, data):
"""Publish PubSub message
:param topic: Topic Name
:type topic: str
:param data: json serializable object
:type data: object
"""
self.ws.send_message(RdioPubSubMessage(
RdioPubSubMessage.METHOD_PUB, topic, data
))
[docs] def subscribe(self, service, target=None):
"""Subscribe RdioService into pubsub messages
:param service: RdioService instance
:type service: :class:`rdiosock.services.RdioService`
:param target: Target (User, Playlist) or None to indicate current user
:type target: str or None
"""
if target is None:
target = self._sock.user.key
service.__subscribe__(self, target)
[docs] def subscribe_topic(self, topic, callback, target=None):
""" Subscribe to pubsub topic
:param topic: Topic Name
:type topic: str
:param callback: callback(message) will be called when messages are received
:type callback: function
"""
topic = topic.strip('/')
if '/' not in topic:
if target is not None:
topic = target + '/' + topic
else:
raise ValueError()
if topic not in self._subscription_callbacks:
self._subscription_callbacks[topic] = []
if callback not in self._subscription_callbacks[topic]:
self._subscription_callbacks[topic].append(callback)
self.ws.send_message(RdioPubSubMessage(
RdioPubSubMessage.METHOD_SUB, topic
))
Logr.info("subscribed to %s", topic)
def received_message(self, message):
Logr.debug("received_message: %s", message)
if message.method == RdioPubSubMessage.METHOD_CONNECTED:
self.on_connected()
elif message.method == RdioPubSubMessage.METHOD_PUB:
# Send message to subscribed callbacks
if message.topic in self._subscription_callbacks:
for callback in self._subscription_callbacks[message.topic]:
callback(message)
else:
Logr.warning("Unhandled message topic: %s", message.topic)
def reconnect(self):
self.connect(False)
class RdioPubSubClient(WebSocketClient):
def __init__(self, pubsub, received_message, reconnect):
"""
:type pubsub: RdioPubSub
"""
self.pubsub = pubsub
self._received_message = received_message
self._reconnect = reconnect
super(RdioPubSubClient, self).__init__(self._select_random_server(False))
def _select_random_server(self, parse=True):
server_id = randint(0, len(self.pubsub.servers) - 1)
self.url = 'ws://' + self.pubsub.servers[server_id]
if parse:
self._parse_url()
Logr.info("selected server : %s", self.url)
return self.url
def send_message(self, message):
Logr.debug("send_message %s", message)
self.send(str(message))
def opened(self):
Logr.debug("opened")
self.send_message(RdioPubSubMessage(
RdioPubSubMessage.METHOD_CONNECT, self.pubsub.token, {
'chat': {
'canChat': True
},
'player': {
'canRemote': True,
'name': self.pubsub.name
}
}))
def closed(self, code, reason=None):
Logr.info("closed: (%s) %s", code, reason)
if code == 1006:
Logr.info("reconnecting")
self._reconnect()
def received_message(self, message):
self._received_message(RdioPubSubMessage.parse(str(message)))
class RdioPubSubMessage:
METHOD_CONNECT = 'CONNECT'
METHOD_CONNECTED = 'CONNECTED'
METHOD_PUB = 'PUB'
METHOD_SUB = 'SUB'
METHODS = (
METHOD_CONNECT,
METHOD_CONNECTED,
METHOD_PUB,
METHOD_SUB
)
SNIP_MIN_LENGTH = 100
SNIP_STRING = "[...snip...]"
def __init__(self, method, topic, data=None):
self.method = method
self.topic = topic
self.data = data
@staticmethod
def parse(message):
message = message.split(' ', 1)
if len(message) != 2:
Logr.warning("invalid message")
Logr.debug("len(message)= %s", len(message))
Logr.debug(message)
return None
method, data = message
# Check if method is valid
if method not in RdioPubSubMessage.METHODS:
Logr.warning('invalid message method "%s"', method)
return None
if '|' not in data:
return RdioPubSubMessage(method, data)
topic, data = data.split('|', 1)
return RdioPubSubMessage(method, topic, json.loads(data))
def __str__(self):
if self.data:
data_str = json.dumps(self.data)
if len(data_str) >= self.SNIP_MIN_LENGTH:
part_length = (self.SNIP_MIN_LENGTH / 2) - (len(self.SNIP_STRING) / 2)
data_str = "".join([
data_str[:part_length],
self.SNIP_STRING,
data_str[-part_length:]
])
return self.method + ' ' + self.topic + '|' + data_str
else:
return self.method + ' ' + self.topic