The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, time
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR
104 import importlib
105 #import atexit
106 
107 
108 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
109  , 'linkdiscovery'
110  , 'checksumdiscovery'
111  , 'monitoringdiscovery'
112  ]
113 #
114 # "Main" program starts below...
115 # It is a the real CMA intended to run with some real nanoprobes running
116 # somewhere out there...
117 #
118 #pylint: disable=R0914
119 def main():
120  'Main program for the CMA (Collective Management Authority)'
121  DefaultPort = 1984
122  # VERY Linux-specific - but useful and apparently correct ;-)
123  PrimaryIPcmd = \
124  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
125  ipfd = os.popen(PrimaryIPcmd, 'r')
126  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
127  ipfd.close()
128 
129  OurPort = None
130 
131  parser = optparse.OptionParser(prog='CMA', version='0.0.1',
132  description='Collective Management Authority for the Assimilation System',
133  usage='cma.py [--bind address:port]')
134 
135  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
136  , metavar='address:port-to-bind-to'
137  , help='Address:port to listen to - for nanoprobes to connect to')
138 
139  parser.add_option('-d', '--debug', action='count', default=0, dest='debug'
140  , help='enable debug for CMA and libraries - multiple occurances increase debug value')
141 
142  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
143  , help='Return status of running CMA')
144 
145  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
146  , help='Shut down running CMA.')
147 
148  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
149  , help='Erase Neo4J before starting')
150 
151  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
152  , help='keep the CMA from going into the background')
153 
154  parser.add_option('-p', '--pidfile', action='store', default='/var/run/cma', dest='pidfile'
155  , metavar='pidfile-pathname'
156  , help='full pathname of where to locate our pid file')
157 
158  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
159  , help='Trace CMA execution')
160 
161 
162  opt = parser.parse_args()[0]
163 
164  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
165  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
166 
167 
168  if opt.status:
169  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
170  return rc
171 
172  if opt.kill:
173  if kill_pid_service(opt.pidfile, 15) < 0:
174  print >> sys.stderr, "Unable to stop CMA."
175  return 1
176  return 0
177 
178 
179  # This doesn't seem to work no matter where I invoke it...
180  # But if we don't fork in daemonize_me() ('C' code), it works great...
181 # def cleanup():
182 # remove_pid_file(opt.pidfile)
183 # atexit.register(cleanup)
184 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
185 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
186 
187  daemonize_me(opt.foreground, '/', opt.pidfile)
188 
189  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
190 
191  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
192  assimilation_openlog("cma")
193  from packetlistener import PacketListener
194  from messagedispatcher import MessageDispatcher
195  from dispatchtarget import DispatchTarget
196  from cmadb import CMAdb
197  from monitoring import MonitoringRule
198  from AssimCclasses import pyNetAddr, pySignFrame, pyConfigContext, pyReliableUDP, \
199  pyPacketDecoder
200  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
201  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_HBPORT, CONFIGNAME_OUTSIG, \
202  CONFIGNAME_DEADTIME, CONFIGNAME_WARNTIME, CONFIGNAME_HBTIME, CONFIGNAME_OUTSIG,\
203  proj_class_incr_debug, VERSION_STRING, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
204  for debug in range(opt.debug):
205  debug = debug
207 
208  # Input our monitoring rule templates
209  # They only exist in flat files and in memory - they aren't in the database
210  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
211  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
212 
213  execobserver_constraints = {
214  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
215  }
216  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
217  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
218 
219 
220  if opt.bind is None:
221  BindAddrStr = ('0.0.0.0:%d' % DefaultPort)
222  else:
223  BindAddrStr = opt.bind
224  OurAddrStr = opt.bind
225 
226  OurAddr = pyNetAddr(OurAddrStr)
227  BindAddr = pyNetAddr(BindAddrStr)
228  if OurAddr.port() == 0:
229  OurAddr.setport(DefaultPort)
230  OurPort = OurAddr.port()
231 
232 
233  configinit = {
234  CONFIGNAME_CMAINIT: BindAddr, # Initial listening (bind) address
235  CONFIGNAME_CMAADDR: OurAddr, # not sure what this one does...
236  CONFIGNAME_CMADISCOVER: OurAddr,# Discovery packets sent here
237  CONFIGNAME_CMAFAIL: OurAddr, # Failure packets sent here
238  CONFIGNAME_CMAPORT: OurPort,
239  CONFIGNAME_HBPORT: OurPort,
240  CONFIGNAME_OUTSIG: pySignFrame(1),
241  CONFIGNAME_DEADTIME: 10*1000000,
242  CONFIGNAME_WARNTIME: 3*1000000,
243  CONFIGNAME_HBTIME: 1*1000000,
244  CONFIGNAME_OUTSIG: pySignFrame(1),
245  }
246  config = pyConfigContext(init=configinit)
247  io = pyReliableUDP(config, pyPacketDecoder())
248  trycount = 0
249  while True:
250  try:
251  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
252  except RuntimeError:
253  print >> sys.stderr, 'TRYING AGAIN...'
254  trycount += 1
255  if trycount > 300:
256  remove_pid_file(opt.pidfile)
257  print >> sys.stderr, ('Neo4j still not started - giving up.')
258  CMAdb.log.critical('Neo4j still not started - giving up.')
259  raise SystemExit(1)
260  if (trycount % 60) == 1:
261  print >> sys.stderr, ('Waiting for Neo4j to start.')
262  CMAdb.log.warning('Waiting for Neo4j to start.')
263  # Let's try again in a second...
264  time.sleep(1)
265  continue
266  # Neo4j started. All is well with the world.
267  break
268 
269  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
270  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
271  if CMAdb.debug:
272  CMAdb.log.info('TheOneRing created - id = %s' % CMAdb.TheOneRing)
273  CMAdb.log.info('Config Object sent to nanoprobes: %s' % config)
274 
275  disp = MessageDispatcher(DispatchTarget.dispatchtable)
276  CMAdb.log.info('Starting CMA version %s - licensed under %s'
277  % (VERSION_STRING, LONG_LICENSE_STRING))
278  if opt.foreground:
279  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
280  % (VERSION_STRING, LONG_LICENSE_STRING))
281  # Important to note that we don't want PacketListener to create its own 'io' object
282  # or it will screw up the ReliableUDP protocol...
283  listener = PacketListener(config, disp, io=io)
284  for optional in optional_modules:
285  importlib.import_module(optional)
286  if opt.doTrace:
287  import trace
288  tracer = trace.Trace(count=False, trace=True)
289  if CMAdb.debug:
290  CMAdb.log.debug(
291  'Starting up traced listener.listen(); debug=%d' % opt.debug)
292  if opt.foreground:
293  print >> sys.stderr, (
294  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
295  tracer.run('listener.listen()')
296  else:
297  if CMAdb.debug:
298  CMAdb.log.debug(
299  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
300  if opt.foreground:
301  print >> sys.stderr, (
302  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
303  listener.listen()
304  return 0
305 
306 if __name__ == '__main__':
307  exitrc = main()
308  sys.exit(int(exitrc))