99 import sys, time, weakref, os
100 sys.path.append(
"../pyclasswrappers")
101 sys.path.append(
"pyclasswrappers")
102 from frameinfo
import FrameTypes, FrameSetTypes
103 from AssimCclasses
import *
107 '''Class defining our Neo4J database.'''
132 if not os.environ.has_key(
'JAVA_HOME'):
133 altjava=
'/etc/alternatives/java'
134 if os.path.islink(altjava):
135 javahome = os.path.dirname(os.path.dirname(os.readlink(altjava)))
136 os.environ[
'JAVA_HOME'] = javahome
138 self.
db = neo4j.GraphDatabase(pathname)
139 indexes = {
'ringindex':
'exact',
'droneindex':
'exact',
'ipindex':
'exact',
'macindex':
'exact'}
140 for idx
in indexes.keys():
141 if not self.db.node.indexes.exists(idx):
142 self.db.node.indexes.create(idx, type=indexes[idx])
145 self.
ipindex = self.db.node.indexes.get(
'ipindex')
146 self.
macindex = self.db.node.indexes.get(
'macindex')
149 'Find a unique ring in the ring index'
151 for ring
in ringhits:
158 'Add this ring node to our ring index'
160 assert node[
'nodetype'] ==
'ring'
161 assert self.
getring(name)
is None
165 'Find a unique drone node in the drone index'
166 dronehits = self.
droneindex[
'designation'][designation]
168 for drone
in dronehits:
171 except RuntimeError
as e:
172 print 'Caught runtime error searching drone index'
177 'Add this drone node to our drone index'
178 designation = drone[
'designation']
179 assert drone[
'nodetype'] ==
'drone'
180 assert self.
getdrone(designation)
is None
181 self.
ringindex[
'designation'][designation] = drone
184 'Find an interface node with a unique MAC address in the MAC address index'
185 machits = self.
macindex[
'macaddr'][address]
193 'Add this NIC node to our MAC address index'
194 mac = node[
'macaddr']
195 assert node[
'nodetype'] ==
'nic'
200 'Find a unique IP address node in our IP address index'
201 iphits = self.
ringindex[
'ipaddr'][address]
209 'Add this IP address node to our IP address index'
211 assert node[
'nodetype'] ==
'ipaddr'
216 'Create a new ring (or return a pre-existing one), and put it in the ring index'
219 print >>sys.stderr,
'Returning pre-existing ring [%s]' % ring.name
221 with self.db.transaction:
222 ring = self.db.node(nodetype=
'ring', name=name, **kw)
224 print >>sys.stderr,
'Creating new ring [%s]' % ring.name
228 'Create a new drone (or return a pre-existing one), and put it in the drone index'
229 print 'Adding drone', designation
231 if drone
is not None:
232 print 'Found drone %s in drone index' % designation
234 with self.db.transaction:
238 drone = self.db.node(nodetype=
'drone', designation=designation, **kw)
243 '''Create a new NIC (or return a pre-existing one), and put it in the mac address index,
244 and point it at its parent drone.'''
245 machits = self.
macindex[
'macaddr'][macaddr]
247 if mac.ipowner[0].outgoing.designation == drone.designation:
251 print "Duplicate MAC address for %s: %s vs %s" % \
252 (macaddr, drone.designation, mac.ipowner[0].outgoing.designation)
254 with self.db.transaction:
255 nic = self.db.node(nodetype=
'nic', macaddr=macaddr, **kw)
258 nic.relationships.create(
'nichost', drone, hostname=drone[
'designation'], reltype=
'nichost')
262 '''Create a new IP address (or return a pre-existing one), and point it at its parent
263 NIC and its grandparent drone'''
264 iphits = self.
ipindex[
'ipaddr'][macaddr]
265 drone = nic.nichost[0].outgoing
268 if ip.iphost[0].outgoing.designation == drone.designation:
272 print "Duplicate IP address for %s: %s vs %s" % \
273 (ipaddr, drone.designation, ip.ipowner[0].outgoing.designation)
275 with self.db.transaction:
276 ip = self.db.node(nodetype=
'ipaddr', ipaddr=ipaddr, **kw)
279 nic.relationships.create(
'ipowner', nic, reltype=
'ipowner')
281 nic.relationships.create(
'iphost', drone, reltype=
'iphost')
286 'Class defining the behavior of a heartbeat ring.'
293 def __init__(self, name, ringtype, parentring=None):
294 'Constructor for a heartbeat ring.'
295 if ringtype < HbRing.SWITCH
or ringtype > HbRing.THEONERING:
296 raise ValueError(
"Invalid ring type [%s]" % str(ringtype))
297 if HbRing.ringnames.has_key(name):
298 raise ValueError(
"ring name [%s] already exists." % str(naem))
299 HbRing.ringnames[name] = self
305 NEO.new_ring(name, ringtype=ringtype)
308 'Add this drone to our ring'
310 if self.members.has_key(drone.designation):
312 raise ValueError(
"Drone %s is already a member of this ring [%s]"
313 % (drone.designation, self.
name))
316 self.
members[drone.designation] = weakref.proxy(drone)
317 drone.ringmemberships[self.
name] = weakref.proxy(self)
321 if partners ==
None:
return
323 drone.start_heartbeat(self, partners[0])
324 partners[0].start_heartbeat(self, drone)
327 partners[0].stop_heartbeat(self, partners[1])
328 partners[1].stop_heartbeat(self, partners[0])
329 drone.start_heartbeat(self, partners[0], partners[1])
330 partners[0].start_heartbeat(self, drone)
331 partners[1].start_heartbeat(self, drone)
334 'Remove a drone from this heartbeat Ring.'
335 if not self.members.has_key(drone.designation):
336 raise ValueError(
"Drone %s is not a member of this ring [%s]"
337 % (drone.designation, self.
name))
340 if self.
memberlist[j].designation == drone.designation:
345 del self.
members[drone.designation]
346 del drone.ringmemberships[self.
name]
350 drone.stop_heartbeat(self, self.
memberlist[0])
351 self.
memberlist[0].stop_heartbeat(self, drone)
355 partner2loc=location-1
363 partner1.stop_heartbeat(self, drone)
364 if partner1loc != partner2loc:
366 partner2.stop_heartbeat(self, drone)
367 partner1.start_heartbeat(self, partner2)
368 partner2.start_heartbeat(self, partner1)
370 drone.stop_heartbeat(self, partner1, partner2)
372 def _findringpartners(self, drone):
373 'Find (one or) two partners for this drone to heartbeat with.'
379 self.memberlist.insert(0, weakref.proxy(drone))
380 nummember = len(self.memberlist)
381 if nummember == 1:
return None
382 if nummember == 2:
return (self.memberlist[1],)
383 return (self.memberlist[1], self.memberlist[nummember-1])
386 'Length function - returns number of members in this ring.'
390 ret =
'Ring("%s", [' % self.
name
392 for drone
in memberlist:
393 ret +=
'%s%s' % (comma, drone)
402 HbRing.ringnames = {}
403 TheOneRing =
HbRing(
'The One Ring', HbRing.THEONERING)
407 'Everything about Drones - endpoints that run our nanoprobes'
417 self.
drone = NEO.new_drone(designation, **kw)
421 DroneInfo.droneset = {}
422 DroneInfo.droneIPs = {}
425 'Record what IPs this drone has - and on what interfaces'
428 self.
addresses[str(addr)] = (addr, ifname)
431 'Process and save away JSON discovery data'
432 jsonobj = pyConfigContext(jsontext)
433 if not jsonobj.has_key(
'discovertype')
or not jsonobj.has_key(
'data'):
434 print >>sys.stderr,
'Invalid JSON discovery packet.'
436 dtype = jsonobj[
'discovertype']
440 if dtype ==
'netconfig':
444 'Save away the network configuration data we got from JSON discovery.'
448 data = jsonobj[
'data']
450 for intf
in data.keys():
452 isprimaryif= ifinfo.has_key(
'default_gw')
453 iptable = ifinfo[
'ipaddrs']
454 for ip
in iptable.keys():
456 if ipinfo[
'scope'] !=
'global':
458 (iponly,mask) = ip.split(
'/')
459 self.addaddr(iponly, intf)
460 DroneInfo.droneIPs[iponly] = self
462 if isprimaryif
and primaryip ==
None:
468 'Select an appropriate IP address for talking to this partner on this ring'
474 return partner.primaryIP
475 except AttributeError
as e:
478 return partner.startaddr
481 '''Send a message with an attached address list and optional port.
482 This is intended primarily for start or stop heartbeating messages.'''
483 fs = pyFrameSet(fstype)
485 if port
is not None and port > 0
and port < 65536:
486 pframe = pyIntFrame(FrameTypes.PORTNUM, intbytes=2, initval=int(port))
487 for addr
in addrlist:
488 if addr
is None:
continue
489 if pframe
is not None:
491 aframe = pyAddrFrame(FrameTypes.IPADDR, addrstring=addr)
493 self.io.sendframesets(dest, (fs,))
496 'Process a death/shutdown report for us. RIP us.'
497 print >>sys.stderr,
"Node %s has been reported as %s by address %s. Reason: %s" \
498 % (self.
designation, status, str(fromaddr), reason)
505 ringlist = self.ringmemberships.keys()
506 for ring
in ringlist:
507 HbRing.ringnames[ring].leave(self)
511 'Start heartbeating to the given partners'
513 partner1ip = self.
select_ip(ring, partner1)
514 if partner2
is not None:
515 partner2ip = self.
select_ip(ring, partner2)
521 self.
ringpeers[partner1.designation] = partner1
522 if partner2
is not None:
525 self.
ringpeers[partner2.designation] = partner2
528 self.
send_hbmsg(ourip, FrameSetTypes.SENDEXPECTHB, 0, (partner1ip, partner2ip))
531 'Stop heartbeating to the given partners.'
533 partner1ip = self.
select_ip(ring, partner1)
534 if partner2
is not None:
535 partner2ip = self.
select_ip(ring, partner2)
544 if partner2
is not None:
550 self.
send_hbmsg(ourip, FrameSetTypes.STOPSENDEXPECTHB, 0, (partner1ip, partner2ip))
553 'Give out our designation'
558 'Find a drone with the given designation or IP address.'
559 if isinstance(designation, str):
560 if DroneInfo.droneset.has_key(designation):
561 return DroneInfo.droneset[designation]
562 elif isinstance(designation, pyNetAddr):
564 saddr = str(designation)
565 if DroneInfo.droneIPs.has_key(saddr):
566 return DroneInfo.droneIPs[saddr]
570 def add(designation, io, reason, status='up'):
571 "Add a drone to our set if it isn't already there."
572 ret = DroneInfo.find(designation);
579 DroneInfo.droneset[designation] = ret
583 '''Base class for handling incoming FrameSets.
584 This base class is designated to handle unhandled FrameSets.
585 All it does is print that we received them.
590 fstype = frameset.get_framesettype()
591 print "Received FrameSet of type [%s] from [%s]" \
592 % (FrameSetTypes.get(fstype)[0], str(origaddr))
593 for frame
in frameset.iter():
594 frametype=frame.frametype()
595 print "\tframe type [%s]: [%s]" \
596 % (FrameTypes.get(frametype)[1], str(frame))
603 'DispatchTarget subclass for handling incoming HBDEAD FrameSets.'
605 'Dispatch function for HBDEAD FrameSets'
607 fstype = frameset.get_framesettype()
608 fromdrone = DroneInfo.find(origaddr)
609 print>>sys.stderr,
"DispatchHBDEAD: received [%s] FrameSet from [%s]" \
610 % (FrameSetTypes.get(fstype)[0], str(origaddr))
611 for frame
in frameset.iter():
612 frametype=frame.frametype()
613 if frametype == FrameTypes.IPADDR:
614 deaddrone = DroneInfo.find(frame.getnetaddr())
615 deaddrone.death_report(
'dead',
'HBDEAD packet', origaddr, frameset)
618 'DispatchTarget subclass for handling incoming STARTUP FrameSets.'
621 fstype = frameset.get_framesettype()
622 print >>sys.stderr,
"DispatchSTARTUP: received [%s] FrameSet from [%s]" \
623 % (FrameSetTypes.get(fstype)[0], str(origaddr))
624 for frame
in frameset.iter():
625 frametype=frame.frametype()
626 if frametype == FrameTypes.HOSTNAME:
627 sysname = frame.getstr()
628 if frametype == FrameTypes.JSDISCOVER:
629 json = frame.getstr()
630 fs = CMAlib.create_setconfig(self.
config)
636 self.io.sendframesets(origaddr, fs)
638 DroneInfo.add(sysname, self.
io,
'STARTUP packet')
639 drone = DroneInfo.find(sysname)
640 drone.startaddr=origaddr
643 TheOneRing.join(drone)
647 'We dispatch incoming messages where they need to go.'
649 'Constructor for MessageDispatcher - requires a dispatch table as a parameter'
654 'Dispatch a frameset where it will get handled.'
655 fstype = frameset.get_framesettype()
656 if self.dispatchtable.has_key(fstype):
659 self.default.dispatch(origaddr, frameset)
662 'Save our configuration away. We need it before we can do anything.'
664 self.default.setconfig(io, config)
665 for msgtype
in self.dispatchtable.keys():
669 'Listen for packets and get them dispatched as any good packet ought to be.'
673 self.
io = pyNetIOudp(config, pyPacketDecoder())
677 dispatch.setconfig(self.
io, config)
679 self.io.bindaddr(config[
"cmainit"])
680 self.io.setblockio(
True)
686 'Listen for packets. Get them dispatched.'
688 (fromaddr, framesetlist) = self.io.recvframesets()
695 for frameset
in framesetlist:
696 self.dispatcher.dispatch(fromaddr, frameset)
698 if __name__ ==
'__main__':
705 TheOneRing =
HbRing(
'The One Ring', HbRing.THEONERING)
706 print 'Ring created!!'
708 print FrameTypes.get(1)[2]
710 OurAddr = pyNetAddr((10,10,10,200),1984)
718 'outsig': pySignFrame(1),
719 'deadtime': 10*1000000,
720 'warntime': 3*1000000,
726 config = pyConfigContext(init=configinit)