EMDIS-ECS
view release on metacpan or search on metacpan
docker/qpid-broker-python/pybroker.py view on Meta::CPAN
def unsubscribe(self, consumer):
"""
:return: True if the queue is to be deleted
"""
if consumer in self.consumers:
self.consumers.remove(consumer)
return len(self.consumers) == 0 and (self.dynamic or len(self.queue) == 0)
def publish(self, message):
self.queue.append(message)
self.dispatch()
def dispatch(self, consumer=None):
if consumer:
c = [consumer]
else:
c = self.consumers
while self._deliver_to(c):
pass
def _deliver_to(self, consumers):
try:
result = False
for c in consumers:
if c.credit:
c.send(self.queue.popleft())
result = True
return result
except IndexError: # no more messages
return False
class Broker(MessagingHandler):
def __init__(self, debug, url, truststore, sslcert, sslkey, sslpass, sasl_config_path, sasl_config_name):
super(Broker, self).__init__()
self.debug = debug
self.url = url
# SSL trust store
self.truststore = truststore
# SSL certificate
self.sslcert = sslcert
self.sslkey = sslkey
self.sslpass = sslpass
# SASL configuration
self.sasl_config_path = sasl_config_path
self.sasl_config_name = sasl_config_name
# queues
self.queues = {}
def on_start(self, event):
if self.debug >= 1:
print('on_start: {}'.format(event))
ssl_domain = SSLDomain(SSLDomain.MODE_SERVER)
# SSL trust store (e.g. PEM file containing trusted CA certificate(s))
if self.truststore:
ssl_domain.set_trusted_ca_db(self.truststore)
# server certificate
if self.sslkey:
ssl_domain.set_credentials(self.sslcert, self.sslkey, self.sslpass)
self.acceptor = event.container.listen(url=self.url, ssl_domain=ssl_domain)
def _queue(self, address):
if address not in self.queues:
self.queues[address] = Queue()
return self.queues[address]
def on_connection_init(self, event):
if self.debug >= 1:
print('on_connection_init: {}'.format(event))
# set SASL configuration file
sasl = event.connection.transport.sasl()
sasl.config_path(self.sasl_config_path)
sasl.config_name(self.sasl_config_name)
def on_link_opening(self, event):
if self.debug >= 1:
print('on_link_opening: {}'.format(event))
if event.link.is_sender:
if event.link.remote_source.dynamic:
address = str(uuid.uuid4())
event.link.source.address = address
q = Queue(True)
self.queues[address] = q
q.subscribe(event.link)
elif event.link.remote_source.address:
event.link.source.address = event.link.remote_source.address
self._queue(event.link.source.address).subscribe(event.link)
elif event.link.remote_target.address:
event.link.target.address = event.link.remote_target.address
def _unsubscribe(self, link):
if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link):
del self.queues[link.source.address]
def on_link_closing(self, event):
if self.debug >= 1:
print('on_link_closing: {}'.format(event))
if event.link.is_sender:
self._unsubscribe(event.link)
def on_connection_closing(self, event):
if self.debug >= 1:
print('on_connection_closing: {}'.format(event))
self.remove_stale_consumers(event.connection)
def on_disconnected(self, event):
if self.debug >= 1:
print('on_disconnected: {}'.format(event))
self.remove_stale_consumers(event.connection)
def remove_stale_consumers(self, connection):
link = connection.link_head(Endpoint.REMOTE_ACTIVE)
while link:
if link.is_sender:
self._unsubscribe(link)
link = link.next(Endpoint.REMOTE_ACTIVE)
def on_sendable(self, event):
( run in 0.586 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )