The Assimilation Project  based on Assimilation version 1.1.7.1474836767
bestpractices.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # bestpractices - implementation of best practices evaluation
5 #
6 # This file is part of the Assimilation Project.
7 #
8 # Author: Alan Robertson <alanr@unix.sh>
9 # Copyright (C) 2015 - Assimilation Systems Limited
10 #
11 # Free support is available from the Assimilation Project community
12 # - http://assimproj.org
13 # Paid support is available from Assimilation Systems Limited
14 # - http://assimilationsystems.com
15 #
16 # The Assimilation software is free software: you can redistribute it and/or modify
17 # it under the terms of the GNU General Public License as published by
18 # the Free Software Foundation, either version 3 of the License, or
19 # (at your option) any later version.
20 #
21 # The Assimilation software is distributed in the hope that it will be useful,
22 # but WITHOUT ANY WARRANTY; without even the implied warranty of
23 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 # GNU General Public License for more details.
25 #
26 # You should have received a copy of the GNU General Public License
27 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
28 #
29 #
30 '''
31 This module defines some classes related to evaluating best practices based
32 on discovery information
33 '''
34 import os, logging, sys
35 from droneinfo import Drone
36 from consts import CMAconsts
37 from graphnodes import BPRules, BPRuleSet
38 from systemnode import SystemNode
39 from discoverylistener import DiscoveryListener
40 from graphnodeexpression import GraphNodeExpression, ExpressionContext
41 from AssimCclasses import pyConfigContext
42 from store import Store
43 from assimevent import AssimEvent
44 from assimeventobserver import AssimEventObserver
45 
46 class BestPractices(DiscoveryListener):
47  'Base class for evaluating changes against best practices'
48  prio = DiscoveryListener.PRI_OPTION
49  prio = DiscoveryListener.PRI_OPTION # What priority are we?
50  wantedpackets = [] # Used to register ourselves for discovery packets
51  eval_objects = {}
52  eval_classes = {}
53  evaled_classes = {}
54  application = None
55  discovery_name = None
56  application = 'os'
57  BASEURL = 'http://db.ITBestPractices.info:%d'
58 
59  def __init__(self, config, packetio, store, log, debug):
60  'Initialize our BestPractices object'
61  DiscoveryListener.__init__(self, config, packetio, store, log, debug)
62  if self.__class__ != BestPractices:
63  return
64  for pkttype in config['allbpdiscoverytypes']:
65  BestPractices.register_sensitivity(BestPracticesCMA, pkttype)
66  for pkttype in BestPractices.eval_classes:
67  if pkttype not in BestPractices.eval_objects:
68  BestPractices.eval_objects[pkttype] = []
69  if pkttype not in BestPractices.evaled_classes:
70  BestPractices.evaled_classes[pkttype] = {}
71 
72  for bpcls in BestPractices.eval_classes[pkttype]:
73  if bpcls not in BestPractices.evaled_classes[pkttype]:
74  BestPractices.eval_objects[pkttype] \
75  .append(bpcls(config, packetio, store, log, debug))
76  BestPractices.evaled_classes[pkttype][bpcls] = True
77 
78 
79  @staticmethod
80  def register(*pkttypes):
81  '''Register a BestPractices subclass interested in the given discovery types.
82  Return value: our decorator function'''
83  def decorator(cls):
84  '''Register our class with the packet types given to 'register' above.
85  Return value: Class that we registered.
86  '''
87  for pkttype in pkttypes:
88  BestPractices.register_sensitivity(cls, pkttype)
89  return cls
90  return decorator
91 
92  @staticmethod
93  def register_sensitivity(bpcls, pkttype):
94  "Register that class 'bpcls' wants to see packet of type 'pkttype'"
95  #print >> sys.stderr, '%s is looking for packet of type %s' % (bpcls, pkttype)
96  if pkttype not in BestPractices.wantedpackets:
97  BestPractices.wantedpackets.append(pkttype)
98  SystemNode.add_json_processor(BestPractices)
99  if pkttype not in BestPractices.eval_classes:
100  BestPractices.eval_classes[pkttype] = []
101  if bpcls not in BestPractices.eval_classes[pkttype]:
102  BestPractices.eval_classes[pkttype].append(bpcls)
103 
104  @staticmethod
105  def load_json(store, json, bp_class, rulesetname, basedon=None):
106  '''Load JSON for a single JSON ruleset into the database.'''
107  rules = store.load_or_create(BPRules, bp_class=bp_class, json=json,
108  rulesetname=rulesetname)
109  if basedon is None or not Store.is_abstract(rules):
110  return
111  parent = store.load(BPRules, bp_class=bp_class, rulesetname=basedon)
112  store.relate_new(rules, CMAconsts.REL_basis, parent,
113  properties={'bp_class': bp_class})
114  return rules
115 
116  @staticmethod
117  def load_from_file(store, filename, bp_class, rulesetname, basedon=None):
118  '''Load JSON from a single ruleset file into the database.'''
119  with open(filename, 'r') as jsonfile:
120  json = jsonfile.read()
121  return BestPractices.load_json(store, json, bp_class, rulesetname, basedon)
122 
123  @staticmethod
124  def load_directory(store, directoryname, rulesetname, basedon=None):
125  '''
126  Load all the rules in the 'directoryname' directory into our database
127  as 'rulesetname' and link them up as being based on the given rule
128  set name.
129 
130  If 'basedon' is not None, then we derive a set of basis ordering
131  which we use to compute the precedence of rule sets.
132 
133  For the moment, all rule sets must contain all the different rule sets
134  that their predecessor is based on. They can have empty rule sets if
135  there is nothing to override, but they have to all be there.
136  Dependent rule sets can have new rule sets not present in their basis,
137  but the reverse cannot be true.
138 
139  It's perfectly normal for a rule set to not contain all the rules that
140  a basis rule set specifies, which means they aren't overridden.
141 
142  It's also perfectly OK for a dependent rule set to have rules not
143  present in the basis rule set.
144  '''
145  store.load_or_create(BPRuleSet, rulesetname=rulesetname, basisrules=basedon)
146  files = os.listdir(directoryname)
147  files.sort()
148  for filename in files:
149  if filename.startswith('.'):
150  continue
151  path = os.path.join(directoryname, filename)
152  classname = filename.replace('.json', '')
153  yield BestPractices.load_from_file(store, path, classname, rulesetname, basedon)
154 
155  @staticmethod
156  def gen_bp_rules_by_ruleset(store, rulesetname):
157  '''Return generator providing all BP rules for the given ruleset
158  '''
159  return store.load_cypher_nodes(CMAconsts.QUERY_RULESET_RULES, BPRules
160  , params={'rulesetname': rulesetname})
161 
162 
163  def url(self, drone, ruleid, ruleobj, html=True, port=5000):
164  '''
165  Return the URL in the IT Best Practices project that goes with this
166  particular rule.
167 
168  Emily Ratliff <ejratl@gmail.com> defines the API this way:
169 
170  .../v1.0/doquery?app=os&domain=security&class=posix
171  &os=linux&osname=redhat&release=6&tipname=nist_V-58901
172  '''
173  values={'app': ruleobj.get('application', 'os'),
174  'class': 'posix',
175  'domain': ruleobj['category'],
176  'tipname': ruleid
177  }
178  osinfo = drone.jsonval('os')
179  if osinfo is not None and 'data' in osinfo:
180  osdata = osinfo['data']
181  if 'kernel-name' in osdata:
182  values['os'] = osdata['kernel-name'].lower()
183  if 'Distributor ID' in osdata:
184  values['osname'] = osdata['Distributor ID'].lower()
185  if 'Release' in osdata:
186  values['release'] = osdata['Release'].lower()
187  names = values.keys()
188  names.sort()
189 
190  ret = 'itbp/v1.0/%s' % ('show' if html else 'showjson')
191  delim='?'
192  for name in names:
193  ret += '%s%s=%s' % (delim, name, values[name])
194  delim='&'
195  return '%s/%s' % ((self.BASEURL % port), ret)
196 
197  def processpkt(self, drone, srcaddr, jsonobj, discoverychanged):
198  '''Inform interested rule objects about this change'''
199  if not discoverychanged:
200  return
201  discovertype = jsonobj['discovertype']
202  discoverinstance = jsonobj['instance']
203  if discoverinstance in BestPractices.eval_objects:
204  #print 'MATCHING ON INSTANCE: %s: %s' % (discoverinstance, str(jsonobj))
205  self._processpkt_by_type(drone, srcaddr, discoverinstance, jsonobj)
206  elif discovertype in BestPractices.eval_objects:
207  #print 'MATCHING BY DISCOVERTYPE: %s: %s' % (discovertype, str(jsonobj))
208  self._processpkt_by_type(drone, srcaddr, discovertype, jsonobj)
209  else:
210  print >> sys.stderr, 'No BP rules for %s/%s' % (discovertype, discoverinstance)
211 
212  def _processpkt_by_type(self, drone, srcaddr, evaltype, jsonobj):
213  'process a discovery object against its set of rules'
214  #print >> sys.stderr, 'IN PROCESSPKT_BY_TYPE for %s: %s %s' % \
215  # (drone, evaltype, BestPractices.eval_objects[evaltype])
216  for rule_obj in BestPractices.eval_objects[evaltype]:
217  #print >> sys.stderr, 'Fetching %s rules for %s' % (evaltype, drone)
218  rulesobj = rule_obj.fetch_rules(drone, srcaddr, evaltype)
219  #print >> sys.stderr, 'RULES ARE:', rulesobj
220  statuses = pyConfigContext(rule_obj.evaluate(drone, srcaddr,
221  jsonobj, rulesobj, evaltype))
222  #print >> sys.stderr, 'RESULTS ARE:', statuses
223  self.log_rule_results(statuses, drone, srcaddr, jsonobj, evaltype, rulesobj)
224 
225  @staticmethod
226  def send_rule_event(oldstat, newstat, drone, ruleid, ruleobj, url):
227  ''' Newstat, ruleid, and ruleobj can never be None. '''
228  extrainfo = {'ruleid': ruleid, 'category': ruleobj[ruleid]['category'], 'url': url}
229  if oldstat is None:
230  if newstat == 'fail':
231  AssimEvent(drone, AssimEvent.OBJWARN, extrainfo=extrainfo)
232  elif oldstat == 'pass':
233  if newstat == 'fail':
234  AssimEvent(drone, AssimEvent.OBJWARN, extrainfo=extrainfo)
235  elif oldstat == 'fail':
236  if newstat == 'pass' or newstat == 'ignore' or newstat == 'NA':
237  AssimEvent(drone, AssimEvent.OBJUNWARN, extrainfo=extrainfo)
238  elif oldstat == 'ignore':
239  if newstat == 'fail':
240  AssimEvent(drone, AssimEvent.OBJWARN, extrainfo=extrainfo)
241  elif oldstat == 'NA':
242  if newstat == 'fail':
243  AssimEvent(drone, AssimEvent.OBJWARN, extrainfo=extrainfo)
244 
245  def basic_rule_score_algorithm(self, _drone, rule, status):
246  'A very basic best practice scoring algorithm'
247  if status != 'fail':
248  return 0.0
249  category = rule.get('category', 'security')
250  if category == 'comment':
251  return 0.0
252  severity = rule.get('severity', 'medium')
253  default_sevmap = {'high': 3.0, 'medium': 2.0, 'low': 1.0}
254  if 'score_severity_map' in self.config:
255  sevmap = self.config['score_severity_map'].get(category, default_sevmap)
256  else:
257  sevmap = default_sevmap
258  return sevmap.get(severity, sevmap['medium'])
259 
260  #R0914 -- too many local variables
261  #pylint: disable=R0914
262  def log_rule_results(self, results, drone, _srcaddr, discoveryobj, discovertype, rulesobj):
263  '''Log the results of this set of rule evaluations'''
264  status_name = Drone.bp_discoverytype_result_attrname(discovertype)
265  if hasattr(drone, status_name):
266  oldstats = pyConfigContext(getattr(drone, status_name))
267  else:
268  oldstats = pyConfigContext({'pass': [], 'fail': [], 'ignore': [],
269  'NA': [], 'score': 0.0})
270  for stat in ('pass', 'fail', 'ignore', 'NA'):
271  logmethod = self.log.info if stat == 'pass' else self.log.warning
272  for ruleid in results[stat]:
273  oldstat = None
274  for statold in ('pass', 'fail', 'ignore', 'NA'):
275  if ruleid in oldstats[statold]:
276  oldstat = statold
277  break
278  if oldstat == stat or stat == 'NA':
279  # No change
280  continue
281  url = self.url(drone, ruleid, rulesobj[ruleid])
282  BestPractices.send_rule_event(oldstat, stat, drone, ruleid, rulesobj, url)
283  thisrule = rulesobj[ruleid]
284  rulecategory = thisrule['category']
285  logmethod('%s %sED %s rule %s: %s [%s]' %
286  (drone, stat.upper(), rulecategory, ruleid, url, thisrule['rule']))
287  self.compute_score_updates(discoveryobj, drone, rulesobj, results, oldstats)
288  setattr(drone, status_name, str(results))
289 
290  def compute_scores(self, drone, rulesobj, statuses):
291  '''Compute the scores from this set of statuses - organized by category
292  We return the total score, scores organized by category
293  and the scoring detailed on a rule-by-rule basis.
294  '''
295  scores = {}
296  rulescores = {}
297  totalscore=0
298  if isinstance(statuses, (str, unicode)):
299  statuses = pyConfigContext(statuses)
300  for status in statuses:
301  if status == 'score':
302  continue
303  for ruleid in statuses[status]:
304  rule = rulesobj[ruleid]
305  rulecat = rule['category']
306  rulescore = self.basic_rule_score_algorithm(drone, rule, status)
307  if rulecat not in rulescores:
308  rulescores[rulecat] = {}
309  rulescores[rulecat][ruleid] = rulescore
310  totalscore += rulescore
311  if rulecat not in scores:
312  scores[rulecat] = 0.0
313  scores[rulecat] += rulescore
314  return totalscore, scores, rulescores
315 
316  #pylint disable=R0914 -- too many local variables
317  #pylint: disable=R0914
318  def compute_score_updates(self, discovery_json, drone, rulesobj, newstats, oldstats):
319  '''We compute the score updates for the rules and results we've been given.
320  The drone is a Drone (or host), the 'rulesobj' contains the rules and their categories.
321  Statuses contains the results of evaluating the rules.
322  Our job is to compute the scores for each of the categories of rules in the
323  statuses, issue events for score changes, and update the category scores in the host.
324 
325  We're storing the successes, failures, etc, for this discovery object for this drone.
326 
327  Note that this can fail if we change our algorithm - because we don't know the values
328  the old algorithm gave us, only what the current algorithm gives us on the old results.
329 
330  @TODO: We eventually want to update the scores for the domain to which this drone
331  belongs.
332 
333 
334  '''
335  _, oldcatscores, _ = self.compute_scores(drone, rulesobj, oldstats)
336  _, newcatscores, _ = self.compute_scores(drone, rulesobj, newstats)
337  keys = set(newcatscores)
338  keys |= set(oldcatscores)
339  # I have no idea why "keys = set(newcatscores) | set(oldcatscores)" did not work...
340  # It worked fine in an interactive python session...
341 
342  diffs = {}
343 
344  for category in keys:
345  newscore = newcatscores.get(category, 0.0)
346  oldscore = oldcatscores.get(category, 0.0)
347  catattr = Drone.bp_category_score_attrname(category)
348  # I just compare two floating point numbers without a lot of formality.
349  # This should be OK because they're both computed by the same algorithm
350  # And at this level algorithms mostly produce integers
351  # This is not a numerical analysis problem ;-)
352  if newscore != oldscore:
353  diff = newscore - oldscore
354  if category in diffs:
355  diffs[category] += diff
356  else:
357  diffs[category] = diff
358  eventtype = AssimEvent.OBJWARN if newscore > oldscore else AssimEvent.OBJUNWARN
359  extrainfo = {'category': category,
360  'oldscore': str(oldscore),
361  'newscore': str(newscore),
362  'discovery_type': discovery_json['discovertype'],
363  'discovery_description': discovery_json['description']
364  }
365  # POTENTIALCONCURRENCY
366  # As long as no one else is updating this attribute for this drone
367  # we shouldn't have concurrency problems.
368  oldval = getattr(drone, catattr) if hasattr(drone, catattr) else 0.0
369  setattr(drone, catattr, oldval + diff)
370  print >> sys.stderr, 'Setting %s.%s to %d' % (drone, catattr, oldval+diff)
371  AssimEvent(drone, eventtype, extrainfo=extrainfo)
372  return newcatscores, diffs
373 
374 
375  def fetch_rules(self, _drone, _unusedsrcaddr, _discovertype):
376  '''Evaluate our rules given the current/changed data.
377  Note that fetch_rules is separate from rule evaluation to simplify
378  testing.
379  '''
380  raise NotImplementedError('class BestPractices is an abstract class')
381 
382  @staticmethod
383  def evaluate(_unused_drone, _unusedsrcaddr, wholejsonobj, ruleobj, description):
384  '''Evaluate our rules given the current/changed data.
385  '''
386  jsonobj = wholejsonobj['data']
387  #oldcontext = ExpressionContext((drone,), prefix='JSON_proc_sys')
388  newcontext = ExpressionContext((jsonobj,))
389  if hasattr(ruleobj, '_jsonobj'):
390  ruleobj = getattr(ruleobj, '_jsonobj')
391  ruleids = ruleobj.keys()
392  ruleids.sort()
393  statuses = {'pass': [], 'fail': [], 'ignore': [], 'NA': [], 'score': 0.0}
394  if len(ruleids) < 1:
395  return statuses
396  print >> sys.stderr, '\n==== Evaluating %d Best Practice rules on "%s" [%s]' \
397  % (len(ruleids)-1, wholejsonobj['description'], description)
398  for ruleid in ruleids:
399  ruleinfo = ruleobj[ruleid]
400  rule = ruleinfo['rule']
401  rulecategory = ruleinfo['category']
402  result = GraphNodeExpression.evaluate(rule, newcontext)
403  if result is None:
404  print >> sys.stderr, 'n/a: %s ID %s %s' \
405  % (rulecategory, ruleid, rule)
406  statuses['NA'].append(ruleid)
407  elif not isinstance(result, bool):
408  print >> sys.stderr, 'Rule id %s %s returned %s (%s)' \
409  % (ruleid, rule, result, type(result))
410  statuses['fail'].append(ruleid)
411  elif result:
412  if rule.startswith('IGNORE'):
413  if not rulecategory.lower().startswith('comment'):
414  statuses['ignore'].append(ruleid)
415  print >> sys.stderr, 'IGNORE: %s ID %s %s' % \
416  (rulecategory, ruleid, rule)
417  else:
418  statuses['pass'].append(ruleid)
419  print >> sys.stderr, 'PASS: %s ID %s %s' \
420  % (rulecategory, ruleid, rule)
421  else:
422  print >> sys.stderr, 'FAIL: %s ID %s %s'\
423  % (rulecategory, ruleid, rule)
424  statuses['fail'].append(ruleid)
425  return statuses
426 
427 @BestPractices.register('proc_sys')
428 @SystemNode.add_json_processor
430  'Security Best Practices which are evaluated against various discovery modules'
431  application = 'os'
432  discovery_name = 'JSON_proc_sys'
433 
434  def __init__(self, config, packetio, store, log, debug):
435  BestPractices.__init__(self, config, packetio, store, log, debug)
436 
437  def fetch_rules(self, drone, _unusedsrcaddr, discovertype):
438  '''Evaluate our rules given the current/changed data.
439  Note that fetch_rules is separate from rule evaluation to
440  simplify testing.
441  In our case, we ask our Drone to provide us with the merged rule
442  sets for the current kind of incoming packet.
443  '''
444  return drone.get_merged_bp_rules(discovertype)
445 
446  @staticmethod
447  def configcallback(config, changedname, _unusedargs):
448  '''Function called when configuration is updated.
449  We use it to make sure all we get callbacks for all
450  our discovery types.
451  this might be overkill, but it's not expensive ;-).
452  And, it doesn't do anything useful at the moment...
453  '''
454  print >> sys.stderr, 'Config Callback for name %s' % changedname
455  if changedname in (None, 'allbpdiscoverytypes'):
456  for pkttype in config['allbpdiscoverytypes']:
457  BestPractices.register_sensitivity(BestPracticesCMA, pkttype)
458 
459 if __name__ == '__main__':
460  #import sys
461 
462  class DebugEventObserver(AssimEventObserver):
463  '''
464  Event observer for testing the send event code
465  '''
466  expectResults = {
467  'f2p' : AssimEvent.OBJUNWARN,
468  'n2f' : AssimEvent.OBJWARN,
469  'p2f' : AssimEvent.OBJWARN,
470  'i2f' : AssimEvent.OBJWARN,
471  'f2i' : AssimEvent.OBJUNWARN,
472  'f2na': AssimEvent.OBJUNWARN,
473  'na2f': AssimEvent.OBJWARN
474  }
475  def __init__(self):
476  AssimEventObserver.__init__(self,None)
477  def notifynewevent(self,event):
478  if event.eventtype == DebugEventObserver.expectResults[event.extrainfo['ruleid']]:
479  print "Success Result for %s is correct" % event.extrainfo['ruleid']
480  else:
481  print "Failure Result for %s is incorrect" % event.extrainfo['ruleid']
482  sys.exit(1)
483 
484  #pylint: disable=R0903
485  class DummyDrone(object):
486  'Really dummy object'
487  pass
488  JSON_data = '''
489 {
490  "discovertype": "proc_sys",
491  "description": "Information derived from /proc/sys",
492  "host": "ubuntu72",
493  "source": "../../discovery_agents/proc_sys",
494  "data": {
495  "kernel.core_pattern": "|/usr/share/apport/apport %p %s %c %P",
496  "kernel.core_pipe_limit": 0,
497  "kernel.core_uses_pid": 0,
498  "kernel.ctrl-alt-del": 0,
499  "kernel.printk_ratelimit_burst": 10,
500  "kernel.pty.max": 4096,
501  "kernel.pty.nr": 9,
502  "kernel.pty.reserve": 1024,
503  "kernel.randomize_va_space": 2,
504  "kernel.sysrq": 176,
505  "kernel.tainted": 0,
506  "kernel.watchdog": 1,
507  "kernel.watchdog_thresh": 10,
508  "net.core.default_qdisc": "pfifo_fast",
509  "net.ipv4.conf.all.accept_redirects": 0,
510  "net.ipv4.conf.all.accept_source_route": 0,
511  "net.ipv4.conf.all.secure_redirects": 1,
512  "net.ipv4.conf.all.send_redirects": 1,
513  "net.ipv6.conf.all.accept_redirects": 1,
514  "net.ipv6.conf.all.accept_source_route": 0
515  }}'''
516  rulefile = None
517  dummydrone = DummyDrone()
518  for dirname in ('.', '..', '../..', '../../..'):
519  rulefile= '%s/best_practices/proc_sys.json' % dirname
520  if os.access(rulefile, os.R_OK):
521  break
522  with open(rulefile, 'r') as procsys_file:
523  testrules = pyConfigContext(procsys_file.read())
524  testjsonobj = pyConfigContext(JSON_data)
525  logger = logging.getLogger('BestPracticesTest')
526  logger.addHandler(logging.StreamHandler(sys.stderr))
527  testconfig = {'allbpdiscoverytypes': ['login_defs', 'pam', 'proc_sys', 'sshd']}
528  bpobj = BestPractices(testconfig, None, None, logger, False)
529  for procsys in BestPractices.eval_classes['proc_sys']:
530  ourstats = procsys.evaluate("testdrone", None, testjsonobj, testrules, 'proc_sys')
531  size = sum([len(ourstats[st]) for st in ourstats.keys() if st != 'score'])
532  #print size, len(testrules)
533  assert size == len(testrules)-1 # One rule is an IGNOREd comment
534  assert ourstats['fail'] == ['itbp-00001', 'nist_V-38526', 'nist_V-38601']
535  assert len(ourstats['NA']) >= 13
536  assert len(ourstats['pass']) >= 3
537  assert len(ourstats['ignore']) == 0
538  score, tstdiffs = bpobj.compute_score_updates(testjsonobj, dummydrone, testrules,
539  ourstats, {})
540  assert str(pyConfigContext(score)) == '{"networking":1.0,"security":4.0}'
541  # pylint: disable=E1101
542  assert dummydrone.bp_category_networking_score == 1.0 # should be OK for integer values
543  assert dummydrone.bp_category_security_score == 4.0 # should be OK for integer values
544  assert isinstance(dummydrone.bp_category_networking_score, float)
545  assert isinstance(dummydrone.bp_category_security_score, float)
546  assert str(pyConfigContext(tstdiffs)) == '{"networking":1.0,"security":4.0}'
547  score, tstdiffs = bpobj.compute_score_updates(testjsonobj, dummydrone, testrules,
548  ourstats, ourstats)
549  assert str(pyConfigContext(score)) == '{"networking":1.0,"security":4.0}'
550  assert str(pyConfigContext(tstdiffs)) == '{}'
551  score, tstdiffs = bpobj.compute_score_updates(testjsonobj, dummydrone, testrules,
552  {}, ourstats)
553  assert str(pyConfigContext(tstdiffs)) == '{"networking":-1.0,"security":-4.0}'
554  assert dummydrone.bp_category_networking_score == 0.0 # should be OK for integer values
555  assert dummydrone.bp_category_security_score == 0.0 # should be OK for integer values
557  atestrule = testrules['itbp-00001']
558  # Create temporary rules for the send_rule_event tests
559  for case in ('f2p', 'n2f', 'p2f', 'i2f', 'f2i', 'f2na', 'na2f'):
560  testrules[case] = atestrule
561  BestPractices.send_rule_event('fail', 'pass', 'testdrone', 'f2p', testrules, 'https://URL')
562  BestPractices.send_rule_event(None, 'fail', 'testdrone', 'n2f', testrules, 'https://URL')
563  BestPractices.send_rule_event('pass', 'fail', 'testdrone', 'p2f', testrules, 'https://URL')
564  BestPractices.send_rule_event('ignore', 'fail', 'testdrone', 'i2f', testrules, 'https://URL')
565  BestPractices.send_rule_event('fail', 'ignore', 'testdrone', 'f2i', testrules, 'https://URL')
566  BestPractices.send_rule_event('fail', 'NA', 'testdrone', 'f2na', testrules, 'https://URL')
567  BestPractices.send_rule_event('NA', 'fail', 'testdrone', 'na2f', testrules, 'https://URL')
568  # Get rid of the temporary rules for the send_rule_event tests
569  for case in ('f2p', 'n2f', 'p2f', 'i2f', 'f2i', 'f2na', 'na2f'):
570  del testrules[case]
571 
572  print 'Results look correct!'
def __init__(self, config, packetio, store, log, debug)
def evaluate(_unused_drone, _unusedsrcaddr, wholejsonobj, ruleobj, description)
def send_rule_event(oldstat, newstat, drone, ruleid, ruleobj, url)
def _processpkt_by_type(self, drone, srcaddr, evaltype, jsonobj)
def processpkt(self, drone, srcaddr, jsonobj, discoverychanged)
def fetch_rules(self, _drone, _unusedsrcaddr, _discovertype)
def load_json(store, json, bp_class, rulesetname, basedon=None)
def log_rule_results(self, results, drone, _srcaddr, discoveryobj, discovertype, rulesobj)
def fetch_rules(self, drone, _unusedsrcaddr, discovertype)
def url(self, drone, ruleid, ruleobj, html=True, port=5000)
def __init__(self, config, packetio, store, log, debug)
def gen_bp_rules_by_ruleset(store, rulesetname)
def configcallback(config, changedname, _unusedargs)
def load_from_file(store, filename, bp_class, rulesetname, basedon=None)
def basic_rule_score_algorithm(self, _drone, rule, status)
def compute_scores(self, drone, rulesobj, statuses)
def compute_score_updates(self, discovery_json, drone, rulesobj, newstats, oldstats)
def register_sensitivity(bpcls, pkttype)
def load_directory(store, directoryname, rulesetname, basedon=None)