The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
pcap+mainloop.c
Go to the documentation of this file.
1 
37 #include <projectcommon.h>
38 #include <stdlib.h>
39 #include <stdio.h>
40 #include <string.h>
41 #ifdef HAVE_MCHECK_H
42 # include <mcheck.h>
43 #endif
44 #include <framesettypes.h>
45 #include <frameset.h>
46 #include <ctype.h>
47 #include <netgsource.h>
48 #include <reliableudp.h>
49 #include <netaddr.h>
50 #include <authlistener.h>
51 #include <signframe.h>
52 #include <cryptframe.h>
53 #include <compressframe.h>
54 #include <intframe.h>
55 #include <addrframe.h>
56 #include <cstringframe.h>
57 #include <frametypes.h>
58 #include <nanoprobe.h>
59 #include <resourcecmd.h>
60 #include <cmalib.h>
61 
62 
63 #define TESTPORT 1984
64 
65 #ifdef WIN32
66 WINIMPORT int errcount;
67 WINIMPORT NanoHbStats nano_hbstats;
68 WINIMPORT GMainLoop* mainloop;
69 #else
70 extern int errcount;
72 GMainLoop* mainloop;
73 #endif
74 
76 gint64 maxpkts = G_MAXINT64;
77 gint64 pktcount = 0;
78 GMainLoop* mainloop;
85 int wirepktcount = 0;
86 
87 gboolean gotnetpkt(Listener*, FrameSet* fs, NetAddr* srcaddr);
88 void got_heartbeat(HbListener* who);
89 void got_heartbeat2(HbListener* who);
90 void check_JSON(FrameSet* fs);
91 
93 gboolean timeout_agent(gpointer ignored);
94 
97  {0, NULL},
98 };
99 
100 
102 
103 void
105 {
106  GSList* fptr;
107  int jsoncount = 0;
108  int errcount = 0;
109 
110  g_debug("Frameset type is: %d", fs->fstype);
111  for (fptr=fs->framelist; fptr; fptr=fptr->next) {
112  Frame* frame = CASTTOCLASS(Frame, fptr->data);
113  CstringFrame* csf;
115  g_debug("Frame type is: %d", frame->type);
116  if (frame->type != FRAMETYPE_JSDISCOVER) {
117  continue;
118  }
119  ++jsoncount;
120  // Ahh! JSON data. Let's parse it!
121  csf = CASTTOCLASS(CstringFrame, frame);
123  if (config == NULL) {
124  g_warning("JSON text did not parse correctly [%s]"
125  , (char*)csf->baseclass.value);
126  ++errcount;
127  }else{
128  char * tostr = config->baseclass.toString(config);
129  g_message("PARSED JSON: %s", tostr);
130  g_free(tostr); tostr = NULL;
131  UNREF(config);
132  }
133  }
134  g_message("%d JSON strings parsed. %d errors.", jsoncount, errcount);
135 }
136 
138 gboolean
139 gotnetpkt(Listener* l,
140  FrameSet* fs,
141  NetAddr* srcaddr
142  )
143 {
144  (void)l; (void)srcaddr;
145  ++wirepktcount;
146  switch(fs->fstype) {
147  case FRAMESETTYPE_HBDEAD:
148  g_message("CMA Received dead host notification (type %d) over the 'wire'."
149  , fs->fstype);
150  break;
152  g_message("CMA Received switch discovery data (type %d) over the 'wire'."
153  , fs->fstype);
154  break;
156  g_message("CMA Received JSON discovery data (type %d) over the 'wire'."
157  , fs->fstype);
158  check_JSON(fs);
159  break;
160  default:{
161  char * fsstr = fs->baseclass.toString(&fs->baseclass);
162  g_message("CMA Received a FrameSet of type %d [%s] over the 'wire'."
163  , fs->fstype, fsstr);
164  FREE(fsstr); fsstr = NULL;
165  }
166  }
167 
168  l->transport->_netio->ackmessage(l->transport->_netio, srcaddr, fs);
169  UNREF(fs);
170  if (wirepktcount >= maxpkts) {
171  g_message("QUITTING NOW - wirepktcount!");
173  return FALSE;
174  }
175  return TRUE;
176 }
177 
179 gboolean
180 timeout_agent(gpointer ignored)
181 {
182  ReliableUDP* io = CASTTOCLASS(ReliableUDP, nettransport);
183 
184  (void)ignored;
185  if (nano_hbstats.heartbeat_count > (unsigned)maxpkts) {
186  g_message("QUITTING NOW! (heartbeat count)");
187  io->_protocol->closeall(io->_protocol);
189  return FALSE;
190  }
191  return TRUE;
192 }
193 
194 #define OCFCLASS "\"" REQCLASSNAMEFIELD "\": \"ocf\""
195 #define HBPROVIDER "\"" REQPROVIDERNAMEFIELD "\": \"heartbeat\""
196 #define DUMMYTYPE "\"" REQTYPENAMEFIELD "\": \"Dummy\""
197 #define STARTOP "\"" REQOPERATIONNAMEFIELD "\": \"start\""
198 #define STOPOP "\"" REQOPERATIONNAMEFIELD "\": \"stop\""
199 #define MONITOROP "\"" REQOPERATIONNAMEFIELD "\": \"monitor\""
200 #define METADATAOP "\"" REQOPERATIONNAMEFIELD "\": \"meta-data\""
201 #define RESOURCENAME "\"" REQRSCNAMEFIELD "\": \"DummyTestGTest01\""
202 #define NULLPARAMS "\"" REQENVIRONNAMEFIELD "\": {}"
203 #define C ","
204 #define REQID(id) "\"" REQIDENTIFIERNAMEFIELD "\": " #id
205 #define REPEAT(repeat) "\"" REQREPEATNAMEFIELD "\": " #repeat
206 #define INITDELAY(delay) "\"" REQINITDELAYNAMEFIELD "\": " #delay
207 #define COMMREQUEST OCFCLASS C HBPROVIDER C DUMMYTYPE C RESOURCENAME C NULLPARAMS
208 #define REQUEST(type,id, repeat,delay) \
209  "{" COMMREQUEST C type C REQID(id) C REPEAT(repeat) C INITDELAY(delay)"}"
210 #define START REQUEST(STARTOP, 1, 0, 0) // One shot - no delay
211 #define MONITOR REQUEST(MONITOROP, 2, 0, 0) // Repeat every second - no delay
212 #define STOP REQUEST(STOPOP, 3, 0, 5) // No repeat - 5 second delay
213 
215 void
217 {
218  FrameSet* pkt;
219  NetGSource* netpkt = auth->baseclass.transport;
220  char * nanostr = nanoaddr->baseclass.toString(nanoaddr);
221 
222  (void)ifs;
223  g_message("CMA received startup message from nanoprobe at address %s/%d."
224  , nanostr, nanoaddr->port(nanoaddr));
225  g_free(nanostr); nanostr = NULL;
226  check_JSON(ifs);
227 
228  // Send the configuration data to our new "client"
229  pkt = create_setconfig(nanoconfig);
230  netpkt->_netio->sendareliablefs(netpkt->_netio, nanoaddr, DEFAULT_FSP_QID, pkt);
231  UNREF(pkt);
232 
233  // Now tell them to send/expect heartbeats to various places
235  netpkt->_netio->sendareliablefs(netpkt->_netio, nanoaddr, DEFAULT_FSP_QID, pkt);
236  UNREF(pkt);
237 
239  netpkt->_netio->sendareliablefs(netpkt->_netio, nanoaddr, DEFAULT_FSP_QID, pkt);
240  UNREF(pkt);
241 
243  netpkt->_netio->sendareliablefs(netpkt->_netio, nanoaddr, DEFAULT_FSP_QID, pkt);
244  UNREF(pkt);
245 
246  {
247  const char * json[] = { START, MONITOR, STOP};
248  unsigned j;
249  // Create a frameset for a few resource operations
251  for (j=0; j < DIMOF(json); j++) {
253  csf->baseclass.setvalue(&csf->baseclass, g_strdup(json[j])
254  , strlen(json[j])+1, g_free);
255  frameset_append_frame(pkt, &csf->baseclass);
256  UNREF2(csf);
257  }
258  netpkt->_netio->sendareliablefs(netpkt->_netio, nanoaddr, DEFAULT_FSP_QID, pkt);
259  UNREF(pkt);
260  }
261 }
262 
271 int
272 main(int argc, char **argv)
273 {
274  const guint8 loopback[] = CONST_IPV6_LOOPBACK;
275  const guint8 mcastaddrstring[] = CONST_ASSIM_DEFAULT_V4_MCAST;
276  NetAddr* mcastaddr;
277  const guint8 otheradstring[] = {127,0,0,1};
278  const guint8 otheradstring2[] = {10,10,10,4};
279  const guint8 anyadstring[] = {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
280  guint16 testport = TESTPORT;
281  SignFrame* signature = signframe_new(G_CHECKSUM_SHA256, 0);
282  Listener* otherlistener;
284  PacketDecoder* decoder = nano_packet_decoder();
285  AuthListener* listentonanoprobes;
286 
287 #if 0
288 # ifdef HAVE_MCHECK_PEDANTIC
289  g_assert(mcheck_pedantic(NULL) == 0);
290 # else
291 # ifdef HAVE_MCHECK
292  g_assert(mcheck(NULL) == 0);
293 # endif
294 # endif
295 #endif
296  g_setenv("G_MESSAGES_DEBUG", "all", TRUE);
297 #if 0
298  proj_class_incr_debug(NULL);
299  proj_class_incr_debug(NULL);
300  proj_class_incr_debug(NULL);
301 #endif
302  g_log_set_fatal_mask(NULL, G_LOG_LEVEL_ERROR|G_LOG_LEVEL_CRITICAL);
303 
304  if (argc > 1) {
305  maxpkts = atol(argv[1]);
306  g_debug("Max packet count is "FMT_64BIT"d", maxpkts);
307  }
308 
310  g_message("Our OS supports dual ipv4/v6 sockets. Hurray!");
311  }else{
312  g_warning("Our OS DOES NOT support dual ipv4/v6 sockets - this may not work!!");
313  }
314 
315 
316  config->setframe(config, CONFIGNAME_OUTSIG, &signature->baseclass);
317 
318  // Create a network transport object for normal UDP packets
319  nettransport = &(reliableudp_new(0, config, decoder, 0)->baseclass.baseclass);
320  g_return_val_if_fail(NULL != nettransport, 2);
321 
322  // Set up the parameters the 'CMA' is going to send to our 'nanoprobe'
323  // in response to their request for configuration data.
324  nanoconfig = configcontext_new(0);
325  nanoconfig->setint(nanoconfig, CONFIGNAME_HBPORT, testport);
326  nanoconfig->setint(nanoconfig, CONFIGNAME_HBTIME, 1000000);
327  nanoconfig->setint(nanoconfig, CONFIGNAME_DEADTIME, 3*1000000);
328  nanoconfig->setint(nanoconfig, CONFIGNAME_CMAPORT, testport);
329 
330 
331  // Construct the NetAddr we'll talk to (i.e., ourselves) and listen from
332  destaddr = netaddr_ipv6_new(loopback, testport);
333  g_return_val_if_fail(NULL != destaddr, 3);
334  config->setaddr(config, CONFIGNAME_CMAINIT, destaddr);
335  nanoconfig->setaddr(nanoconfig, CONFIGNAME_CMAADDR, destaddr);
336  nanoconfig->setaddr(nanoconfig, CONFIGNAME_CMAFAIL, destaddr);
337  nanoconfig->setaddr(nanoconfig, CONFIGNAME_CMADISCOVER, destaddr);
338 
339 
340  // Construct another couple of NetAddrs to talk to and listen from
341  // for good measure...
342  otheraddr = netaddr_ipv4_new(otheradstring, testport);
343  g_return_val_if_fail(NULL != otheraddr, 4);
344  otheraddr2 = netaddr_ipv4_new(otheradstring2, testport);
345  g_return_val_if_fail(NULL != otheraddr2, 4);
346 
347  // Construct another NetAddr to bind to (anything)
348  anyaddr = netaddr_ipv6_new(anyadstring, testport);
349  g_return_val_if_fail(NULL != destaddr, 5);
350 
351  // Bind to ANY address (as noted above)
352  g_return_val_if_fail(nettransport->bindaddr(nettransport, anyaddr, FALSE),16);
353  //g_return_val_if_fail(nettransport->bindaddr(nettransport, destaddr),16);
354 
355  g_message("Joining multicast address.");
356  mcastaddr = netaddr_ipv4_new(mcastaddrstring, testport);
357  g_return_val_if_fail(nettransport->mcastjoin(nettransport, mcastaddr, NULL), 17);
358  UNREF(mcastaddr);
359  g_message("multicast join succeeded.");
360 
361  // Connect up our network transport into the g_main_loop paradigm
362  // so we get dispatched when packets arrive
363  netpkt = netgsource_new(nettransport, NULL, G_PRIORITY_HIGH, FALSE, NULL, 0, NULL);
364 
365  // Set up so that we can observe all unclaimed packets
366  otherlistener = listener_new(config, 0);
367  otherlistener->got_frameset = gotnetpkt;
368  netpkt->addListener(netpkt, 0, otherlistener);
369  otherlistener->associate(otherlistener,netpkt);
370 
371  // Unref the "other" listener - we hold other references to it
372  UNREF(otherlistener);
373 
374  // Pretend to be the CMA...
375  // Listen for packets from our nanoprobes - scattered throughout space...
376  listentonanoprobes = authlistener_new(0, cmalist, config, TRUE);
377  listentonanoprobes->baseclass.associate(&listentonanoprobes->baseclass, netpkt);
378 
379  nano_start_full("netconfig", 900, netpkt, config);
380 
381  g_timeout_add_seconds(1, timeout_agent, NULL);
382  mainloop = g_main_loop_new(g_main_context_default(), TRUE);
383 
384  /********************************************************************
385  * Start up the main loop - run our test program...
386  * (the one pretending to be both the nanoprobe and the CMA)
387  ********************************************************************/
388  g_main_loop_run(mainloop);
389 
390  /********************************************************************
391  * We exited the main loop. Shut things down.
392  ********************************************************************/
393 
394  nano_shutdown(TRUE); // Tell it to shutdown and print stats
395  g_message("Count of 'other' pkts received:\t%d", wirepktcount);
396 
397  UNREF(nettransport);
398 
399  // Main loop is over - shut everything down, free everything...
400  g_main_loop_unref(mainloop); mainloop=NULL;
401 
402 
403  // Unlink misc dispatcher - this should NOT be necessary...
404  netpkt->addListener(netpkt, 0, NULL);
405 
406  // Dissociate packet actions from the packet source.
407  listentonanoprobes->baseclass.dissociate(&listentonanoprobes->baseclass);
408 
409  // Unref the AuthListener object
410  UNREF2(listentonanoprobes);
411 
412  g_source_destroy(&netpkt->baseclass);
413  g_source_unref(&netpkt->baseclass);
414  //g_main_context_unref(g_main_context_default());
415 
416  // Free signature frame
417  UNREF2(signature);
418 
419  // Free misc addresses
420  UNREF(destaddr);
421  UNREF(otheraddr);
422  UNREF(otheraddr2);
423  UNREF(anyaddr);
424 
425  // Free config object
426  UNREF(config);
427  UNREF(nanoconfig);
428 
429  // At this point - nothing should show up - we should have freed everything
430  if (proj_class_live_object_count() > 0) {
431  g_warning("Too many objects (%d) alive at end of test.",
434  ++errcount;
435  }else{
436  g_message("No objects left alive. Awesome!");
437  }
438  proj_class_finalize_sys(); // Shut down object system to make valgrind happy :-D
439  return(errcount <= 127 ? errcount : 127);
440 }