The Assimilation Project  based on Assimilation version 1.1.7.1474836767
assimeventobserver.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Author: Alan Robertson <alanr@unix.sh>
7 # Copyright (C) 2014 - Assimilation Systems Limited
8 #
9 # Free support is available from the Assimilation Project community - http://assimproj.org
10 # Paid support is available from Assimilation Systems Limited - http://assimilationsystems.com
11 #
12 # The Assimilation software is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
16 #
17 # The Assimilation software is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
24 #
25 #
26 '''
27 This module implements observer classes associated with Events in the Assimilation Project.
28 The base class of these various classes is the abstract class AssimEventObserver.
29 '''
30 
31 import os, signal, subprocess, sys, fcntl, tempfile
32 from AssimCtypes import NOTIFICATION_SCRIPT_DIR, setpipebuf
33 from AssimCclasses import pyConfigContext, pyNetAddr
34 from assimevent import AssimEvent
35 from assimjson import JSONtree
36 
37 DEBUG = True
38 DEBUG = False
39 
40 #R0903: 35,0:AssimEventObserver: Too few public methods (1/2)
41 # pylint: disable=R0903
42 class AssimEventObserver(object):
43  '''This class is an abstract base class which is all about observing AssimEvents.
44  Our subclasses presumably know what to do with these events.
45  '''
46 
47  def __init__(self, constraints):
48  '''Initializer for AssimEventObserver class.
49 
50  Parameters:
51  -----------
52  constraints: dict-like *or* callable() returning bool
53  A dict describing our desired events. The constraints in the dict are
54  effectively ANDed together. Each key is an attribute name in either
55  the event itself or its associated object. The value associated with
56  each attribute is either a list or a scalar value or something
57  implementing __contains__. A scalar value implies that it *must* have exactly
58  that value, otherwise it must be *in* the associated list/tuple/etc.
59 
60  This should be able to constrain the type of event we're looking at, the
61  type of event-object we're looking at, and the domain of the event-object -
62  and lots of other potentially useful things.
63 
64  See the "is_interesting" method below for implementation details...
65 
66  If 'constraints' is a callable (that is, callable(constraints) is True), then
67  we will just call constraints(event) to see if the event is interesting to
68  this observer. Whatever 'constraints' returns will be interpreted in a
69  boolean context - so returning a bool would be a good idea...
70  '''
71  self.constraints = constraints
72  AssimEvent.registerobserver(self)
73 
74  def notifynewevent(self, event):
75  '''We get called when a new AssimEvent has occured that we might want to observe.
76  But we are an abstract base class so we error out with NotImplementedError every time!
77  '''
78  raise NotImplementedError('AssimEventObserver is an abstract base class')
79 
80  # [R0912:AssimEventObserver.is_interesting] Too many branches (13/12)
81  # This is triggered largely by the if DEBUG statements...
82  # pylint: disable=R0912
83  def is_interesting(self, event):
84  '''Return True if the given event conforms to our constraints. That is, would it
85  be interesting to our observers?
86 
87  Parameters:
88  -----------
89  event: AssimEvent
90  The event we're evaluating to see if our listeners want to hear about it.
91  '''
92  if DEBUG:
93  print >> sys.stderr, 'is_interesting(%s, %s)?' % (self.constraints, event.eventtype)
94  if self.constraints is None:
95  return True
96  if callable(self.constraints):
97  return self.constraints(event)
98  for attr in self.constraints:
99  value = AssimEventObserver.getvalue(event, attr)
100  if DEBUG:
101  print >>sys.stderr, 'VALUE of attr %s is %s' % (attr, value)
102  if value is None:
103  # @FIXME: Is this the right treatment of no-such-value (None)?
104  continue
105  constraint = self.constraints[attr]
106  if DEBUG:
107  print >>sys.stderr, 'CONSTRAINT is %s' % constraint
108  if hasattr(constraint, '__contains__'):
109  if value not in constraint:
110  if DEBUG:
111  print >> sys.stderr, 'Event is not interesting(1)', value, constraint
112  return False
113  elif value != constraint:
114  if DEBUG:
115  print >> sys.stderr, 'Event is not interesting(2)', value, constraint
116  return False
117  if DEBUG:
118  print >> sys.stderr, 'Event %s IS interesting' % event.eventtype
119  return True
120 
121  @staticmethod
122  def getvalue(event, attr):
123  'Helper function to return a the value of a constraint expression'
124  value = None
125  if hasattr(event, attr):
126  value = getattr(event, attr)
127  elif hasattr(event.associatedobject, attr):
128  value = getattr(event.associatedobject, attr)
129  else:
130  try:
131  value = event.associatedobject.get(attr)
132  except AttributeError:
133  if hasattr(event.associatedobject, attr):
134  value = getattr(event.associatedobject, attr)
135  return value
136 
137 
139  '''Objects in this class send JSON messages to a FIFO when events they are interested in
140  are observed. Each message encapsulates a single event, and is followed by a single
141  NUL (zero) byte. If the len(JSON) is 100, then 101 bytes are written to the
142  FIFO, with the last being a single NUL byte (as noted in the previous sentence).
143  '''
144 
145  NULstr = chr(0) # Will this work in python 3?
146 
147  def __init__(self, FIFOwritefd, constraints=None, maxerrcount=None):
148  '''Initializer for FIFO EventObserver class.
149 
150  Parameters:
151  -----------
152  FIFOwritefd: int
153  a UNIX file descriptor pointing to the FIFO where event observers are listening...
154  '''
155  self.FIFOwritefd = FIFOwritefd
156  self.constraints = constraints
157  self.errcount = 0
158  self.maxerrcount = maxerrcount
159  # We want a big buffer in the FIFO between us and our clients - they might be slow
160  # 4 MB ought to be plenty. Most events are only a few hundred bytes...
161  pipebufsize = setpipebuf(FIFOwritefd, 4096*1024)
162  if pipebufsize < (1024*1024):
163  pipebufsize = setpipebuf(FIFOwritefd, 1024*1024)
164  # Complain if we don't have at least 1 MB
165  if pipebufsize < 1024*1024:
166  print ('WARNING: pipe buffer size is only %s bytes' % pipebufsize)
167  self.pipebufsize = pipebufsize
168  # We don't want to hang around if we can't send out an event
169  if hasattr(os, 'O_NDELAY'):
170  fcntl.fcntl(FIFOwritefd, fcntl.F_SETFL, os.O_NDELAY)
171  elif hasattr(os, 'FNDELAY'):
172  # Using getattr avoids a pylint complaint...
173  fcntl.fcntl(FIFOwritefd, fcntl.F_SETFL, getattr(os, 'FNDELAY'))
174  AssimEventObserver.__init__(self, constraints)
175 
176  def notifynewevent(self, event):
177  '''We get called when a new AssimEvent has occured that we might want to observe.
178  When we get the call, we write a NUL-terminated JSON blob to our FIFO file descriptor
179  '''
180  # @TODO add the host name that's reporting the problem if it's a monitor action
181  # We have the address the report came from, but it's an IP address, not a host name
182  if not self.is_interesting(event):
183  return
184 
185  json = str(JSONtree(event))
186  jsonlen = len(json)
187  json += FIFOEventObserver.NULstr
188  try:
189  if DEBUG:
190  print >> sys.stderr, '*************SENDING EVENT (%d bytes)' % (jsonlen+1)
191  os.write(self.FIFOwritefd, json)
192  self.errcount = 0
193  if DEBUG:
194  print >> sys.stderr, '*************EVENT SENT (%d bytes)' % (jsonlen+1)
195  except OSError, e:
196  if DEBUG:
197  print >> sys.stderr, '+++++++++++++++++EVENT FIFO write error: %s' % str(e)
198  self.errcount += 1
199  self.ioerror(event)
200 
201  def ioerror(self, _unusedevent):
202  '''This function gets called when we get an I/O error writing to the FIFO.
203  This is likely an EPIPE (broken pipe) error.
204  '''
205  if self.maxerrcount is not None and self.errcount > self.maxerrcount:
206  AssimEvent.unregisterobserver(self)
207 
209  '''Objects in this class execute scripts when events they are interested in
210  are observed. Note that these events come to us through a pipe
211  that we create, but is written to by our base class FIFOEventObserver...
212  '''
213  def __init__(self, constraints=None, scriptdir=None):
214  '''Initializer for ForkExecObserver class.
215 
216  Parameters:
217  -----------
218  constraints: dict
219  Same as AssimEventObserver's constraints parameter.
220  scriptdir: str
221  The directory where our scripts are found. We execute them all whenever an
222  event of the selected type occurs.
223  '''
224  if scriptdir is None:
225  scriptdir = NOTIFICATION_SCRIPT_DIR
226  if not os.path.isdir(scriptdir):
227  raise ValueError('Script directory [%s] is not a directory' % scriptdir)
228  self.scriptdir = scriptdir
229  pipefds = os.pipe()
230  self.FIFOreadfd = pipefds[0]
231  FIFOEventObserver.__init__(self, pipefds[1], constraints)
232  self.childpid = os.fork()
233  if self.childpid == 0:
234  #print >> sys.stderr, ('Child EVENT observer dispatching from %s' % scriptdir)
235  self.listenforevents()
236  else:
237  os.close(self.FIFOreadfd)
238  self.FIFOreadfd = -1
239  #print >> sys.stderr, ('Fork/Event Parent observer dispatching from %s' % scriptdir)
240 
241  def ioerror(self, event):
242  '''Re-initialize (respawn) our child in response to an I/O error'''
243 
244  if DEBUG:
245  print >> sys.stderr, '**********Reinitializing child EVENT process'
246  if self.childpid > 0:
247  os.kill(self.childpid, signal.SIGKILL)
248  self.childpid = 0
249  if self.FIFOwritefd >= 0:
250  os.close(self.FIFOwritefd)
251  self.FIFOwritefd = -1
252  self.__init__(self.constraints, self.scriptdir)
253 
254  if self.errcount < 2:
255  # Try to keep from losing this event
256  self.notifynewevent(event)
257  else:
258  print >> sys.stderr, 'Reinitialization of ForkExecObserver may have failed.'
259 
260  def __del__(self):
261  if self.childpid > 0:
262  os.close(self.FIFOwritefd)
263  os.kill(self.childpid, signal.SIGTERM)
264  self.childpid = 0
265 
266 
267  def listenforevents(self):
268  'Listen for JSON events terminated by a FIFOEventObserver.NULstr'
269  os.close(self.FIFOwritefd)
270  fcntl.fcntl(self.FIFOreadfd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
271  for fd in range(3, 1024):
272  try:
273  fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
274  except IOError:
275  pass
276  currentbuf = ''
277  while True:
278  try:
279  if DEBUG:
280  print >> sys.stderr, 'ISSUING EVENT READ...'
281  currentbuf += os.read(self.FIFOreadfd, 4096)
282  if DEBUG:
283  print >> sys.stderr, 'EVENT READ returned %d bytes' % (len(currentbuf))
284  if len(currentbuf) == 0:
285  # We don't want any kind of python cleanup going on here...
286  # so we access the 'protected' member _exit of os, and irritate pylint
287  # pylint: disable=W0212
288  os._exit(0)
289  while True:
290  if FIFOEventObserver.NULstr in currentbuf:
291  (currentbuf, additional) = currentbuf.split(FIFOEventObserver.NULstr, 1)
292  self.processJSONevent(currentbuf)
293  currentbuf = additional
294  else:
295  break
296  # W0703: catching too general exception Exception
297  # pylint: disable=W0703
298  except Exception as e:
299  print >> sys.stderr, ('ForkExecObserver Got exception in child process: %s'
300  % str(e))
301  currentbuf = ''
302  except KeyboardInterrupt as e:
303  sys.exit(0)
304 
305 
306  @staticmethod
307  def _JSONevent_env(eventobj):
308  'Create the environment for our child processes'
309  scalars = (str, unicode, int, float, long, bool, pyNetAddr)
310  aobj = eventobj['associatedobject']
311  env = {}
312  # Initialize the child environment with our current environment
313  for item in os.environ:
314  env[item] = os.environ[item]
315  # Add in things in 'extrainfo' (if any)
316  if 'extrainfo' in eventobj and eventobj['extrainfo'] is not None:
317  extrastuff = eventobj['extrainfo']
318  for extra in extrastuff.keys():
319  evextra = extrastuff[extra]
320  if isinstance(evextra, scalars):
321  env['ASSIM_%s' % extra] = str(evextra)
322  # Add all the scalars in the associated object
323  for attr in aobj.keys():
324  avalue = aobj[attr]
325  if isinstance(avalue, scalars):
326  env['ASSIM_%s' % attr] = str(avalue)
327  return env
328 
329  def processJSONevent(self, jsonstr):
330  'Process a single JSON event from our input stream'
331  eventobj = pyConfigContext(jsonstr)
332  aobjclass = eventobj['associatedobject']['nodetype']
333  eventtype = AssimEvent.eventtypenames[eventobj['eventtype']]
334  childenv = self._JSONevent_env(eventobj)
335 
336  # It's an event we want our scripts to know about...
337  # So, let them know!
338  if DEBUG:
339  print >> sys.stderr, 'TO RUN: %s' % (str(self.listscripts()))
340  # Put the full JSON in a temporary file, so our scripts can read it from stdin
341  jsontmpfile = tempfile.TemporaryFile()
342  jsontmpfile.write(str(eventobj))
343  jsontmpfile.seek(0)
344  for script in self.listscripts():
345  args = [script, eventtype, aobjclass]
346  if DEBUG:
347  print >> sys.stderr, 'STARTING EVENT SCRIPT: %s' % (str(args))
348  subprocess.call(args, env=childenv, stdin=jsontmpfile)
349  if DEBUG:
350  print >> sys.stderr, 'EVENT SCRIPT %s IS NOW DONE' % (str(args))
351 
352  def listscripts(self):
353  'Return the list of pathnames to execute when we get notified of an event'
354  retval = []
355  for script in os.listdir(self.scriptdir):
356  path = os.path.join(self.scriptdir, script)
357  if os.path.isfile(path) and os.access(path, os.X_OK):
358  retval.append(path)
359  retval.sort()
360  return retval
WINEXPORT gsize setpipebuf(int fd, gsize bufsize)
Set the buffer size of a pipe (if possible)
Definition: misc.c:717
def __init__(self, constraints=None, scriptdir=None)
def __init__(self, FIFOwritefd, constraints=None, maxerrcount=None)