EMDIS-ECS
view release on metacpan or search on metacpan
script/ecs_amqp_send.py view on Meta::CPAN
parser.add_option("-a", "--address", default=None,
help="topic or queue to which message is sent (e.g. test_queue)")
parser.add_option("-s", "--truststore", default=None,
help="SSL trust store (e.g. cacert.pem)")
parser.add_option("-c", "--sslcert", default=None,
help="client-side SSL certificate / public key (e.g. user-cert.pem)")
parser.add_option("-k", "--sslkey", default=None,
help="client-side SSL private key (e.g. user-key.pem)")
parser.add_option("-y", "--sslpass", default=None,
help="password for client-side SSL private key (overrides ECS_AMQP_SSLPASS env var)")
parser.add_option("-u", "--username", default=None,
help="username for SASL authentication")
parser.add_option("-p", "--password", default=None,
help="password for SASL authentication (overrides ECS_AMQP_PASSWORD env var)")
parser.add_option("-i", "--inputfile", default=None,
help="input file containing message to be sent")
parser.add_option("-P", "--property", action="append", default=None,
help="application defined message property and value (e.g. x-emdis-hub-snd=DE)")
parser.add_option("-E", "--encoding", default='UTF-8',
help="message Content-Encoding (default %default)")
parser.add_option("-T", "--type", default='text/plain',
help="message Content-Type (default %default)")
parser.add_option("-S", "--subject", default=None,
help="message Subject (e.g. EMDIS)")
opts, args = parser.parse_args()
return opts
"""
Proton event handler class
Creates an amqp connection and a sender to publish messages.
"""
class Send(MessagingHandler):
def __init__(self, debug, url, vhost, address, truststore, sslcert, sslkey, sslpass, username, password, inputfile, properties, content_encoding, content_type, message_subject):
super(Send, self).__init__()
# exit status
self.exit_status = 1
# debug output level
self.debug = debug
# amqp broker host url
self.url = url
# target amqp node address
self.address = address
# broker virtual host name
self.virtual_host = vhost
# SSL trust store
self.truststore = truststore
# client-side SSL certificate
self.sslcert = sslcert
self.sslkey = sslkey
self.sslpass = sslpass
# authentication credentials
self.username = username
self.password = password
# input file
self.inputfile = inputfile
if self.inputfile == '-':
self.inputfile = sys.stdin.fileno()
# application defined message properties
self.application_defined_message_properties = None
if properties:
self.application_defined_message_properties = dict([prop.split('=', maxsplit=1) for prop in properties])
# standard message properties
self.content_encoding = content_encoding
self.content_type = content_type
self.message_subject = message_subject
# done sending yet?
self.done_sending = False
def on_start(self, event):
if self.debug >= 1:
print('on_start: {}'.format(event))
ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
# SSL trust store (e.g. PEM file containing trusted CA certificate(s))
if self.truststore:
ssl_domain.set_trusted_ca_db(self.truststore)
if True:
# use trust store to verify peer's (e.g. broker's) SSL certificate
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
else:
# verify hostname on peer's (e.g. broker's) SSL certificate
ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME)
# client-side certificate
if self.sslkey:
ssl_domain.set_credentials(self.sslcert, self.sslkey, self.sslpass)
if self.username:
# basic username and password authentication
event.container.connect(url=self.url,
ssl_domain=ssl_domain,
user=self.username,
password = self.password,
allow_insecure_mechs=False,
sasl_enabled=True,
allowed_mechs="PLAIN",
virtual_host=self.virtual_host)
else:
# anonymous authentication
event.container.connect(url=self.url,
ssl_domain=ssl_domain,
allow_insecure_mechs=False,
sasl_enabled=True,
allowed_mechs="ANONYMOUS",
virtual_host=self.virtual_host)
def on_connection_opened(self, event):
if self.debug >= 1:
print('on_connection_opened: {}'.format(event))
# attaches sender link to transmit messages
event.container.create_sender(event.connection, target=self.address)
def on_sendable(self, event):
if self.debug >= 1:
print('on_sendable: {}'.format(event))
if event.sender.credit and not self.done_sending:
# creates message to send
msg = None
with open(self.inputfile, "r") as fd:
msg = Message(body=fd.read(),
subject=self.message_subject,
content_type=self.content_type,
content_encoding=self.content_encoding,
properties=self.application_defined_message_properties,
durable=True)
self.done_sending = True
if msg:
# send message
event.sender.send(msg)
else:
self.exit_status = 2
event.connection.close()
def on_accepted(self, event):
if self.debug >= 1:
print('on_accepted: {}'.format(event))
print("Message accepted by broker", self.url)
self.exit_status = 0
event.connection.close()
def on_rejected(self, event):
if self.debug >= 1:
print('on_rejected: {}'.format(event))
( run in 0.460 second using v1.01-cache-2.11-cpan-39bf76dae61 )