mirror of git://git.psyced.org/git/pypsyc
134 lines
3.4 KiB
Python
134 lines
3.4 KiB
Python
"""
|
|
:copyright: 2010 by Manuel Jacob
|
|
:license: MIT
|
|
"""
|
|
from functools import wraps
|
|
from traceback import format_exc
|
|
import sys
|
|
|
|
from greenlet import getcurrent
|
|
from mock import Mock
|
|
import twisted.internet
|
|
|
|
from pypsyc.core.mmp import Header
|
|
|
|
|
|
# keep reactor from being installed
|
|
|
|
assert not 'twisted.internet.reactor' in sys.modules
|
|
twisted.internet.reactor = sys.modules['twisted.internet.reactor'] = None
|
|
|
|
|
|
# random helpers
|
|
|
|
def inited_header(*args, **kwds):
|
|
header = Header(*args, **kwds)
|
|
header._init()
|
|
return header
|
|
|
|
def _make_send_function(other_circuit):
|
|
def f(header, content):
|
|
try:
|
|
other_circuit.packet_received(inited_header(header), content)
|
|
except:
|
|
raise Exception("%s.packet_received raised exception:\n%s" %
|
|
(other_circuit, format_exc()))
|
|
return f
|
|
|
|
def connect_circuits(c1, c2):
|
|
"""Make a virtual connection between two curcuits."""
|
|
c1.send = c1.psyc._send = _make_send_function(c2)
|
|
c2.send = c2.psyc._send = _make_send_function(c1)
|
|
return c1, c2
|
|
|
|
|
|
class iter_(object):
|
|
def __init__(self, *iter_):
|
|
self.iter_ = iter_
|
|
|
|
def __eq__(self, iter_):
|
|
return tuple(iter_) == self.iter_
|
|
|
|
def rendered(packet):
|
|
return iter_(*packet.render())
|
|
|
|
|
|
def check_success(f):
|
|
"""
|
|
Wrap `f` so that it checks if :attr:`TestCase.success` is ``True``. This is
|
|
useful to check if an asynchronous test has passed completely.
|
|
"""
|
|
@wraps(f)
|
|
def _wrapper(self):
|
|
self.success = False
|
|
f(self)
|
|
assert self.success
|
|
del self.success
|
|
return _wrapper
|
|
|
|
|
|
class AsyncMethod(object):
|
|
def __call__(self, *args, **kwds):
|
|
self.child = getcurrent()
|
|
return self.child.parent.switch()
|
|
|
|
def callback(self, *args, **kwds):
|
|
self.child.switch(*args, **kwds)
|
|
|
|
def errback(self, *args, **kwds):
|
|
self.child.throw(*args, **kwds)
|
|
|
|
|
|
def mockify(obj, attrs):
|
|
"""Replace attrs of obj with mocks. Return the original attributes."""
|
|
originals = {}
|
|
for attr in attrs:
|
|
orig = originals[attr] = getattr(obj, attr)
|
|
mock = Mock(spec=orig)
|
|
setattr(obj, attr, mock)
|
|
return originals
|
|
|
|
class mockified(object):
|
|
"""
|
|
Wrap a function so that it 'mockifies' and resets `obj`. If `obj` is a
|
|
string, mockify the module named like it.
|
|
Can also be used as a context manager.
|
|
"""
|
|
def __init__(self, obj, attrs):
|
|
if isinstance(obj, basestring):
|
|
self.obj = __import__(obj, globals(), locals(), attrs)
|
|
else:
|
|
self.obj = obj
|
|
self.attrs = attrs
|
|
|
|
def __enter__(self):
|
|
self.originals = mockify(self.obj, self.attrs)
|
|
return (getattr(self.obj, attr) for attr in self.attrs)
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
for attr, original in self.originals.iteritems():
|
|
setattr(self.obj, attr, original)
|
|
|
|
def __call__(self, f):
|
|
@wraps(f)
|
|
def _wrapper(*args, **kwds):
|
|
with self:
|
|
kwds.update((attr, getattr(self.obj, attr)) for attr in self.attrs)
|
|
return f(*args, **kwds)
|
|
return _wrapper
|
|
|
|
|
|
class PlaceHolder(object):
|
|
"""
|
|
An object that be used as a placeholder for comparisons. The object on the
|
|
other side of the comparison is saved as attribute `obj`.
|
|
"""
|
|
|
|
def __eq__(self, other):
|
|
self.obj = other
|
|
return True
|
|
|
|
|
|
class StubException(Exception):
|
|
pass
|