The Assimilation Project  based on Assimilation version 1.1.7.1474836767
packetlistener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent number tabstop=4 shiftwidth=4 expandtab colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Copyright (C) 2011, 2012, 2013 - Alan Robertson <alanr@unix.sh>
7 #
8 # The Assimilation software is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # The Assimilation software is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
20 #
21 
22 '''
23 This implements the PacketListener class - which listens to packets then
24 dispatches them.
25 '''
26 
27 import traceback, os
28 import time
29 import sys
30 from AssimCclasses import pyReliableUDP, pyPacketDecoder, pyNetAddr, pyCryptFrame
31 from AssimCtypes import CMAADDR, CONFIGNAME_CMAINIT
32 from frameinfo import FrameSetTypes
33 from cmadb import CMAdb
34 #try:
35  #gi.repository confuses pylint...
36  #pylint: disable=E0611
37  #from gi.repository import GLib as glib
38 #except ImportError:
39  #pylint: disable=F0401
40  #import gobject as glib
41 import assimglib as glib # We've replaced gi.repository and gobject with our own 'glib' module
42 
43 
44 callback_save = []
45 
46 # R0903 is too few public methods
47 #pylint: disable=R0903
48 class PacketListener(object):
49  'Listen for packets and get them dispatched as any good packet ought to be.'
50  PRIO_ZERO = 0
51  PRIO_ONE = 1
52  PRIO_TWO = 2
53  PRIO_THREE = 3
54 
55  DEFAULT_PRIO = PRIO_THREE
56  LOWEST_PRIO = PRIO_THREE
57 
58  prio_map = { # Our map of packet type priorities
59  FrameSetTypes.CONNSHUT: PRIO_ZERO, # High priority and cheap
60  FrameSetTypes.HBSHUTDOWN: PRIO_ZERO,
61  FrameSetTypes.HBDEAD: PRIO_ZERO,
62  FrameSetTypes.PING: PRIO_ZERO,
63  FrameSetTypes.PONG: PRIO_ZERO,
64  FrameSetTypes.RSCOPREPLY: PRIO_ZERO,
65 
66  FrameSetTypes.STARTUP: PRIO_ONE, # High priority but sometimes expensive
67 
68  FrameSetTypes.SWDISCOVER: PRIO_TWO, # Not priority, often expensive
69  FrameSetTypes.JSDISCOVERY: PRIO_TWO,
70 
71  FrameSetTypes.HBLATE: PRIO_THREE, # Not terribly important
72  FrameSetTypes.HBBACKALIVE: PRIO_THREE,
73  FrameSetTypes.HBMARTIAN: PRIO_THREE,
74  }
75 
76  unencrypted_fstypes = {
77  FrameSetTypes.STARTUP
78  }
79 
80  def __init__(self, config, dispatch, io=None, encryption_required=True):
81  self.config = config
82  self.encryption_required=encryption_required
83  if io is None:
84  self.io = pyReliableUDP(config, pyPacketDecoder())
85  else:
86  self.io = io
87  dispatch.setconfig(self.io, config)
88 
89  if not self.io.bindaddr(config[CONFIGNAME_CMAINIT]):
90  raise NameError('Cannot bind to address %s' % (str(config[CONFIGNAME_CMAINIT])))
91  if not self.io.mcastjoin(pyNetAddr(CMAADDR)):
92  CMAdb.log.warning('Failed to join multicast at %s' % CMAADDR)
93  self.io.setblockio(False)
94  #print "IO[socket=%d,maxpacket=%d] created." \
95  #% (self.io.fileno(), self.io.getmaxpktsize())
96  self.dispatcher = dispatch
97  self.iowatch = None
98  self.mainloop = glib.MainLoop()
99  #print >> sys.stderr, ('self.mainloop %s, self.mainloop.mainloop: %s'
100  # % (self.mainloop, self.mainloop.mainloop))
101  # W0612: unused variable j
102  # pylint: disable=W0612
103  self.prio_queues = [[] for j in range(PacketListener.LOWEST_PRIO+1)]
104  self.queue_addrs = {} # Indexed by IP addresses - which queue is this IP in?
105 
106  @staticmethod
107  def frameset_prio(frameset):
108  'Return the priority of a frameset'
109  fstype = frameset.get_framesettype()
110  return PacketListener.prio_map.get(fstype, PacketListener.DEFAULT_PRIO)
111 
112  def enqueue_frameset(self, frameset, fromaddr):
113  '''Enqueue (read in) a frameset to our frameset queue system
114  This queue system has a queue of frameset queues - one per priority level
115  Each frameset queue consists of three elements:
116  'addr' the IP address of the far-end
117  'Q' a queue of framesets from address 'addr'
118  'prio' the priority of the highest priority packet in the queue
119  Every frameset in a given frameset queue came from the same address...
120 
121  When we read in a new packet, we append it to the appropriate frameset
122  queue - creating it if need be. If the new packet raises the priority
123  of the queue, then we move that frameset queue to the appropriate priority queue
124 
125  We keep a separate hash table (queue_addrs) which associates frameset queues with
126  the corresponding IP addresses.
127  '''
128  prio = self.frameset_prio(frameset)
129  if fromaddr not in self.queue_addrs:
130  # Then we need to create a new frameset queue for it
131  queue = {'addr': fromaddr, 'Q': [frameset,], 'prio': prio}
132  self.queue_addrs[fromaddr] = queue
133  self.prio_queues[prio].append(queue)
134  else:
135  # The frameset queue exists. Append our frameset to the queue
136  queue = self.queue_addrs[fromaddr]
137  queue['Q'].append(frameset)
138  oldprio = queue['prio']
139  # Do we need to move the frameset queue to a different priority queue?
140  if prio < oldprio:
141  queue['prio'] = prio
142  self.prio_queues[oldprio].remove(queue)
143  self.prio_queues[prio].append(queue)
144 
146  '''Read a frameset from our frameset queue system in priority order
147  We read from the highest priority queues first, moving down the
148  priority scheme if there are no higher priority queues with packets to read.
149  '''
150  for prio_queue in self.prio_queues:
151  if len(prio_queue) == 0:
152  continue
153  frameset_queue = prio_queue.pop(0)
154  frameset = frameset_queue['Q'].pop(0)
155  fromaddr = frameset_queue['addr']
156  # Was that the last packet from this address?
157  if len(frameset_queue['Q']) > 0:
158  # Nope. We still have more to read.
159  newprio = min([self.frameset_prio(fs) for fs in frameset_queue['Q']])
160  # Appending the frameset queue to the end => fairness under load
161  self.prio_queues[newprio].append(frameset_queue)
162  frameset_queue['prio'] = newprio
163  else:
164  # Frameset queue is now empty
165  del self.queue_addrs[fromaddr]
166  if CMAdb.debug:
167  CMAdb.log.debug('dequeue_a_frameset: RETURNING (%s, %s)'
168  % (fromaddr, str(frameset)[:80]))
169  return fromaddr, frameset
170  return None, None
171 
172 
173  @staticmethod
175  '''Handle an unexpected exception.
176  Under us, the MessageDispatcher code will catch *most* exceptions.
177  But those that result from badly formatted/encrypted messages
178  that keep us from understanding or trusting the message
179  will get caught here.
180  '''
181  # Put everything useful into the logs in a legible way
182  trace = sys.exc_info()[2] # Ignore the etype and evalue from sys.exc_info
183  tblist = traceback.extract_tb(trace, 20)
184  CMAdb.log.critical('PacketListener exception [%s] occurred' % e)
185  CMAdb.log.info('======== Begin %s PacketListener Exception Traceback ========' % e)
186  for tb in tblist:
187  (filename, lineno, funcname, text) = tb
188  filename = os.path.basename(filename)
189  CMAdb.log.info('%s.%s:%s: %s'% (filename, lineno, funcname, text))
190  CMAdb.log.info('======== End %s PacketListener Exception Traceback ========' % (e))
191 
192  @staticmethod
193  def mainloop_callback(unusedsource, cb_condition, listener):
194  'Function to be called back by the Python Glib mainloop hooks'
195  #make pylint happy
196  unusedsource = unusedsource
197  if cb_condition == glib.IO_IN or cb_condition == glib.IO_PRI:
198  #print >> sys.stderr, ('Calling %s.listenonce' % listener), type(listener)
199  #listener.listenonce() ##OLD CODE
200  # W0703 == Too general exception catching...
201  # pylint: disable=W0703
202  try:
203  listener.queueanddispatch()
204  except Exception as e:
205  # Illegitimi non carborundum
206  PacketListener.process_pkt_exception(e)
207  # Just keep on keepin' on...
208  else:
209  if cb_condition == glib.IO_ERR:
210  cond = 'IO_ERR'
211  elif cb_condition == glib.IO_OUT:
212  cond = 'IO_OUT'
213  elif cb_condition == glib.IO_HUP:
214  cond = 'IO_HUP'
215  else:
216  cond = '(0x%08x??)' % (int(cb_condition))
217  CMAdb.log.warning('mainloop_callback: Received Unexpected I/O condition: %s '
218  % (cond))
219  CMAdb.log.warning('mainloop_callback: Called with (%s, %s, %s)'
220  % (unusedsource, cb_condition, listener))
221  return True
222  #print >> sys.stderr, 'RETURNING True'
223  return True
224 
225  def listen(self):
226  'Listen for packets. Get them dispatched.'
227  self.iowatch = glib.IOWatch(self.io.fileno(), glib.IO_IN | glib.IO_PRI
228  , PacketListener.mainloop_callback, self)
229  #print >> sys.stderr, 'listen: self.iowatch = %s' % str(self.iowatch)
230  #print >> sys.stderr, 'calling self.mainloop.run()'
231  self.mainloop.run()
232 
233  # Clean up before returning [if we ever do ;-)]
234  self.iowatch = None
235  self.mainloop = None
236 
237  def OLDlisten(self):
238  'Listen for packets. Get them dispatched.'
239  while True:
240  self.listenonce()
241  time.sleep(0.5)
242 
243  def listenonce(self):
244  'Process framesets received as a single packet'
245  while True:
246  (fromaddr, framesetlist) = self.io.recvframesets()
247  if fromaddr is None:
248  # Must have read an ACK or something...
249  return
250  else:
251  fromstr = repr(fromaddr)
252  if CMAdb.debug:
253  CMAdb.log.debug("listenonce: Received FrameSets from str([%s], [%s])" \
254  % (str(fromaddr), fromstr))
255  #print >> sys.stderr, ("Received FrameSet from str([%s], [%s])" \
256  #% (str(fromaddr), fromstr))
257 
258  for frameset in framesetlist:
259  if CMAdb.debug:
260  CMAdb.log.debug("listenonce: FrameSet Gotten ([%s]: [%s])" \
261  % (str(fromaddr), frameset))
262  self.dispatcher.dispatch(fromaddr, frameset)
263  def _read_all_available(self):
264  'Read All available framesets into our queue system'
265  while True:
266  (fromaddr, framesetlist) = self.io.recvframesets()
267  if fromaddr is None:
268  break
269  else:
270  fromstr = repr(fromaddr)
271  if CMAdb.debug:
272  CMAdb.log.debug("_read_all_available: Received FrameSet from str([%s], [%s])" \
273  % (str(fromaddr), fromstr))
274  #print >> sys.stderr, ("Received FrameSet from str([%s], [%s])" \
275  #% (str(fromaddr), fromstr))
276 
277  for frameset in framesetlist:
278  if CMAdb.debug:
279  CMAdb.log.debug("FrameSet Gotten ([%s]: [%s])" \
280  % (str(fromaddr), frameset))
281  self.enqueue_frameset(frameset, fromaddr)
282 
283  def queueanddispatch(self):
284  'Queue and dispatch all available framesets in priority order'
285  while True:
286  self._read_all_available()
287  fromaddr, frameset = self.dequeue_a_frameset()
288  if fromaddr is None:
289  return
290  fstype = frameset.get_framesettype()
291  key_id=frameset.sender_key_id()
292  if key_id is not None:
293  if CMAdb.debug:
294  CMAdb.log.debug('SETTING KEY(%s, %s) from fstype %s'
295  % (fromaddr, key_id, frameset.fstypestr()))
296  pyCryptFrame.dest_set_key_id(fromaddr, key_id)
297  elif (self.encryption_required and
298  fstype not in PacketListener.unencrypted_fstypes):
299  fsstr = str(frameset)
300  if len(fsstr) > 100:
301  fsstr = fsstr[0:90] + '...'
302  raise ValueError('Unencrypted %s frameset received from %s: frameset is %s'
303  % (frameset.fstypestr(), fromaddr, fsstr))
304  self.dispatcher.dispatch(fromaddr, frameset)
def __init__(self, config, dispatch, io=None, encryption_required=True)
def enqueue_frameset(self, frameset, fromaddr)
def mainloop_callback(unusedsource, cb_condition, listener)