The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
detrius/obsolete_cma.py
Go to the documentation of this file.
1 #
2 #
3 # This file is part of the Assimilation Project.
4 #
5 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
6 #
7 # The Assimilation software is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # The Assimilation software is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
19 #
20 #
21 #
22 # Design outline:
23 #
24 # All incoming network messages come in and get sent to a client who is a dispatcher.
25 #
26 # The dispatcher looks at the message type and computes which queue to send the
27 # message to based on the message type and contents.
28 #
29 # For death notices, the dispatcher forwards the message to the worker
30 # assigned to the switch the system is on - if known, or the worker
31 # assigned to the subnet.
32 #
33 # Each worker handles one or more rings - probably handling the per-switch rings
34 # for a subnet and the subnet ring as well. It is important to ensure that a ring
35 # is handled by only one worker. This eliminates locking concerns. When a given
36 # worker receives a death notice for a drone that is also in higher-level rings,
37 # it does its at its level and also forwards the request to the worker handling
38 # the higher level ring as well. The first subnet worker will also handle the work
39 # for the top-level (global) ring.
40 #
41 # Packets are ACKed by workers after all work has been completed. In the case of
42 # a drone on multiple rings, it is only ACKed after both rings have been fully
43 # repaired.
44 #
45 # The reason for this is that until it is fully repaired, the system might crash
46 # before completing its work. Retransmission timeouts will need to be set
47 # accordingly...
48 #
49 # Although congestion is normally very unlikely, this is not true for full
50 # datacenter powerons - where it is reasonably likely - depending on how
51 # quickly one can power on the servers and not pop circuit breakers or
52 # damage UPSes
53 # (it would be good to know how fast hosts can come up worst case).
54 #
55 #
56 # Misc Workers with well-known-names
57 # Request-To-Create-Ring
58 #
59 #
60 # Mappings:
61 #
62 # Drone-related information-------------------------
63 # NetAddr-to-drone-name
64 # drone-name to NetAddr
65 # (drone-name,ifname) to interface-info (including switch info)
66 # drone-neighbor-info:
67 # drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 #
69 # Ring-related information--------------------------
70 # drone-name to ring-name(s)
71 # ring-names to ring-information (level, #members, etc)
72 # ring-links-info ??
73 # Subnet-to-ring-name
74 # Switch-to-ring-name
75 # Global-ring-name [TheOneRing]
76 #
77 # Discovery-related information---------------------
78 # (drone-name, Interface-name) to LLDP/CDP packet
79 # (drone-name, discovery-type) to JSON info
80 #
81 #
82 # Misc Info-----------------------------------------
83 # NetAddr(MAC)-to-NetAddr(IP)
84 #
85 #
86 # Dispatcher logic:
87 # For now sends all requests to TheOneRing because we don't have
88 # a database yet ;-)
89 #
90 # We will need a database RealSoonNow :-D.
91 #
92 ################################################################################
93 #
94 # It is readily observable that the code is headed that way, but is a long
95 # way from that structure...
96 #
97 ################################################################################
98 
99 import sys, time, weakref, os
100 sys.path.append("../pyclasswrappers")
101 sys.path.append("pyclasswrappers")
102 from frameinfo import FrameTypes, FrameSetTypes
103 from AssimCclasses import *
104 
105 
106 class CMAdb:
107  '''Class defining our Neo4J database.'''
108 # Indexes:
109 # ringindex - index of all Ring objects [nodetype=ring]
110 # droneindex - index of all Drone objects [nodetype=drone]
111 # ipindex - index of all IP address objects [nodetype=ipaddr]
112 # macindex - index of all interfaces by MAC address [nodetype=nic]
113 
114 # Node types [nodetype enumeration values]:
115 # ring - heartbeat ring objects
116 # drone - systems running our nanoprobes
117 # nic - interfaces on drones
118 # ipaddr - IP addresses (ipv4 or ipv6)
119 
120 # Relationship types [reltype enumeration values]
121 # ------------------------------------------
122 # reltype fromnodetype tonodetype
123 # -------- ------------ ----------
124 # nichost nic drone
125 # iphost ipaddr drone
126 # ipowner ipaddr nic
127 # ringnext drone drone
128 # ringmember ring ipaddr
129 
130  def __init__(self, pathname):
131  # Code for the Java-deficient (like me)
132  if not os.environ.has_key('JAVA_HOME'):
133  altjava='/etc/alternatives/java'
134  if os.path.islink(altjava):
135  javahome = os.path.dirname(os.path.dirname(os.readlink(altjava)))
136  os.environ['JAVA_HOME'] = javahome
137  import neo4j
138  self.db = neo4j.GraphDatabase(pathname)
139  indexes = {'ringindex':'exact', 'droneindex':'exact', 'ipindex':'exact', 'macindex':'exact'}
140  for idx in indexes.keys():
141  if not self.db.node.indexes.exists(idx):
142  self.db.node.indexes.create(idx, type=indexes[idx])
143  self.ringindex = self.db.node.indexes.get('ringindex') # Rings
144  self.droneindex = self.db.node.indexes.get('droneindex') # Drones
145  self.ipindex = self.db.node.indexes.get('ipindex') # IP addresses
146  self.macindex = self.db.node.indexes.get('macindex') # MAC addresses
147 
148  def getring(self, name):
149  'Find a unique ring in the ring index'
150  ringhits = self.ringindex['name'][name]
151  for ring in ringhits:
152  ringhits.close()
153  return ring
154  ringhits.close()
155  return None
156 
157  def add_ringindex(self, node):
158  'Add this ring node to our ring index'
159  name = node['name']
160  assert node['nodetype'] == 'ring'
161  assert self.getring(name) is None
162  self.ringindex['name'][name] = node
163 
164  def getdrone(self, designation):
165  'Find a unique drone node in the drone index'
166  dronehits = self.droneindex['designation'][designation]
167  try:
168  for drone in dronehits:
169  dronehits.close()
170  return drone
171  except RuntimeError as e:
172  print 'Caught runtime error searching drone index'
173  dronehits.close()
174  return None
175 
176  def add_droneindex(self, drone):
177  'Add this drone node to our drone index'
178  designation = drone['designation']
179  assert drone['nodetype'] == 'drone'
180  assert self.getdrone(designation) is None
181  self.ringindex['designation'][designation] = drone
182 
183  def getMACaddr(self, address):
184  'Find an interface node with a unique MAC address in the MAC address index'
185  machits = self.macindex['macaddr'][address]
186  for mac in machits:
187  machits.close()
188  return mac
189  machits.close()
190  return None
191 
192  def add_MACindex(self, node):
193  'Add this NIC node to our MAC address index'
194  mac = node['macaddr']
195  assert node['nodetype'] == 'nic'
196  assert self.getMACaddr(mac) is None # Probably too strict
197  self.ringindex['macaddr'][mac] = node
198 
199  def getIPaddr(self, address):
200  'Find a unique IP address node in our IP address index'
201  iphits = self.ringindex['ipaddr'][address]
202  for ip in iphits:
203  iphits.close()
204  return ip
205  iphits.close()
206  return None
207 
208  def add_IPindex(self, node):
209  'Add this IP address node to our IP address index'
210  ip = node['ipaddr']
211  assert node['nodetype'] == 'ipaddr'
212  assert self.getIPaddr(ip) is None # Probably too strict
213  self.ringindex['ipaddr'][ip] = node
214 
215  def new_ring(self, name, **kw):
216  'Create a new ring (or return a pre-existing one), and put it in the ring index'
217  ring = self.getring(name)
218  if ring is not None:
219  print >>sys.stderr, 'Returning pre-existing ring [%s]' % ring.name
220  return ring
221  with self.db.transaction:
222  ring = self.db.node(nodetype='ring', name=name, **kw)
223  self.add_ringindex(ring)
224  print >>sys.stderr, 'Creating new ring [%s]' % ring.name
225  return ring
226 
227  def new_drone(self, designation, **kw):
228  'Create a new drone (or return a pre-existing one), and put it in the drone index'
229  print 'Adding drone', designation
230  drone = self.getdrone(designation)
231  if drone is not None:
232  print 'Found drone %s in drone index' % designation
233  return drone
234  with self.db.transaction:
235  print self
236  print self.db
237  print self.db.node
238  drone = self.db.node(nodetype='drone', designation=designation, **kw)
239  self.add_droneindex(drone)
240  return drone
241 
242  def new_nic(self, drone, macaddr, **kw):
243  '''Create a new NIC (or return a pre-existing one), and put it in the mac address index,
244  and point it at its parent drone.'''
245  machits = self.macindex['macaddr'][macaddr]
246  for mac in machits:
247  if mac.ipowner[0].outgoing.designation == drone.designation:
248  machits.close()
249  return mac
250  else:
251  print "Duplicate MAC address for %s: %s vs %s" % \
252  (macaddr, drone.designation, mac.ipowner[0].outgoing.designation)
253  machits.close()
254  with self.db.transaction:
255  nic = self.db.node(nodetype='nic', macaddr=macaddr, **kw)
256  self.add_MACindex(nic)
257  # Point this NIC at the drone that owns it.
258  nic.relationships.create('nichost', drone, hostname=drone['designation'], reltype='nichost')
259  return nic
260 
261  def new_IPaddr(self, nic, ipaddr, **kw):
262  '''Create a new IP address (or return a pre-existing one), and point it at its parent
263  NIC and its grandparent drone'''
264  iphits = self.ipindex['ipaddr'][macaddr]
265  drone = nic.nichost[0].outgoing
266  for ip in iphits:
267  # This works because we point at our grandparent as well as our parent...
268  if ip.iphost[0].outgoing.designation == drone.designation:
269  iphits.close()
270  return ip
271  else:
272  print "Duplicate IP address for %s: %s vs %s" % \
273  (ipaddr, drone.designation, ip.ipowner[0].outgoing.designation)
274  iphits.close()
275  with self.db.transaction:
276  ip = self.db.node(nodetype='ipaddr', ipaddr=ipaddr, **kw)
277  self.add_IPindex(ip)
278  # Point this IP address at the NIC that owns it.
279  nic.relationships.create('ipowner', nic, reltype='ipowner')
280  # Also point this IP address at the drone that owns its NIC (our grandparent)
281  nic.relationships.create('iphost', drone, reltype='iphost')
282  return ip
283 
284 
285 class HbRing:
286  'Class defining the behavior of a heartbeat ring.'
287  SWITCH = 1
288  SUBNET = 2
289  THEONERING = 3 # And The One Ring to rule them all...
290 
291  ringnames = {}
292 
293  def __init__(self, name, ringtype, parentring=None):
294  'Constructor for a heartbeat ring.'
295  if ringtype < HbRing.SWITCH or ringtype > HbRing.THEONERING:
296  raise ValueError("Invalid ring type [%s]" % str(ringtype))
297  if HbRing.ringnames.has_key(name):
298  raise ValueError("ring name [%s] already exists." % str(naem))
299  HbRing.ringnames[name] = self
300  self.members = {}
301  self.memberlist = []
302  self.ringtype = ringtype
303  self.name = str(name)
304  self.parentring = parentring
305  NEO.new_ring(name, ringtype=ringtype)
306 
307  def join(self, drone):
308  'Add this drone to our ring'
309  # Make sure he's not already in our ring according to our 'database'
310  if self.members.has_key(drone.designation):
311  print self.members
312  raise ValueError("Drone %s is already a member of this ring [%s]"
313  % (drone.designation, self.name))
314 
315  # Insert this drone into our 'database', and us into the drone's
316  self.members[drone.designation] = weakref.proxy(drone)
317  drone.ringmemberships[self.name] = weakref.proxy(self)
318  partners = self._findringpartners(drone) # Also adds drone to memberlist
319 
320  #print >>sys.stderr,'Adding drone %s to talk to partners'%drone.designation, partners
321  if partners == None: return
322  if len(self.memberlist) == 2:
323  drone.start_heartbeat(self, partners[0])
324  partners[0].start_heartbeat(self, drone)
325  return
326  elif len(self.memberlist) > 3:
327  partners[0].stop_heartbeat(self, partners[1])
328  partners[1].stop_heartbeat(self, partners[0])
329  drone.start_heartbeat(self, partners[0], partners[1])
330  partners[0].start_heartbeat(self, drone)
331  partners[1].start_heartbeat(self, drone)
332 
333  def leave(self, drone):
334  'Remove a drone from this heartbeat Ring.'
335  if not self.members.has_key(drone.designation):
336  raise ValueError("Drone %s is not a member of this ring [%s]"
337  % (drone.designation, self.name))
338  location = None
339  for j in range(0,len(self.memberlist)): # Index won't work due to weakproxy
340  if self.memberlist[j].designation == drone.designation:
341  location = j
342  break
343  # Remove the associations from the 'database'
344  del self.memberlist[location]
345  del self.members[drone.designation]
346  del drone.ringmemberships[self.name]
347 
348  if len(self.memberlist) == 0: return # Previous length: 1
349  if len(self.memberlist) == 1: # Previous length: 2
350  drone.stop_heartbeat(self, self.memberlist[0])
351  self.memberlist[0].stop_heartbeat(self, drone)
352  return
353  # Previous length had to be >= 3
354  partner1loc=location
355  partner2loc=location-1
356  if location >= len(self.memberlist):
357  partner1loc = 0
358  if location == 0:
359  partner2loc = len(self.memberlist)-1
360 
361  partner1 = self.memberlist[partner1loc]
362  partner2 = None
363  partner1.stop_heartbeat(self, drone)
364  if partner1loc != partner2loc:
365  partner2 = self.memberlist[partner2loc]
366  partner2.stop_heartbeat(self, drone)
367  partner1.start_heartbeat(self, partner2)
368  partner2.start_heartbeat(self, partner1)
369  # Poor drone -- all alone in the universe... (maybe even dead...)
370  drone.stop_heartbeat(self, partner1, partner2)
371 
372  def _findringpartners(self, drone):
373  'Find (one or) two partners for this drone to heartbeat with.'
374  # It would be nice to not keep updating the drone on the end of the list
375  # I suppose walking through the ring would be a good choice
376  # or maybe choosing a random insert position.
377 
378  # Insert the partner into the 'database'
379  self.memberlist.insert(0, weakref.proxy(drone))
380  nummember = len(self.memberlist)
381  if nummember == 1: return None
382  if nummember == 2: return (self.memberlist[1],)
383  return (self.memberlist[1], self.memberlist[nummember-1])
384 
385  def __len__(self):
386  'Length function - returns number of members in this ring.'
387  return len(self.memberlist)
388 
389  def __str__(self):
390  ret = 'Ring("%s", [' % self.name
391  comma=''
392  for drone in memberlist:
393  ret += '%s%s' % (comma, drone)
394  comma=','
395  ret += ']'
396  return ret
397 
398 
399  @staticmethod
400  def reset():
401  global TheOneRing
402  HbRing.ringnames = {}
403  TheOneRing = HbRing('The One Ring', HbRing.THEONERING)
404 
405 
406 class DroneInfo:
407  'Everything about Drones - endpoints that run our nanoprobes'
408  droneset = {}
409  droneIPs = {}
410  def __init__(self, designation, io,**kw):
411  self.designation = designation
412  self.addresses = {}
413  self.jsondiscovery = {}
414  self.ringpeers = {}
415  self.ringmemberships = {}
416  self.io = io
417  self.drone = NEO.new_drone(designation, **kw)
418 
419  @staticmethod
420  def reset():
421  DroneInfo.droneset = {}
422  DroneInfo.droneIPs = {}
423 
424  def addaddr(self, addr, ifname=None):
425  'Record what IPs this drone has - and on what interfaces'
426  #print >>sys.stderr, 'Address %s is on interface %s on %s' % \
427  # (addr, ifname, self.designation)
428  self.addresses[str(addr)] = (addr, ifname)
429 
430  def logjson(self, jsontext):
431  'Process and save away JSON discovery data'
432  jsonobj = pyConfigContext(jsontext)
433  if not jsonobj.has_key('discovertype') or not jsonobj.has_key('data'):
434  print >>sys.stderr, 'Invalid JSON discovery packet.'
435  return
436  dtype = jsonobj['discovertype']
437  #print "Saved discovery type %s for endpoint %s." % \
438  # (dtype, self.designation)
439  self.jsondiscovery[dtype] = jsonobj
440  if dtype == 'netconfig':
441  self.add_netconfig_addresses(jsonobj)
442 
443  def add_netconfig_addresses(self, jsonobj):
444  'Save away the network configuration data we got from JSON discovery.'
445  # Ought to protect this code by try blocks...
446  # Also ought to figure out which IP is the primary IP for contacting
447  # this system
448  data = jsonobj['data'] # The data portion of the JSON message
449  primaryip = None
450  for intf in data.keys(): # List of interfaces just below the data cection
451  ifinfo = data[intf]
452  isprimaryif= ifinfo.has_key('default_gw')
453  iptable = ifinfo['ipaddrs'] # look in the 'ipaddrs' section
454  for ip in iptable.keys(): # keys are 'ip/mask' in CIDR format
455  ipinfo = iptable[ip]
456  if ipinfo['scope'] != 'global':
457  continue
458  (iponly,mask) = ip.split('/')
459  self.addaddr(iponly, intf)
460  DroneInfo.droneIPs[iponly] = self
461 
462  if isprimaryif and primaryip == None:
463  primaryip = iponly
464  self.primaryIP = iponly
465  self.primaryIF = intf
466 
467  def select_ip(self, ring, partner):
468  'Select an appropriate IP address for talking to this partner on this ring'
469  # Not really good enough for the long term, but good enough for now...
470  # In particular, when talking on a particular switch ring, or
471  # subnet ring, we want to choose an IP that's on that subnet.
472  # For TheOneRing, we want their primary IP address.
473  try:
474  return partner.primaryIP
475  except AttributeError as e:
476  # This shouldn't happen, but it's a reasonable recovery,
477  # because we _have_ to know the address they're sending from.
478  return partner.startaddr
479 
480  def send_hbmsg(self, dest, fstype, port, addrlist):
481  '''Send a message with an attached address list and optional port.
482  This is intended primarily for start or stop heartbeating messages.'''
483  fs = pyFrameSet(fstype)
484  pframe = None
485  if port is not None and port > 0 and port < 65536:
486  pframe = pyIntFrame(FrameTypes.PORTNUM, intbytes=2, initval=int(port))
487  for addr in addrlist:
488  if addr is None: continue
489  if pframe is not None:
490  fs.addframe(pframe)
491  aframe = pyAddrFrame(FrameTypes.IPADDR, addrstring=addr)
492  fs.append(aframe)
493  self.io.sendframesets(dest, (fs,))
494 
495  def death_report(self, status, reason, fromaddr, frameset):
496  'Process a death/shutdown report for us. RIP us.'
497  print >>sys.stderr, "Node %s has been reported as %s by address %s. Reason: %s" \
498  % (self.designation, status, str(fromaddr), reason)
499  self.status = status
500  self.reason = reason
501  # There is a need for us to be a little more sophisticated
502  # in terms of the number of peers this particular drone had
503  # It's here in this place that we will eventually add the ability
504  # to distinguish death of a switch or subnet or site from death of a single drone
505  ringlist = self.ringmemberships.keys()
506  for ring in ringlist:
507  HbRing.ringnames[ring].leave(self)
508 
509 
510  def start_heartbeat(self, ring, partner1, partner2=None):
511  'Start heartbeating to the given partners'
512  ourip = self.select_ip(ring, self)
513  partner1ip = self.select_ip(ring, partner1)
514  if partner2 is not None:
515  partner2ip = self.select_ip(ring, partner2)
516  else:
517  partner2ip = None
518  #print >>sys.stderr, 'We want to start heartbeating %s to %s' \
519  #% (self.designation, partner1ip)
520  #print >>sys.stderr, "%s now peering with %s" % (self, partner1)
521  self.ringpeers[partner1.designation] = partner1
522  if partner2 is not None:
523  #print >>sys.stderr, 'We also want to start heartbeating %s to %s' \
524  #% (self.designation, partner2ip)
525  self.ringpeers[partner2.designation] = partner2
526  #print >>sys.stderr, "%s now peering with %s" % (self, partner2)
527  #print >>sys.stderr, self, self.ringpeers
528  self.send_hbmsg(ourip, FrameSetTypes.SENDEXPECTHB, 0, (partner1ip, partner2ip))
529 
530  def stop_heartbeat(self, ring, partner1, partner2=None):
531  'Stop heartbeating to the given partners.'
532  ourip = self.select_ip(ring, self)
533  partner1ip = self.select_ip(ring, partner1)
534  if partner2 is not None:
535  partner2ip = self.select_ip(ring, partner2)
536  else:
537  partner2ip = None
538  #print >>sys.stderr, 'We want to stop heartbeating %s to %s' \
539  # % (self.designation, partner1ip)
540  #print >>sys.stderr, "IN STOP: %s, %s" % (self.designation, partner1.designation)
541  #print >>sys.stderr, "PARTNERS:", self.ringpeers.keys()
542  # Remove partner1 from our 'database'
543  del self.ringpeers[partner1.designation]
544  if partner2 is not None:
545  #print >>sys.stderr, 'We also want to stop heartbeating %s to %s' \
546  #% (self.designation, partner2ip)
547  # Remove partner2 from our 'database'
548  del self.ringpeers[partner2.designation]
549  #print >>sys.stderr, self, self.ringpeers
550  self.send_hbmsg(ourip, FrameSetTypes.STOPSENDEXPECTHB, 0, (partner1ip, partner2ip))
551 
552  def __str__(self):
553  'Give out our designation'
554  return 'Drone(%s)' % self.designation
555 
556  @staticmethod
557  def find(designation):
558  'Find a drone with the given designation or IP address.'
559  if isinstance(designation, str):
560  if DroneInfo.droneset.has_key(designation):
561  return DroneInfo.droneset[designation]
562  elif isinstance(designation, pyNetAddr):
563  #Is there a concern about non-canonical IP address formats?
564  saddr = str(designation)
565  if DroneInfo.droneIPs.has_key(saddr):
566  return DroneInfo.droneIPs[saddr]
567  return None
568 
569  @staticmethod
570  def add(designation, io, reason, status='up'):
571  "Add a drone to our set if it isn't already there."
572  ret = DroneInfo.find(designation);
573  if ret is not None:
574  return ret
575  else:
576  ret = DroneInfo(designation, io)
577  ret.reason = reason
578  ret.status = status
579  DroneInfo.droneset[designation] = ret
580  return ret
581 
583  '''Base class for handling incoming FrameSets.
584  This base class is designated to handle unhandled FrameSets.
585  All it does is print that we received them.
586  '''
587  def __init__(self):
588  pass
589  def dispatch(self, origaddr, frameset):
590  fstype = frameset.get_framesettype()
591  print "Received FrameSet of type [%s] from [%s]" \
592  % (FrameSetTypes.get(fstype)[0], str(origaddr))
593  for frame in frameset.iter():
594  frametype=frame.frametype()
595  print "\tframe type [%s]: [%s]" \
596  % (FrameTypes.get(frametype)[1], str(frame))
597 
598  def setconfig(self, io, config):
599  self.io = io
600  self.config = config
601 
603  'DispatchTarget subclass for handling incoming HBDEAD FrameSets.'
604  def dispatch(self, origaddr, frameset):
605  'Dispatch function for HBDEAD FrameSets'
606  json = None
607  fstype = frameset.get_framesettype()
608  fromdrone = DroneInfo.find(origaddr)
609  print>>sys.stderr, "DispatchHBDEAD: received [%s] FrameSet from [%s]" \
610  % (FrameSetTypes.get(fstype)[0], str(origaddr))
611  for frame in frameset.iter():
612  frametype=frame.frametype()
613  if frametype == FrameTypes.IPADDR:
614  deaddrone = DroneInfo.find(frame.getnetaddr())
615  deaddrone.death_report('dead', 'HBDEAD packet', origaddr, frameset)
616 
618  'DispatchTarget subclass for handling incoming STARTUP FrameSets.'
619  def dispatch(self, origaddr, frameset):
620  json = None
621  fstype = frameset.get_framesettype()
622  print >>sys.stderr,"DispatchSTARTUP: received [%s] FrameSet from [%s]" \
623  % (FrameSetTypes.get(fstype)[0], str(origaddr))
624  for frame in frameset.iter():
625  frametype=frame.frametype()
626  if frametype == FrameTypes.HOSTNAME:
627  sysname = frame.getstr()
628  if frametype == FrameTypes.JSDISCOVER:
629  json = frame.getstr()
630  fs = CMAlib.create_setconfig(self.config)
631  #print 'Telling them to heartbeat themselves.'
632  #fs2 = CMAlib.create_sendexpecthb(self.config, FrameSetTypes.SENDEXPECTHB
633  #, origaddr)
634  #print 'Sending SetConfig frameset to %s' % origaddr
635  #self.io.sendframesets(origaddr, (fs,fs2))
636  self.io.sendframesets(origaddr, fs)
637  #print 'ADDING DRONE for system %s' % sysname
638  DroneInfo.add(sysname, self.io, 'STARTUP packet')
639  drone = DroneInfo.find(sysname)
640  drone.startaddr=origaddr
641  if json is not None:
642  drone.logjson(json)
643  TheOneRing.join(drone)
644 
645 
647  'We dispatch incoming messages where they need to go.'
648  def __init__(self, dispatchtable):
649  'Constructor for MessageDispatcher - requires a dispatch table as a parameter'
650  self.dispatchtable = dispatchtable
652 
653  def dispatch(self, origaddr, frameset):
654  'Dispatch a frameset where it will get handled.'
655  fstype = frameset.get_framesettype()
656  if self.dispatchtable.has_key(fstype):
657  self.dispatchtable[fstype].dispatch(origaddr, frameset)
658  else:
659  self.default.dispatch(origaddr, frameset)
660 
661  def setconfig(self, io, config):
662  'Save our configuration away. We need it before we can do anything.'
663  self.io = io
664  self.default.setconfig(io, config)
665  for msgtype in self.dispatchtable.keys():
666  self.dispatchtable[msgtype].setconfig(io, config)
667 
669  'Listen for packets and get them dispatched as any good packet ought to be.'
670  def __init__(self, config, dispatch, io=None):
671  self.config = config
672  if io is None:
673  self.io = pyNetIOudp(config, pyPacketDecoder())
674  else:
675  self.io = io
676 
677  dispatch.setconfig(self.io, config)
678 
679  self.io.bindaddr(config["cmainit"])
680  self.io.setblockio(True)
681  #print "IO[socket=%d,maxpacket=%d] created." \
682  #% (self.io.getfd(), self.io.getmaxpktsize())
683  self.dispatcher = dispatch
684 
685  def listen(self):
686  'Listen for packets. Get them dispatched.'
687  while True:
688  (fromaddr, framesetlist) = self.io.recvframesets()
689  if fromaddr is None:
690  # BROKEN! ought to be able to set blocking mode on the socket...
691  #print "Failed to get a packet - sleeping."
692  time.sleep(1.0)
693  else:
694  #print "Received packet from [%s]" % (str(fromaddr))
695  for frameset in framesetlist:
696  self.dispatcher.dispatch(fromaddr, frameset)
697 
698 if __name__ == '__main__':
699  #
700  # "Main" program starts below...
701  #
702 
703  NEO = CMAdb('/backups/neo1')
704 
705  TheOneRing = HbRing('The One Ring', HbRing.THEONERING)
706  print 'Ring created!!'
707 
708  print FrameTypes.get(1)[2]
709 
710  OurAddr = pyNetAddr((10,10,10,200),1984)
711  configinit = {
712  'cmainit': OurAddr, # Initial 'hello' address
713  'cmaaddr': OurAddr, # not sure what this one does...
714  'cmadisc': OurAddr, # Discovery packets sent here
715  'cmafail': OurAddr, # Failure packets sent here
716  'cmaport': 1984,
717  'hbport': 1984,
718  'outsig': pySignFrame(1),
719  'deadtime': 10*1000000,
720  'warntime': 3*1000000,
721  'hbtime': 1*1000000,
722  }
724  { FrameSetTypes.STARTUP: DispatchSTARTUP()
725  })
726  config = pyConfigContext(init=configinit)
727  listener = PacketListener(config, disp)
728  listener.listen()