All computer source code presented on this page, unless it includes attribution to another author, is provided by Ed Halley under the Artistic License. Use such code freely and without any expectation of support. I would like to know if you make anything cool with the code, or need questions answered.
python/
    bindings.py
    boards.py
    buzz.py
    cache.py
    cards.py
    constraints.py
    english.py
    getopts.py
    gizmos.py
    goals.py
    improv.py
    interpolations.py
    namespaces.py
    nihongo.py
    nodes.py
    octalplus.py
    patterns.py
    persist.py
    physics.py
    pieces.py
    quizzes.py
    recipes.py
    relays.py
    romaji.py
    ropen.py
    sheets.py
    strokes.py
    subscriptions.py
    svgbuild.py
    testing.py
    things.py
    timing.py
    ucsv.py
    useful.py
    uuid.py
    vectors.py
    weighted.py
java/
    GlobFilenameFilter.java
    RegexFilenameFilter.java
    StringBufferOutputStream.java
    ThreadSet.java
    TracingThread.java
    Utf8ConsoleTest.java
perl/
    CVQM.pm
    Kana.pm
    Typo.pm
cxx/
    CCache.h
    equalish.cpp
#!python

import cPickle as pickle
import SocketServer
import threading
import traceback
import logging
import socket
import struct
import time
import sys
import re

#----------------------------------------------------------------------------

class Relay (object):

    '''A general-purpose non-topological TCP/IP network message relay node.

    Unlike most network code which focuses on differences between "servers"
    and "clients," the Relay mechanism just refers to generic nodes
    throughout the network.  These relay nodes can be arranged in any
    sort of topology with simple routing rules.  Any relay is a server to
    many clients; any relay is a client of many servers; a relay can be
    both server and client at the same time.

    From the point of view of any individual Relay, there are two kinds
    of connections: connections we initiated, and connections we
    accepted.  Those that we initiate, we consider to be "above" us.  The
    ones we have accepted are "below" us.  The routing rules use these
    concepts to specify how packets are handled.

    A Relay has a default packet-protocol format, which can be changed to
    any one of several built-in formats.  Custom formatting routines can
    also be supplied for special or external network datagram formats.

    '''

    FREE = 0
    LINES = 1
    N8LEN = 2
    N16LEN = 3
    N32LEN = 4
    NVLEN = 5
    PICKLE = 6

    class Conduit (SocketServer.BaseRequestHandler):
        '''Internal packet queuing and parsing management mechanism.'''
        #
        # Not a lot of doc-strings, as Conduit is meant for internal use.
        #
        # The python libs call it a 'request' but it's basically an open
        # socket session, from connection accepted to connection closed.
        #
        # This represents a handler for any socket session in a Relay,
        # providing the queuing of incoming raw data into application
        # packets, and allowing multiple threads to post output packets
        # to send.
        #
        def setup(self):
            # Called once by our base class to perform configuration.
            self.server.relay.conduits[self] = self.client_address
            self.server.relay.addresses[self.client_address] = self
            self.mode(self.server.relay.packet)
            self.pending = threading.Event()
            self.since = time.time()
            self.queue = []
            self.data = ''
            self.icount = 0
            self.ocount = 0
            self.quit = False
        def handle(self):
            # Called once by our base class to perform the whole life cycle
            # of the open socket session.  We make a separate thread for
            # our transmitting duties, so transmitting and receiving are
            # fully asynchronous.
            self.transmitting = threading.Thread(target=self.transmitting)
            self.transmitting.setDaemon(True)
            self.transmitting.start()
            self.receiving()
            self.shutdown()
        def mode(self, packet=None, parse=None, make=None):
            # Set up the available parsing options that can recognize
            # valid raw data as application packets.  An extension to
            # this class may replace or extend this parsing process.
            parsers = [ parse_free, parse_line, parse_n8len,
                        parse_n16len, parse_n32len, parse_nVlen,
                        parse_pickle ]
            makers = [ make_free, make_line, make_n8len,
                       make_n16len, make_n32len, make_nVlen,
                       make_pickle ]
            if parse is None and packet >= 0 and packet < len(parsers):
                parse = parsers[packet]
            if make is None and packet >= 0 and packet < len(makers):
                make = makers[packet]
            if parse: self.parse = parse
            if make: self.make = make
        def receiving(self):
            # Called by handle() in the main thread.
            # Whenever a chunk arrives, we see if that completes a packet.
            # Whenever a packet successfully parses, we view() it.
            talk = self.request
            while not self.quit:
                chunk = talk.recv(4096)
                # r = repr(chunk)
                # if len(r) > 20: r = r[:17] + '...' + r[-1:]
                if len(chunk) == 0:
                    break
                self.data += chunk
                try:
                    (packet, self.data) = self.parse(self.data)
                    while packet is not None:
                        self.view(packet)
                        (packet, self.data) = self.parse(self.data)
                except:
                    address = self.client_address
                    _LOG.error('Malformed packet from %s:%d' % address)
                    _LOG.trace()
                    self.shutdown()
        def transmitting(self):
            # Called by handle() as a separate thread.
            # We just watch our pending queue and send whatever is posted.
            talk = self.request
            while not self.quit:
                self.pending.wait()
                self.pending.clear()
                while self.queue:
                    packet = self.queue.pop(0)
                    try: talk.send(packet)
                    except: self.shutdown()
        def shutdown(self):
            # Orderly one-step shutdown including our transmit thread.
            self.quit = True
            try: self.request.close()
            except: pass
            self.pending.set()
            if self.client_address:
                address = self.client_address
                self.client_address = None
                self.server.relay.kill(self)
        def view(self, packet):
            # Called whenever we successfully parse raw data into a packet.
            r = repr(packet)
            if len(r) > 20: r = r[:17] + '...' + r[-1:]
            self.icount += 1
            self.server.incoming.append( (self.client_address, packet) )
            self.server.pending.set()
        def stats(self):
            # Some debugging or status information formatted for display.
            life = time.time() - self.since
            text = 'i=%d, o=%d, l=%.1f' % (self.icount, self.ocount, life)
            if life >= 1.0:
                text += ', r=%.1f/sec' % ((self.icount+self.ocount) / life)
            return text
        def post(self, packet):
            # Add a packet to be sent in turn.
            self.ocount += 1
            self.queue.append(self.make(packet))
            self.pending.set()

    class RelayServer (SocketServer.ThreadingTCPServer):
        '''Internal listening socket and thread mechanism.'''
        #
        # Not a lot of doc-strings, as Conduit is meant for internal use.
        #
        # We add the concept of 'jilted' or banned addresses.  A Relay
        # can check the list of jilts to see whether a connection should
        # be accepted to form a new connection (a Conduit).
        #
        # We support logging of opened/closed connections.
        #
        allow_reuse_address = 1
        # The TCPServer should use socket.SO_REUSEADDR before bind().
        #
        def __init__(self, address, handler, relay):
            SocketServer.TCPServer.__init__(self, address, handler)
            self.daemon_threads = True
            self.relay = relay
            self.quit = False
            self.incoming = []
            self.outgoing = []
            self.pending = threading.Event()
        def server_close(self):
            self.quit = True
            SocketServer.ThreadingTCPServer.server_close(self)
        shutdown = server_close
        def get_request(self):
            # Retrieve the 'request' (the connect session, or Conduit).
            (request, address) = SocketServer.ThreadingTCPServer.get_request(self)
            return (request, address)
        def verify_request(self, request, address):
            # New request to connect. Check if we accept it.
            if self.relay.jilted(address):
                _LOG.warning('Connection refused from %s:%d' % address)
                return False
            _LOG.info('Connection accepted from %s:%d' % address)
            self.relay.below.add(address)
            return True
        def finish_request(self, request, address):
            # Actually create the request Conduit. It self-attaches.
            handler = self.RequestHandlerClass(request, address, self)
        def handle_error(self, request, address):
            _LOG.error('Exception handling message from %s:%d' % address)
            _LOG.trace()
        def listening(self):
            # Our lifecycle is a simple loop to listen for new connections.
            while not self.quit:
                try: self.handle_request()
                except: pass
        def routing(self):
            while not self.quit:
                self.pending.wait()
                self.pending.clear()
                while self.incoming:
                    self.relay.route_incoming(self.incoming.pop(0))
                while self.outgoing:
                    self.relay.route_outgoing(self.outgoing.pop(0))

    def __init__(self, host='localhost', port=4567, packet=LINES):
        '''Construct a generic message relay node.'''
        self.address = (host, port)
        self.server = Relay.RelayServer(self.address, Relay.Conduit, self)
        self.jilts = set([]) # 'hostname' we do not accept contact
        self.jiltseen = set([]) # temp 'hostname' we did not accept contact
        self.jiltmasks = set([]) # 'host*' masks we do not accept contact
        self.above = set([]) # (addr,port)
        self.below = set([]) # (addr,port)
        self.conduits = {} # conduit: (addr,port)
        self.addresses = {} # (addr,port): conduit
        self.packet = packet # initial assumed packet mode being received
        self.listening = threading.Thread(target=self.server.listening)
        self.listening.setDaemon(True)
        self.listening.start()
        self.routing = threading.Thread(target=self.server.routing)
        self.routing.setDaemon(True)
        self.routing.start()
        _LOG.info('Startup; now listening at %s:%d' % self.address)

    def shutdown(self):
        '''Disconnect from everything and terminate our thread.'''
        _LOG.info('Shutdown; not listening at %s:%d' % self.address)
        for conduit in self.conduits.keys():
            self.kill(conduit)
        if self.server:
            self.server.shutdown()
        _LOG.info('Shutdown complete')

    def find(self, target):
        '''Given either an address or conduit, return a tuple with both.'''
        conduit = address = None
        if target in self.conduits:
            conduit = target
            address = self.conduits[conduit]
        if target in self.addresses:
            address = target
            conduit = self.addresses[address]
        for conduit in self.conduits:
            if conduit.request is target:
                return (conduit, conduit.client_address)
        return (conduit, address)

    def route_incoming(self, envelope):
        '''The incoming envelope shows the conduit delivering a message.'''
        (source, message) = envelope
        self.view(source, message)

    def route_outgoing(self, envelope):
        '''Outgoing messages must be propagated.'''
        (targets, message) = envelope
        if isinstance(targets, list):
            for target in targets:
                (conduit, address) = self.find(target)
                if conduit: conduit.post(message)
            return
        (conduit, address) = self.find(targets)
        if conduit: conduit.post(message)

    def jilted(self, address):
        '''Returns True if calls from the address should not be accepted.'''
        if isinstance(address, tuple):
            address = address[0]
        if address in self.jilts or address in self.jiltseen:
            return True
        for mask in self.jiltmasks:
            if _mask_match(address, mask):
                self.jiltseen.add(address)
                return True
        return False

    def jilt(self, address):
        '''Add a new hostname or hostmask to refuse all incoming contact.

        Hostnames can be specific DNS or IPv4/IPv6 addresses.  Hostmasks
        can use '*' wildcards as well.
        '''
        if isinstance(address, tuple):
            address = address[0]
        if '*' in address:
            self.jiltmasks.add(address)
        else:
            self.jilts.add(address)

    def unjilt(self, address):
        '''Remove a hostname or hostmask from the set of jilted addresses.'''
        if isinstance(address, tuple):
            address = address[0]
        if address in self.jilts: self.jilts.remove(address)
        if address in self.jiltseen: self.jiltseen.remove(address)
        for mask in self.jiltmasks.keys():
            within = mask.replace('*', '#')
            if _mask_match(within, address):
                self.jiltmasks.remove(mask)

    def kill(self, conduit):
        '''If the address is connected, disconnect immediately.'''
        if not conduit in self.conduits:
            return
        address = self.conduits[conduit]
        del self.conduits[conduit]
        del self.addresses[address]
        if address in self.above: self.above.remove(address)
        if address in self.below: self.below.remove(address)
        _LOG.info('Disconnecting %s:%d (%s)' % (address[0], address[1],
                                               conduit.stats()))
        conduit.shutdown()

    def call(self, address, packet=None):
        '''Initiate contact with a higher Relay or another plain socket.

        The first argument is an address tuple of the following form:

            ('hostname', port)

        The second argument is optional, and will set the packet
        forming/parsing mode for the created conduit.
        '''
        (conduit, addressed) = self.find(address)
        if conduit:
            self.kill(conduit)
        if packet is None:
            packet = self.packet
        _LOG.info('Calling %s:%d...' % address)
        talk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        talk.connect(address)
        self.above.add(address)
        self.server.process_request(talk, address)
        _LOG.info('Contacted peer at %s:%d' % address)

    def mode(self, target=None, packet=None, parse=None, make=None):
        '''Adjust the packet parsing and/or making mode.

        The first argument is either a conduit or an address tuple.  To
        apply the same packet mode to all currently connected conduits,
        supply target=None.
        
        Packet modes may be any of the existing values:
            FREE, LINES, N8LEN, N16LEN, N32LEN, NVLEN, PICKLE
            
        A custom packet parsing routine can be supplied with parse=P.
        The def P(data) should take one datastring argument, and return a
        two-element tuple (deserialized_value, remainder_of_datastring).
        If the datastring does not contain enough data to deserialize a
        value, return (None, whole_unmodified_datastring).

        A custom packet making routine can be supplied with make=M.  The
        def M(value) should take one value argument, and return the
        serialized datastring that can be parsed by the remote system.
        '''
        if target is None:
            for conduit in self.conduits:
                if not conduit: break
                self.mode(target=conduit, packet=packet,
                          parse=parse, make=make)
        (conduit, address) = self.find(target)
        if conduit: conduit.mode(packet=packet, parse=parse, make=make)

    def post(self, targets, message):
        '''Accepts a new message into the system.

        The first argument is a single target which must already have
        been connected.  This can be a conduit or an address tuple.
        Alternatively, this argument can be a list of valid targets.
        Each target will receive an identical copy of the message.

        The second argument is the actual data to be made into a data
        packet using the currently selected packet making routine.
        '''
        _LOG.debug('Posting to %r: %r' % (targets, message))
        self.server.outgoing.append( (targets, message) )
        self.server.pending.set()

    def view(self, source, message):
        '''Called when receiving a message.

        This method is overridable.  For many Relay-based applications,
        this may be the only method you need to override.

        The "source" argument refers to which conduit of this Relay
        delivered the message.  You can reply to the same conduit by
        using that source value as the target in the post() method.  For
        example, a simple echo server would perform a call like this:

            self.post(source, message)
        
        The "message" argument is the actual contents of the message.  It
        is the result returned from the selected packet parsing routine,
        so there is no need to manage or understand many underlying
        protocol details.

        Any other tracking information or meta-routing data is not
        supported by Relay itself, and must be encapsulated within the
        message.
        '''
        _LOG.debug('Message from %r: %r' % (source, message))
        pass

#----------------------------------------------------------------------------

def parse_free(data):
    '''Passes all available data as a single packet.'''
    return (data, '')

def parse_line(data):
    r'''Parses normal ASCII/UTF-8 lines with terminating CRLF ('\r\n').
    Any embedded extra solitary CR or LF characters are ignored and
    passed as part of the packet.
    '''
    packet = data.find('\r\n')
    if packet < 0:
        return (None, data)
    packet += len('\r\n')
    return (data[:packet], data[packet:])

def parse_n8len(data):
    '''Parses data packets preceded by 8-bit unsigned packet size.'''
    if len(data) < 1: return (None, data)
    packet = ord(data[0]) + 1
    if len(data) < packet: return (None, data)
    return (data[1:packet], data[packet:])

def parse_n16len(data):
    '''Parses data packets preceded by 16-bit big-endian packet size.'''
    if len(data) < 2: return (None, data)
    packet = struct.unpack('!H', data)[0] + 2
    if len(data) < packet: return (None, data)
    return (data[2:packet], data[packet:])

def parse_n32len(data):
    '''Parses data packets preceded by 32-bit big-endian packet size.'''
    if len(data) < 4: return (None, data)
    packet = struct.unpack('!L', data)[0] + 4
    if len(data) < packet: return (None, data)
    return (data[4:packet], data[packet:])

def parse_nVlen(data):
    '''Parses data packets preceded by variable big-endian packet size.
    The packet size is broken into 7-bit pieces. The most-significant
    7-bits is put into the first byte, and the high bit is set if there
    are more bytes required to describe the packet size. This minimizes
    overhead of packet size descriptors while not limiting packet size.
    '''
    v = 1
    packet = 0
    while data and ord(data[0]) == 128:
        data = data[1:]
    while True:
        if len(data) < v: return (None, data)
        byte = ord(data[v-1])
        packet = (packet << 7) | (byte & 0x7F)
        if 0 == (byte & 0x80): break
        v += 1
        if v > 6: raise ValueError, 'packet length too long'
    packet += v
    if len(data) < packet: return (None, data)
    return (data[v:packet], data[packet:])

def parse_pickle(data):
    '''Parses data packets that were formed by python serialization into
    "pickle" datagrams.  This serialized data must be preceded by a
    variable size prefix such as that used by parse_nVlen(), so the
    caller does not work extra hard trying to parse incomplete pickle
    streams repeatedly.  Any pickle failure exceptions will be raised, or
    the reconstituted object will be returned as the packet.
    '''
    (blob, data) = parse_nVlen(data)
    if blob is None: return (None, data)
    packet = pickle.loads(blob)
    return (packet, data)

def make_free(packet):
    '''Makes a datagram directly from any data packet given.'''
    return packet

def make_line(packet):
    r'''Makes datagram(s) from ASCII/UTF-8 line(s).
    Supplies a terminating CRLF ('\r\n') pair if none is given.
    Removes solitary CR characters and turns solitary LF into CRLF pairs.
    '''
    packet.replace('\r', '')
    packet.replace('\n', '\r\n')
    if not packet.endswith('\r\n'):
        packet += '\r\n'
    return packet

def make_n8len(packet):
    '''Makes a small datagram with an 8-bit unsigned packet size prefix.
    It is a ValueError if the packet is too long to fit in this prefix.
    '''
    if len(packet) > 0xFF: raise ValueError, 'packet length too long'
    return chr(len(packet)) + packet

def make_n16len(packet):
    '''Makes a medium datagram with a 16-bit unsigned packet size prefix.
    It is a ValueError if the packet is too long to fit in this prefix.
    '''
    if len(packet) > 0xFFFF: raise ValueError, 'packet length too long'
    v = struct.pack('!H', len(packet))
    return v + packet

def make_n32len(packet):
    '''Makes a large datagram with a 32-bit unsigned packet size prefix.
    It is a ValueError if the packet is too long to fit in this prefix.
    '''
    if len(packet) > 0xFFFFFFFF: raise ValueError, 'packet length too long'
    v = struct.pack('!L', len(packet))
    return v + packet

def make_nVlen(packet):
    '''Makes a datagram with a variable packet size prefix.
    It is a ValueError if the packet is too long to fit in this prefix.
    '''
    raise Exception, 'not supported yet'

def make_pickle(packet):
    '''Makes a datagram that encapsulates a python serialization (or
    "pickle") of an arbitrary object.  The remote system would have to
    have access to the identical definitions of any classes and datatypes
    used in the pickle.  To simplify the parsing side, the serialized
    data is then prefixed with length information, using the make_nVlen()
    function.
    '''
    blob = pickle.dumps(packet)
    return make_nVlen(blob)

#----------------------------------------------------------------------------

def _trace():
    text = traceback.format_exception(sys.exc_type,
                                      sys.exc_value,
                                      sys.exc_traceback)
    text = ''.join(text)
    text = text.split('\n')
    if not text[-1]: text.pop()
    _LOG.debug(('\n' + ' '*28 + '| ').join(text))

if True:
    _LOG = logging.getLogger('Relay')
    _LOG.trace = _trace
    _format = '(%(levelname).1s) %(asctime)-15s | %(message)s'
    logging.basicConfig(level=logging.DEBUG, format=_format)

def _mask_match(name, mask):
    # hostname string falls within hostmask
    name = name.replace('.', ':')
    mask = mask.replace('.', ':').replace('*', '.*') + r'\Z'
    if re.match(mask, name):
        return True
    return False

#----------------------------------------------------------------------------

def _test_crashing_parse_line(data):
    packet = data.find('\r\n')
    if packet < 0:
        return (None, data)
    if '1' in data: raise ValueError, 'boom'
    packet += len('\r\n')
    (packet, data) = (data[:packet], data[packet:])
    return (packet, data)

def _test_relay_to_relay():
    r = Relay(port=6600)
    t = Relay(port=7700)
    time.sleep(0.1)
    #
    a = ('localhost', 7700)
    r.call( a )
    time.sleep(0.5)
    #
    r.post( a, 'Hello, World\r\n' )
    time.sleep(0.5)
    #
    r.post( a, 'Blah blah blah\r\n' )
    time.sleep(0.5)
    #
    r.post( a, 'Goodbye, Cruel World\r\n' )
    time.sleep(0.5)
    #
    time.sleep(2)
    r.kill( a )
    time.sleep(0.5)
    #
    r.shutdown()
    t.shutdown()

def _test_socket_to_relay():
    r = Relay(port=5500)
    #
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(('localhost', 5500))
    s.send('Hello, World\r\n')
    s.send('Blah blah.\r\n')
    #
    time.sleep(0.5)
    s.close()
    time.sleep(0.5)
    #
    r.shutdown()

class _socker (threading.Thread):
    def __init__(self, address, breadth=3):
        threading.Thread.__init__(self)
        self.address = address
        self.breadth = breadth
        self.setDaemon(True)
    def run(self):
        import random
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(self.address)
        for i in range(self.breadth):
            try: s.send('i=%d\r\n' % i)
            except: pass
            time.sleep(0.1 + random.random() * 0.3)
        while True:
            try: chunk = s.recv(1024)
            except: chunk = ''
            if len(chunk) == 0:
                break
            _LOG.debug(repr(chunk))
        s.close()

def _test_multi_socket_to_relay():
    r = Relay(port=4400)
    #
    width = 3
    breadth = 3
    for i in range(width):
        s = _socker( ('localhost', 4400), breadth )
        s.start()
    #
    time.sleep(width)
    r.shutdown()

if __name__ == '__main__':
    for test in [ _test_socket_to_relay,
                  _test_multi_socket_to_relay,
                  _test_relay_to_relay,
                  ]:
        _LOG.debug('-'*40)
        test()


Contact Ed Halley by email at ed@halley.cc.
Text, code, layout and artwork are Copyright © 1996-2008 Ed Halley.
Copying in whole or in part, with author attribution, is expressly allowed.
Any references to trademarks are illustrative and are controlled by their respective owners.
Make donations with PayPal - it's fast, free and secure!