The Assimilation Project  based on Assimilation version 1.1.7.1474836767
discoverylistener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: smartindent tabstop=4 shiftwidth=4 expandtab number colorcolumn=100
3 #
4 # This file is part of the Assimilation Project.
5 #
6 # Author: Alan Robertson <alanr@unix.sh>
7 # Copyright (C) 2013,2014 - Assimilation Systems Limited
8 #
9 # Free support is available from the Assimilation Project community - http://assimproj.org
10 # Paid support is available from Assimilation Systems Limited - http://assimilationsystems.com
11 #
12 # The Assimilation software is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
16 #
17 # The Assimilation software is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with the Assimilation Project software. If not, see http://www.gnu.org/licenses/
24 #
25 #
26 
27 '''
28 Discovery Listener infrastructure
29 This is the base class for code that wants to hear about various
30 discovery packets as they arrive.
31 
32 More details are documented in the DiscoveryListener class
33 '''
34 import re, sys, os
35 from droneinfo import Drone
36 from consts import CMAconsts
37 from store import Store
38 from AssimCtypes import CONFIGNAME_TYPE, CONFIGNAME_INSTANCE
39 from AssimCclasses import pyNetAddr, pyConfigContext
40 from systemnode import ChildSystem
41 
42 from graphnodes import NICNode, IPaddrNode, ProcessNode, IPtcpportNode, GraphNode
43 
44 class DiscoveryListener(object):
45  '''Class for listening to discovery packets
46  We support three different categories/priorities of discovery actions
47  as documented below:
48  '''
49 
50  PRI_CORE = 0 # This discovery plugin is part of the core system
51  PRI_OPTION = 1 # This is an optional capability that comes with the system
52  PRI_CONTRIB = 2 # This is a contributed (and optional) capability
53  PRI_LIMIT = PRI_CONTRIB+1
54 
55  prio = PRI_CONTRIB
56  wantedpackets = None
57 
58  def __init__(self, config, packetio, store, log, debug):
59  'Init function for DiscoveryListener'
60  self.packetio = packetio
61  self.store = store
62  self.log = log
63  self.debug = debug
64  self.config = config
65 
66  @classmethod
67  def priority(cls):
68  'Return the priority (ordering) that this should be invoked at'
69  return cls.prio
70 
71  @classmethod
72  def desiredpackets(cls):
73  'Return the set of packets we want to be called for'
74  return cls.wantedpackets
75 
76  def processpkt(self, drone, srcaddr, json, discoverychanged):
77  'A desired packet has been received - process it'
78  raise NotImplementedError('Abstract class - processpkt()')
79 
80 
81 @Drone.add_json_processor
83  'Class for updating our agent cache when we get new monitoringagents information'
84 
85  prio = DiscoveryListener.PRI_CORE
86  wantedpackets = ('monitoringagents',)
87 
88  def processpkt(self, drone, srcaddr, jsonobj, discoverychanged):
89  '''Update the _agentcache when we get a new set of available agents'''
90  if not discoverychanged:
91  return
92  #print >> sys.stderr, 'SETTING MONITORING AGENTS: ', jsonobj['data']
93  setattr(drone, '_agentcache', jsonobj['data'])
94 
95 
96 @Drone.add_json_processor
98  'Class for discovering audit permissions'
99 
100  prio = DiscoveryListener.PRI_CORE
101  wantedpackets = ('auditd_conf',)
102 
103  def processpkt(self, drone, srcaddr, jsonobj, discoverychanged):
104  '''Request discovery of auditd (log) files and directories.
105  They will be evaluated by some auditd best practice rules'''
106  if not discoverychanged:
107  return
108  data = jsonobj['data'] # The data portion of the JSON message
109  params = pyConfigContext()
110  params['parameters'] = pyConfigContext()
111  params[CONFIGNAME_TYPE] = 'fileattrs'
112  params[CONFIGNAME_INSTANCE] = 'auditd_fileattrs'
113  filelist = ''
114  if 'log_file' in data:
115  filelist = os.path.dirname(data['log_file']) + '/'
116  else:
117  filelist= '/var/log/audit/'
118  params['parameters']['filelist'] = filelist
119  params['parameters']['ASSIM_filelist'] = filelist
120  #print >> sys.stderr, 'DISCOVERING %s' % str(params)
121  # repeat, warn, and interval are automatically added
122  drone.request_discovery((params,))
123 
124 # R0912 -- too many branches
125 # R0914 -- too many local variables
126 #pylint: disable=R0914,R0912
127 @Drone.add_json_processor
129  'Class for the (initial) netconfig discovery packet'
130 
131  prio = DiscoveryListener.PRI_CORE
132  wantedpackets = ('netconfig',)
133 
134  def processpkt(self, drone, _srcaddr, jsonobj, discoverychanged):
135  '''Save away the network configuration data we got from netconfig JSON discovery.
136  This includes all our NICs, their MAC addresses, all our IP addresses and so on
137  for any (non-loopback) interface. Whee!
138 
139  This code is more complicated than I'd like but it's not obvious how to simplify it...
140  '''
141 
142  assert self.store.has_node(drone)
143  if not discoverychanged:
144  return
145  data = jsonobj['data'] # The data portion of the JSON message
146 
147  currmacs = {} # Currmacs is a list of current NICNode objects belonging to this host
148  # indexed by MAC address
149  # Get our current list of NICs
150  iflist = self.store.load_related(drone, CMAconsts.REL_nicowner, NICNode)
151  for nic in iflist:
152  currmacs[nic.macaddr] = nic
153 
154  primaryifname = None
155  newmacs = {} # Newmacs is a list of NICNode objects found/created by this discovery
156  # They are indexed by MAC address
157  for ifname in data.keys(): # List of interfaces just below the data section
158  ifinfo = data[ifname]
159  if 'address' not in ifinfo:
160  continue
161  macaddr = str(ifinfo['address'])
162  newnic = self.store.load_or_create(NICNode, domain=drone.domain
163  , macaddr=macaddr, ifname=ifname, json=str(ifinfo))
164  newmacs[macaddr] = newnic
165  if 'default_gw' in ifinfo and primaryifname is None:
166  primaryifname = ifname
167 
168  # Now compare the two sets of MAC addresses (old and new) and update the "old" MAC
169  # address with info from the new discovery and deleting any MAC addresses that
170  # we don't have any more...
171  for macaddr in currmacs.keys():
172  currmac = currmacs[macaddr]
173  if macaddr in newmacs:
174  # This MAC may need updating
175  newmacs[macaddr] = currmac.update_attributes(newmacs[macaddr])
176  else:
177  # This MAC has disappeared
178  self.store.separate(drone, CMAconsts.REL_ipowner, currmac)
179  #self.store.separate(drone, CMAconsts.REL_causes, currmac)
180  # @TODO Needs to be a 'careful, complete' reference count deletion...
181  self.store.delete(currmac)
182  del currmacs[macaddr]
183  currmacs = None
184 
185  # Create REL_nicowner relationships for any newly created NIC nodes
186  for macaddr in newmacs.keys():
187  nic = newmacs[macaddr]
188  self.store.relate_new(drone, CMAconsts.REL_nicowner, nic, {'causes': True})
189  #self.store.relate(drone, CMAconsts.REL_causes, nic)
190 
191  # Now newmacs contains all the updated info about our current NICs
192  # Let's figure out what's happening with our IP addresses...
193 
194  primaryip = None
195 
196  for macaddr in newmacs.keys():
197  mac = newmacs[macaddr]
198  ifname = mac.ifname
199  #print >> sys.stderr, 'MAC IS', str(mac)
200  #print >> sys.stderr, 'DATA IS:', str(data)
201  #print >> sys.stderr, 'IFNAME IS', str(ifname)
202  iptable = data[str(ifname)]['ipaddrs']
203  currips = {}
204  iplist = self.store.load_related(mac, CMAconsts.REL_ipowner, IPaddrNode)
205  for ip in iplist:
206  currips[ip.ipaddr] = ip
207 
208  newips = {}
209  for ip in iptable.keys(): # keys are 'ip/mask' in CIDR format
210  ipname = ':::INVALID:::'
211  ipinfo = iptable[ip]
212  if 'name' in ipinfo:
213  ipname = ipinfo['name']
214  if ipinfo['scope'] != 'global':
215  continue
216  iponly, cidrmask = ip.split('/')
217  netaddr = pyNetAddr(iponly).toIPv6()
218  if netaddr.islocal(): # We ignore loopback addresses - might be wrong...
219  continue
220  ipnode = self.store.load_or_create(IPaddrNode
221  , domain=drone.domain, ipaddr=str(netaddr), cidrmask=cidrmask)
222  ## FIXME: Not an ideal way to determine primary (preferred) IP address...
223  ## it's a bit idiosyncratic to Linux...
224  ## A better way would be to use their 'startaddr' (w/o the port)
225  ## This uses the IP address they used to talk to us.
226  if ifname == primaryifname and primaryip is None and ipname == ifname:
227  primaryip = ipnode
228  drone.primary_ip_addr = str(primaryip.ipaddr)
229  newips[str(netaddr)] = ipnode
230 
231  # compare the two sets of IP addresses (old and new)
232  for ipaddr in currips.keys():
233  currip = currips[ipaddr]
234  if ipaddr in newips:
235  newips[ipaddr] = currip.update_attributes(newips[ipaddr])
236  else:
237  #print >> sys.stderr, 'Deleting address %s from MAC %s' % (currip, macaddr)
238  #print >> sys.stderr, 'currip:%s, currips:%s' % (str(currip), str(currips))
239  self.log.debug('Deleting address %s from MAC %s' % (currip, macaddr))
240  self.log.debug('currip:%s, currips:%s' % (str(currip), str(currips)))
241  self.store.separate(mac, rel_type=CMAconsts.REL_ipowner, obj=currip)
242  # @TODO Needs to be a 'careful, complete' reference count deletion...
243  self.store.delete(currip)
244  del currips[ipaddr]
245 
246  # Create REL_ipowner relationships for all the newly created IP nodes
247  for ipaddr in newips.keys():
248  ip = newips[ipaddr]
249  self.store.relate_new(mac, CMAconsts.REL_ipowner, ip, {'causes': True})
250  #self.store.relate(mac, CMAconsts.REL_causes, ip)
251 
252 @Drone.add_json_processor
254  'Class for TCP discovery handling'
255 
256  prio = DiscoveryListener.PRI_CORE
257  wantedpackets = ('tcpdiscovery',)
258  netstatipportpat = re.compile('(.*):([^:]*)$')
259 
260  # disable=R0914 means too many local variables...
261  # disable=R0912 means too many branches
262  # pylint: disable=R0914,R0912
263  def processpkt(self, drone, _srcaddr, jsonobj, discoverychanged):
264  '''Add TCP listeners and clients.'''
265  if not discoverychanged:
266  return
267  data = jsonobj['data'] # The data portion of the JSON message
268  if self.debug:
269  self.log.debug('_add_tcplisteners(data=%s)' % data)
270 
271  assert(not Store.is_abstract(drone))
272  allourips = drone.get_owned_ips()
273  if self.debug:
274  self.log.debug('Processing keys(%s)' % data.keys())
275  newprocs = {}
276  newprocmap = {}
277  discoveryroles = {}
278  for procname in data.keys(): # List of nanoprobe-assigned names of processes...
279  procinfo = data[procname]
280  if 'listenaddrs' in procinfo:
281  if CMAconsts.ROLE_server not in discoveryroles:
282  discoveryroles[CMAconsts.ROLE_server] = True
283  drone.addrole(CMAconsts.ROLE_server)
284  if 'clientaddrs' in procinfo:
285  if CMAconsts.ROLE_client not in discoveryroles:
286  discoveryroles[CMAconsts.ROLE_client] = True
287  drone.addrole(CMAconsts.ROLE_client)
288  #print >> sys.stderr, 'CREATING PROCESS %s!!' % procname
289  processproc = self.store.load_or_create(ProcessNode, domain=drone.domain
290  , processname=procname
291  , host=drone.designation
292  , pathname=procinfo.get('exe', 'unknown'), argv=procinfo.get('cmdline', 'unknown')
293  , uid=procinfo.get('uid','unknown'), gid=procinfo.get('gid', 'unknown')
294  , cwd=procinfo.get('cwd', '/'))
295  assert hasattr(processproc, '_Store__store_node')
296  processproc.procinfo = str(procinfo)
297 
298  newprocs[processproc.processname] = processproc
299  newprocmap[procname] = processproc
300  if self.store.is_abstract(processproc):
301  self.store.relate(drone, CMAconsts.REL_hosting, processproc, {'causes':True})
302  if self.debug:
303  self.log.debug('procinfo(%s) - processproc created=> %s' % (procinfo, processproc))
304 
305  oldprocs = {}
306  # Several kinds of nodes have the same relationship to the host...
307  for proc in self.store.load_related(drone, CMAconsts.REL_hosting, GraphNode.factory):
308  if not isinstance(proc, ProcessNode):
309  continue
310  assert hasattr(proc, '_Store__store_node')
311  procname = proc.processname
312  oldprocs[procname] = proc
313  if procname not in newprocs:
314  if len(proc.delrole(discoveryroles.keys())) == 0:
315  assert not Store.is_abstract(proc)
316  self.store.separate(drone, CMAconsts.REL_hosting, proc)
317  # @TODO Needs to be a 'careful, complete' reference count deletion...
318  print >> sys.stderr, ('TRYING TO DELETE node %s'
319  % (procname))
320  for newprocname in newprocs:
321  print >> sys.stderr, ('*** new procs: proc.procname %s'
322  % (str(newprocname)))
323  print >> sys.stderr, ('*** DELETING proc: proc.procname %s: proc=%s'
324  % (str(procname), str(proc)))
325  self.store.delete(proc)
326 
327  for procname in data.keys(): # List of names of processes...
328  processnode = newprocmap[procname]
329  procinfo = data[procname]
330  if self.debug:
331  self.log.debug('Processing key(%s): proc: %s' % (procname, processnode))
332  if 'listenaddrs' in procinfo:
333  srvportinfo = procinfo['listenaddrs']
334  processnode.addrole(CMAconsts.ROLE_server)
335  for srvkey in srvportinfo.keys():
336  match = TCPDiscoveryListener.netstatipportpat.match(srvkey)
337  (ip, port) = match.groups()
338  self._add_serveripportnodes(drone, ip, int(port), processnode, allourips)
339  if 'clientaddrs' in procinfo:
340  clientinfo = procinfo['clientaddrs']
341  processnode.addrole(CMAconsts.ROLE_client)
342  for clientkey in clientinfo.keys():
343  match = TCPDiscoveryListener.netstatipportpat.match(clientkey)
344  (ip, port) = match.groups()
345  self._add_clientipportnode(drone, ip, int(port), processnode)
346 
347  def _add_clientipportnode(self, drone, ipaddr, servport, processnode):
348  '''Add the information for a single client IPtcpportNode to the database.'''
349  servip_name = str(pyNetAddr(ipaddr).toIPv6())
350  servip = self.store.load_or_create(IPaddrNode, domain=drone.domain, ipaddr=servip_name)
351  ip_port = self.store.load_or_create(IPtcpportNode, domain=drone.domain
352  , ipaddr=servip_name, port=servport)
353  self.store.relate_new(ip_port, CMAconsts.REL_baseip, servip)
354  self.store.relate_new(ip_port, CMAconsts.REL_tcpclient, processnode)
355 
356  def _add_serveripportnodes(self, drone, ip, port, processnode, allourips):
357  '''We create tcpipports objects that correspond to the given json object in
358  the context of the set of IP addresses that we support - including support
359  for the ANY ipv4 and ipv6 addresses'''
360  netaddr = pyNetAddr(str(ip)).toIPv6()
361  if netaddr.islocal():
362  self.log.warning('add_serveripportnodes("%s"): address is local' % netaddr)
363  return
364  addr = str(netaddr)
365  # Were we given the ANY address?
366  anyaddr = netaddr.isanyaddr()
367  for ipaddr in allourips:
368  if not anyaddr and str(ipaddr.ipaddr) != addr:
369  continue
370  ip_port = self.store.load_or_create(IPtcpportNode, domain=drone.domain
371  , ipaddr=ipaddr.ipaddr, port=port)
372  assert hasattr(ip_port, '_Store__store_node')
373  self.store.relate_new(processnode, CMAconsts.REL_tcpservice, ip_port)
374  assert hasattr(ipaddr, '_Store__store_node')
375  self.store.relate_new(ip_port, CMAconsts.REL_baseip, ipaddr)
376  if not anyaddr:
377  return
378  if not anyaddr:
379  print >> sys.stderr, ('LOOKING FOR %s (%s, %s) in: %s'
380  % (netaddr, type(ip), type(netaddr), [str(ip.ipaddr) for ip in allourips]))
381  #raise ValueError('IP Address mismatch for Drone %s - could not find address %s'
382  #% (drone, addr))
383  # Must not have been discovered yet. Hopefully discovery will come along and
384  # fill in the cidrmask, and create the NIC relationship ;-)
385  ipnode = self.store.load_or_create(IPaddrNode, domain=drone.domain, ipaddr=addr)
386  allourips.append(ipnode)
387  self._add_serveripportnodes(drone, addr, port, processnode, allourips)
388 
389 @Drone.add_json_processor
391  'Listening for subsystem discovery results'
392 
393  prio = DiscoveryListener.PRI_CORE
394  wantedpackets = ('vagrant', 'docker')
395 
396  def processpkt(self, drone, _unused_srcaddr, jsonobj, discoverychanged):
397  ''' Kick off discovery for a Docker or vagrant instance - as though it were a
398  real boy -- I mean a real Drone
399  '''
400  if not discoverychanged:
401  return
402 
403  data = jsonobj['data']
404  if 'containers' not in data:
405  return
406  childtype = jsonobj['discovertype']
407  systems = data['containers']
408  #print >> sys.stderr, '=====================GOT %s packet' % (childtype)
409  discovery_types = self.config['containers'][childtype]['initial_discovery']
410  for sysid in systems:
411  system = ChildSystem.childfactory(drone, childtype, sysid, systems[sysid])
412  if not Store.is_abstract(system):
413  continue
414  # Connect it to its parent system
415  self.store.relate_new(system, CMAconsts.REL_parentsys, drone)
416 
417  runspec = ' "runas_user": "%s",' % system.runas_user \
418  if system.runas_user is not None else ''
419  if system.runas_group is not None:
420  runspec += ' "runas_group": "%s",' % system.runas_group
421 
422  allparams = []
423  for dtype in discovery_types:
424  # kick off discovery...
425  instance = '_init_%s_%s' % (dtype, system.childpath)
426  allparams.append(pyConfigContext(
427  '{"%s": "%s", "%s": "%s",%s "parameters":{"%s": "%s"}}'
428  % (CONFIGNAME_TYPE, dtype,
429  CONFIGNAME_INSTANCE, instance,
430  runspec,
431  'ASSIM_PROXY_PATH', system.childpath
432  )))
433  # kick off discovery...
434  #print >> sys.stderr, '=====================REQUESTING DISCOVERY: %s' % (str(allparams))
435  system.request_discovery(allparams)
436 
def processpkt(self, drone, srcaddr, jsonobj, discoverychanged)
def __init__(self, config, packetio, store, log, debug)
def processpkt(self, drone, srcaddr, jsonobj, discoverychanged)
def _add_serveripportnodes(self, drone, ip, port, processnode, allourips)
def processpkt(self, drone, srcaddr, json, discoverychanged)
def processpkt(self, drone, _srcaddr, jsonobj, discoverychanged)
def processpkt(self, drone, _unused_srcaddr, jsonobj, discoverychanged)
def processpkt(self, drone, _srcaddr, jsonobj, discoverychanged)
def _add_clientipportnode(self, drone, ipaddr, servport, processnode)