52 lines
1.3 KiB
Plaintext
52 lines
1.3 KiB
Plaintext
|
=Publish / Subscribe DP Tests=
|
||
|
|
||
|
Copyright (c) 2008-2012, David P. D. Moss. All rights reserved.
|
||
|
|
||
|
Basic Publisher and Subscriber object tests.
|
||
|
|
||
|
{{{
|
||
|
|
||
|
>>> from netaddr.core import Publisher, Subscriber, PrettyPrinter
|
||
|
|
||
|
>>> class Subject(Publisher):
|
||
|
... pass
|
||
|
|
||
|
|
||
|
>>> class Observer(Subscriber):
|
||
|
... def __init__(self, id):
|
||
|
... self.id = id
|
||
|
...
|
||
|
... def update(self, data):
|
||
|
... print(repr(self), data)
|
||
|
...
|
||
|
... def __repr__(self):
|
||
|
... return '%s(%r)' % (self.__class__.__name__, self.id)
|
||
|
...
|
||
|
|
||
|
>>> s = Subject()
|
||
|
|
||
|
>>> s.attach(Observer('foo'))
|
||
|
>>> s.attach(Observer('bar'))
|
||
|
|
||
|
#FIXME: >>> pp = PrettyPrinter()
|
||
|
#FIXME: >>> s.attach(pp)
|
||
|
|
||
|
>>> data = {'foo': 42, 'list': [1,'2', list(range(10))], 'strings': ['foo', 'bar', 'baz', 'quux']}
|
||
|
>>> s.notify(data)
|
||
|
Observer('foo') {'foo': 42, 'list': [1, '2', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]], 'strings': ['foo', 'bar', 'baz', 'quux']}
|
||
|
Observer('bar') {'foo': 42, 'list': [1, '2', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]], 'strings': ['foo', 'bar', 'baz', 'quux']}
|
||
|
|
||
|
#FIXME: >>> s.detach(pp)
|
||
|
>>> s.notify(['foo', 'bar', 'baz'])
|
||
|
Observer('foo') ['foo', 'bar', 'baz']
|
||
|
Observer('bar') ['foo', 'bar', 'baz']
|
||
|
|
||
|
>>> s.attach('foo')
|
||
|
Traceback (most recent call last):
|
||
|
...
|
||
|
TypeError: 'foo' does not support required interface!
|
||
|
|
||
|
>>> s.detach('foo')
|
||
|
|
||
|
}}}
|