Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions example/tx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
import random
import time

from twisted.python import failure
from twisted.internet import defer
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import endpoints
from txmsgpack.protocol import MsgpackClientFactory
from txmsgpack.client import connect

ITERATIONS=3000


def create_client():
point = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8007)
d = point.connect(MsgpackClientFactory())
d = connect("localhost", 8007)
d.addCallback(test_client)

def compare(result, item):
Expand Down
40 changes: 40 additions & 0 deletions example/tx_rpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from twisted.internet import defer, protocol, reactor
from twisted.protocols import memcache
from txmsgpack.server import MsgpackRPCServer

class EchoRPC(MsgpackRPCServer):
def __init__(self, memcacheClient):
self.memcacheClient = memcacheClient

def remote_echo(self, value, msgid=None):
df = defer.Deferred()
df.callback(value)
return df

def remote_bounce(self, one, two, three):
if self.memcacheClient is None:
return self.remote_echo((one, two, three))
df = self.memcacheClient.set(one, "%s:%s" % (two, three))
df.addCallback(lambda x:(one, two, three))
return df

@defer.inlineCallbacks
def main():
try:
client = yield (protocol.ClientCreator(reactor, memcache.MemCacheProtocol)
.connectTCP("localhost", memcache.DEFAULT_PORT))
except Exception:
client = None

if client is None:
print "Memcache is not avaiable"
else:
print "WARNING: THIS SERVER WILL ADD DATA TO YOUR MEMCACHED SERVICE"
print "memcache_client: %s" % (client,)

server = EchoRPC(client)
server.serve(8007)

if __name__ == '__main__':
reactor.callWhenRunning(main)
reactor.run()
10 changes: 3 additions & 7 deletions lib/python/txmsgpack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from protocol import Msgpack
from protocol import MsgpackError
from protocol import MsgpackServerFactory
from protocol import MsgpackClientFactory
from protocol import MSGTYPE_REQUEST
from protocol import MSGTYPE_RESPONSE
from protocol import MSGTYPE_NOTIFICATION
import client
import protocol
import server
17 changes: 17 additions & 0 deletions lib/python/txmsgpack/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

def connect(host, port, timeout=None, reactor=None):
if reactor is None:
from twisted.internet import reactor

from twisted.internet import endpoints
from txmsgpack.protocol import MsgpackClientFactory

kwargs = {}

if timeout is not None:
kwargs['timeout'] = timeout

factory = MsgpackClientFactory()

point = endpoints.TCP4ClientEndpoint(reactor, host, port, **kwargs)
return point.connect(factory)
62 changes: 62 additions & 0 deletions lib/python/txmsgpack/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

import inspect
import sys

from txmsgpack.protocol import Msgpack, MsgpackServerFactory


class MsgpackRPCServer(object):

def _buildFactory(self):
factory = MsgpackServerFactory()
factory.server = self
factory.protocol = self._buildProtocol()

return factory

def _buildProtocol(self):
methods = {}

for name, member in inspect.getmembers(self):
if name.startswith('remote_') and inspect.ismethod(member):
closure = self._createClosure(member)
methods[name] = closure

protocol_name = self.__class__.__name__ + '_protocol'

protocol = type(protocol_name, (Msgpack, object), methods)
protocol.server = self

return protocol

def _createClosure(self, method):
def protocol_method(proto, *args, **kwargs):
return method(*args, **kwargs)

protocol_method.func_name = method.im_func.func_name
protocol_method.func_doc = getattr(method.im_func, 'func_doc', None)
protocol_method.func_dict = getattr(method.im_func, 'func_dict', {})
protocol_method.func_defaults = getattr(method.im_func, 'func_defaults', ())
callermodule = sys._getframe(3).f_globals.get('__name__', '?')
protocol_method.__module__ = getattr(method.im_func, 'module', callermodule)

return protocol_method

def serve(self, port, backlog=None, interface=None, reactor=None):
if reactor is None:
from twisted.internet import reactor

kwargs = {}

if backlog is not None:
kwargs['backlog'] = backlog

if interface is not None:
kwargs['interface'] = interface

factory = self._buildFactory()

return reactor.listenTCP(port, factory, **kwargs)


__all__ = ['MsgpackRPCServer']
14 changes: 7 additions & 7 deletions lib/python/txmsgpack/test/test_protocol.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@

import msgpack
from txmsgpack import Msgpack
from txmsgpack import MsgpackClientFactory
from txmsgpack import MsgpackServerFactory
from txmsgpack.protocol import Msgpack
from txmsgpack.protocol import MsgpackClientFactory
from txmsgpack.protocol import MsgpackServerFactory
from twisted.trial import unittest
from twisted.test import proto_helpers
from twisted.internet import defer
from twisted.internet import protocol

from txmsgpack import MSGTYPE_REQUEST
from txmsgpack import MSGTYPE_RESPONSE
from txmsgpack import MSGTYPE_NOTIFICATION
from txmsgpack.protocol import MSGTYPE_REQUEST
from txmsgpack.protocol import MSGTYPE_RESPONSE
from txmsgpack.protocol import MSGTYPE_NOTIFICATION


class Echo(Msgpack):
Expand Down Expand Up @@ -91,7 +91,7 @@ def _test_request(self, operation=MSGTYPE_REQUEST, method="echo", value="", expe

self.assertEqual(return_value, packed_response)

unpacked_response = msgpack.unpacks(return_value)
unpacked_response = msgpack.loads(return_value)
(msgType, msgid, methodName, params) = unpacked_response

self.assertEqual(msgType, MSGTYPE_RESPONSE)
Expand Down
115 changes: 115 additions & 0 deletions lib/python/txmsgpack/test/test_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from twisted.internet import defer
from twisted.test import proto_helpers
from twisted.trial import unittest

import msgpack
from txmsgpack.protocol import MSGTYPE_REQUEST
from txmsgpack.protocol import MSGTYPE_RESPONSE
from txmsgpack.protocol import MSGTYPE_NOTIFICATION
from txmsgpack.server import MsgpackRPCServer


class TestServer(MsgpackRPCServer):
def __init__(self):
self.storage = {}

def remote_insert_key(self, value, msgid=None):
value["new_key"]=1
return self.remote_echo(value, msgid)

def remote_echo(self, value, msgid=None):
return value

def remote_notify(self, value):
return

def remote_sum(self, args):
lhs, rhs = args
df = defer.Deferred()
df.callback(lhs + rhs)
return df

def remote_store(self, key, value):
self.storage[key] = value

def remote_load(self, key):
return self.storage[key]


class MsgpackRPCServerTestCase(unittest.TestCase):

request_index=0

def setUp(self):
self.server = TestServer()
factory = self.server._buildFactory()

self.proto = factory.buildProtocol(("127.0.0.1", 0))
self.transport = proto_helpers.StringTransport()
self.proto.makeConnection(self.transport)
self.packer = msgpack.Packer(encoding="utf-8")

def test_request_string(self):
arg = "SIMON SAYS"
return self._test_request(method="echo", param=arg, expected_result=arg, expected_error=None)

def test_request_dict(self):
arg = {"A":1234}
ret = {"A":1234, "new_key":1}
return self._test_request(method="insert_key", param=arg, expected_result=ret, expected_error=None)

def test_notify(self):
arg = "NOTIFICATION"
return self._test_notification(method="notify", value=arg)

def test_sum(self):
args = (2,5)
ret = 7
return self._test_request(method="sum", param=args, expected_result=ret, expected_error=None)

def test_store_load(self):
key = 'foo'
value = 'bar'
args = (key, value)

self._test_request(method="store", params=args)

self.assertEqual(self.server.storage, {key: value})

self.transport.clear()
self._test_request(method="load", param=key, expected_result=value)

def _test_notification(self, method="notify", value=""):
message = (MSGTYPE_NOTIFICATION, method, (value,))
packed_message = self.packer.pack(message)
self.proto.dataReceived(packed_message)
return_value = self.transport.value()
self.assertEqual(return_value, "")

def _test_request(self, method, param=None, params=None, expected_result=None, expected_error=None):
index = MsgpackRPCServerTestCase.request_index
MsgpackRPCServerTestCase.request_index += 1

if params is not None:
args = params
else:
args = (param,)

message = (MSGTYPE_REQUEST, index, method, args)
packed_message = self.packer.pack(message)

response = (MSGTYPE_RESPONSE, index, expected_error, expected_result)
packed_response = self.packer.pack(response)

self.proto.dataReceived(packed_message)
return_value = self.transport.value()

self.assertEqual(return_value, packed_response)

unpacked_response = msgpack.loads(return_value)
(msgType, msgid, methodName, args) = unpacked_response

self.assertEqual(msgType, MSGTYPE_RESPONSE)
self.assertEqual(msgid, index)
self.assertEqual(methodName, None)
self.assertEqual(args, expected_result)