EMDIS-ECS

 view release on metacpan or  search on metacpan

script/ecs_amqp_recv.py  view on Meta::CPAN

    parser.add_option("-a", "--address", default=None,
                  help="topic or queue from which messages are received (e.g. test_queue)")
    parser.add_option("-s", "--truststore", default=None,
                  help="SSL trust store (e.g. cacert.pem)")
    parser.add_option("-c", "--sslcert", default=None,
                  help="client-side SSL certificate / public key (e.g. user-cert.pem)")
    parser.add_option("-k", "--sslkey", default=None,
                  help="client-side SSL private key (e.g. user-key.pem)")
    parser.add_option("-y", "--sslpass", default=None,
                  help="password for client-side SSL private key (overrides ECS_AMQP_SSLPASS env var)")
    parser.add_option("-u", "--username", default=None,
                  help="username for SASL authentication")
    parser.add_option("-p", "--password", default=None,
                  help="password for SASL authentication (overrides ECS_AMQP_PASSWORD env var)")
    parser.add_option("-t", "--timeout", type=int, default=5,
                  help="inactivity timeout threshold, in seconds (default %default)")
    parser.add_option("-o", "--outputdir", action="append", default=None,
                  help="file system directory for output files")
    parser.add_option("-x", "--suffix", default=".amqp.msg",
                  help="filename suffix for output files (default %default)")

    opts, args = parser.parse_args()

    return opts

"""
Proton event handler class
Creates an amqp connection using ANONYMOUS or PLAIN authentication.
Then attaches a receiver link to consume messages from the broker.
"""
class Recv(MessagingHandler):
    def __init__(self, debug, url, vhost, address, truststore, sslcert, sslkey, sslpass, username, password, timeout, outputdir, suffix):
        super(Recv, self).__init__()

        self.connection = None
        self.receiver = None

        # exit status
        self.exit_status = 1

        # debug output level
        self.debug = debug

        # amqp broker host url
        self.url = url

        # amqp node address
        self.address = address

        # broker virtual host name
        self.virtual_host = vhost

        # SSL trust store
        self.truststore = truststore

        # client-side SSL certificate
        self.sslcert = sslcert
        self.sslkey = sslkey
        self.sslpass = sslpass

        # authentication credentials
        self.username = username
        self.password = password

        # inactivity timeout
        self.inactivity_timestamp = 0
        self.inactivity_threshold = timeout

        # output dir
        self.outputdir = outputdir
        self.hextrans = str.maketrans('0123456789abcdef', 'BCDFGHJKLMNPQRST')
        self.old_prefix_ts = ''
        self.msg_seqnum = 0

        # suffix
        self.suffix = suffix

    def on_start(self, event):
        if self.debug >= 1:
            print('on_start:  {}'.format(event))

        ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
        # SSL trust store (e.g. PEM file containing trusted CA certificate(s))
        if self.truststore:
            ssl_domain.set_trusted_ca_db(self.truststore)
        if True:
            # use trust store to verify peer's (e.g. broker's) SSL certificate
            ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
        else:
            # verify hostname on peer's (e.g. broker's) SSL certificate
            ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME)

        # client-side certificate
        if self.sslkey:
            ssl_domain.set_credentials(self.sslcert, self.sslkey, self.sslpass)

        if self.username:
            # username and password authentication
            self.connection = event.container.connect(url=self.url,
                                                      ssl_domain=ssl_domain,
                                                      user=self.username,
                                                      password = self.password,
                                                      allow_insecure_mechs=False,
                                                      sasl_enabled=True,
                                                      allowed_mechs="PLAIN",
                                                      virtual_host=self.virtual_host)
        else:
            # anonymous authentication
            self.connection = event.container.connect(url=self.url,
                                                      ssl_domain=ssl_domain,
                                                      allow_insecure_mechs=False,
                                                      sasl_enabled=True,
                                                      allowed_mechs="ANONYMOUS",
                                                      virtual_host=self.virtual_host)

    def on_connection_opened(self, event):
        if self.debug >= 1:
            print('on_connection_opened:  {}'.format(event))

        # create receiver link to consume messages
        self.receiver = event.container.create_receiver(event.connection, source=self.address)
        self.exit_status = 0

    def on_connection_closed(self, event):
        if self.debug >= 1:
            print('on_connection_closed:  {}'.format(event))

    def on_reactor_quiesced(self, event):
        if self.debug >= 1:
            print('on_reactor_quiesced [{},{}]:  {}'.format(self.inactivity_timestamp, time.time(), event))

        # TODO: improve robustness of link/session/connection/container teardown (instead of relying solely on inactivity timeout)
        if self.inactivity_timestamp == 0:
            self.inactivity_timestamp = time.time()
        inactivity_seconds = time.time() - self.inactivity_timestamp
        if inactivity_seconds > 0.1 and inactivity_seconds >= self.inactivity_threshold:
            if self.connection:
                self.connection.close()
            else:
                event.container.stop()

    def on_message(self, event):
        if self.debug >= 1:
            print('on_message:  {}'.format(event))

        filename_prefix = int(time.time() * 16).to_bytes(8, byteorder='big').hex().translate(self.hextrans)[8:]
        if self.old_prefix_ts != filename_prefix:
            self.old_prefix_ts = filename_prefix
            # reset msg_seqnum when prefix_ts changes
            self.msg_seqnum = 0
        self.msg_seqnum += 1
        filename_prefix += '%03i.' % self.msg_seqnum

        created_filename = None
        with tempfile.NamedTemporaryFile(dir=self.outputdir[0], mode='wt', prefix=filename_prefix, suffix=self.suffix, delete=False) as fd:



( run in 1.918 second using v1.01-cache-2.11-cpan-39bf76dae61 )