The Assimilation Project  based on Assimilation version 1.1.2.1454953989
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 SUPPORTED_PYTHON_VERSIONS = ('2.7',)
98 SUPPORTED_PY2NEO_VERSIONS = (2,)
99 SUPPORTED_NEO4J_VERSIONS = (2,)
100 import sys
101 PYTHON_VERSION = ('%s.%s' % sys.version_info[0:2])
102 if PYTHON_VERSION not in SUPPORTED_PYTHON_VERSIONS:
103  raise EnvironmentError('Python Version %s not supported' % PYTHON_VERSION)
104 
105 import os, signal
106 import optparse, traceback
107 import cmainit
108 from assimeventobserver import ForkExecObserver
109 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID, CRYPTKEYDIR, CMA_KEY_PREFIX
110 import AssimCtypes
111 from AssimCclasses import pyCompressFrame, pyCryptCurve25519, pyCryptFrame
112 from cmaconfig import ConfigFile
113 from bestpractices import BestPractices
114 import importlib
115 #import atexit
116 import getent
117 import py2neo
118 
119 
120 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
121  , 'linkdiscovery'
122  , 'checksumdiscovery'
123  , 'monitoringdiscovery'
124  , 'arpdiscovery'
125  ]
126 PY2NEO_VERSION = py2neo.__version__
127 #
128 # "Main" program starts below...
129 # It is a the real CMA intended to run with some real nanoprobes running
130 # somewhere out there...
131 #
132 # 912: Too many branches, 914: too many local variables, 915: too many statements
133 #pylint: disable=R0912,R0914,R0915
134 def main():
135  'Main program for the CMA (Collective Management Authority)'
136  py2neo_major_version = int(PY2NEO_VERSION.partition('.')[0])
137  if py2neo_major_version not in SUPPORTED_PY2NEO_VERSIONS:
138  raise EnvironmentError('py2neo version %s not supported' % PY2NEO_VERSION)
139  DefaultPort = 1984
140  # This works around a bug in the glib library...
141  os.environ['G_SLICE'] = 'always-malloc'
142  # This works around a stupidity in the glib library...
143  os.environ['G_MESSAGES_DEBUG'] = 'all'
144  # VERY Linux-specific - but useful and apparently correct ;-)
145  PrimaryIPcmd = \
146  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
147  ipfd = os.popen(PrimaryIPcmd, 'r')
148  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
149  ipfd.close()
150 
151  parser = optparse.OptionParser(prog='CMA', version=AssimCtypes.VERSION_STRING,
152  description='Collective Management Authority for the Assimilation System',
153  usage='cma.py [--bind address:port]')
154 
155  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
156  , metavar='address:port-to-bind-to'
157  , help='Address:port to listen to - for nanoprobes to connect to')
158 
159  parser.add_option('-d', '--debug', action='store', default=0, dest='debug'
160  , help='enable debug for CMA and libraries - value is debug level for C libraries.')
161 
162  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
163  , help='Return status of running CMA')
164 
165  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
166  , help='Shut down running CMA.')
167 
168  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
169  , help='Erase Neo4J before starting')
170 
171  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
172  , help='keep the CMA from going into the background')
173 
174  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
175  , dest='pidfile', metavar='pidfile-pathname'
176  , help='full pathname of where to locate our pid file')
177 
178  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
179  , help='Trace CMA execution')
180 
181  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
182  , metavar='userid'
183  , help='userid to run the CMA as')
184 
185 
186  opt = parser.parse_args()[0]
187 
188  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
189  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
190 
191 
192  if opt.status:
193  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
194  return rc
195 
196  if opt.kill:
197  if kill_pid_service(opt.pidfile, 15) < 0:
198  print >> sys.stderr, "Unable to stop CMA."
199  return 1
200  return 0
201 
202  opt.debug = int(opt.debug)
203 
204  # This doesn't seem to work no matter where I invoke it...
205  # But if we don't fork in daemonize_me() ('C' code), it works great...
206 # def cleanup():
207 # remove_pid_file(opt.pidfile)
208 # atexit.register(cleanup)
209 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
210 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
211 
212  from cmadb import CMAdb
213  CMAdb.running_under_docker()
214  make_pid_dir(opt.pidfile, opt.userid)
215  make_key_dir(CRYPTKEYDIR, opt.userid)
216  cryptwarnings = pyCryptCurve25519.initkeys()
217  for warn in cryptwarnings:
218  print >> sys.stderr, ("WARNING: %s" % warn)
219  #print >> sys.stderr, 'All known key ids:'
220  keyids = pyCryptFrame.get_key_ids()
221  keyids.sort()
222  for keyid in keyids:
223  if not keyid.startswith(CMA_KEY_PREFIX):
224  try:
225  # @FIXME This is not an ideal way to associate identities with hosts
226  # in a multi-tenant environment
227  # @FIXME - don't think I need to do the associate_identity at all any more...
228  hostname, notused_post = keyid.split('@@', 1)
229  notused_post = notused_post
230  pyCryptFrame.associate_identity(hostname, keyid)
231  except ValueError:
232  pass
233  #print >> sys.stderr, '> %s/%s' % (keyid, pyCryptFrame.get_identity(keyid))
234 
235  daemonize_me(opt.foreground, '/', opt.pidfile, 20)
236 
237  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
238 
239 
240  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
241  assimilation_openlog("cma")
242  from packetlistener import PacketListener
243  from messagedispatcher import MessageDispatcher
244  from dispatchtarget import DispatchTarget
245  from monitoring import MonitoringRule
246  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
247  pyPacketDecoder
248  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
249  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
250  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
251  proj_class_incr_debug, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
252 
253 
254  if opt.debug:
255  print >> sys.stderr, ('Setting debug to %s' % opt.debug)
256 
257  for debug in range(opt.debug):
258  debug = debug
259  print >> sys.stderr, ('Incrementing C-level debug by one.')
261 
262  # Input our monitoring rule templates
263  # They only exist in flat files and in memory - they aren't in the database
264  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
265  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
266 
267  execobserver_constraints = {
268  'nodetype': ['Drone',
269  'IPaddrNode',
270  'MonitorAction',
271  'NICNode',
272  'ProcessNode',
273  'SystemNode',
274  ]
275  }
276  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
277  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
278 
279 
280  if opt.bind is not None:
281  OurAddrStr = opt.bind
282 
283  OurAddr = pyNetAddr(OurAddrStr)
284  if OurAddr.port() == 0:
285  OurAddr.setport(DefaultPort)
286 
287  try:
288  configinfo = ConfigFile(filename=CMAINITFILE)
289  except IOError:
290  configinfo = ConfigFile()
291  if opt.bind is not None:
292  bindaddr = pyNetAddr(opt.bind)
293  if bindaddr.port() == 0:
294  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
295  configinfo[CONFIGNAME_CMAINIT] = bindaddr
296  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
297  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
298  configinfo[CONFIGNAME_CMAADDR] = OurAddr
299  if (CONFIGNAME_COMPRESSTYPE in configinfo):
300  configinfo[CONFIGNAME_COMPRESS] \
301  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
302  configinfo[CONFIGNAME_OUTSIG] = pySignFrame(1)
303  config = configinfo.complete_config()
304 
305  addr = config[CONFIGNAME_CMAINIT]
306  if addr.port() == 0:
307  addr.setport(DefaultPort)
308  ourport = addr.port()
309  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
310  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
311  if elem in config:
312  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
313  io = pyReliableUDP(config, pyPacketDecoder())
314  io.setrcvbufsize(10*1024*1024) # No harm in asking - it will get us the best we can get...
315  io.setsendbufsize(1024*1024) # Most of the traffic volume is inbound from discovery
316  drop_privileges_permanently(opt.userid)
317  try:
318  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
319  except RuntimeError:
320  remove_pid_file(opt.pidfile)
321  raise
322  for warn in cryptwarnings:
323  CMAdb.log.warning(warn)
324  if CMAdb.cdb.db.neo4j_version[0] not in SUPPORTED_NEO4J_VERSIONS:
325  raise EnvironmentError('Neo4j version %s.%s.%s not supported'
326  % CMAdb.cdb.db.neo4j_version)
327  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
328  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
329  CMAdb.log.info('Socket input buffer size: %d' % io.getrcvbufsize())
330  CMAdb.log.info('Socket output buffer size: %d' % io.getsendbufsize())
331  keyids = pyCryptFrame.get_key_ids()
332  keyids.sort()
333  for keyid in keyids:
334  CMAdb.log.info('KeyId %s Identity %s' % (keyid, pyCryptFrame.get_identity(keyid)))
335  if CMAdb.debug:
336  CMAdb.log.debug('C-library Debug was set to %s' % opt.debug)
337  CMAdb.log.debug('TheOneRing created - id = %s' % CMAdb.TheOneRing)
338  CMAdb.log.debug('Config Object sent to nanoprobes: %s' % config)
339 
340  jvmfd = os.popen('java -version 2>&1')
341  jvers = jvmfd.readline()
342  jvmfd.close()
343  disp = MessageDispatcher(DispatchTarget.dispatchtable)
344  neovers = CMAdb.cdb.db.neo4j_version
345  neoversstring = (('%s.%s.%s'if len(neovers) == 3 else '%s.%s.%s%s')
346  % neovers[0:3])
347 
348  CMAdb.log.info('Starting CMA version %s - licensed under %s'
349  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
350  CMAdb.log.info('Neo4j version %s // py2neo version %s // Python version %s // %s'
351  % (('%s.%s.%s' % CMAdb.cdb.db.neo4j_version)
352  , str(py2neo.__version__)
353  , ('%s.%s.%s' % sys.version_info[0:3])
354  , jvers))
355  if opt.foreground:
356  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
357  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
358  print >> sys.stderr, ('Neo4j version %s // py2neo version %s // Python version %s // %s'
359  % ( neoversstring
360  , PY2NEO_VERSION
361  , ('%s.%s.%s' % sys.version_info[0:3])
362  , jvers))
363  if len(neovers) > 3:
364  CMAdb.log.warning('Neo4j version %s is beta code - results not guaranteed.'
365  % str(neovers))
366 
367  # Important to note that we don't want PacketListener to create its own 'io' object
368  # or it will screw up the ReliableUDP protocol...
369  listener = PacketListener(config, disp, io=io)
370  mandatory_modules = [ 'discoverylistener' ]
371  for mandatory in mandatory_modules:
372  importlib.import_module(mandatory)
373  for optional in config['optional_modules']:
374  importlib.import_module(optional)
375  if opt.doTrace:
376  import trace
377  tracer = trace.Trace(count=False, trace=True)
378  if CMAdb.debug:
379  CMAdb.log.debug(
380  'Starting up traced listener.listen(); debug=%d' % opt.debug)
381  if opt.foreground:
382  print >> sys.stderr, (
383  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
384  tracer.run('listener.listen()')
385  else:
386  if CMAdb.debug:
387  CMAdb.log.debug(
388  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
389  if opt.foreground:
390  print >> sys.stderr, (
391  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
392 
393  # This is kind of a kludge, we should really look again at
394  # at initializition and so on.
395  # This module *ought* to be optional.
396  # that would involve adding some Drone callbacks for creation of new Drones
397  BestPractices(config, io, CMAdb.store, CMAdb.log, opt.debug)
398  listener.listen()
399  return 0
400 
402  '''Return the list of supplementary groups to which this member
403  would belong if they logged in as a tuple of (groupnamelist, gidlist)
404  '''
405  namelist=[]
406  gidlist=[]
407  for entry in getent.group():
408  if userid in entry.members:
409  namelist.append(entry.name)
410  gidlist.append(entry.gid)
411  return (namelist, gidlist)
412 
413 
415  '''
416  Drop our privileges permanently and run as the given user with
417  the privileges to which they would be entitled if they logged in.
418  That is, the uid, gid, and supplementary group list are all set correctly.
419  We are careful to make sure we have exactly the permissions we need
420  as 'userid'.
421  Either we need to be started as root or as 'userid' or this function
422  will fail and exit the program.
423  '''
424  userinfo = getent.passwd(userid)
425  if userinfo is None:
426  raise(OSError('Userid "%s" is unknown.' % userid))
427  #pylint is confused about the getent.passwd object
428  #pylint: disable=E1101
429  newuid = userinfo.uid
430  #pylint: disable=E1101
431  newgid = userinfo.gid
432  auxgroups = supplementary_groups_for_user(userid)[1]
433  # Need to set supplementary groups, then group id then user id in that order.
434  try:
435  os.setgroups(auxgroups)
436  os.setgid(newgid)
437  os.setuid(newuid)
438  except OSError:
439  # We let this fail if it wants to and catch it below.
440  # This allows this to work if we're already running as that user id...
441  pass
442  # Let's see if everything wound up as it should...
443  if (os.getuid() != newuid or os.geteuid() != newuid
444  or os.getgid() != newgid or os.getegid() != newgid):
445  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
446  % (userid, os.getuid(), os.getgid()))
447  # Checking groups is a little more complicated - order is potentially not preserved...
448  # This also allows for the case where there might be dups (which shouldn't happen?)
449  curgroups = os.getgroups()
450  for elem in auxgroups:
451  if elem not in curgroups:
452  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
453  for elem in curgroups:
454  # I don't think the default gid is supposed to be in the current group list...
455  # but it is in my tests... It should be harmless...
456  if elem not in auxgroups and elem != newgid:
457  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
458  # Hurray! Everything worked!
459 
460 def make_pid_dir(pidfile, userid):
461  'Make a suitable directory for the pidfile'
462  piddir = os.path.dirname(pidfile)
463  if os.path.isdir(piddir):
464  # Assume it's been set up suitably
465  return
466  os.mkdir(piddir, 0755)
467  userinfo = getent.passwd(userid)
468  if userinfo is None:
469  raise(OSError('Userid "%s" is unknown.' % userid))
470  # pylint doesn't understand about getent...
471  # pylint: disable=E1101
472  os.chown(piddir, userinfo.uid, userinfo.gid)
473 
474 def make_key_dir(keydir, userid):
475  'Make a suitable directory for us to store our keys in '
476  if os.path.isdir(keydir):
477  # Assume it's been set up suitably
478  return
479  os.mkdir(keydir, 0700)
480  userinfo = getent.passwd(userid)
481  if userinfo is None:
482  raise(OSError('Userid "%s" is unknown.' % userid))
483  # pylint doesn't understand about getent...
484  # pylint: disable=E1101
485  os.chown(keydir, userinfo.uid, userinfo.gid)
486 
487 def logger(msg):
488  'Log a message to syslog using logger'
489  os.system("logger -s '%s'" % msg)
490 
492  'Process an uncaught exception outside our event loop'
493  trace = sys.exc_info()[2]
494  tblist = traceback.extract_tb(trace, 20)
495  # Put our traceback into the logs in a legible way
496  logger('Got an exception in Main [%s]' % str(ex))
497  logger('======== Begin Main Exception Traceback ========')
498  for tb in tblist:
499  (filename, line, funcname, text) = tb
500  filename = os.path.basename(filename)
501  logger('%s.%s:%s: %s'% (filename, line, funcname, text))
502  logger('======== End Main Exception Traceback ========')
503 
504 if __name__ == '__main__':
505  pyversion = sys.version_info
506  if pyversion[0] != 2 or pyversion[1] < 7:
507  raise RuntimeError('Must be run using python 2.x where x >= 7')
508  exitrc = 1
509  # W0703 == Too general exception catching...
510  # pylint: disable=W0703
511  try:
512  exitrc = main()
513  except Exception as e:
515 
516  sys.exit(int(exitrc))
def logger(msg)
Definition: cma.py:487
def make_pid_dir(pidfile, userid)
Definition: cma.py:460
def make_key_dir(keydir, userid)
Definition: cma.py:474
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:211
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:489
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:539
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:512
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:480
def main()
Definition: cma.py:134
def process_main_exception(ex)
Definition: cma.py:491
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:317
def supplementary_groups_for_user(userid)
Definition: cma.py:401
def drop_privileges_permanently(userid)
Definition: cma.py:414
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile, int minclosefd)
Function to get system name (uname -n in UNIX terms) - and return in lower case
Definition: misc.c:105