EMDIS-ECS
view release on metacpan or search on metacpan
script/ecs_amqp_recv.py view on Meta::CPAN
parser.add_option("-a", "--address", default=None,
help="topic or queue from which messages are received (e.g. test_queue)")
parser.add_option("-s", "--truststore", default=None,
help="SSL trust store (e.g. cacert.pem)")
parser.add_option("-c", "--sslcert", default=None,
help="client-side SSL certificate / public key (e.g. user-cert.pem)")
parser.add_option("-k", "--sslkey", default=None,
help="client-side SSL private key (e.g. user-key.pem)")
parser.add_option("-y", "--sslpass", default=None,
help="password for client-side SSL private key (overrides ECS_AMQP_SSLPASS env var)")
parser.add_option("-u", "--username", default=None,
help="username for SASL authentication")
parser.add_option("-p", "--password", default=None,
help="password for SASL authentication (overrides ECS_AMQP_PASSWORD env var)")
parser.add_option("-t", "--timeout", type=int, default=5,
help="inactivity timeout threshold, in seconds (default %default)")
parser.add_option("-o", "--outputdir", action="append", default=None,
help="file system directory for output files")
parser.add_option("-x", "--suffix", default=".amqp.msg",
help="filename suffix for output files (default %default)")
opts, args = parser.parse_args()
return opts
"""
Proton event handler class
Creates an amqp connection using ANONYMOUS or PLAIN authentication.
Then attaches a receiver link to consume messages from the broker.
"""
class Recv(MessagingHandler):
def __init__(self, debug, url, vhost, address, truststore, sslcert, sslkey, sslpass, username, password, timeout, outputdir, suffix):
super(Recv, self).__init__()
self.connection = None
self.receiver = None
# exit status
self.exit_status = 1
# debug output level
self.debug = debug
# amqp broker host url
self.url = url
# amqp node address
self.address = address
# broker virtual host name
self.virtual_host = vhost
# SSL trust store
self.truststore = truststore
# client-side SSL certificate
self.sslcert = sslcert
self.sslkey = sslkey
self.sslpass = sslpass
# authentication credentials
self.username = username
self.password = password
# inactivity timeout
self.inactivity_timestamp = 0
self.inactivity_threshold = timeout
# output dir
self.outputdir = outputdir
self.hextrans = str.maketrans('0123456789abcdef', 'BCDFGHJKLMNPQRST')
self.old_prefix_ts = ''
self.msg_seqnum = 0
# suffix
self.suffix = suffix
def on_start(self, event):
if self.debug >= 1:
print('on_start: {}'.format(event))
ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
# SSL trust store (e.g. PEM file containing trusted CA certificate(s))
if self.truststore:
ssl_domain.set_trusted_ca_db(self.truststore)
if True:
# use trust store to verify peer's (e.g. broker's) SSL certificate
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
else:
# verify hostname on peer's (e.g. broker's) SSL certificate
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME)
# client-side certificate
if self.sslkey:
ssl_domain.set_credentials(self.sslcert, self.sslkey, self.sslpass)
if self.username:
# username and password authentication
self.connection = event.container.connect(url=self.url,
ssl_domain=ssl_domain,
user=self.username,
password = self.password,
allow_insecure_mechs=False,
sasl_enabled=True,
allowed_mechs="PLAIN",
virtual_host=self.virtual_host)
else:
# anonymous authentication
self.connection = event.container.connect(url=self.url,
ssl_domain=ssl_domain,
allow_insecure_mechs=False,
sasl_enabled=True,
allowed_mechs="ANONYMOUS",
virtual_host=self.virtual_host)
def on_connection_opened(self, event):
if self.debug >= 1:
print('on_connection_opened: {}'.format(event))
# create receiver link to consume messages
self.receiver = event.container.create_receiver(event.connection, source=self.address)
self.exit_status = 0
def on_connection_closed(self, event):
if self.debug >= 1:
print('on_connection_closed: {}'.format(event))
def on_reactor_quiesced(self, event):
if self.debug >= 1:
print('on_reactor_quiesced [{},{}]: {}'.format(self.inactivity_timestamp, time.time(), event))
# TODO: improve robustness of link/session/connection/container teardown (instead of relying solely on inactivity timeout)
if self.inactivity_timestamp == 0:
self.inactivity_timestamp = time.time()
inactivity_seconds = time.time() - self.inactivity_timestamp
if inactivity_seconds > 0.1 and inactivity_seconds >= self.inactivity_threshold:
if self.connection:
self.connection.close()
else:
event.container.stop()
def on_message(self, event):
if self.debug >= 1:
print('on_message: {}'.format(event))
filename_prefix = int(time.time() * 16).to_bytes(8, byteorder='big').hex().translate(self.hextrans)[8:]
if self.old_prefix_ts != filename_prefix:
self.old_prefix_ts = filename_prefix
# reset msg_seqnum when prefix_ts changes
self.msg_seqnum = 0
self.msg_seqnum += 1
filename_prefix += '%03i.' % self.msg_seqnum
created_filename = None
with tempfile.NamedTemporaryFile(dir=self.outputdir[0], mode='wt', prefix=filename_prefix, suffix=self.suffix, delete=False) as fd:
( run in 1.918 second using v1.01-cache-2.11-cpan-39bf76dae61 )