The Assimilation Project  based on Assimilation version 1.1.7.1474836767
transaction.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Author: Alan Robertson <alanr@unix.sh>
7 # Copyright (C) 2013 - Assimilation Systems Limited
8 #
9 # Free support is available from the Assimilation Project community - http://assimproj.org/
10 # Paid support is available from Assimilation Systems Limited - http://assimilationsystems.com
11 #
12 # The Assimilation software is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
16 #
17 # The Assimilation software is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
24 #
25 #
26 '''
27 This file implements the transaction class - a class which encapsulates a description of a database
28 transaction and a corresponding set of network operations on nanoprobes. It is these two things
29 which constitute the transaction. These transactions are idempotent - that is, they describe
30 enough of the update that they can be executed multiple times in a row without any harm to the
31 nanoprobe configuration or the data in the database.
32 
33 The be-all-and-end-all of these transactions is JSON - that is, the transactions are described in
34 terms of JSON and they are ultimately expressed and persisted as JSON before being committed.
35 
36 How they are persisted is something which has generated a little controversy in the project.
37 Purists say (quite rightly) that persisting transactions is what messaging systems are designed
38 for. More pragmatic people don't want to bring in a large and complex messaging system for what
39  is a relatively simple job. Personally, I agree with both of them.
40 
41 So, initially, we will persist the transactions just to flat files. If we need messaging for
42 (horizontal) scaling, or other features of the messaging system, then we will switch to a messaging
43 system.
44 
45 In either case, this class won't be directly affected - since it only stores and executes
46 transactions - it does not worry about how they ought to be persisted.
47 '''
48 import sys
49 from datetime import datetime, timedelta
50 from AssimCclasses import pyNetAddr, pyConfigContext, pyFrameSet, pyIntFrame, pyCstringFrame, \
51  pyIpPortFrame, pyCryptFrame
52 from frameinfo import FrameSetTypes, FrameTypes
53 from assimjson import JSONtree
54 
55 class Transaction(object):
56  '''This class implements database/nanoprobe transactions.
57 
58  The nanoprobe portions of the transaction support the following operations:
59  Start sending heartbeats
60  Stop sending heartbeats
61  Start listening for heartbeats
62  Stop listening for heartbeats
63  Start sending/receiving heartbeats
64  Stop sending/receiving heartbeats
65  Start a monitoring action
66  Stop a monitoring action
67  Start a discovery action
68  Stop a discovery action
69 
70  The database portions of the transaction support the following operations:
71  Insert a node possibly including a subtree to be inserted
72  Replace a node possibly including owned subtrees to be replaced
73  Delete a node and possible owned subtrees
74 
75  The semantics of the database updates are worth describing in further detail
76  An "owned" subtree means that the node in question has ownership of all nodes
77  related to it by the given relationship types, and if the node is deleted
78  then all the things it owns should also be deleted
79 
80  If it is replaced, then all of its nodes related to it by the given
81  relationship types should be replaced by the given nodes described in
82  the transaction - and no other nodes related to it by these relationship
83  types should exist. If they do, they need to be deleted.
84 
85  Is this too complex? Should the originator of the request be responsible for knowing what
86  nodes he needs to delete?
87  If this is eventually going to be executed by a plugin into the database engine,
88  then the more work you leave to the executor of the transaction, the faster this
89  code will run. On the other hand, it delays the mess until transaction execution
90  time and makes the code for creating the transaction simpler...
91  I'm gonna opt for the complexity in processing of the transaction rather than
92  complexity in all the places where people might add things to the transaction.
93 
94 
95  @NOTE AND WARNING:
96  Transactions need to be somehow repeatable... This means if this transaction
97  was committed, we need to <i>not</i> repeat it - or make sure it's idempotent.
98  Neither of those is true at the moment.
99  '''
100 
101  def __init__(self, encryption_required=False):
102  'Constructor for a combined database/network transaction.'
103  self.tree = {'packets': []} # 'tree' cannot be pyConfigContext: we append to its array
104  self.namespace = {}
105  self.created = []
106  self.sequence = None
107  self.stats = {'lastcommit': timedelta(0), 'totaltime': timedelta(0)}
108  self.encryption_required = encryption_required
110 
111  def __str__(self):
112  'Convert our internal tree to JSON.'
113  return str(JSONtree(self.tree))
114 
115 ###################################################################################################
116 #
117 # This collection of member functions accumulate work to be done for our Transaction
118 #
119 ###################################################################################################
120 
121  def add_packet(self, destaddr, action, frames, frametype=None):
122  '''Append a packet to the ConfigContext object for this transaction.
123  In effect, this queues the packet for sending when this transaction is committed.
124 
125  Parameters
126  ----------
127  destaddr : pyNetAddr
128  The address to send this packet to - the address that will get this packet
129  action : int
130  What action to ask the destaddr to perform on our behalf
131  frames : [several possible types]
132  A list of frames or (optionally) frame values
133  frametype: [int], optional
134  If present, it is the frame type for all the frame <i>values</i> in the 'frames' array
135  '''
136 
137  # Note that we don't do this as a ConfigContext - it doesn't support modifying arrays.
138  # On the other hand, our JSON converts nicely into a ConfigContext - because it converts
139  # arrays correctly from JSON
140  #print >> sys.stderr, 'ADDING THESE FRAMES: %s' % str(frames)
141 
142  if self.encryption_required and pyCryptFrame.get_dest_identity(destaddr) is None:
143  raise ValueError('Destaddr %s has no identity: key id is %s'
144  % (destaddr, pyCryptFrame.get_dest_key_id(destaddr)))
145 
146  # Allow 'frames' to be a single frame
147  if not isinstance(frames, list) and not isinstance(frames, tuple):
148  frames = [frames]
149  # Allow 'frames' to be a list of frame <i>values</i> - if they're all the same frametype
150  if frametype is not None:
151  newframes = []
152  for thing in frames:
153  if thing is not None:
154  newframes.append({'frametype': frametype, 'framevalue': thing})
155  frames = newframes
156  self.tree['packets'].append({'action': int(action), 'destaddr': destaddr, 'frames': frames})
157 
158 
159 ###################################################################################################
160 #
161 # Code from here to the end has to do with committing our transactions...
162 #
163 ###################################################################################################
164 
165  def _commit_network_trans(self, io):
166  '''
167  Commit the network portion of our transaction - that is, send the packets!
168  One interesting thing - we should probably not consider this transaction fully
169  completed until we decide each destination is dead, or until its packets are all ACKed.
170 
171  @TODO: We don't yet cover with CMA crashing before all packets are received versus sent --
172  That is, if they get lost between sending by the CMA and receiving by the nanoprobes.
173  This argues for doing the network portion of the transaction first - presuming we do the
174  db and network portions sequentially -- Of course, no transaction can start until
175  the previous one is finished.
176  '''
177  #print >> sys.stderr, "PACKET JSON IS >>>%s<<<" % self.tree['packets']
178  #print >> sys.stderr, 'COMMITTING THESE FRAMES: %s' % str(self.tree['packets'])
179  # pylint is confused here - self.tree['packets'] _is_ very much iterable...
180  # pylint: disable=E1133
181  for packet in self.tree['packets']:
182  dest = packet['destaddr']
183  fs = pyFrameSet(packet['action'])
184  if packet['action'] == FrameSetTypes.STARTUP:
185  raise ValueError('Packet is a STARTUP packet %s to %s' % (str(packet), dest))
186  #from cmadb import CMAdb
187  #CMAdb.log.info('SENDING PACKET: %s' % str(packet))
188  for frame in packet['frames']:
189  ftype = frame['frametype']
190  fvalue = frame['framevalue']
191  # The number of cases below will have to grow over time.
192  # but this code is pretty simple so far...
193 
194  if ftype == FrameTypes.IPPORT:
195  if isinstance(fvalue, str) or isinstance(fvalue, unicode):
196  fvalue = pyNetAddr(fvalue)
197  aframe = pyIpPortFrame(ftype, fvalue)
198  fs.append(aframe)
199 
200  elif ftype == FrameTypes.DISCNAME or ftype == FrameTypes.DISCJSON \
201  or ftype == FrameTypes.CONFIGJSON or ftype == FrameTypes.RSCJSON:
202  sframe = pyCstringFrame(ftype)
203  sframe.setvalue(str(fvalue))
204  fs.append(sframe)
205 
206  elif ftype == FrameTypes.DISCINTERVAL:
207  nframe = pyIntFrame(ftype, intbytes=4, initval=int(fvalue))
208  fs.append(nframe)
209  else:
210  raise ValueError('Unrecognized frame type [%s]: %s' % (ftype, frame))
211  # In theory we could optimize multiple FrameSets in a row being sent to the
212  # same address, but we can always do that later...
213  io.sendreliablefs(dest, (fs, ))
214 
215  def commit_trans(self, io):
216  'Commit our transaction'
217  # This is just to test that our tree serializes successfully - before we
218  # persist it on disk later. Once we're doing that, this will be
219  # unnecessary...
220  #print >> sys.stderr, "HERE IS OUR TREE:"
221  #print >> sys.stderr, str(self)
222  #print >> sys.stderr, "CONVERTING BACK TO TREE"
223  self.tree = pyConfigContext(str(self))
224  if len(self.tree['packets']) > 0:
225  start = datetime.now()
226  self._commit_network_trans(io)
227  end = datetime.now()
228  diff = end - start
229  self.stats['lastcommit'] = diff
230  self.stats['totaltime'] += diff
231  else:
232  self.stats['lastcommit'] = timedelta(0)
233  self.abort_trans()
234 
235  def abort_trans(self):
236  'Forget everything about this transaction.'
237  self.tree = {'packets': []}
238  self.namespace = {}
239 
240 if __name__ == '__main__':
241 
242  def testme():
243  'This is a string'
244  from AssimCtypes import CONFIGNAME_OUTSIG
245  from AssimCclasses import pyReliableUDP, pyPacketDecoder, pySignFrame
246 
247  config = pyConfigContext(init={CONFIGNAME_OUTSIG: pySignFrame(1)})
248  io = pyReliableUDP(config, pyPacketDecoder())
249  trans = Transaction(encryption_required=False)
250  destaddr = pyNetAddr('10.10.10.1:1984')
251  addresses = (pyNetAddr('10.10.10.5:1984'), pyNetAddr('10.10.10.6:1984'))
252 
253  trans.add_packet(destaddr, FrameSetTypes.SENDEXPECTHB
254  , addresses, frametype=FrameTypes.IPPORT)
255  assert len(trans.tree['packets']) == 1
256 
257  trans.add_packet(pyNetAddr('10.10.10.1:1984')
258  , FrameSetTypes.SENDEXPECTHB
259  , (pyNetAddr('10.10.10.5:1984')
260  , pyNetAddr('10.10.10.6:1984'))
261  , frametype=FrameTypes.IPPORT)
262  assert len(trans.tree['packets']) == 2
263 
264  print >> sys.stderr, 'JSON: %s\n' % str(trans)
265  print >> sys.stderr, 'JSON: %s\n' % str(pyConfigContext(str(trans)))
266  trans.commit_trans(io)
267  assert len(trans.tree['packets']) == 0
268  testme()
def _commit_network_trans(self, io)
Code from here to the end has to do with committing our transactions...
Definition: transaction.py:165
def add_packet(self, destaddr, action, frames, frametype=None)
This collection of member functions accumulate work to be done for our Transaction.
Definition: transaction.py:121
def __init__(self, encryption_required=False)
Definition: transaction.py:101