The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, time
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID
104 import AssimCtypes
105 from AssimCclasses import pyCompressFrame
106 from cmaconfig import ConfigFile
107 import importlib
108 #import atexit
109 import getent
110 import py2neo
111 
112 
113 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
114  , 'linkdiscovery'
115  , 'checksumdiscovery'
116  , 'monitoringdiscovery'
117  , 'arpdiscovery'
118  ]
119 #
120 # "Main" program starts below...
121 # It is a the real CMA intended to run with some real nanoprobes running
122 # somewhere out there...
123 #
124 # 912: Too many branches, 914: too many local variables, 915: too many statements
125 #pylint: disable=R0912,R0914,R0915
126 def main():
127  'Main program for the CMA (Collective Management Authority)'
128  DefaultPort = 1984
129  # VERY Linux-specific - but useful and apparently correct ;-)
130  PrimaryIPcmd = \
131  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
132  ipfd = os.popen(PrimaryIPcmd, 'r')
133  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
134  ipfd.close()
135 
136  parser = optparse.OptionParser(prog='CMA', version=AssimCtypes.VERSION_STRING,
137  description='Collective Management Authority for the Assimilation System',
138  usage='cma.py [--bind address:port]')
139 
140  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
141  , metavar='address:port-to-bind-to'
142  , help='Address:port to listen to - for nanoprobes to connect to')
143 
144  parser.add_option('-d', '--debug', action='store', default=0, dest='debug'
145  , help='enable debug for CMA and libraries - value is debug level for C libraries.')
146 
147  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
148  , help='Return status of running CMA')
149 
150  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
151  , help='Shut down running CMA.')
152 
153  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
154  , help='Erase Neo4J before starting')
155 
156  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
157  , help='keep the CMA from going into the background')
158 
159  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
160  , dest='pidfile', metavar='pidfile-pathname'
161  , help='full pathname of where to locate our pid file')
162 
163  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
164  , help='Trace CMA execution')
165 
166  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
167  , metavar='userid'
168  , help='userid to run the CMA as')
169 
170 
171  opt = parser.parse_args()[0]
172 
173  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
174  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
175 
176 
177  if opt.status:
178  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
179  return rc
180 
181  if opt.kill:
182  if kill_pid_service(opt.pidfile, 15) < 0:
183  print >> sys.stderr, "Unable to stop CMA."
184  return 1
185  return 0
186 
187  opt.debug = int(opt.debug)
188 
189  # This doesn't seem to work no matter where I invoke it...
190  # But if we don't fork in daemonize_me() ('C' code), it works great...
191 # def cleanup():
192 # remove_pid_file(opt.pidfile)
193 # atexit.register(cleanup)
194 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
195 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
196 
197  from cmadb import CMAdb
198  CMAdb.running_under_docker()
199  make_pid_dir(opt.pidfile, opt.userid)
200  drop_privileges_permanently(opt.userid)
201  daemonize_me(opt.foreground, '/', opt.pidfile)
202 
203  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
204 
205 
206  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
207  assimilation_openlog("cma")
208  from packetlistener import PacketListener
209  from messagedispatcher import MessageDispatcher
210  from dispatchtarget import DispatchTarget
211  from monitoring import MonitoringRule
212  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
213  pyPacketDecoder
214  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
215  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
216  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
217  proj_class_incr_debug, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
218 
219  if opt.debug:
220  print >> sys.stderr, ('Setting debug to %s' % opt.debug)
221 
222  for debug in range(opt.debug):
223  debug = debug
224  print >> sys.stderr, ('Incrementing C-level debug by one.')
226 
227  # Input our monitoring rule templates
228  # They only exist in flat files and in memory - they aren't in the database
229  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
230  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
231 
232  execobserver_constraints = {
233  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
234  }
235  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
236  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
237 
238 
239  if opt.bind is not None:
240  OurAddrStr = opt.bind
241 
242  OurAddr = pyNetAddr(OurAddrStr)
243  if OurAddr.port() == 0:
244  OurAddr.setport(DefaultPort)
245 
246  try:
247  configinfo = ConfigFile(filename=CMAINITFILE)
248  except IOError:
249  configinfo = ConfigFile()
250  if opt.bind is not None:
251  bindaddr = pyNetAddr(opt.bind)
252  if bindaddr.port() == 0:
253  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
254  configinfo[CONFIGNAME_CMAINIT] = bindaddr
255  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
256  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
257  configinfo[CONFIGNAME_CMAADDR] = OurAddr
258  if (CONFIGNAME_COMPRESSTYPE in configinfo):
259  configinfo[CONFIGNAME_COMPRESS] \
260  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
261  config = configinfo.complete_config()
262  config[CONFIGNAME_OUTSIG] = pySignFrame(1)
263 
264  addr = config[CONFIGNAME_CMAINIT]
265  if addr.port() == 0:
266  addr.setport(DefaultPort)
267  ourport = addr.port()
268  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
269  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
270  if elem in config:
271  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
272  io = pyReliableUDP(config, pyPacketDecoder())
273  trycount = 0
274  while True:
275  try:
276  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
277  except RuntimeError:
278  print >> sys.stderr, 'TRYING AGAIN...'
279  trycount += 1
280  if trycount > 300:
281  remove_pid_file(opt.pidfile)
282  print >> sys.stderr, ('Neo4j still not started - giving up.')
283  CMAdb.log.critical('Neo4j still not started - giving up.')
284  raise SystemExit(1)
285  if (trycount % 60) == 1:
286  print >> sys.stderr, ('Waiting for Neo4j to start.')
287  CMAdb.log.warning('Waiting for Neo4j to start.')
288  # Let's try again in a second...
289  time.sleep(1)
290  continue
291  # Neo4j started. All is well with the world.
292  break
293 
294  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
295  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
296  if CMAdb.debug:
297  CMAdb.log.debug('C-library Debug was set to %s' % opt.debug)
298  CMAdb.log.debug('TheOneRing created - id = %s' % CMAdb.TheOneRing)
299  CMAdb.log.debug('Config Object sent to nanoprobes: %s' % config)
300 
301  jvmfd = os.popen('java -version 2>&1')
302  jvers = jvmfd.readline()
303  jvmfd.close()
304  disp = MessageDispatcher(DispatchTarget.dispatchtable)
305  CMAdb.log.info('Starting CMA version %s - licensed under %s'
306  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
307  CMAdb.log.info('Neo4j version %s // py2neo version %s // Python version %s // %s'
308  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
309  , str(py2neo.__version__)
310  , ('%s.%s.%s' % sys.version_info[0:3])
311  , jvers))
312  if opt.foreground:
313  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
314  % (AssimCtypes.VERSION_STRING, LONG_LICENSE_STRING))
315  print >> sys.stderr, ('Neo4j version %s // py2neo version %s // Python version %s // %s'
316  % (('%s.%s.%s%s' % CMAdb.cdb.db.neo4j_version)
317  , str(py2neo.__version__)
318  , ('%s.%s.%s' % sys.version_info[0:3])
319  , jvers))
320  # Important to note that we don't want PacketListener to create its own 'io' object
321  # or it will screw up the ReliableUDP protocol...
322  listener = PacketListener(config, disp, io=io)
323  mandatory_modules = [ 'discoverylistener' ]
324  for mandatory in mandatory_modules:
325  importlib.import_module(mandatory)
326  for optional in config['optional_modules']:
327  importlib.import_module(optional)
328  if opt.doTrace:
329  import trace
330  tracer = trace.Trace(count=False, trace=True)
331  if CMAdb.debug:
332  CMAdb.log.debug(
333  'Starting up traced listener.listen(); debug=%d' % opt.debug)
334  if opt.foreground:
335  print >> sys.stderr, (
336  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
337  tracer.run('listener.listen()')
338  else:
339  if CMAdb.debug:
340  CMAdb.log.debug(
341  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
342  if opt.foreground:
343  print >> sys.stderr, (
344  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
345  listener.listen()
346  return 0
347 
349  '''Return the list of supplementary groups to which this member
350  would belong if they logged in as a tuple of (groupnamelist, gidlist)
351  '''
352  namelist=[]
353  gidlist=[]
354  for entry in getent.group():
355  if userid in entry.members:
356  namelist.append(entry.name)
357  gidlist.append(entry.gid)
358  return (namelist, gidlist)
359 
360 
362  '''
363  Drop our privileges permanently and run as the given user with
364  the privileges to which they would be entitled if they logged in.
365  That is, the uid, gid, and supplementary group list are all set correctly.
366  We are careful to make sure we have exactly the permissions we need
367  as 'userid'.
368  Either we need to be started as root or as 'userid' or this function
369  will fail and exit the program.
370  '''
371  userinfo = getent.passwd(userid)
372  if userinfo is None:
373  raise(OSError('Userid "%s" is unknown.' % userid))
374  #pylint is confused about the getent.passwd object
375  #pylint: disable=E1101
376  newuid = userinfo.uid
377  #pylint: disable=E1101
378  newgid = userinfo.gid
379  auxgroups = supplementary_groups_for_user(userid)[1]
380  # Need to set supplementary groups, then group id then user id in that order.
381  try:
382  os.setgroups(auxgroups)
383  os.setgid(newgid)
384  os.setuid(newuid)
385  except OSError:
386  # We let this fail if it wants to and catch it below.
387  # This allows this to work if we're already running as that user id...
388  pass
389  # Let's see if everything wound up as it should...
390  if (os.getuid() != newuid or os.geteuid() != newuid
391  or os.getgid() != newgid or os.getegid() != newgid):
392  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
393  % (userid, os.getuid(), os.getgid()))
394  # Checking groups is a little more complicated - order is potentially not preserved...
395  # This also allows for the case where there might be dups (which shouldn't happen?)
396  curgroups = os.getgroups()
397  for elem in auxgroups:
398  if elem not in curgroups:
399  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
400  for elem in curgroups:
401  # I don't think the default gid is supposed to be in the current group list...
402  # but it is in my tests... It should be harmless...
403  if elem not in auxgroups and elem != newgid:
404  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
405  # Hurray! Everything worked!
406 
407 def make_pid_dir(pidfile, userid):
408  'Make a suitable directory for the pidfile'
409  piddir = os.path.dirname(pidfile)
410  if os.path.isdir(piddir):
411  # Assume it's been set up suitably
412  return
413  os.mkdir(piddir, 0755)
414  userinfo = getent.passwd(userid)
415  if userinfo is None:
416  raise(OSError('Userid "%s" is unknown.' % userid))
417  # pylint doesn't understand about getent...
418  # pylint: disable=E1101
419  os.chown(piddir, userinfo.uid, userinfo.gid)
420 
421 if __name__ == '__main__':
422  pyversion = sys.version_info
423  if pyversion[0] != 2 or pyversion[1] < 7:
424  raise RuntimeError('Must be run using python 2.x where x >= 7')
425  exitrc = main()
426  sys.exit(int(exitrc))
def make_pid_dir
Definition: cma.py:407
def drop_privileges_permanently
Definition: cma.py:361
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:192
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:470
def supplementary_groups_for_user
Definition: cma.py:348
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:520
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:493
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile)
Function to get system name (uname -n in UNIX terms)
Definition: misc.c:90
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:461
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:298
def main
Definition: cma.py:126