The Assimilation Project  based on Assimilation version 1.1.7.1474836767
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 import os
98 # This works around a bug in the glib library...
99 os.environ['G_SLICE'] = 'always-malloc'
100 # This works around a stupidity in the glib library...
101 os.environ['G_MESSAGES_DEBUG'] = 'all'
102 # The environment assignments above *must* come before the imports below.
103 # It *might* be sufficient to put them before AssimCtypes, but that would also make pylint bitch...
104 # pylint: disable=C0413
105 import sys
106 import signal
107 import optparse, traceback
108 import importlib
109 #import atexit
110 import getent
111 import py2neo
112 import cmainit
113 from assimeventobserver import ForkExecObserver
114 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID, CRYPTKEYDIR, CMA_KEY_PREFIX
115 import AssimCtypes
116 from AssimCclasses import pyCompressFrame, pyCryptCurve25519, pyCryptFrame
117 from cmaconfig import ConfigFile
118 from bestpractices import BestPractices
119 
120 SUPPORTED_PYTHON_VERSIONS = ('2.7',)
121 SUPPORTED_PY2NEO_VERSIONS = (2,)
122 SUPPORTED_NEO4J_VERSIONS = (2,3)
123 
124 PYTHON_VERSION = ('%s.%s' % sys.version_info[0:2])
125 if PYTHON_VERSION not in SUPPORTED_PYTHON_VERSIONS:
126  raise EnvironmentError('Python Version %s not supported' % PYTHON_VERSION)
127 
128 
129 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
130  , 'linkdiscovery'
131  , 'checksumdiscovery'
132  , 'monitoringdiscovery'
133  , 'arpdiscovery'
134  ]
135 PY2NEO_VERSION = py2neo.__version__
136 #
137 # "Main" program starts below...
138 # It is a the real CMA intended to run with some real nanoprobes running
139 # somewhere out there...
140 #
141 # 912: Too many branches, 914: too many local variables, 915: too many statements
142 #pylint: disable=R0912,R0914,R0915
143 def main():
144  'Main program for the CMA (Collective Management Authority)'
145  py2neo_major_version = int(PY2NEO_VERSION.partition('.')[0])
146  if py2neo_major_version not in SUPPORTED_PY2NEO_VERSIONS:
147  raise EnvironmentError('py2neo version %s not supported' % PY2NEO_VERSION)
148  DefaultPort = 1984
149  # VERY Linux-specific - but useful and apparently correct ;-)
150  PrimaryIPcmd = \
151  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
152  ipfd = os.popen(PrimaryIPcmd, 'r')
153  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
154  ipfd.close()
155 
156  parser = optparse.OptionParser(prog='CMA', version=AssimCtypes.VERSION_STRING,
157  description='Collective Management Authority for the Assimilation System',
158  usage='cma.py [--bind address:port]')
159 
160  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
161  , metavar='address:port-to-bind-to'
162  , help='Address:port to listen to - for nanoprobes to connect to')
163 
164  parser.add_option('-d', '--debug', action='store', default=0, dest='debug'
165  , help='enable debug for CMA and libraries - value is debug level for C libraries.')
166 
167  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
168  , help='Return status of running CMA')
169 
170  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
171  , help='Shut down running CMA.')
172 
173  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
174  , help='Erase Neo4J before starting')
175 
176  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
177  , help='keep the CMA from going into the background')
178 
179  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
180  , dest='pidfile', metavar='pidfile-pathname'
181  , help='full pathname of where to locate our pid file')
182 
183  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
184  , help='Trace CMA execution')
185 
186  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
187  , metavar='userid'
188  , help='userid to run the CMA as')
189 
190 
191  opt = parser.parse_args()[0]
192 
193  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
194  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
195 
196 
197  if opt.status:
198  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
199  return rc
200 
201  if opt.kill:
202  if kill_pid_service(opt.pidfile, 15) < 0:
203  print >> sys.stderr, "Unable to stop CMA."
204  return 1
205  return 0
206 
207  opt.debug = int(opt.debug)
208 
209  # This doesn't seem to work no matter where I invoke it...
210  # But if we don't fork in daemonize_me() ('C' code), it works great...
211 # def cleanup():
212 # remove_pid_file(opt.pidfile)
213 # atexit.register(cleanup)
214 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
215 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
216 
217  from cmadb import CMAdb
218  CMAdb.running_under_docker()
219  make_pid_dir(opt.pidfile, opt.userid)
220  make_key_dir(CRYPTKEYDIR, opt.userid)
221  cryptwarnings = pyCryptCurve25519.initkeys()
222  for warn in cryptwarnings:
223  print >> sys.stderr, ("WARNING: %s" % warn)
224  #print >> sys.stderr, 'All known key ids:'
225  keyids = pyCryptFrame.get_key_ids()
226  keyids.sort()
227  for keyid in keyids:
228  if not keyid.startswith(CMA_KEY_PREFIX):
229  try:
230  # @FIXME This is not an ideal way to associate identities with hosts
231  # in a multi-tenant environment
232  # @FIXME - don't think I need to do the associate_identity at all any more...
233  hostname, notused_post = keyid.split('@@', 1)
234  notused_post = notused_post
235  pyCryptFrame.associate_identity(hostname, keyid)
236  except ValueError:
237  pass
238  #print >> sys.stderr, '> %s/%s' % (keyid, pyCryptFrame.get_identity(keyid))
239 
240  daemonize_me(opt.foreground, '/', opt.pidfile, 20)
241 
242  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
243 
244 
245  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
246  assimilation_openlog("cma")
247  from packetlistener import PacketListener
248  from messagedispatcher import MessageDispatcher
249  from dispatchtarget import DispatchTarget
250  from monitoring import MonitoringRule
251  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
252  pyPacketDecoder
253  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
254  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
255  CONFIGNAME_COMPRESS, proj_class_incr_debug, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
256 
257 
258  if opt.debug:
259  print >> sys.stderr, ('Setting debug to %s' % opt.debug)
260 
261  for debug in range(opt.debug):
262  debug = debug
263  print >> sys.stderr, ('Incrementing C-level debug by one.')
265 
266  # Input our monitoring rule templates
267  # They only exist in flat files and in memory - they aren't in the database
268  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
269  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
270 
271  execobserver_constraints = {
272  'nodetype': ['Drone',
273  'IPaddrNode',
274  'MonitorAction',
275  'NICNode',
276  'ProcessNode',
277  'SystemNode',
278  ]
279  }
280  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
281  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
282 
283 
284  if opt.bind is not None:
285  OurAddrStr = opt.bind
286 
287  OurAddr = pyNetAddr(OurAddrStr)
288  if OurAddr.port() == 0:
289  OurAddr.setport(DefaultPort)
290 
291  try:
292  configinfo = ConfigFile(filename=CMAINITFILE)
293  except IOError:
294  configinfo = ConfigFile()
295  if opt.bind is not None:
296  bindaddr = pyNetAddr(opt.bind)
297  if bindaddr.port() == 0:
298  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
299  configinfo[CONFIGNAME_CMAINIT] = bindaddr
300  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
301  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
302  configinfo[CONFIGNAME_CMAADDR] = OurAddr
303  if (CONFIGNAME_COMPRESSTYPE in configinfo):
304  configinfo[CONFIGNAME_COMPRESS] \
305  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
306  configinfo[CONFIGNAME_OUTSIG] = pySignFrame(1)
307  config = configinfo.complete_config()
308 
309  addr = config[CONFIGNAME_CMAINIT]
310  # pylint is confused: addr is a pyNetAddr, not a pyConfigContext
311  # pylint: disable=E1101
312  if addr.port() == 0:
313  addr.setport(DefaultPort)
314  ourport = addr.port()
315  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
316  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
317  if elem in config:
318  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
319  io = pyReliableUDP(config, pyPacketDecoder())
320  io.setrcvbufsize(10*1024*1024) # No harm in asking - it will get us the best we can get...
321  io.setsendbufsize(1024*1024) # Most of the traffic volume is inbound from discovery
322  drop_privileges_permanently(opt.userid)
323  try:
324  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
325  except RuntimeError:
326  remove_pid_file(opt.pidfile)
327  raise
328  for warn in cryptwarnings:
329  CMAdb.log.warning(warn)
330  if CMAdb.cdb.db.neo4j_version[0] not in SUPPORTED_NEO4J_VERSIONS:
331  raise EnvironmentError('Neo4j version %s.%s.%s not supported'
332  % CMAdb.cdb.db.neo4j_version)
333  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
334  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
335  CMAdb.log.info('Socket input buffer size: %d' % io.getrcvbufsize())
336  CMAdb.log.info('Socket output buffer size: %d' % io.getsendbufsize())
337  keyids = pyCryptFrame.get_key_ids()
338  keyids.sort()
339  for keyid in keyids:
340  CMAdb.log.info('KeyId %s Identity %s' % (keyid, pyCryptFrame.get_identity(keyid)))
341  if CMAdb.debug:
342  CMAdb.log.debug('C-library Debug was set to %s' % opt.debug)
343  CMAdb.log.debug('TheOneRing created - id = %s' % CMAdb.TheOneRing)
344  CMAdb.log.debug('Config Object sent to nanoprobes: %s' % config)
345 
346  jvmfd = os.popen('java -version 2>&1')
347  jvers = jvmfd.readline()
348  jvmfd.close()
349  disp = MessageDispatcher(DispatchTarget.dispatchtable)
350  neovers = CMAdb.cdb.db.neo4j_version
351  neoversstring = (('%s.%s.%s'if len(neovers) == 3 else '%s.%s.%s%s')
352  % neovers[0:3])
353 
354  CMAdb.log.info('Starting CMA version %s - licensed under %s'
355  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
356  CMAdb.log.info('Neo4j version %s // py2neo version %s // Python version %s // %s'
357  % (('%s.%s.%s' % CMAdb.cdb.db.neo4j_version)
358  , str(py2neo.__version__)
359  , ('%s.%s.%s' % sys.version_info[0:3])
360  , jvers))
361  if opt.foreground:
362  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
363  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
364  print >> sys.stderr, ('Neo4j version %s // py2neo version %s // Python version %s // %s'
365  % ( neoversstring
366  , PY2NEO_VERSION
367  , ('%s.%s.%s' % sys.version_info[0:3])
368  , jvers))
369  if len(neovers) > 3:
370  CMAdb.log.warning('Neo4j version %s is beta code - results not guaranteed.'
371  % str(neovers))
372 
373  # Important to note that we don't want PacketListener to create its own 'io' object
374  # or it will screw up the ReliableUDP protocol...
375  listener = PacketListener(config, disp, io=io)
376  mandatory_modules = [ 'discoverylistener' ]
377  for mandatory in mandatory_modules:
378  importlib.import_module(mandatory)
379  #pylint is confused here...
380  # pylint: disable=E1133
381  for optional in config['optional_modules']:
382  importlib.import_module(optional)
383  if opt.doTrace:
384  import trace
385  tracer = trace.Trace(count=False, trace=True)
386  if CMAdb.debug:
387  CMAdb.log.debug(
388  'Starting up traced listener.listen(); debug=%d' % opt.debug)
389  if opt.foreground:
390  print >> sys.stderr, (
391  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
392  tracer.run('listener.listen()')
393  else:
394  if CMAdb.debug:
395  CMAdb.log.debug(
396  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
397  if opt.foreground:
398  print >> sys.stderr, (
399  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
400 
401  # This is kind of a kludge, we should really look again at
402  # at initializition and so on.
403  # This module *ought* to be optional.
404  # that would involve adding some Drone callbacks for creation of new Drones
405  BestPractices(config, io, CMAdb.store, CMAdb.log, opt.debug)
406  listener.listen()
407  return 0
408 
410  '''Return the list of supplementary groups to which this member
411  would belong if they logged in as a tuple of (groupnamelist, gidlist)
412  '''
413  namelist=[]
414  gidlist=[]
415  for entry in getent.group():
416  if userid in entry.members:
417  namelist.append(entry.name)
418  gidlist.append(entry.gid)
419  return (namelist, gidlist)
420 
421 
423  '''
424  Drop our privileges permanently and run as the given user with
425  the privileges to which they would be entitled if they logged in.
426  That is, the uid, gid, and supplementary group list are all set correctly.
427  We are careful to make sure we have exactly the permissions we need
428  as 'userid'.
429  Either we need to be started as root or as 'userid' or this function
430  will fail and exit the program.
431  '''
432  userinfo = getent.passwd(userid)
433  if userinfo is None:
434  raise(OSError('Userid "%s" is unknown.' % userid))
435  #pylint is confused about the getent.passwd object
436  #pylint: disable=E1101
437  newuid = userinfo.uid
438  #pylint: disable=E1101
439  newgid = userinfo.gid
440  auxgroups = supplementary_groups_for_user(userid)[1]
441  # Need to set supplementary groups, then group id then user id in that order.
442  try:
443  os.setgroups(auxgroups)
444  os.setgid(newgid)
445  os.setuid(newuid)
446  except OSError:
447  # We let this fail if it wants to and catch it below.
448  # This allows this to work if we're already running as that user id...
449  pass
450  # Let's see if everything wound up as it should...
451  if (os.getuid() != newuid or os.geteuid() != newuid
452  or os.getgid() != newgid or os.getegid() != newgid):
453  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
454  % (userid, os.getuid(), os.getgid()))
455  # Checking groups is a little more complicated - order is potentially not preserved...
456  # This also allows for the case where there might be dups (which shouldn't happen?)
457  curgroups = os.getgroups()
458  for elem in auxgroups:
459  if elem not in curgroups:
460  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
461  for elem in curgroups:
462  # I don't think the default gid is supposed to be in the current group list...
463  # but it is in my tests... It should be harmless...
464  if elem not in auxgroups and elem != newgid:
465  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
466  # Hurray! Everything worked!
467 
468 def make_pid_dir(pidfile, userid):
469  'Make a suitable directory for the pidfile'
470  piddir = os.path.dirname(pidfile)
471  if os.path.isdir(piddir):
472  # Assume it's been set up suitably
473  return
474  os.mkdir(piddir, 0755)
475  userinfo = getent.passwd(userid)
476  if userinfo is None:
477  raise(OSError('Userid "%s" is unknown.' % userid))
478  # pylint doesn't understand about getent...
479  # pylint: disable=E1101
480  os.chown(piddir, userinfo.uid, userinfo.gid)
481 
482 def make_key_dir(keydir, userid):
483  'Make a suitable directory for us to store our keys in '
484  if os.path.isdir(keydir):
485  # Assume it's been set up suitably
486  return
487  os.mkdir(keydir, 0700)
488  userinfo = getent.passwd(userid)
489  if userinfo is None:
490  raise(OSError('Userid "%s" is unknown.' % userid))
491  # pylint doesn't understand about getent...
492  # pylint: disable=E1101
493  os.chown(keydir, userinfo.uid, userinfo.gid)
494 
495 def logger(msg):
496  'Log a message to syslog using logger'
497  os.system("logger -s '%s'" % msg)
498 
500  'Process an uncaught exception outside our event loop'
501  trace = sys.exc_info()[2]
502  tblist = traceback.extract_tb(trace, 20)
503  # Put our traceback into the logs in a legible way
504  logger('Got an exception in Main [%s]' % str(ex))
505  logger('======== Begin Main Exception Traceback ========')
506  for tb in tblist:
507  (filename, line, funcname, text) = tb
508  filename = os.path.basename(filename)
509  logger('%s.%s:%s: %s'% (filename, line, funcname, text))
510  logger('======== End Main Exception Traceback ========')
511 
512 if __name__ == '__main__':
513  pyversion = sys.version_info
514  if pyversion[0] != 2 or pyversion[1] < 7:
515  raise RuntimeError('Must be run using python 2.x where x >= 7')
516  exitrc = 1
517  # W0703 == Too general exception catching...
518  # pylint: disable=W0703
519  try:
520  exitrc = main()
521  except Exception as e:
523 
524  sys.exit(int(exitrc))
def logger(msg)
Definition: cma.py:495
def make_pid_dir(pidfile, userid)
Definition: cma.py:468
def make_key_dir(keydir, userid)
Definition: cma.py:482
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:225
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:503
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:553
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *origpidfile, int minclosefd)
Function to get system name (uname -n in UNIX terms) - and return in lower case
Definition: misc.c:106
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:526
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:494
def main()
Definition: cma.py:143
def process_main_exception(ex)
Definition: cma.py:499
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:331
def supplementary_groups_for_user(userid)
Definition: cma.py:409
def drop_privileges_permanently(userid)
Definition: cma.py:422