The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, time
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID
104 import AssimCtypes
105 from AssimCclasses import pyCompressFrame, pyCryptCurve25519
106 from cmaconfig import ConfigFile
107 import importlib
108 #import atexit
109 import getent
110 import py2neo
111 
112 
113 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
114  , 'linkdiscovery'
115  , 'checksumdiscovery'
116  , 'monitoringdiscovery'
117  , 'arpdiscovery'
118  ]
119 #
120 # "Main" program starts below...
121 # It is a the real CMA intended to run with some real nanoprobes running
122 # somewhere out there...
123 #
124 # 912: Too many branches, 914: too many local variables, 915: too many statements
125 #pylint: disable=R0912,R0914,R0915
126 def main():
127  'Main program for the CMA (Collective Management Authority)'
128  DefaultPort = 1984
129  # This works around a bug in the glib library...
130  os.environ['G_SLICE'] = 'always-malloc'
131  # This works around a stupidity in the glib library...
132  os.environ['G_MESSAGES_DEBUG'] = 'all'
133  # VERY Linux-specific - but useful and apparently correct ;-)
134  PrimaryIPcmd = \
135  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
136  ipfd = os.popen(PrimaryIPcmd, 'r')
137  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
138  ipfd.close()
139 
140  parser = optparse.OptionParser(prog='CMA', version=AssimCtypes.VERSION_STRING,
141  description='Collective Management Authority for the Assimilation System',
142  usage='cma.py [--bind address:port]')
143 
144  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
145  , metavar='address:port-to-bind-to'
146  , help='Address:port to listen to - for nanoprobes to connect to')
147 
148  parser.add_option('-d', '--debug', action='store', default=0, dest='debug'
149  , help='enable debug for CMA and libraries - value is debug level for C libraries.')
150 
151  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
152  , help='Return status of running CMA')
153 
154  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
155  , help='Shut down running CMA.')
156 
157  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
158  , help='Erase Neo4J before starting')
159 
160  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
161  , help='keep the CMA from going into the background')
162 
163  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
164  , dest='pidfile', metavar='pidfile-pathname'
165  , help='full pathname of where to locate our pid file')
166 
167  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
168  , help='Trace CMA execution')
169 
170  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
171  , metavar='userid'
172  , help='userid to run the CMA as')
173 
174 
175  opt = parser.parse_args()[0]
176 
177  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
178  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
179 
180 
181  if opt.status:
182  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
183  return rc
184 
185  if opt.kill:
186  if kill_pid_service(opt.pidfile, 15) < 0:
187  print >> sys.stderr, "Unable to stop CMA."
188  return 1
189  return 0
190 
191  opt.debug = int(opt.debug)
192 
193  # This doesn't seem to work no matter where I invoke it...
194  # But if we don't fork in daemonize_me() ('C' code), it works great...
195 # def cleanup():
196 # remove_pid_file(opt.pidfile)
197 # atexit.register(cleanup)
198 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
199 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
200 
201  from cmadb import CMAdb
202  CMAdb.running_under_docker()
203  make_pid_dir(opt.pidfile, opt.userid)
204  drop_privileges_permanently(opt.userid)
205  cryptwarnings = pyCryptCurve25519.initkeys()
206  for warn in cryptwarnings:
207  print >> sys.stderr, ("WARNING: %s" % warn)
208 
209  daemonize_me(opt.foreground, '/', opt.pidfile)
210 
211  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
212 
213 
214  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
215  assimilation_openlog("cma")
216  from packetlistener import PacketListener
217  from messagedispatcher import MessageDispatcher
218  from dispatchtarget import DispatchTarget
219  from monitoring import MonitoringRule
220  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
221  pyPacketDecoder
222  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
223  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
224  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
225  proj_class_incr_debug, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
226 
227 
228  if opt.debug:
229  print >> sys.stderr, ('Setting debug to %s' % opt.debug)
230 
231  for debug in range(opt.debug):
232  debug = debug
233  print >> sys.stderr, ('Incrementing C-level debug by one.')
235 
236  # Input our monitoring rule templates
237  # They only exist in flat files and in memory - they aren't in the database
238  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
239  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
240 
241  execobserver_constraints = {
242  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
243  }
244  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
245  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
246 
247 
248  if opt.bind is not None:
249  OurAddrStr = opt.bind
250 
251  OurAddr = pyNetAddr(OurAddrStr)
252  if OurAddr.port() == 0:
253  OurAddr.setport(DefaultPort)
254 
255  try:
256  configinfo = ConfigFile(filename=CMAINITFILE)
257  except IOError:
258  configinfo = ConfigFile()
259  if opt.bind is not None:
260  bindaddr = pyNetAddr(opt.bind)
261  if bindaddr.port() == 0:
262  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
263  configinfo[CONFIGNAME_CMAINIT] = bindaddr
264  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
265  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
266  configinfo[CONFIGNAME_CMAADDR] = OurAddr
267  if (CONFIGNAME_COMPRESSTYPE in configinfo):
268  configinfo[CONFIGNAME_COMPRESS] \
269  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
270  config = configinfo.complete_config()
271  config[CONFIGNAME_OUTSIG] = pySignFrame(1)
272 
273  addr = config[CONFIGNAME_CMAINIT]
274  if addr.port() == 0:
275  addr.setport(DefaultPort)
276  ourport = addr.port()
277  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
278  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
279  if elem in config:
280  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
281  io = pyReliableUDP(config, pyPacketDecoder())
282  trycount = 0
283  while True:
284  try:
285  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
286  except RuntimeError:
287  print >> sys.stderr, 'TRYING AGAIN...'
288  trycount += 1
289  if trycount > 300:
290  remove_pid_file(opt.pidfile)
291  print >> sys.stderr, ('Neo4j still not started - giving up.')
292  CMAdb.log.critical('Neo4j still not started - giving up.')
293  raise SystemExit(1)
294  if (trycount % 60) == 1:
295  print >> sys.stderr, ('Waiting for Neo4j to start.')
296  CMAdb.log.warning('Waiting for Neo4j to start.')
297  # Let's try again in a second...
298  time.sleep(1)
299  continue
300  # Neo4j started. All is well with the world.
301  break
302  for warn in cryptwarnings:
303  CMAdb.log.warning(warn)
304 
305  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
306  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
307  if CMAdb.debug:
308  CMAdb.log.debug('C-library Debug was set to %s' % opt.debug)
309  CMAdb.log.debug('TheOneRing created - id = %s' % CMAdb.TheOneRing)
310  CMAdb.log.debug('Config Object sent to nanoprobes: %s' % config)
311 
312  jvmfd = os.popen('java -version 2>&1')
313  jvers = jvmfd.readline()
314  jvmfd.close()
315  disp = MessageDispatcher(DispatchTarget.dispatchtable)
316  CMAdb.log.info('Starting CMA version %s - licensed under %s'
317  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
318  CMAdb.log.info('Neo4j version %s // py2neo version %s // Python version %s // %s'
319  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
320  , str(py2neo.__version__)
321  , ('%s.%s.%s' % sys.version_info[0:3])
322  , jvers))
323  if opt.foreground:
324  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
325  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
326  print >> sys.stderr, ('Neo4j version %s // py2neo version %s // Python version %s // %s'
327  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
328  , str(py2neo.__version__)
329  , ('%s.%s.%s' % sys.version_info[0:3])
330  , jvers))
331  # Important to note that we don't want PacketListener to create its own 'io' object
332  # or it will screw up the ReliableUDP protocol...
333  listener = PacketListener(config, disp, io=io)
334  mandatory_modules = [ 'discoverylistener' ]
335  for mandatory in mandatory_modules:
336  importlib.import_module(mandatory)
337  for optional in config['optional_modules']:
338  importlib.import_module(optional)
339  if opt.doTrace:
340  import trace
341  tracer = trace.Trace(count=False, trace=True)
342  if CMAdb.debug:
343  CMAdb.log.debug(
344  'Starting up traced listener.listen(); debug=%d' % opt.debug)
345  if opt.foreground:
346  print >> sys.stderr, (
347  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
348  tracer.run('listener.listen()')
349  else:
350  if CMAdb.debug:
351  CMAdb.log.debug(
352  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
353  if opt.foreground:
354  print >> sys.stderr, (
355  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
356  listener.listen()
357  return 0
358 
360  '''Return the list of supplementary groups to which this member
361  would belong if they logged in as a tuple of (groupnamelist, gidlist)
362  '''
363  namelist=[]
364  gidlist=[]
365  for entry in getent.group():
366  if userid in entry.members:
367  namelist.append(entry.name)
368  gidlist.append(entry.gid)
369  return (namelist, gidlist)
370 
371 
373  '''
374  Drop our privileges permanently and run as the given user with
375  the privileges to which they would be entitled if they logged in.
376  That is, the uid, gid, and supplementary group list are all set correctly.
377  We are careful to make sure we have exactly the permissions we need
378  as 'userid'.
379  Either we need to be started as root or as 'userid' or this function
380  will fail and exit the program.
381  '''
382  userinfo = getent.passwd(userid)
383  if userinfo is None:
384  raise(OSError('Userid "%s" is unknown.' % userid))
385  #pylint is confused about the getent.passwd object
386  #pylint: disable=E1101
387  newuid = userinfo.uid
388  #pylint: disable=E1101
389  newgid = userinfo.gid
390  auxgroups = supplementary_groups_for_user(userid)[1]
391  # Need to set supplementary groups, then group id then user id in that order.
392  try:
393  os.setgroups(auxgroups)
394  os.setgid(newgid)
395  os.setuid(newuid)
396  except OSError:
397  # We let this fail if it wants to and catch it below.
398  # This allows this to work if we're already running as that user id...
399  pass
400  # Let's see if everything wound up as it should...
401  if (os.getuid() != newuid or os.geteuid() != newuid
402  or os.getgid() != newgid or os.getegid() != newgid):
403  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
404  % (userid, os.getuid(), os.getgid()))
405  # Checking groups is a little more complicated - order is potentially not preserved...
406  # This also allows for the case where there might be dups (which shouldn't happen?)
407  curgroups = os.getgroups()
408  for elem in auxgroups:
409  if elem not in curgroups:
410  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
411  for elem in curgroups:
412  # I don't think the default gid is supposed to be in the current group list...
413  # but it is in my tests... It should be harmless...
414  if elem not in auxgroups and elem != newgid:
415  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
416  # Hurray! Everything worked!
417 
418 def make_pid_dir(pidfile, userid):
419  'Make a suitable directory for the pidfile'
420  piddir = os.path.dirname(pidfile)
421  if os.path.isdir(piddir):
422  # Assume it's been set up suitably
423  return
424  os.mkdir(piddir, 0755)
425  userinfo = getent.passwd(userid)
426  if userinfo is None:
427  raise(OSError('Userid "%s" is unknown.' % userid))
428  # pylint doesn't understand about getent...
429  # pylint: disable=E1101
430  os.chown(piddir, userinfo.uid, userinfo.gid)
431 
432 if __name__ == '__main__':
433  pyversion = sys.version_info
434  if pyversion[0] != 2 or pyversion[1] < 7:
435  raise RuntimeError('Must be run using python 2.x where x >= 7')
436  exitrc = main()
437  sys.exit(int(exitrc))
def make_pid_dir
Definition: cma.py:418
def drop_privileges_permanently
Definition: cma.py:372
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:192
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:470
def supplementary_groups_for_user
Definition: cma.py:359
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:520
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:493
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile)
Function to get system name (uname -n in UNIX terms)
Definition: misc.c:90
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:461
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:298
def main
Definition: cma.py:126