The Assimilation Monitoring Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 #
22 #
23 #
24 # Design outline:
25 #
26 # All incoming network messages come in and get sent to a client who is a dispatcher.
27 #
28 # The dispatcher looks at the message type and computes which queue to send the
29 # message to based on the message type and contents.
30 #
31 # For death notices, the dispatcher forwards the message to the worker
32 # assigned to the switch the system is on - if known, or the worker
33 # assigned to the subnet.
34 #
35 # Each worker handles one or more rings - probably handling the per-switch rings
36 # for a subnet and the subnet ring as well. It is important to ensure that a ring
37 # is handled by only one worker. This eliminates locking concerns. When a given
38 # worker receives a death notice for a drone that is also in higher-level rings,
39 # it does its at its level and also forwards the request to the worker handling
40 # the higher level ring as well. The first subnet worker will also handle the work
41 # for the top-level (global) ring.
42 #
43 # Packets are ACKed by workers after all work has been completed. In the case of
44 # a drone on multiple rings, it is only ACKed after both rings have been fully
45 # repaired.
46 #
47 # The reason for this is that until it is fully repaired, the system might crash
48 # before completing its work. Retransmission timeouts will need to be set
49 # accordingly...
50 #
51 # Although congestion is normally very unlikely, this is not true for full
52 # datacenter powerons - where it is reasonably likely - depending on how
53 # quickly one can power on the servers and not pop circuit breakers or
54 # damage UPSes
55 # (it would be good to know how fast hosts can come up worst case).
56 #
57 #
58 # Misc Workers with well-known-names
59 # Request-To-Create-Ring
60 #
61 #
62 # Mappings:
63 #
64 # Drone-related information-------------------------
65 # NetAddr-to-drone-name
66 # drone-name to NetAddr
67 # (drone-name,ifname) to interface-info (including switch info)
68 # drone-neighbor-info:
69 # drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
70 #
71 # Ring-related information--------------------------
72 # drone-name to ring-name(s)
73 # ring-names to ring-information (level, #members, etc)
74 # ring-links-info ??
75 # Subnet-to-ring-name
76 # Switch-to-ring-name
77 # Global-ring-name [TheOneRing]
78 #
79 # Discovery-related information---------------------
80 # (drone-name, Interface-name) to LLDP/CDP packet
81 # (drone-name, discovery-type) to JSON info
82 #
83 #
84 # Misc Info-----------------------------------------
85 # NetAddr(MAC)-to-NetAddr(IP)
86 #
87 #
88 # Dispatcher logic:
89 # For now sends all requests to TheOneRing because we need to write more code ;-)
90 #
91 #
92 ################################################################################
93 #
94 # It is readily observable that the code is headed that way, but is a long
95 # way from that structure...
96 #
97 ################################################################################
98 
99 
100 if __name__ == '__main__':
101  import optparse, atexit, time
102  #
103  # "Main" program starts below...
104  # It is a the real CMA intended to run with some real nanoprobes running
105  # somewhere out there...
106  #
107  DefaultPort = 1984
108  import os, sys, signal
109  # VERY Linux-specific - but useful and apparently correct ;-)
110  PrimaryIPcmd = \
111  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
112  ipfd = os.popen(PrimaryIPcmd, 'r')
113  OurAddrStr=('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
114  ipfd.close()
115 
116  OurPort = None
117 
118  parser = optparse.OptionParser(prog='CMA', version='0.0.1',
119  description='Collective Management Authority for the Assimilation System',
120  usage='cma.py [--bind address:port]')
121 
122  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
123  , metavar='address:port-to-bind-to'
124  , help='Address:port to listen to - for nanoprobes to connect to')
125 
126  parser.add_option('-d', '--debug', action='count', default=0, dest='debug'
127  , help='enable debug for CMA and libraries - multiple occurances increase debug value')
128 
129  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
130  , help='Return status of running CMA')
131 
132  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
133  , help='Shut down running CMA.')
134 
135  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
136  , help='Erase Neo4J before starting')
137 
138  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
139  , help='keep the CMA from going into the background')
140 
141  parser.add_option('-p', '--pidfile', action='store', default='/var/run/cma', dest='pidfile'
142  , metavar='pidfile-pathname'
143  , help='full pathname of where to locate our pid file')
144 
145  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
146  , help='Trace CMA execution')
147 
148 
149  opt, args = parser.parse_args()
150 
151  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
152  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
153 
154 
155  if opt.status:
157  os._exit(rc)
158 
159  if opt.kill:
160  if kill_pid_service(opt.pidfile, 15) < 0:
161  print >> sys.stderr, "Unable to stop CMA."
162  os._exit(1)
163  os._exit(0)
164 
165 
166  # This doesn't seem to work no matter where I invoke it...
167  # But if we don't fork in daemonize_me() ('C' code), it works great...
168 # def cleanup():
169 # remove_pid_file(opt.pidfile)
170 # atexit.register(cleanup)
171 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
172 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
173 
174  daemonize_me(opt.foreground, '/', opt.pidfile)
175 
176  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
177 
178  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
179  assimilation_openlog("cma")
180  from packetlistener import PacketListener
181  from messagedispatcher import MessageDispatcher
182  from dispatchtarget import DispatchSTARTUP, DispatchHBDEAD, DispatchJSDISCOVERY, \
183  DispatchSWDISCOVER, DispatchHBSHUTDOWN
184  import cmadb
185  from AssimCclasses import pyNetAddr, pySignFrame, pyConfigContext, pyReliableUDP, \
186  pyPacketDecoder
187  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
188  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_HBPORT, CONFIGNAME_OUTSIG, \
189  CONFIGNAME_DEADTIME, CONFIGNAME_WARNTIME, CONFIGNAME_HBTIME, CONFIGNAME_OUTSIG,\
190  proj_class_incr_debug, VERSION_STRING, LONG_LICENSE_STRING
191  from frameinfo import FrameTypes, FrameSetTypes
192  import py2neo
193  for debug in range(opt.debug):
195 
196  if opt.bind is None:
197  BindAddrStr = ('0.0.0.0:%d' % DefaultPort)
198  else:
199  BindAddrStr = opt.bind
200  OurAddrStr = opt.bind
201 
202  OurAddr = pyNetAddr(OurAddrStr)
203  BindAddr = pyNetAddr(BindAddrStr)
204  if OurAddr.port() == 0:
205  OurAddr.setport(DefaultPort)
206  OurPort = OurAddr.port()
207 
208 
209  configinit = {
210  CONFIGNAME_CMAINIT: BindAddr, # Initial listening (bind) address
211  CONFIGNAME_CMAADDR: OurAddr, # not sure what this one does...
212  CONFIGNAME_CMADISCOVER: OurAddr,# Discovery packets sent here
213  CONFIGNAME_CMAFAIL: OurAddr, # Failure packets sent here
214  CONFIGNAME_CMAPORT: OurPort,
215  CONFIGNAME_HBPORT: OurPort,
216  CONFIGNAME_OUTSIG: pySignFrame(1),
217  CONFIGNAME_DEADTIME: 10*1000000,
218  CONFIGNAME_WARNTIME: 3*1000000,
219  CONFIGNAME_HBTIME: 1*1000000,
220  CONFIGNAME_OUTSIG: pySignFrame(1),
221  }
222  config = pyConfigContext(init=configinit)
223  io = pyReliableUDP(config, pyPacketDecoder(0))
224  trycount=0
225  while True:
226  try:
227  cmadb.CMAdb.initglobal(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
228  except py2neo.rest.SocketError:
229  trycount+=1
230  if trycount > 300:
231  remove_pid_file(opt.pidfile)
232  print >>sys.stderr, ('Neo4j still not started - giving up.')
233  cmadb.CMAdb.log.critical('Neo4j still not started - giving up.')
234  raise SystemExit(1)
235  if (trycount % 60) == 1:
236  print >>sys.stderr, ('Waiting for Neo4j to start.')
237  cmadb.CMAdb.log.warning('Waiting for Neo4j to start.')
238  # Let's try again in a second...
239  time.sleep(1)
240  continue
241  # Neo4j started. All is well with the world.
242  break
243 
244  cmadb.CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
245  cmadb.CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
246  if cmadb.CMAdb.debug:
247  cmadb.CMAdb.log.info('TheOneRing created - id = %d' % cmadb.CMAdb.TheOneRing.node.id)
248  cmadb.CMAdb.log.info('Config Object sent to nanoprobes: %s' % config)
249 
250  print FrameTypes.get(1)[2]
251  disp = MessageDispatcher(
252  { FrameSetTypes.STARTUP: DispatchSTARTUP(),
253  FrameSetTypes.HBDEAD: DispatchHBDEAD(),
254  FrameSetTypes.JSDISCOVERY: DispatchJSDISCOVERY(),
255  FrameSetTypes.SWDISCOVER: DispatchSWDISCOVER(),
256  FrameSetTypes.HBSHUTDOWN: DispatchHBSHUTDOWN()
257  })
258  cmadb.CMAdb.log.info('Starting CMA version %s - licensed under %s'
259  % (VERSION_STRING, LONG_LICENSE_STRING))
260  if opt.foreground:
261  print >>sys.stderr, ('Starting CMA version %s - licensed under %s'
262  % (VERSION_STRING, LONG_LICENSE_STRING))
263  # Important to note that we don't want PacketListener to create its own 'io' object
264  # or it will screw up the ReliableUDP protocol...
265  listener = PacketListener(config, disp, io=io)
266  if opt.doTrace:
267  import trace
268  tracer = trace.Trace(count=False, trace=True)
269  if cmadb.CMAdb.debug: cmadb.CMAdb.log.debug(
270  'Starting up traced listener.listen(); debug=%d' % opt.debug)
271  if opt.foreground: print >>sys.stderr, (
272  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
273  tracer.run('listener.listen()')
274  else:
275  if cmadb.CMAdb.debug: cmadb.CMAdb.log.debug(
276  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
277  if opt.foreground: print >>sys.stderr, (
278  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
279  listener.listen()