|
Programmer's Notebook |
All computer source code presented on this page, unless it includes attribution to another author, is provided by Ed Halley under the Artistic License. Use such code freely and without any expectation of support. I would like to know if you make anything cool with the code, or need questions answered.
python/ bindings.py boards.py buzz.py cache.py cards.py constraints.py english.py getopts.py gizmos.py goals.py improv.py interpolations.py namespaces.py nihongo.py nodes.py octalplus.py patterns.py persist.py physics.py pieces.py quizzes.py recipes.py relays.py romaji.py ropen.py sheets.py strokes.py subscriptions.py svgbuild.py testing.py things.py timing.py ucsv.py useful.py uuid.py vectors.py weighted.py java/ GlobFilenameFilter.java RegexFilenameFilter.java StringBufferOutputStream.java ThreadSet.java TracingThread.java Utf8ConsoleTest.java perl/ CVQM.pm Kana.pm Typo.pm cxx/ CCache.h equalish.cpp |
# nodes - a dataflow model of processing ''' The nodes module provides a dataflow model of processing. ABSTRACT Each node consists of a set of data inputs called "sources," a set of data outputs called "results," and a processing routine which can be executed whenever data is available. A process is a collection of nodes, with pipeline connections wired up between each node, from the results of one node to the sources of others. The process can be stored as a simple configuration list that describes how the individual nodes are organized. AUTHOR Ed Halley (ed@halley.cc) 31 March 2007 ''' import subscriptions ; from subscriptions import Subscription #---------------------------------------------------------------------------- class Node (object): '''A node is an *instance* of a given computational component type, an instance as used at a given place within a larger system of parts. It has a number of ports by internally unique names. Source ports expect a certain data type. Results ports publish a certain data type (or an exception object in cases of failure). When executed, data is parked on the results ports and can be collected at any time.''' # +---------------+ # (s) | # | (r) # (s) | # +---------------+ import namespaces names = namespaces.Namespace() SOURCE = -1 RESULT = +1 DEBUG = False def debug(self): return Node.DEBUG or self._debug #--------------- # Infrastructure def __init__(self): super(Node, self).__init__() self.name = None self.rename(self.__class__.__name__) self.parent = None self.sources = {} self.results = {} self.terms = {} self.ports = {} self.gender = {} self.first = True self._debug = False def __str__(self): return self.name def __repr__(self): me = self.__class__.__name__ + '():' if self.name != self.__class__.__name__: me += '\n\t.name = ' + self.name if self.parent is not None: me += '\n\t.parent = ' + self.parent if self.ports: me += '\n\t.ports = {' for each in self.ports: kind = self.ports[each] if isinstance(kind, type): kind = kind.__name__ gender = self.gender[each] gender = { Node.SOURCE: 'source', Node.RESULT: 'result' }[gender] me += '\n\t\t:%s [%s] (%s)' % (each, gender, kind) me += '\n\t\t}' return me #----------------------------------------- # Defining Names for Self, Terms and Ports def rename(self, name): if Node.names.object(name) is self: return name if self.name: Node.names.unregister(self, self.name) self.name = Node.names.register(self, name) return self.name def term(self, name, value=None): if value is not None: self.terms[name] = value if name in self.terms: return self.terms[name] if self.parent: parent = Node.names.object(self.parent) return parent.term(name) return None def port(self, name, gender=None, type=object, subscription=None): if gender is None: if name in self.gender: return self.gender[name] return None if not gender in [Node.RESULT, Node.SOURCE]: raise ValueError('Port must be of type RESULT or SOURCE.') if name in self.ports: if type is not self.ports[name] or gender != self.gender[name]: raise ValueError('Cannot redefine port type after creation.') return self.ports[name] = type self.gender[name] = gender self.results[name] = None if gender == Node.RESULT: if subscription is not None: if not isinstance(subscription, Subscription): subscription = Subscription() self.results[name] = subscription #-------------------------------- # Support for Container/Parentage def contains(self, node): '''Returns False because plain Nodes cannot contain children. ''' return False def within(self, node): '''Returns True iff the given node is anywhere up our parent chain. ''' if self.parent == self.name: return True if self.parent: parent = Node.names.object(self.parent) return parent.within(node) return False #----------------------------- # General Processing Callbacks def init(self): pass def process(self): return True def reset(self): pass #-------------------- # Process Port Access def peek(self, port, count): if port in self.sources: if self.ready(port, count): peeks = self.sources[port].peek(count) return peeks return None def receive(self, port): received = None if port in self.sources: received = self.sources[port] if isinstance(received, Subscription): received = received.receive(self.name+':'+port) return received def publish(self, port, product): if port in self.results: if not isinstance(product, self.ports[port]): raise TypeError('Cannot publish a %s, expected a %s.' % ( type(product).__name__, self.ports[port].__name__ )) subscription = self.results[port] if isinstance(subscription, Subscription): product = subscription.publish(product) else: self.results[port] = product return product def flush(self): for port in self.results: if not isinstance(self.results[port], Subscription): self.results[port] = None #----------------------------------- # Processing and Port Infrastructure def setup(self, sources={}): if self.first: self.init() self.first = False self.sources = {} for each in self.ports: if self.gender[each] == Node.SOURCE: self.sources[each] = None if self.gender[each] == Node.RESULT: if not each in self.results: self.results[each] = None elif not isinstance(self.results[each], Subscription): self.results[each] = None for each in sources: if each in self.ports: if self.gender[each] == Node.SOURCE: self.sources[each] = sources[each] if isinstance(self.sources[each], Subscription): self.sources[each].register(self.name+':'+each) def ready(self, port=None, count=1): if port is None: for port in self.ports: if self.gender[port] == Node.SOURCE: if not self.ready(port=port, count=count): return False return True if isinstance(self.sources[port], Subscription): if not self.sources[port].ready(self.name+':'+port): return False return True def go(self, sources={}): if self.debug(): print "%s{'%s'}.go(...)" % (self.__class__.__name__, self.name) self.setup(sources) if not self.ready(): return None if not self.process(): return None return self.results #---------------------------------------------------------------------------- class Compound (Node): '''A compound node may have children nodes joined into it. There is no particular connectivity enforced internally; extensions to this class may combine, compare or filter the results internally as desired. The compound node can execute the processing capability for any child. ''' # +---------------+ # (s) | # | +---+ (r) # | (s) (r) | # | +---+ | # | +---+ | # | (s) (r) | # | +---+ | # | | # +---------------+ def __init__(self): super(Compound, self).__init__() self.children = set({}) def __repr__(self): me = super(Compound, self).__repr__() if self.children: me += '\n\t.children = [' + ', '.join(self.children) + ']' return me def child(self, node): '''Given a name or a Node instance which is an immediate child, returns the Node instance. Returns None if the given argument is not an immediate child. ''' if node in self.children: return Node.names.object(node) if isinstance(node, Node): if node.name in self.children: return node return None def contains(self, node): '''Returns True iff the given node is one of our immediate children. ''' return str(node) in self.children def enjoin(self, node): if isinstance(node, list) or isinstance(node, tuple): nodes = node for each in nodes: self.enjoin(each) return nodes node = Node.names.object(str(node)) if not isinstance(node, Node): raise TypeError('Must give a Node instance.') if node is self: raise ValueError('Cannot join a node into itself.') if self.within(node): raise ValueError('The node cannot be joined within itself.') if node.parent == self.name: return node if node.parent: raise ValueError('The node is already enjoined elsewhere.') node.parent = self.name self.children.add(node.name) if node.first: node.init() node.first = False return node def unjoin(self, node): if not self.contains(node): raise ValueError('The given node is not an immediate child.') node = Node.names.object(str(node)) self.children.discard(node.name) node.parent = None def execute(self, node, sources={}): node = self.child(node) if not node: raise ValueError('Can only execute an immediate child node.') return node.go(sources) def reset(self): super(Compound, self).reset() for each in self.children: node = self.child(each) node.reset() node.flush() #---------------------------------------------------------------------------- class Process (Compound): '''A process is a compound node, in which a number of internal nodes are executed to develop an ultimate set of results. The internal nodes are wired with a directed acyclic graph of connections, and only those subnodes which contribute to the desired results are executed.''' # +---------------------+ # (s)-+ | # | | +---+ +-(r) # | +-(s) (r)+ | | # | +---+ | | | # | / | | # | | +---+ | | # | +(s) (r)-+ | # | +---+ | # | | # +---------------------+ def __init__(self): super(Process, self).__init__() self.connections = set({}) def __repr__(self): me = super(Process, self).__repr__() if self.connections: me += '\n\t.connections = [' for each in self.connections: (upstream, result, downstream, source) = each upstream = str(upstream) downstream = str(downstream) me += ( '\n\t\t(%s:%s -> %s:%s),' % (upstream, result, downstream, source) ) me += '\n\t\t]' return me #------------------------------- # Relationships between Children def upstream(self, node, deep=False): '''Returns a list of nodes upstream of the given node.''' node = str(node) uplinks = [ link[0] for link in self.connections if link[2] == node ] if deep: deep = uplinks[:] for other in deep: uplinks.extend(self.upstream(other, True)) return uplinks def downstream(self, node, deep=False): '''Returns a list of nodes downstream of the given node.''' node = str(node) uplinks = [ link[2] for link in self.connections if link[0] == node ] if deep: deep = uplinks[:] for other in deep: uplinks.extend(self.upstream(other, True)) return uplinks def original(self, node): '''Checks if a given node is original, or has no upstream ports.''' node = self.child(node) if not node.ports: return True if not self.upstream(node): return True return False def originals(self): '''Returns a list of our nodes which are considered original.''' return [ node for node in self.children if self.original(node) ] def terminal(self, node): '''Checks if a given node is terminal, or has no downstream ports.''' node = self.child(node) if not node.ports: return True if not self.downstream(node): return True return False def terminals(self): '''Returns a list of our nodes which are considered terminal.''' return [ node for node in self.children if self.terminal(node) ] #------------------------ # Connecting the Children def connected(self, upstream, result, downstream, source): '''Checks if a given connection already exists.''' upstream = str(upstream) downstream = str(downstream) return (upstream, result, downstream, source) in self.connections def connect(self, upstream, result, downstream, source): '''Adds a connection from any port to another compatible port. A results port can feed into any source port of the same type, as long as it does not publish a loop (cycle) in the graph of connections. Redundant identical connections are ignored.''' upstream = self.child(upstream) downstream = self.child(downstream) # every possible reason you can't connect, from easiest to hardest if upstream is downstream: raise ValueError('A node cannot source from its own results.') if not self.contains(upstream): raise ValueError('Upstream must be a Node instance or name.') if not upstream.port(result): raise ValueError('Upstream node needs port "%s".' % result) if upstream.gender[result] != Node.RESULT: raise ValueError('Upstream port "%s" must be output.' % result) if not self.contains(downstream): raise ValueError('Downstream must be a Node instance or name.') if not downstream.port(source): raise ValueError('Downstream node needs port "%s".' % source) if downstream.gender[source] != Node.SOURCE: raise ValueError('Downstream port "%s" must be input.' % source) if upstream.ports[result] != downstream.ports[source]: if True: # if there's no type conversion from source to result, raise ValueError('Result type conflicts with source type.') if downstream.name in self.upstream(upstream): raise ValueError('A node result cannot cycle to an upstream node.') link = (upstream.name, result, downstream.name, source) for c in self.connections: if c[2:3] == link[2:3] and c[0:1] != link[0:1]: raise ValueError('Downstream port "%s" already connected.' % source) if result in upstream.results: if isinstance(upstream.results[result], Subscription): upstream.results[result].register(link[2]+':'+link[3]) self.connections.add(link) return True def disconnect(self, upstream, result, downstream, source): '''Removes any existing connection from one port to another.''' upstream = self.child(upstream) downstream = self.child(downstream) link = (upstream.name, result, downstream.name, source) if link in self.connections: self.connections.remove(link) return True #------------------------- # Connections Data Caching def feed(self, node): '''Gathers available data from connections destined for the source ports of a given node. Returns a sources={} dict if all of the source ports have data ready. Returns None if the node is not yet ready after collecting all available data. ''' node = self.child(node) sources = {} #TODO: propagate from our own source ports to our children's sources pass # propagate from upstream connections connects = [ c for c in self.connections if c[2] == node.name ] for c in connects: (upstream, result, downstream, source) = c upstream = self.child(upstream) if result in upstream.results: product = upstream.results[result] if product is None: return None if isinstance(product, Subscription): if not product.ready(downstream+':'+source): return None sources[source] = product # see if we're 100% ready for port in node.ports: if node.gender[port] != Node.SOURCE: continue if not port in sources: return None return sources #---------------- # Main Processing def process(self, targets=[], drain=False): '''Processes those nodes required to fulfill the targets.''' # figure out targets (result-most nodes that will demand data) if not targets: targets = self.terminals() if not targets: targets = self.children.keys() for i in range(len(targets)): if hasattr(targets[i], 'name'): targets[i] = targets[i].name # all targets and their upstreams are pending pending = {} for each in targets: pending[each] = True nodes = self.upstream(each, True) for node in nodes: pending[node] = True self.child(node).flush() if drain: for each in pending.keys(): if self.original(each): del pending[each] if self.debug(): print 'Process.process() will try %s' % repr(pending.keys()) # only subscriptions persist in the connection data sources = self.sources # while there is input available, if self.ready(): # try to give every pending node a nudge while pending: progress = False for each in pending.keys(): # if the node can process, do so sources = self.feed(each) if sources is None: if self.debug(): print 'Node "%s" not fed, must retry it.' % each continue results = self.execute(each, sources) if results is None: if self.debug(): print 'Node "%s" said to fail.' % each continue if self.debug(): print 'Node "%s" succeeded.' % each del pending[each] progress = True if not progress: break # if any of our targets are stalled, we are stalled if pending: if self.debug(): print ( 'Still had %s for pending; stalled for more data.' % pending.keys() ) return False # for each original target, post the output as results for each in targets: pass if self.debug(): print 'Process.process() success' return True #---------------- # Other Overrides def unjoin(self, node): '''Disconnects and removes a child node from this process node.''' node = str(node) if not self.contains(node): raise ValueError('The node is not joined to this process.') for c in self.connections.keys(): if c[0] == node or c[2] == node: del self.connections[c] super(Process).unjoin(self, node) #---------------------------------------------------------------------------- if __name__ == "__main__": # Unit tests are run by executing this module directly on command-line. # Tests are successful unless you see a python traceback in the output. class _Filter (Node): def process(self): print '_Filter.process()' value = self.receive('in') self.publish('out', value + 1.1) return True # construction tests a = _Filter() ; a.rename("A") a.port("out", Node.RESULT, float) a.port("in", Node.SOURCE, float) if a.port("out"): print "A has a port 'out'" print repr(a) b = Compound() ; b.rename("B") b.port("out", Node.RESULT, float) b.port("in", Node.SOURCE, float) if b.port("in"): print "B has a port 'in'" b.port("snork", Node.SOURCE, float) if b.port("snork"): print "B has a port 'snork'" print repr(b) c = Node() ; c.rename("C") c.port("out", Node.RESULT, float) c.port("in", Node.SOURCE, float) if c.port("out"): print "C has a port 'out'" print repr(c) p = Process() ; p.rename("P") if not p.contains(a): print "P does not contain A" p.enjoin(a) if p.contains(a): print "P contains A" p.enjoin(b) if p.contains(b): print "P contains B" p.enjoin(c) if p.contains(c): print "P contains C" if not a.contains(b): print "A does not contain B" if not b.contains(a): print "B does not contain A" # connections try: p.connect(a,'nonexistant',a,'nonexistant') except Exception, detail: print detail try: p.connect(a,'nonexistant',b,'nonexistant') except Exception, detail: print detail try: p.connect(a,'out',b,'nonexistant') except Exception, detail: print detail try: p.connect(a,'out',a,'out') except Exception, detail: print detail try: p.connect(a,'out',a,'in') except Exception, detail: print detail try: p.connect(a,'out',b,'out') except Exception, detail: print detail print repr(p) p.connect(a,'out',b,'in') if p.connected(a,'out',b,'in'): print "A.out -> B.in" try: p.connect(c,'out',b,'in') except Exception, detail: print detail try: b.join(p) except Exception, detail: print detail p.connect(a,'out',c,'in') if p.connected(a,'out',c,'in'): print "A.out -> C.in" print "Upstream from A is", p.upstream(a) print "Downstream from A is", p.downstream(a) print "Upstream from B is", p.upstream(b) print "Downstream from B is", p.downstream(b) print repr(p) # single-node processing print '=====' a = _Filter() ; a.rename("A") a.port("out", Node.RESULT, float, subscription=False) a.port("in", Node.SOURCE, float) results = a.go({'in': 2.2}) print repr(results) print '--' a = _Filter() ; a.rename("A") a.port("out", Node.RESULT, float, subscription=True) a.port("in", Node.SOURCE, float) data = Subscription() data.publish(1.1) data.publish(2.2) results = a.go({'in': data}) print repr(results) results = a.go({'in': data}) print repr(results) # processing a simple A->B process with a non-subscription connection print '=====' class _GeneratorFlat (Node): def init(self): self.port('out', Node.RESULT, float) self.foo = 1.0 def process(self): print '_GeneratorFlat.process()' self.publish('out', self.foo) print ' _generator > ', self.foo self.foo += 1.0 return True class _Reporter (Node): def init(self): self.port('in', Node.SOURCE, float) def process(self): print '_Reporter.process()' self.foo = self.receive('in') print '> _reporter ', self.foo return True p = Process() a = _GeneratorFlat() p.enjoin(a) b = _Reporter() p.enjoin(b) p.connect(a, 'out', b, 'in') print repr(p) p.process() print '--' p.process() if b.foo != 2.0: raise ValueError('final test did not pipeline a 2.0 result') # processing a simple A->B process with a subscription connection print '=====' class _Generator (Node): def init(self): self.port('out', Node.RESULT, float, subscription=True) self.foo = 1.0 def process(self): print '_Generator.process()' self.publish('out', self.foo) print ' _generator > ', self.foo self.foo += 1.0 return True p = Process() a = _Generator() p.enjoin(a) b = _Reporter() p.enjoin(b) p.connect(a, 'out', b, 'in') print repr(p) p.process() print '--' p.process() if b.foo != 2.0: raise ValueError('final test did not pipeline a 2.0 result') # processing a fork with subscription connections print '=====' p = Process() a = _Generator() p.enjoin(a) b = _Reporter() p.enjoin(b) c = _Reporter() p.enjoin(c) p.connect(a, 'out', b, 'in') p.connect(a, 'out', c, 'in') print repr(p) print '--' p.process() print '--' p.process() if b.foo != 2.0: raise ValueError('final test on node B did not get a 2.0 result') if c.foo != 2.0: raise ValueError('final test on node C did not get a 2.0 result') |
|
Contact Ed Halley by email at
ed@halley.cc. Text, code, layout and artwork are Copyright © 1996-2008 Ed Halley. Copying in whole or in part, with author attribution, is expressly allowed. Any references to trademarks are illustrative and are controlled by their respective owners. |
|