The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cma.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22  Design outline:
23 
24  All incoming network messages come in and get sent to a client who is a dispatcher.
25 
26  The dispatcher looks at the message type and computes which queue to send the
27  message to based on the message type and contents.
28 
29  For death notices, the dispatcher forwards the message to the worker
30  assigned to the switch the system is on - if known, or the worker
31  assigned to the subnet.
32 
33  Each worker handles one or more rings - probably handling the per-switch rings
34  for a subnet and the subnet ring as well. It is important to ensure that a ring
35  is handled by only one worker. This eliminates locking concerns. When a given
36  worker receives a death notice for a drone that is also in higher-level rings,
37  it does its at its level and also forwards the request to the worker handling
38  the higher level ring as well. The first subnet worker will also handle the work
39  for the top-level (global) ring.
40 
41  Packets are ACKed by workers after all work has been completed. In the case of
42  a drone on multiple rings, it is only ACKed after both rings have been fully
43  repaired.
44 
45  The reason for this is that until it is fully repaired, the system might crash
46  before completing its work. Retransmission timeouts will need to be set
47  accordingly...
48 
49  Although congestion is normally very unlikely, this is not true for full
50  datacenter powerons - where it is reasonably likely - depending on how
51  quickly one can power on the servers and not pop circuit breakers or
52  damage UPSes
53  (it would be good to know how fast hosts can come up worst case).
54 
55 
56  Misc Workers with well-known-names
57  Request-To-Create-Ring
58 
59 
60  Mappings:
61 
62  Drone-related information-------------------------
63  NetAddr-to-drone-name
64  drone-name to NetAddr
65  (drone-name,ifname) to interface-info (including switch info)
66  drone-neighbor-info:
67  drone-name-to-neighbor-info (drone-name, NetAddr, ring-name)
68 
69  Ring-related information--------------------------
70  drone-name to ring-name(s)
71  ring-names to ring-information (level, #members, etc)
72  ring-links-info ??
73  Subnet-to-ring-name
74  Switch-to-ring-name
75  Global-ring-name [TheOneRing]
76 
77  Discovery-related information---------------------
78  (drone-name, Interface-name) to LLDP/CDP packet
79  (drone-name, discovery-type) to JSON info
80 
81 
82  Misc Info-----------------------------------------
83  NetAddr(MAC)-to-NetAddr(IP)
84 
85 
86  Dispatcher logic:
87  For now sends all requests to TheOneRing because we need to write more code ;-)
88 
89 
90 ################################################################################
91 #
92 # It is readily observable that the code is headed that way, but is a long
93 # way from that structure...
94 #
95 ################################################################################
96 '''
97 
98 
99 import optparse, time
100 import os, sys, signal
101 import cmainit
102 from assimeventobserver import ForkExecObserver
103 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, CMAINITFILE, CMAUSERID
104 from AssimCclasses import pyCompressFrame
105 from cmaconfig import ConfigFile
106 import importlib
107 #import atexit
108 import getent
109 
110 
111 optional_modules = [ 'discoverylistener' # NOT OPTIONAL(!)
112  , 'linkdiscovery'
113  , 'checksumdiscovery'
114  , 'monitoringdiscovery'
115  , 'arpdiscovery'
116  ]
117 #
118 # "Main" program starts below...
119 # It is a the real CMA intended to run with some real nanoprobes running
120 # somewhere out there...
121 #
122 # 912: Too many branches, 914: too many local variables, 915: too many statements
123 #pylint: disable=R0912,R0914,R0915
124 def main():
125  'Main program for the CMA (Collective Management Authority)'
126  DefaultPort = 1984
127  # VERY Linux-specific - but useful and apparently correct ;-)
128  PrimaryIPcmd = \
129  "ip address show primary scope global | grep '^ *inet' | sed -e 's%^ *inet *%%' -e 's%/.*%%'"
130  ipfd = os.popen(PrimaryIPcmd, 'r')
131  OurAddrStr = ('%s:%d' % (ipfd.readline().rstrip(), DefaultPort))
132  ipfd.close()
133 
134  parser = optparse.OptionParser(prog='CMA', version='0.0.1',
135  description='Collective Management Authority for the Assimilation System',
136  usage='cma.py [--bind address:port]')
137 
138  parser.add_option('-b', '--bind', action='store', default=None, dest='bind'
139  , metavar='address:port-to-bind-to'
140  , help='Address:port to listen to - for nanoprobes to connect to')
141 
142  parser.add_option('-d', '--debug', action='count', default=0, dest='debug'
143  , help='enable debug for CMA and libraries - multiple occurances increase debug value')
144 
145  parser.add_option('-s', '--status', action='store_true', default=False, dest='status'
146  , help='Return status of running CMA')
147 
148  parser.add_option('-k', '--kill', action='store_true', default=False, dest='kill'
149  , help='Shut down running CMA.')
150 
151  parser.add_option('-e', '--erasedb', action='store_true', default=False, dest='erasedb'
152  , help='Erase Neo4J before starting')
153 
154  parser.add_option('-f', '--foreground', action='store_true', default=False, dest='foreground'
155  , help='keep the CMA from going into the background')
156 
157  parser.add_option('-p', '--pidfile', action='store', default='/var/run/assimilation/cma'
158  , dest='pidfile', metavar='pidfile-pathname'
159  , help='full pathname of where to locate our pid file')
160 
161  parser.add_option('-T', '--trace', action='store_true', default=False, dest='doTrace'
162  , help='Trace CMA execution')
163 
164  parser.add_option('-u', '--user', action='store', default=CMAUSERID, dest='userid'
165  , metavar='userid'
166  , help='userid to run the CMA as')
167 
168 
169  opt = parser.parse_args()[0]
170 
171  from AssimCtypes import daemonize_me, assimilation_openlog, are_we_already_running, \
172  kill_pid_service, pidrunningstat_to_status, remove_pid_file, rmpid_and_exit_on_signal
173 
174 
175  if opt.status:
176  rc = pidrunningstat_to_status(are_we_already_running(opt.pidfile, None))
177  return rc
178 
179  if opt.kill:
180  if kill_pid_service(opt.pidfile, 15) < 0:
181  print >> sys.stderr, "Unable to stop CMA."
182  return 1
183  return 0
184 
185 
186  # This doesn't seem to work no matter where I invoke it...
187  # But if we don't fork in daemonize_me() ('C' code), it works great...
188 # def cleanup():
189 # remove_pid_file(opt.pidfile)
190 # atexit.register(cleanup)
191 # signal.signal(signal.SIGTERM, lambda sig, stack: sys.exit(0))
192 # signal.signal(signal.SIGINT, lambda sig, stack: sys.exit(0))
193 
194  make_pid_dir(opt.pidfile, opt.userid)
195  drop_privileges_permanently(opt.userid)
196  daemonize_me(opt.foreground, '/', opt.pidfile)
197 
198  rmpid_and_exit_on_signal(opt.pidfile, signal.SIGTERM)
199 
200 
201  # Next statement can't appear before daemonize_me() or bind() fails -- not quite sure why...
202  assimilation_openlog("cma")
203  from packetlistener import PacketListener
204  from messagedispatcher import MessageDispatcher
205  from dispatchtarget import DispatchTarget
206  from cmadb import CMAdb
207  from monitoring import MonitoringRule
208  from AssimCclasses import pyNetAddr, pySignFrame, pyReliableUDP, \
209  pyPacketDecoder
210  from AssimCtypes import CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR, CONFIGNAME_CMADISCOVER, \
211  CONFIGNAME_CMAFAIL, CONFIGNAME_CMAPORT, CONFIGNAME_OUTSIG, CONFIGNAME_COMPRESSTYPE, \
212  CONFIGNAME_COMPRESS, CONFIGNAME_OUTSIG,\
213  proj_class_incr_debug, VERSION_STRING, LONG_LICENSE_STRING, MONRULEINSTALL_DIR
214  for debug in range(opt.debug):
215  debug = debug
217 
218  # Input our monitoring rule templates
219  # They only exist in flat files and in memory - they aren't in the database
220  MonitoringRule.load_tree(MONRULEINSTALL_DIR)
221  print >> sys.stderr, ('Monitoring rules loaded from %s' % MONRULEINSTALL_DIR)
222 
223  execobserver_constraints = {
224  'nodetype': ['Drone', 'SystemNode', 'IPaddrNode', 'ProcessNode', 'MonitorAction']
225  }
226  ForkExecObserver(constraints=execobserver_constraints, scriptdir=NOTIFICATION_SCRIPT_DIR)
227  print >> sys.stderr, ('Fork/Event observer dispatching from %s' % NOTIFICATION_SCRIPT_DIR)
228 
229 
230  if opt.bind is not None:
231  OurAddrStr = opt.bind
232 
233  OurAddr = pyNetAddr(OurAddrStr)
234  if OurAddr.port() == 0:
235  OurAddr.setport(DefaultPort)
236 
237  try:
238  configinfo = ConfigFile(filename=CMAINITFILE)
239  except IOError:
240  configinfo = ConfigFile()
241  if opt.bind is not None:
242  bindaddr = pyNetAddr(opt.bind)
243  if bindaddr.port() == 0:
244  bindaddr.setport(ConfigFile[CONFIGNAME_CMAPORT])
245  configinfo[CONFIGNAME_CMAINIT] = bindaddr
246  configinfo[CONFIGNAME_CMADISCOVER] = OurAddr
247  configinfo[CONFIGNAME_CMAFAIL] = OurAddr
248  configinfo[CONFIGNAME_CMAADDR] = OurAddr
249  if (CONFIGNAME_COMPRESSTYPE in configinfo):
250  configinfo[CONFIGNAME_COMPRESS] \
251  = pyCompressFrame(compression_method=configinfo[CONFIGNAME_COMPRESSTYPE])
252  config = configinfo.complete_config()
253  config[CONFIGNAME_OUTSIG] = pySignFrame(1)
254 
255  addr = config[CONFIGNAME_CMAINIT]
256  if addr.port() == 0:
257  addr.setport(DefaultPort)
258  ourport = addr.port()
259  for elem in (CONFIGNAME_CMAINIT, CONFIGNAME_CMAADDR
260  , CONFIGNAME_CMADISCOVER, CONFIGNAME_CMAFAIL):
261  if elem in config:
262  config[elem] = pyNetAddr(str(config[elem]), port=ourport)
263  io = pyReliableUDP(config, pyPacketDecoder())
264  trycount = 0
265  while True:
266  try:
267  cmainit.CMAinit(io, cleanoutdb=opt.erasedb, debug=(opt.debug > 0))
268  except RuntimeError:
269  print >> sys.stderr, 'TRYING AGAIN...'
270  trycount += 1
271  if trycount > 300:
272  remove_pid_file(opt.pidfile)
273  print >> sys.stderr, ('Neo4j still not started - giving up.')
274  CMAdb.log.critical('Neo4j still not started - giving up.')
275  raise SystemExit(1)
276  if (trycount % 60) == 1:
277  print >> sys.stderr, ('Waiting for Neo4j to start.')
278  CMAdb.log.warning('Waiting for Neo4j to start.')
279  # Let's try again in a second...
280  time.sleep(1)
281  continue
282  # Neo4j started. All is well with the world.
283  break
284 
285  CMAdb.log.info('Listening on: %s' % str(config[CONFIGNAME_CMAINIT]))
286  CMAdb.log.info('Requesting return packets sent to: %s' % str(OurAddr))
287  if CMAdb.debug:
288  CMAdb.log.info('TheOneRing created - id = %s' % CMAdb.TheOneRing)
289  CMAdb.log.info('Config Object sent to nanoprobes: %s' % config)
290 
291  disp = MessageDispatcher(DispatchTarget.dispatchtable)
292  CMAdb.log.info('Starting CMA version %s - licensed under %s'
293  % (VERSION_STRING, LONG_LICENSE_STRING))
294  if opt.foreground:
295  print >> sys.stderr, ('Starting CMA version %s - licensed under %s'
296  % (VERSION_STRING, LONG_LICENSE_STRING))
297  # Important to note that we don't want PacketListener to create its own 'io' object
298  # or it will screw up the ReliableUDP protocol...
299  listener = PacketListener(config, disp, io=io)
300  mandatory_modules = [ 'discoverylistener' ]
301  for mandatory in mandatory_modules:
302  importlib.import_module(mandatory)
303  for optional in config['optional_modules']:
304  importlib.import_module(optional)
305  if opt.doTrace:
306  import trace
307  tracer = trace.Trace(count=False, trace=True)
308  if CMAdb.debug:
309  CMAdb.log.debug(
310  'Starting up traced listener.listen(); debug=%d' % opt.debug)
311  if opt.foreground:
312  print >> sys.stderr, (
313  'cma: Starting up traced listener.listen() in foreground; debug=%d' % opt.debug)
314  tracer.run('listener.listen()')
315  else:
316  if CMAdb.debug:
317  CMAdb.log.debug(
318  'Starting up untraced listener.listen(); debug=%d' % opt.debug)
319  if opt.foreground:
320  print >> sys.stderr, (
321  'cma: Starting up untraced listener.listen() in foreground; debug=%d' % opt.debug)
322  listener.listen()
323  return 0
324 
326  '''Return the list of supplementary groups to which this member
327  would belong if they logged in as a tuple of (groupnamelist, gidlist)
328  '''
329  namelist=[]
330  gidlist=[]
331  for entry in getent.group():
332  if userid in entry.members:
333  namelist.append(entry.name)
334  gidlist.append(entry.gid)
335  return (namelist, gidlist)
336 
337 
339  '''
340  Drop our privileges permanently and run as the given user with
341  the privileges to which they would be entitled if they logged in.
342  That is, the uid, gid, and supplementary group list are all set correctly.
343  We are careful to make sure we have exactly the permissions we need
344  as 'userid'.
345  Either we need to be started as root or as 'userid' or this function
346  will fail and exit the program.
347  '''
348  userinfo = getent.passwd(userid)
349  if userinfo is None:
350  raise(OSError('Userid "%s" is unknown.' % userid))
351  #pylint is confused about the getent.passwd object
352  #pylint: disable=E1101
353  newuid = userinfo.uid
354  #pylint: disable=E1101
355  newgid = userinfo.gid
356  auxgroups = supplementary_groups_for_user(userid)[1]
357  # Need to set supplementary groups, then group id then user id in that order.
358  try:
359  os.setgroups(auxgroups)
360  os.setgid(newgid)
361  os.setuid(newuid)
362  except OSError:
363  # We let this fail if it wants to and catch it below.
364  # This allows this to work if we're already running as that user id...
365  pass
366  # Let's see if everything wound up as it should...
367  if (os.getuid() != newuid or os.geteuid() != newuid
368  or os.getgid() != newgid or os.getegid() != newgid):
369  raise OSError('Could not set user/group ids to user "%s" [uid:%s, gid:%s].'
370  % (userid, os.getuid(), os.getgid()))
371  # Checking groups is a little more complicated - order is potentially not preserved...
372  # This also allows for the case where there might be dups (which shouldn't happen?)
373  curgroups = os.getgroups()
374  for elem in auxgroups:
375  if elem not in curgroups:
376  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
377  for elem in curgroups:
378  # I don't think the default gid is supposed to be in the current group list...
379  # but it is in my tests... It should be harmless...
380  if elem not in auxgroups and elem != newgid:
381  raise OSError('Could not set auxiliary groups for user "%s"' % userid)
382  # Hurray! Everything worked!
383 
384 def make_pid_dir(pidfile, userid):
385  'Make a suitable directory for the pidfile'
386  piddir = os.path.dirname(pidfile)
387  if os.path.isdir(piddir):
388  # Assume it's been set up suitably
389  return
390  os.mkdir(piddir, 0755)
391  userinfo = getent.passwd(userid)
392  if userinfo is None:
393  raise(OSError('Userid "%s" is unknown.' % userid))
394  # pylint doesn't understand about getent...
395  # pylint: disable=E1101
396  os.chown(piddir, userinfo.uid, userinfo.gid)
397 
398 if __name__ == '__main__':
399  pyversion = sys.version_info
400  if pyversion[0] != 2 or pyversion[1] < 7:
401  raise RuntimeError('Must be run using python 2.x where x >= 7')
402  exitrc = main()
403  sys.exit(int(exitrc))
def make_pid_dir
Definition: cma.py:384
def drop_privileges_permanently
Definition: cma.py:338
void assimilation_openlog(const char *logname)
Open logs in our style (syslog)
Definition: misc.c:192
int kill_pid_service(const char *pidfile, int signal)
kill the service that goes with our current pid file - return negative iff pidfile pid is running and...
Definition: misc.c:470
def supplementary_groups_for_user
Definition: cma.py:325
guint pidrunningstat_to_status(PidRunningStat stat)
Convert PidRunningStat to an exit code for status.
Definition: misc.c:520
void proj_class_incr_debug(const char *Cclass)
Increment debug level for this class and all its subclasses by one.
Definition: proj_classes.c:140
void rmpid_and_exit_on_signal(const char *pidfile, int signal_in)
Remove PID file and exit when a signal is received.
Definition: misc.c:493
void daemonize_me(gboolean stay_in_foreground, const char *dirtorunin, char *pidfile)
Function to get system name (uname -n in UNIX terms)
Definition: misc.c:90
void remove_pid_file(const char *pidfile)
Remove the pid file that goes with this service iff we created one during this invocation.
Definition: misc.c:461
PidRunningStat are_we_already_running(const char *pidfile, int *pidarg)
See if the pid file suggests we are already running or not.
Definition: misc.c:298
def main
Definition: cma.py:124