The Assimilation Project  based on Assimilation version 1.1.7.1474836767
messagedispatcher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 '''
22 This is the overall message dispatcher - it receives incoming messages as they arrive
23 then call dispatch it so it will get handled.
24 '''
25 
26 import os, sys, traceback
27 import gc
28 from datetime import datetime
29 from cmadb import CMAdb
30 from transaction import Transaction
31 from dispatchtarget import DispatchTarget
32 from frameinfo import FrameSetTypes
33 from AssimCtypes import proj_class_live_object_count, proj_class_max_object_count
34 from AssimCclasses import pyAssimObj, dump_c_objects
35 
36 class MessageDispatcher(object):
37  'We dispatch incoming messages where they need to go.'
38  def __init__(self, dispatchtable, logtimes=False, encryption_required=True):
39  'Constructor for MessageDispatcher - requires a dispatch table as a parameter'
40  self.dispatchtable = dispatchtable
41  self.default = DispatchTarget()
42  self.io = None
43  self.dispatchcount = 0
44  self.logtimes = logtimes or CMAdb.debug
45  self.encryption_required = encryption_required
46 
47  def dispatch(self, origaddr, frameset):
48  'Dispatch a Frameset where it will get handled.'
49  self.dispatchcount += 1
50  CMAdb.transaction = Transaction(encryption_required=self.encryption_required)
51  # W0703 == Too general exception catching...
52  # pylint: disable=W0703
53  try:
54  self._try_dispatch_action(origaddr, frameset)
55  if (self.dispatchcount % 100) == 1:
56  self._check_memory_usage()
57  except Exception as e:
58  self._process_exception(e, origaddr, frameset)
59  # We want to ack the packet even in the failed case - retries are unlikely to help
60  # and we need to avoid getting stuck in a loop retrying it forever...
61  if CMAdb.debug:
62  fstypename = FrameSetTypes.get(frameset.get_framesettype())[0]
63  CMAdb.log.debug('MessageDispatcher - ACKing %s message from %s'
64  % (fstypename, origaddr))
65  self.io.ackmessage(origaddr, frameset)
66 
67  # [R0912:MessageDispatcher._try_dispatch_action] Too many branches (13/12)
68  # pylint: disable=R0912
69  def _try_dispatch_action(self, origaddr, frameset):
70  '''Core code to actually dispatch the Frameset.
71  It should be run inside a try/except construct so that anything
72  we barf up won't cause the CMA to die.
73  '''
74  fstype = frameset.get_framesettype()
75  #print >>sys.stderr, 'Got frameset of type %s [%s]' % (fstype, frameset)
76  dispatchstart = datetime.now()
77  if fstype in self.dispatchtable:
78  self.dispatchtable[fstype].dispatch(origaddr, frameset)
79  else:
80  self.default.dispatch(origaddr, frameset)
81  dispatchend = datetime.now()
82  if self.logtimes:
83  CMAdb.log.info('Initial dispatch time for %s frameset: %s'
84  % (fstype, dispatchend-dispatchstart))
85  # Commit the network transaction here
86  CMAdb.transaction.commit_trans(CMAdb.io)
87  if self.logtimes:
88  CMAdb.log.info('Network transaction time: %s'
89  % (str(CMAdb.transaction.stats['lastcommit'])))
90 
91  # Commit the database transaction here
92  if CMAdb.store.transaction_pending:
93  result = CMAdb.store.commit()
94  if self.logtimes or CMAdb.debug:
95  CMAdb.log.info('Neo4j transaction time: %s'
96  % (str(CMAdb.store.stats['lastcommit'])))
97  if CMAdb.debug:
98  resultlines = str(result).splitlines()
99  CMAdb.log.debug('Commit results follow:')
100  for line in resultlines:
101  CMAdb.log.debug(line.expandtabs())
102  CMAdb.log.debug('end of commit results.')
103  # This is a VERY expensive call...
104  # Good thing we only do it when debug is enabled...
105  CMAdb.TheOneRing.AUDIT()
106  else:
107  if CMAdb.debug:
108  CMAdb.log.debug('No database changes this time')
109  CMAdb.store.abort()
110  for pkttype in CMAdb.transaction.post_transaction_packets:
111  CMAdb.transaction.add_packet(origaddr, pkttype, [])
112  if len(CMAdb.transaction.post_transaction_packets) > 0:
113  CMAdb.transaction.commit_trans(CMAdb.io)
114  CMAdb.post_transaction_packets = []
115  dispatchend = datetime.now()
116  if self.logtimes or CMAdb.debug:
117  CMAdb.log.info('Total dispatch time for %s frameset: %s'
118  % (fstype, dispatchend-dispatchstart))
119 
120  @staticmethod
121  def _process_exception(e, origaddr, frameset):
122  'Handle an exception from our message dispatcher'
123  # Darn! Got an exception - let's try and put everything useful into the
124  # logs in a legible way
125  trace = sys.exc_info()[2]
126  # we ignore the etype and evalue returns from sys.exc_info
127  tblist = traceback.extract_tb(trace, 20)
128  fstype = frameset.get_framesettype()
129  fstypename = FrameSetTypes.get(fstype)[0]
130 
131  print >> sys.stderr, ('MessageDispatcher exception [%s] occurred' % (e))
132  CMAdb.log.critical('MessageDispatcher exception [%s] occurred while'
133  ' handling [%s] FrameSet from %s' % (e, fstypename, origaddr))
134  lines = str(frameset).splitlines()
135  CMAdb.log.info('FrameSet Contents follows (%d lines):' % len(lines))
136  for line in lines:
137  CMAdb.log.info(line.expandtabs())
138  CMAdb.log.info('======== Begin %s Message %s Exception Traceback ========'
139  % (fstypename, e))
140  for tb in tblist:
141  (filename, line, funcname, text) = tb
142  filename = os.path.basename(filename)
143  CMAdb.log.info('%s.%s:%s: %s'% (filename, line, funcname, text))
144  CMAdb.log.info('======== End %s Message %s Exception Traceback ========'
145  % (fstypename, e))
146  if CMAdb.store is not None:
147  CMAdb.log.critical("Aborting Neo4j transaction %s" % CMAdb.store)
148  CMAdb.store.abort()
149  if CMAdb.transaction is not None:
150  CMAdb.log.critical("Aborting network transaction %s" % CMAdb.transaction.tree)
151  CMAdb.transaction = None
152 
153 
154  @staticmethod
155  def _check_memory_usage():
156  'Check to see if we have too many objects outstanding right now'
157  gccount = gc.get_count()
158  gctotal = 0
159  for elem in gccount:
160  gctotal += elem
161  CMAdb.log.info('Total allocated Objects: %s. gc levels: %s'
162  % (gctotal, str(gccount)))
163  cobjcount = proj_class_live_object_count()
164  CMAdb.log.info('Total/max allocated C-Objects: %s/%s'
165  % (cobjcount, proj_class_max_object_count()))
166  if gctotal < 20 and cobjcount > 5000:
168 
169  if CMAdb.debug:
170  # Another very expensive set of debug-only calls
171  assimcount = 0
172  for obj in gc.get_objects():
173  if isinstance(obj, (pyAssimObj)):
174  assimcount += 1
175  CMAdb.log.info('Total allocated C-Objects: %s' % assimcount)
176 
177  def setconfig(self, io, config):
178  'Save our configuration away. We need it before we can do anything.'
179  self.io = io
180  self.default.setconfig(io, config)
181  for msgtype in self.dispatchtable.keys():
182  self.dispatchtable[msgtype].setconfig(io, config)
183 
guint32 proj_class_max_object_count(void)
Return the maximum number of live C class objects that we&#39;ve ever had.
Definition: proj_classes.c:426
def _try_dispatch_action(self, origaddr, frameset)
def _process_exception(e, origaddr, frameset)
guint32 proj_class_live_object_count(void)
Return the count of live C class objects.
Definition: proj_classes.c:406
def __init__(self, dispatchtable, logtimes=False, encryption_required=True)
def dispatch(self, origaddr, frameset)