EMDIS-ECS

 view release on metacpan or  search on metacpan

script/ecs_amqp_send.py  view on Meta::CPAN

    parser.add_option("-a", "--address", default=None,
                  help="topic or queue to which message is sent (e.g. test_queue)")
    parser.add_option("-s", "--truststore", default=None,
                  help="SSL trust store (e.g. cacert.pem)")
    parser.add_option("-c", "--sslcert", default=None,
                  help="client-side SSL certificate / public key (e.g. user-cert.pem)")
    parser.add_option("-k", "--sslkey", default=None,
                  help="client-side SSL private key (e.g. user-key.pem)")
    parser.add_option("-y", "--sslpass", default=None,
                  help="password for client-side SSL private key (overrides ECS_AMQP_SSLPASS env var)")
    parser.add_option("-u", "--username", default=None,
                  help="username for SASL authentication")
    parser.add_option("-p", "--password", default=None,
                  help="password for SASL authentication (overrides ECS_AMQP_PASSWORD env var)")
    parser.add_option("-i", "--inputfile", default=None,
                  help="input file containing message to be sent")
    parser.add_option("-P", "--property", action="append", default=None,
                  help="application defined message property and value (e.g. x-emdis-hub-snd=DE)")
    parser.add_option("-E", "--encoding", default='UTF-8',
                  help="message Content-Encoding (default %default)")
    parser.add_option("-T", "--type", default='text/plain',
                  help="message Content-Type (default %default)")
    parser.add_option("-S", "--subject", default=None,
                  help="message Subject (e.g. EMDIS)")
    opts, args = parser.parse_args()
    return opts



"""
Proton event handler class
Creates an amqp connection and a sender to publish messages.
"""
class Send(MessagingHandler):
    def __init__(self, debug, url, vhost, address, truststore, sslcert, sslkey, sslpass, username, password, inputfile, properties, content_encoding, content_type, message_subject):
        super(Send, self).__init__()

        # exit status
        self.exit_status = 1

        # debug output level
        self.debug = debug

        # amqp broker host url
        self.url = url

        # target amqp node address
        self.address = address

        # broker virtual host name
        self.virtual_host = vhost

        # SSL trust store
        self.truststore = truststore

        # client-side SSL certificate
        self.sslcert = sslcert
        self.sslkey = sslkey
        self.sslpass = sslpass

        # authentication credentials
        self.username = username
        self.password = password

        # input file
        self.inputfile = inputfile
        if self.inputfile == '-':
            self.inputfile = sys.stdin.fileno()

        # application defined message properties
        self.application_defined_message_properties = None
        if properties:
            self.application_defined_message_properties = dict([prop.split('=', maxsplit=1) for prop in properties])

        # standard message properties
        self.content_encoding = content_encoding
        self.content_type = content_type
        self.message_subject = message_subject

        # done sending yet?
        self.done_sending = False

    def on_start(self, event):
        if self.debug >= 1:
            print('on_start:  {}'.format(event))

        ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
        # SSL trust store (e.g. PEM file containing trusted CA certificate(s))
        if self.truststore:
            ssl_domain.set_trusted_ca_db(self.truststore)
        if True:
            # use trust store to verify peer's (e.g. broker's) SSL certificate
            ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
        else:
            # verify hostname on peer's (e.g. broker's) SSL certificate
            ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME)

        # client-side certificate
        if self.sslkey:
            ssl_domain.set_credentials(self.sslcert, self.sslkey, self.sslpass)

        if self.username:
            # basic username and password authentication
            event.container.connect(url=self.url,
                                    ssl_domain=ssl_domain,
                                    user=self.username,
                                    password = self.password,
                                    allow_insecure_mechs=False,
                                    sasl_enabled=True,
                                    allowed_mechs="PLAIN",
                                    virtual_host=self.virtual_host)
        else:
            # anonymous authentication
            event.container.connect(url=self.url,
                                    ssl_domain=ssl_domain,
                                    allow_insecure_mechs=False,
                                    sasl_enabled=True,
                                    allowed_mechs="ANONYMOUS",
                                    virtual_host=self.virtual_host)

    def on_connection_opened(self, event):
        if self.debug >= 1:
            print('on_connection_opened:  {}'.format(event))

        # attaches sender link to transmit messages
        event.container.create_sender(event.connection, target=self.address)

    def on_sendable(self, event):
        if self.debug >= 1:
            print('on_sendable:  {}'.format(event))

        if event.sender.credit and not self.done_sending:
            # creates message to send
            msg = None
            with open(self.inputfile, "r") as fd:
                msg = Message(body=fd.read(),
                              subject=self.message_subject,
                              content_type=self.content_type,
                              content_encoding=self.content_encoding,
                              properties=self.application_defined_message_properties,
                              durable=True)
            self.done_sending = True
            if msg:
                # send message
                event.sender.send(msg)
            else:
                self.exit_status = 2
                event.connection.close()

    def on_accepted(self, event):
        if self.debug >= 1:
            print('on_accepted:  {}'.format(event))

        print("Message accepted by broker", self.url)
        self.exit_status = 0
        event.connection.close()

    def on_rejected(self, event):
        if self.debug >= 1:
            print('on_rejected:  {}'.format(event))



( run in 0.460 second using v1.01-cache-2.11-cpan-39bf76dae61 )