The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, time
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID
104 from AssimCclasses import pyCompressFrame
105 from cmaconfig import ConfigFile
106 import importlib
107 #import atexit
108 import getent
109 
110 
111 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
112  , 'linkdiscovery'
113  , 'checksumdiscovery'
114  , 'monitoringdiscovery'
115  , 'arpdiscovery'
116  ]
117 #
118 # "Main" program starts below...
119 # It is a the real CMA intended to run with some real nanoprobes running
120 # somewhere out there...
121 #
122 # 912: Too many branches, 914: too many local variables, 915: too many statements
123 #pylint: disable=R0912,R0914,R0915
124 def main():
125  'Main program for the CMA (Collective Management Authority)'
126  DefaultPort = 1984
127  # VERY Linux-specific - but useful and apparently correct ;-)
128  PrimaryIPcmd = \
129  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
130  ipfd = os.popen(PrimaryIPcmd, 'r')
131  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
132  ipfd.close()
133 
134  parser = optparse.OptionParser(prog='CMA', version='0.0.1',
135  description='Collective Management Authority for the Assimilation System',
136  usage='cma.py [--bind address:port]')
137 
138  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
139  , metavar='address:port-to-bind-to'
140  , help='Address:port to listen to - for nanoprobes to connect to')
141 
142  parser.add_option('-d', '--debug', action='count', default=0, dest='debug'
143  , help='enable debug for CMA and libraries - multiple occurances increase debug value')
144 
145  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
146  , help='Return status of running CMA')
147 
148  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
149  , help='Shut down running CMA.')
150 
151  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
152  , help='Erase Neo4J before starting')
153 
154  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
155  , help='keep the CMA from going into the background')
156 
157  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
158  , dest='pidfile', metavar='pidfile-pathname'
159  , help='full pathname of where to locate our pid file')
160 
161  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
162  , help='Trace CMA execution')
163 
164  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
165  , metavar='userid'
166  , help='userid to run the CMA as')
167 
168 
169  opt = parser.parse_args()[0]
170 
171  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
172  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
173 
174 
175  if opt.status:
176  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
177  return rc
178 
179  if opt.kill:
180  if kill_pid_service(opt.pidfile, 15) < 0:
181  print >> sys.stderr, "Unable to stop CMA."
182  return 1
183  return 0
184 
185 
186  # This doesn't seem to work no matter where I invoke it...
187  # But if we don't fork in daemonize_me() ('C' code), it works great...
188 # def cleanup():
189 # remove_pid_file(opt.pidfile)
190 # atexit.register(cleanup)
191 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
192 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
193 
194  from cmadb import CMAdb
195  CMAdb.running_under_docker()
196  make_pid_dir(opt.pidfile, opt.userid)
197  drop_privileges_permanently(opt.userid)
198  daemonize_me(opt.foreground, '/', opt.pidfile)
199 
200  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
201 
202 
203  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
204  assimilation_openlog("cma")
205  from packetlistener import PacketListener
206  from messagedispatcher import MessageDispatcher
207  from dispatchtarget import DispatchTarget
208  from monitoring import MonitoringRule
209  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
210  pyPacketDecoder
211  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
212  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
213  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
214  proj_class_incr_debug, VERSION_STRING, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
215  for debug in range(opt.debug):
216  debug = debug
218 
219  # Input our monitoring rule templates
220  # They only exist in flat files and in memory - they aren't in the database
221  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
222  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
223 
224  execobserver_constraints = {
225  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
226  }
227  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
228  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
229 
230 
231  if opt.bind is not None:
232  OurAddrStr = opt.bind
233 
234  OurAddr = pyNetAddr(OurAddrStr)
235  if OurAddr.port() == 0:
236  OurAddr.setport(DefaultPort)
237 
238  try:
239  configinfo = ConfigFile(filename=CMAINITFILE)
240  except IOError:
241  configinfo = ConfigFile()
242  if opt.bind is not None:
243  bindaddr = pyNetAddr(opt.bind)
244  if bindaddr.port() == 0:
245  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
246  configinfo[CONFIGNAME_CMAINIT] = bindaddr
247  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
248  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
249  configinfo[CONFIGNAME_CMAADDR] = OurAddr
250  if (CONFIGNAME_COMPRESSTYPE in configinfo):
251  configinfo[CONFIGNAME_COMPRESS] \
252  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
253  config = configinfo.complete_config()
254  config[CONFIGNAME_OUTSIG] = pySignFrame(1)
255 
256  addr = config[CONFIGNAME_CMAINIT]
257  if addr.port() == 0:
258  addr.setport(DefaultPort)
259  ourport = addr.port()
260  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
261  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
262  if elem in config:
263  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
264  io = pyReliableUDP(config, pyPacketDecoder())
265  trycount = 0
266  while True:
267  try:
268  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
269  except RuntimeError:
270  print >> sys.stderr, 'TRYING AGAIN...'
271  trycount += 1
272  if trycount > 300:
273  remove_pid_file(opt.pidfile)
274  print >> sys.stderr, ('Neo4j still not started - giving up.')
275  CMAdb.log.critical('Neo4j still not started - giving up.')
276  raise SystemExit(1)
277  if (trycount % 60) == 1:
278  print >> sys.stderr, ('Waiting for Neo4j to start.')
279  CMAdb.log.warning('Waiting for Neo4j to start.')
280  # Let's try again in a second...
281  time.sleep(1)
282  continue
283  # Neo4j started. All is well with the world.
284  break
285 
286  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
287  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
288  if CMAdb.debug:
289  CMAdb.log.info('TheOneRing created - id = %s' % CMAdb.TheOneRing)
290  CMAdb.log.info('Config Object sent to nanoprobes: %s' % config)
291 
292  disp = MessageDispatcher(DispatchTarget.dispatchtable)
293  CMAdb.log.info('Starting CMA version %s - licensed under %s'
294  % (VERSION_STRING, LONG_LICENSE_STRING))
295  if opt.foreground:
296  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
297  % (VERSION_STRING, LONG_LICENSE_STRING))
298  # Important to note that we don't want PacketListener to create its own 'io' object
299  # or it will screw up the ReliableUDP protocol...
300  listener = PacketListener(config, disp, io=io)
301  mandatory_modules = [ 'discoverylistener' ]
302  for mandatory in mandatory_modules:
303  importlib.import_module(mandatory)
304  for optional in config['optional_modules']:
305  importlib.import_module(optional)
306  if opt.doTrace:
307  import trace
308  tracer = trace.Trace(count=False, trace=True)
309  if CMAdb.debug:
310  CMAdb.log.debug(
311  'Starting up traced listener.listen(); debug=%d' % opt.debug)
312  if opt.foreground:
313  print >> sys.stderr, (
314  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
315  tracer.run('listener.listen()')
316  else:
317  if CMAdb.debug:
318  CMAdb.log.debug(
319  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
320  if opt.foreground:
321  print >> sys.stderr, (
322  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
323  listener.listen()
324  return 0
325 
327  '''Return the list of supplementary groups to which this member
328  would belong if they logged in as a tuple of (groupnamelist, gidlist)
329  '''
330  namelist=[]
331  gidlist=[]
332  for entry in getent.group():
333  if userid in entry.members:
334  namelist.append(entry.name)
335  gidlist.append(entry.gid)
336  return (namelist, gidlist)
337 
338 
340  '''
341  Drop our privileges permanently and run as the given user with
342  the privileges to which they would be entitled if they logged in.
343  That is, the uid, gid, and supplementary group list are all set correctly.
344  We are careful to make sure we have exactly the permissions we need
345  as 'userid'.
346  Either we need to be started as root or as 'userid' or this function
347  will fail and exit the program.
348  '''
349  userinfo = getent.passwd(userid)
350  if userinfo is None:
351  raise(OSError('Userid "%s" is unknown.' % userid))
352  #pylint is confused about the getent.passwd object
353  #pylint: disable=E1101
354  newuid = userinfo.uid
355  #pylint: disable=E1101
356  newgid = userinfo.gid
357  auxgroups = supplementary_groups_for_user(userid)[1]
358  # Need to set supplementary groups, then group id then user id in that order.
359  try:
360  os.setgroups(auxgroups)
361  os.setgid(newgid)
362  os.setuid(newuid)
363  except OSError:
364  # We let this fail if it wants to and catch it below.
365  # This allows this to work if we're already running as that user id...
366  pass
367  # Let's see if everything wound up as it should...
368  if (os.getuid() != newuid or os.geteuid() != newuid
369  or os.getgid() != newgid or os.getegid() != newgid):
370  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
371  % (userid, os.getuid(), os.getgid()))
372  # Checking groups is a little more complicated - order is potentially not preserved...
373  # This also allows for the case where there might be dups (which shouldn't happen?)
374  curgroups = os.getgroups()
375  for elem in auxgroups:
376  if elem not in curgroups:
377  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
378  for elem in curgroups:
379  # I don't think the default gid is supposed to be in the current group list...
380  # but it is in my tests... It should be harmless...
381  if elem not in auxgroups and elem != newgid:
382  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
383  # Hurray! Everything worked!
384 
385 def make_pid_dir(pidfile, userid):
386  'Make a suitable directory for the pidfile'
387  piddir = os.path.dirname(pidfile)
388  if os.path.isdir(piddir):
389  # Assume it's been set up suitably
390  return
391  os.mkdir(piddir, 0755)
392  userinfo = getent.passwd(userid)
393  if userinfo is None:
394  raise(OSError('Userid "%s" is unknown.' % userid))
395  # pylint doesn't understand about getent...
396  # pylint: disable=E1101
397  os.chown(piddir, userinfo.uid, userinfo.gid)
398 
399 if __name__ == '__main__':
400  pyversion = sys.version_info
401  if pyversion[0] != 2 or pyversion[1] < 7:
402  raise RuntimeError('Must be run using python 2.x where x >= 7')
403  exitrc = main()
404  sys.exit(int(exitrc))
def make_pid_dir
Definition: cma.py:385
def drop_privileges_permanently
Definition: cma.py:339
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:192
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:470
def supplementary_groups_for_user
Definition: cma.py:326
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:520
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:493
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile)
Function to get system name (uname -n in UNIX terms)
Definition: misc.c:90
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:461
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:298
def main
Definition: cma.py:124