Alien-SVN

 view release on metacpan or  search on metacpan

src/subversion/tools/server-side/svnpubsub/svnpubsub/client.py  view on Meta::CPAN

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Generic client for SvnPubSub
#
# ### usage...
#
#
# EVENTS
#
#   connected: a connection to the server has been opened (though not
#                 necessarily established)
#   closed:    the connection was closed. reconnect will be attempted.
#   error:     an error closed the connection. reconnect will be attempted.
#   ping:      the server has sent a keepalive
#   stale:     no activity has been seen, so the connection will be closed
#                 and reopened
#

import asyncore
import asynchat
import socket
import functools
import time
import json
try:
  import urlparse
except ImportError:
  import urllib.parse as urlparse

# How long the polling loop should wait for activity before returning.
TIMEOUT = 30.0

# Always delay a bit when trying to reconnect. This is not precise, but sets
# a minimum amount of delay. At the moment, there is no further backoff.
RECONNECT_DELAY = 25.0

# If we don't see anything from the server for this amount time, then we
# will drop and reconnect. The TCP connection may have gone down without
# us noticing it somehow.
STALE_DELAY = 60.0


class SvnpubsubClientException(Exception):
  pass

class Client(asynchat.async_chat):

  def __init__(self, url, commit_callback, event_callback):
    asynchat.async_chat.__init__(self)

    self.last_activity = time.time()
    self.ibuffer = []

    self.url = url
    parsed_url = urlparse.urlsplit(url)
    if parsed_url.scheme != 'http':
      raise ValueError("URL scheme must be http: '%s'" % url)
    host = parsed_url.hostname
    port = parsed_url.port
    resource = parsed_url.path
    if parsed_url.query:
      resource += "?%s" % parsed_url.query
    if parsed_url.fragment:
      resource += "#%s" % parsed_url.fragment

    self.event_callback = event_callback

    self.parser = JSONRecordHandler(commit_callback, event_callback)

    # Wait for the end of headers. Then we start parsing JSON.
    self.set_terminator(b'\r\n\r\n')
    self.skipping_headers = True

    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
      self.connect((host, port))
    except:
      self.handle_error()
      return

    self.push(('GET %s HTTP/1.0\r\n\r\n' % resource).encode('ascii'))

  def handle_connect(self):
    self.event_callback('connected', None)

  def handle_close(self):
    self.event_callback('closed', None)
    self.close()

  def handle_error(self):
    self.event_callback('error', None)
    self.close()

  def found_terminator(self):
    if self.skipping_headers:
      self.skipping_headers = False
      # Each JSON record is terminated by a null character
      self.set_terminator(b'\0')
    else:
      record = b"".join(self.ibuffer)
      self.ibuffer = []
      self.parser.feed(record.decode())

  def collect_incoming_data(self, data):
    # Remember the last time we saw activity
    self.last_activity = time.time()

    if not self.skipping_headers:
      self.ibuffer.append(data)


class JSONRecordHandler:
  def __init__(self, commit_callback, event_callback):
    self.commit_callback = commit_callback
    self.event_callback = event_callback

  def feed(self, record):
    obj = json.loads(record)
    if 'svnpubsub' in obj:
      actual_version = obj['svnpubsub'].get('version')
      EXPECTED_VERSION = 1
      if actual_version != EXPECTED_VERSION:
        raise SvnpubsubClientException("Unknown svnpubsub format: %r != %d"
                                       % (actual_format, expected_format))
      self.event_callback('version', obj['svnpubsub']['version'])
    elif 'commit' in obj:
      commit = Commit(obj['commit'])
      self.commit_callback(commit)
    elif 'stillalive' in obj:
      self.event_callback('ping', obj['stillalive'])


class Commit(object):
  def __init__(self, commit):
    self.__dict__.update(commit)


class MultiClient(object):
  def __init__(self, urls, commit_callback, event_callback):
    self.commit_callback = commit_callback
    self.event_callback = event_callback

    # No target time, as no work to do
    self.target_time = 0
    self.work_items = [ ]

    for url in urls:
      self._add_channel(url)

  def _reconnect(self, url, event_name, event_arg):
    if event_name == 'closed' or event_name == 'error':
      # Stupid connection closed for some reason. Set up a reconnect. Note
      # that it should have been removed from asyncore.socket_map already.
      self._reconnect_later(url)

    # Call the user's callback now.
    self.event_callback(url, event_name, event_arg)

  def _reconnect_later(self, url):
    # Set up a work item to reconnect in a little while.
    self.work_items.append(url)

    # Only set a target if one has not been set yet. Otherwise, we could
    # create a race condition of continually moving out towards the future
    if not self.target_time:
      self.target_time = time.time() + RECONNECT_DELAY

  def _add_channel(self, url):
    # Simply instantiating the client will install it into the global map
    # for processing in the main event loop.
    Client(url,
           functools.partial(self.commit_callback, url),
           functools.partial(self._reconnect, url))

  def _check_stale(self):
    now = time.time()
    for client in asyncore.socket_map.values():
      if client.last_activity + STALE_DELAY < now:
        # Whoops. No activity in a while. Signal this fact, Close the
        # Client, then have it reconnected later on.
        self.event_callback(client.url, 'stale', client.last_activity)

        # This should remove it from .socket_map.
        client.close()

        self._reconnect_later(client.url)

  def _maybe_work(self):
    # If we haven't reach the targetted time, or have no work to do,
    # then fast-path exit



( run in 0.863 second using v1.01-cache-2.11-cpan-df04353d9ac )