The Assimilation Project  based on Assimilation version 0.5.1427325140
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, traceback
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID, CRYPTKEYDIR, CMA_KEY_PREFIX
104 import AssimCtypes
105 from AssimCclasses import pyCompressFrame, pyCryptCurve25519, pyCryptFrame
106 from cmaconfig import ConfigFile
107 import importlib
108 #import atexit
109 import getent
110 import py2neo
111 
112 
113 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
114  , 'linkdiscovery'
115  , 'checksumdiscovery'
116  , 'monitoringdiscovery'
117  , 'arpdiscovery'
118  ]
119 #
120 # "Main" program starts below...
121 # It is a the real CMA intended to run with some real nanoprobes running
122 # somewhere out there...
123 #
124 # 912: Too many branches, 914: too many local variables, 915: too many statements
125 #pylint: disable=R0912,R0914,R0915
126 def main():
127  'Main program for the CMA (Collective Management Authority)'
128  DefaultPort = 1984
129  # This works around a bug in the glib library...
130  os.environ['G_SLICE'] = 'always-malloc'
131  # This works around a stupidity in the glib library...
132  os.environ['G_MESSAGES_DEBUG'] = 'all'
133  # VERY Linux-specific - but useful and apparently correct ;-)
134  PrimaryIPcmd = \
135  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
136  ipfd = os.popen(PrimaryIPcmd, 'r')
137  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
138  ipfd.close()
139 
140  parser = optparse.OptionParser(prog='CMA', version=AssimCtypes.VERSION_STRING,
141  description='Collective Management Authority for the Assimilation System',
142  usage='cma.py [--bind address:port]')
143 
144  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
145  , metavar='address:port-to-bind-to'
146  , help='Address:port to listen to - for nanoprobes to connect to')
147 
148  parser.add_option('-d', '--debug', action='store', default=0, dest='debug'
149  , help='enable debug for CMA and libraries - value is debug level for C libraries.')
150 
151  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
152  , help='Return status of running CMA')
153 
154  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
155  , help='Shut down running CMA.')
156 
157  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
158  , help='Erase Neo4J before starting')
159 
160  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
161  , help='keep the CMA from going into the background')
162 
163  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
164  , dest='pidfile', metavar='pidfile-pathname'
165  , help='full pathname of where to locate our pid file')
166 
167  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
168  , help='Trace CMA execution')
169 
170  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
171  , metavar='userid'
172  , help='userid to run the CMA as')
173 
174 
175  opt = parser.parse_args()[0]
176 
177  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
178  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
179 
180 
181  if opt.status:
182  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
183  return rc
184 
185  if opt.kill:
186  if kill_pid_service(opt.pidfile, 15) < 0:
187  print >> sys.stderr, "Unable to stop CMA."
188  return 1
189  return 0
190 
191  opt.debug = int(opt.debug)
192 
193  # This doesn't seem to work no matter where I invoke it...
194  # But if we don't fork in daemonize_me() ('C' code), it works great...
195 # def cleanup():
196 # remove_pid_file(opt.pidfile)
197 # atexit.register(cleanup)
198 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
199 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
200 
201  from cmadb import CMAdb
202  CMAdb.running_under_docker()
203  make_pid_dir(opt.pidfile, opt.userid)
204  make_key_dir(CRYPTKEYDIR, opt.userid)
205  cryptwarnings = pyCryptCurve25519.initkeys()
206  for warn in cryptwarnings:
207  print >> sys.stderr, ("WARNING: %s" % warn)
208  #print >> sys.stderr, 'All known key ids:'
209  keyids = pyCryptFrame.get_key_ids()
210  keyids.sort()
211  for keyid in keyids:
212  if not keyid.startswith(CMA_KEY_PREFIX):
213  try:
214  # @FIXME This is not an ideal way to associate identities with hosts
215  # in a multi-tenant environment
216  # @FIXME - don't think I need to do the associate_identity at all any more...
217  hostname, notused_post = keyid.split('@@', 1)
218  notused_post = notused_post
219  pyCryptFrame.associate_identity(hostname, keyid)
220  except ValueError:
221  pass
222  #print >> sys.stderr, '> %s/%s' % (keyid, pyCryptFrame.get_identity(keyid))
223 
224  daemonize_me(opt.foreground, '/', opt.pidfile, 20)
225 
226  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
227 
228 
229  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
230  assimilation_openlog("cma")
231  from packetlistener import PacketListener
232  from messagedispatcher import MessageDispatcher
233  from dispatchtarget import DispatchTarget
234  from monitoring import MonitoringRule
235  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
236  pyPacketDecoder
237  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
238  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
239  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
240  proj_class_incr_debug, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
241 
242 
243  if opt.debug:
244  print >> sys.stderr, ('Setting debug to %s' % opt.debug)
245 
246  for debug in range(opt.debug):
247  debug = debug
248  print >> sys.stderr, ('Incrementing C-level debug by one.')
250 
251  # Input our monitoring rule templates
252  # They only exist in flat files and in memory - they aren't in the database
253  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
254  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
255 
256  execobserver_constraints = {
257  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
258  }
259  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
260  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
261 
262 
263  if opt.bind is not None:
264  OurAddrStr = opt.bind
265 
266  OurAddr = pyNetAddr(OurAddrStr)
267  if OurAddr.port() == 0:
268  OurAddr.setport(DefaultPort)
269 
270  try:
271  configinfo = ConfigFile(filename=CMAINITFILE)
272  except IOError:
273  configinfo = ConfigFile()
274  if opt.bind is not None:
275  bindaddr = pyNetAddr(opt.bind)
276  if bindaddr.port() == 0:
277  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
278  configinfo[CONFIGNAME_CMAINIT] = bindaddr
279  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
280  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
281  configinfo[CONFIGNAME_CMAADDR] = OurAddr
282  if (CONFIGNAME_COMPRESSTYPE in configinfo):
283  configinfo[CONFIGNAME_COMPRESS] \
284  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
285  config = configinfo.complete_config()
286  config[CONFIGNAME_OUTSIG] = pySignFrame(1)
287 
288  addr = config[CONFIGNAME_CMAINIT]
289  if addr.port() == 0:
290  addr.setport(DefaultPort)
291  ourport = addr.port()
292  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
293  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
294  if elem in config:
295  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
296  io = pyReliableUDP(config, pyPacketDecoder())
297  io.setrcvbufsize(10*1024*1024) # No harm in asking - it will get us the best we can get...
298  io.setsendbufsize(1024*1024) # Most of the traffic volume is inbound from discovery
299  drop_privileges_permanently(opt.userid)
300  try:
301  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
302  except RuntimeError:
303  remove_pid_file(opt.pidfile)
304  raise
305  for warn in cryptwarnings:
306  CMAdb.log.warning(warn)
307 
308  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
309  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
310  CMAdb.log.info('Socket input buffer size: %d' % io.getrcvbufsize())
311  CMAdb.log.info('Socket output buffer size: %d' % io.getsendbufsize())
312  keyids = pyCryptFrame.get_key_ids()
313  keyids.sort()
314  for keyid in keyids:
315  CMAdb.log.info('KeyId %s Identity %s' % (keyid, pyCryptFrame.get_identity(keyid)))
316  if CMAdb.debug:
317  CMAdb.log.debug('C-library Debug was set to %s' % opt.debug)
318  CMAdb.log.debug('TheOneRing created - id = %s' % CMAdb.TheOneRing)
319  CMAdb.log.debug('Config Object sent to nanoprobes: %s' % config)
320 
321  jvmfd = os.popen('java -version 2>&1')
322  jvers = jvmfd.readline()
323  jvmfd.close()
324  disp = MessageDispatcher(DispatchTarget.dispatchtable)
325  CMAdb.log.info('Starting CMA version %s - licensed under %s'
326  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
327  CMAdb.log.info('Neo4j version %s // py2neo version %s // Python version %s // %s'
328  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
329  , str(py2neo.__version__)
330  , ('%s.%s.%s' % sys.version_info[0:3])
331  , jvers))
332  if opt.foreground:
333  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
334  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
335  print >> sys.stderr, ('Neo4j version %s // py2neo version %s // Python version %s // %s'
336  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
337  , str(py2neo.__version__)
338  , ('%s.%s.%s' % sys.version_info[0:3])
339  , jvers))
340  # Important to note that we don't want PacketListener to create its own 'io' object
341  # or it will screw up the ReliableUDP protocol...
342  listener = PacketListener(config, disp, io=io)
343  mandatory_modules = [ 'discoverylistener' ]
344  for mandatory in mandatory_modules:
345  importlib.import_module(mandatory)
346  for optional in config['optional_modules']:
347  importlib.import_module(optional)
348  if opt.doTrace:
349  import trace
350  tracer = trace.Trace(count=False, trace=True)
351  if CMAdb.debug:
352  CMAdb.log.debug(
353  'Starting up traced listener.listen(); debug=%d' % opt.debug)
354  if opt.foreground:
355  print >> sys.stderr, (
356  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
357  tracer.run('listener.listen()')
358  else:
359  if CMAdb.debug:
360  CMAdb.log.debug(
361  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
362  if opt.foreground:
363  print >> sys.stderr, (
364  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
365  listener.listen()
366  return 0
367 
369  '''Return the list of supplementary groups to which this member
370  would belong if they logged in as a tuple of (groupnamelist, gidlist)
371  '''
372  namelist=[]
373  gidlist=[]
374  for entry in getent.group():
375  if userid in entry.members:
376  namelist.append(entry.name)
377  gidlist.append(entry.gid)
378  return (namelist, gidlist)
379 
380 
382  '''
383  Drop our privileges permanently and run as the given user with
384  the privileges to which they would be entitled if they logged in.
385  That is, the uid, gid, and supplementary group list are all set correctly.
386  We are careful to make sure we have exactly the permissions we need
387  as 'userid'.
388  Either we need to be started as root or as 'userid' or this function
389  will fail and exit the program.
390  '''
391  userinfo = getent.passwd(userid)
392  if userinfo is None:
393  raise(OSError('Userid "%s" is unknown.' % userid))
394  #pylint is confused about the getent.passwd object
395  #pylint: disable=E1101
396  newuid = userinfo.uid
397  #pylint: disable=E1101
398  newgid = userinfo.gid
399  auxgroups = supplementary_groups_for_user(userid)[1]
400  # Need to set supplementary groups, then group id then user id in that order.
401  try:
402  os.setgroups(auxgroups)
403  os.setgid(newgid)
404  os.setuid(newuid)
405  except OSError:
406  # We let this fail if it wants to and catch it below.
407  # This allows this to work if we're already running as that user id...
408  pass
409  # Let's see if everything wound up as it should...
410  if (os.getuid() != newuid or os.geteuid() != newuid
411  or os.getgid() != newgid or os.getegid() != newgid):
412  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
413  % (userid, os.getuid(), os.getgid()))
414  # Checking groups is a little more complicated - order is potentially not preserved...
415  # This also allows for the case where there might be dups (which shouldn't happen?)
416  curgroups = os.getgroups()
417  for elem in auxgroups:
418  if elem not in curgroups:
419  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
420  for elem in curgroups:
421  # I don't think the default gid is supposed to be in the current group list...
422  # but it is in my tests... It should be harmless...
423  if elem not in auxgroups and elem != newgid:
424  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
425  # Hurray! Everything worked!
426 
427 def make_pid_dir(pidfile, userid):
428  'Make a suitable directory for the pidfile'
429  piddir = os.path.dirname(pidfile)
430  if os.path.isdir(piddir):
431  # Assume it's been set up suitably
432  return
433  os.mkdir(piddir, 0755)
434  userinfo = getent.passwd(userid)
435  if userinfo is None:
436  raise(OSError('Userid "%s" is unknown.' % userid))
437  # pylint doesn't understand about getent...
438  # pylint: disable=E1101
439  os.chown(piddir, userinfo.uid, userinfo.gid)
440 
441 def make_key_dir(keydir, userid):
442  'Make a suitable directory for us to store our keys in '
443  if os.path.isdir(keydir):
444  # Assume it's been set up suitably
445  return
446  os.mkdir(keydir, 0700)
447  userinfo = getent.passwd(userid)
448  if userinfo is None:
449  raise(OSError('Userid "%s" is unknown.' % userid))
450  # pylint doesn't understand about getent...
451  # pylint: disable=E1101
452  os.chown(keydir, userinfo.uid, userinfo.gid)
453 
454 def logger(msg):
455  'Log a message to syslog using logger'
456  os.system("logger -s '%s'" % msg)
457 
459  'Process an uncaught exception outside our event loop'
460  trace = sys.exc_info()[2]
461  tblist = traceback.extract_tb(trace, 20)
462  # Put our traceback into the logs in a legible way
463  logger('Got an exception in Main [%s]' % str(ex))
464  logger('======== Begin Main Exception Traceback ========')
465  for tb in tblist:
466  (filename, line, funcname, text) = tb
467  filename = os.path.basename(filename)
468  logger('%s.%s:%s: %s'% (filename, line, funcname, text))
469  logger('======== End Main Exception Traceback ========')
470 
471 if __name__ == '__main__':
472  pyversion = sys.version_info
473  if pyversion[0] != 2 or pyversion[1] < 7:
474  raise RuntimeError('Must be run using python 2.x where x >= 7')
475  exitrc = 1
476  # W0703 == Too general exception catching...
477  # pylint: disable=W0703
478  try:
479  exitrc = main()
480  except Exception as e:
482 
483  sys.exit(int(exitrc))
def process_main_exception
Definition: cma.py:458
def make_key_dir
Definition: cma.py:441
def logger
Definition: cma.py:454
def make_pid_dir
Definition: cma.py:427
def drop_privileges_permanently
Definition: cma.py:381
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:197
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:475
def supplementary_groups_for_user
Definition: cma.py:368
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:525
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:498
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:466
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:303
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile, int minclosefd)
Function to get system name (uname -n in UNIX terms)
Definition: misc.c:91
def main
Definition: cma.py:126