The Assimilation Monitoring Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
clientlib/nanoprobe.c
Go to the documentation of this file.
1 
25 #include <projectcommon.h>
26 #include <string.h>
27 #include <frameset.h>
28 #include <framesettypes.h>
29 #include <frametypes.h>
30 #include <compressframe.h>
31 #include <cryptframe.h>
32 #include <intframe.h>
33 #include <cstringframe.h>
34 #include <addrframe.h>
35 #include <ipportframe.h>
36 #include <seqnoframe.h>
37 #include <packetdecoder.h>
38 #include <netgsource.h>
39 #include <reliableudp.h>
40 #include <authlistener.h>
41 #include <nvpairframe.h>
42 #include <hblistener.h>
43 #include <hbsender.h>
44 #include <configcontext.h>
45 #include <pcap_min.h>
46 #include <jsondiscovery.h>
47 #include <switchdiscovery.h>
48 #include <fsprotocol.h>
49 #include <resourcecmd.h>
50 #include <resourcequeue.h>
51 #include <misc.h>
52 #include <nanoprobe.h>
53 
54 
60 void (*nanoprobe_warntime_agent)(HbListener*, guint64 howlate) = NULL;
61 void (*nanoprobe_comealive_agent)(HbListener*, guint64 howlate) = NULL;
62 WINEXPORT NanoHbStats nano_hbstats = {0U, 0U, 0U, 0U, 0U};
63 gboolean nano_connected = FALSE;
65 WINEXPORT GMainLoop* mainloop = NULL;
66 
74 FSTATIC void nanoobey_change_debug(gint plusminus, AuthListener*, FrameSet*, NetAddr*);
80 FSTATIC void _nano_send_rscexitstatus(ConfigContext* request, gpointer user_data
81 , enum HowDied reason, int rc, int signal, gboolean core_dumped
82 , const char * stringresult);
84 FSTATIC void nano_schedule_discovery(const char *name, guint32 interval,const char* json
85  , ConfigContext*, NetGSource* transport, NetAddr* fromaddr);
86 FSTATIC void nano_stop_discovery(const char * discoveryname, NetGSource*, NetAddr*);
87 FSTATIC gboolean nano_startupidle(gpointer gcruft);
88 FSTATIC gboolean nano_reqconfig(gpointer gcruft);
91 FSTATIC void _real_warntime_agent(HbListener* who, guint64 howlate);
92 FSTATIC void _real_comealive_agent(HbListener* who, guint64 howlate);
95 FSTATIC gboolean _nano_final_shutdown(gpointer unused);
96 FSTATIC gboolean shutdown_when_outdone(gpointer unused);
97 
98 HbListener* (*nanoprobe_hblistener_new)(NetAddr*, ConfigContext*) = _real_hblistener_new;
99 
100 gboolean nano_shutting_down = FALSE;
101 const char * procname = "nanoprobe";
102 
103 static NetAddr* nanofailreportaddr = NULL;
104 static NetGSource* nanotransport = NULL;
105 static guint idle_shutdown_gsource = 0;
106 static ResourceQueue* RscQ = NULL;
107 
109 
114 {
115  return hblistener_new(addr, context, 0);
116 }
117 
119 void
120 nanoprobe_report_upstream(guint16 reporttype
121 , NetAddr* who
122 , const char * systemnm
123 , guint64 howlate)
124 
125 {
126  FrameSet* fs;
127 
128  if (nano_shutting_down || NULL == nanofailreportaddr) {
129  DEBUGMSG("%s.%d: Ignoring request to send fstype=%d message upstream [%s]."
130  , __FUNCTION__, __LINE__, reporttype
131  , (nano_shutting_down ? "shutting down" : "not connected to CMA"));
132  return;
133  }
134 
135  fs = frameset_new(reporttype);
136  // Construct and send a frameset reporting this event...
137  if (howlate > 0) {
139  lateframe->setint(lateframe, howlate);
140  frameset_append_frame(fs, &lateframe->baseclass);
141  UNREF2(lateframe);
142  }
143  // Add the address - if any...
144  if (who != NULL) {
146  frameset_append_frame(fs, &peeraddr->baseclass);
147  UNREF2(peeraddr);
148  }
149  // Add the system name - if any...
150  if (systemnm != NULL) {
152  usf->baseclass.setvalue(&usf->baseclass, g_strdup(systemnm), strlen(systemnm)+1
154  frameset_append_frame(fs, &usf->baseclass);
155  UNREF2(usf);
156  }
157  DEBUGMSG3("%s - sending frameset of type %d", __FUNCTION__, reporttype);
158  DUMP3("nanoprobe_report_upstream", &nanofailreportaddr->baseclass, NULL);
159  nanotransport->_netio->sendareliablefs(nanotransport->_netio, nanofailreportaddr, DEFAULT_FSP_QID, fs);
160  UNREF(fs);
161 }
162 
163 
165 FSTATIC void
167 {
168  static guint64 last_martian_time = 0; // microseconds
169  static guint recent_martian_count = 0;
170  guint64 now = g_get_monotonic_time(); // microseconds
171  const guint64 uS = 1000000;
172 
174 
175  // If it's been more than MARTIAN_TIMEOUT seconds since the last
176  // martian, then reset the count of recent martians
177  if (now > (last_martian_time + (MARTIAN_TIMEOUT*uS))) {
178  recent_martian_count = 0;
179  }
180 
181  last_martian_time = now;
182  ++recent_martian_count;
183 
184  // This means if we only get one martian then none, we say nothing
185  // This can happen as a result of timing - and it's OK.
186  // But if we get more than one, we complain then and once every 10 afterwards
187  if ((recent_martian_count % 10) == 2) {
188  char * addrstring;
189 
191  addrstring = who->baseclass.toString(who);
192  g_warning("System at address %s is sending unexpected heartbeats.", addrstring);
193  g_free(addrstring);
194 
196  }
197 }
200 FSTATIC void
202 {
206  }else{
207  char * addrstring;
208 
209  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
210  g_warning("Peer at address %s is dead (has timed out).", addrstring);
211  g_free(addrstring);
212 
214  }
215 }
216 
219 FSTATIC void
221 {
225  }
226 }
227 
228 
231 FSTATIC void
232 _real_warntime_agent(HbListener* who, guint64 howlate)
233 {
236  nanoprobe_warntime_agent(who, howlate);
237  }else{
238  char * addrstring;
239  guint64 mslate = howlate / 1000;
240  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
241  g_warning("Heartbeat from peer at address %s was "FMT_64BIT"d ms late.", addrstring, mslate);
242  g_free(addrstring);
244  }
245 }
248 FSTATIC void
249 _real_comealive_agent(HbListener* who, guint64 howlate)
250 {
253  nanoprobe_comealive_agent(who, howlate);
254  }else{
255  char * addrstring;
256  double secsdead = ((double)((howlate+50000) / 100000))/10.0; // Round to nearest tenth of a second
257  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
258  g_warning("Peer at address %s came alive after being dead for %g seconds.", addrstring, secsdead);
259  g_free(addrstring);
261  }
262 }
263 
275 void
277  , FrameSet* fs
278  , NetAddr* fromaddr)
279 {
280 
281  GSList* slframe;
282  guint addrcount = 0;
284  guint16 sendinterval = 0;
285 
286  if (nano_shutting_down) {
287  return;
288  }
289 
290  g_return_if_fail(fs != NULL);
291  (void)fromaddr;
292 
293  if (config->getint(config, CONFIGNAME_HBTIME) > 0) {
294  sendinterval = config->getint(config, CONFIGNAME_HBTIME);
295  }
296 
297  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
298  Frame* frame = CASTTOCLASS(Frame, slframe->data);
299  int frametype = frame->type;
300  switch(frametype) {
301  IntFrame* iframe;
302  IpPortFrame* aframe;
303  HbSender* hb;
304 
306  iframe = CASTTOCLASS(IntFrame, frame);
307  sendinterval = (guint16) iframe->getint(iframe);
308  break;
309  case FRAMETYPE_IPPORT:
310  if (0 == sendinterval) {
311  g_warning("Send interval is zero in %s", __FUNCTION__);
312  continue;
313  }
314  aframe = CASTTOCLASS(IpPortFrame, frame);
315  addrcount++;
316  hb = hbsender_new(aframe->getnetaddr(aframe), parent->baseclass.transport
317  , sendinterval, 0);
318  (void)hb;
319  break;
320  }
321  }
322 }
334 void
336  , FrameSet* fs
337  , NetAddr* fromaddr)
338 {
339 
340  GSList* slframe;
342  guint addrcount = 0;
343 
344  guint64 deadtime = 0;
345  guint64 warntime = 0;
346 
347  (void)fromaddr;
348 
349  if (nano_shutting_down) {
350  return;
351  }
352 
353  g_return_if_fail(fs != NULL);
354  if (config->getint(config, CONFIGNAME_DEADTIME) > 0) {
355  deadtime = config->getint(config, CONFIGNAME_DEADTIME);
356  }
357  if (config->getint(config, CONFIGNAME_WARNTIME) > 0) {
358  warntime = config->getint(config, CONFIGNAME_WARNTIME);
359  }
360 
361  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
362  Frame* frame = CASTTOCLASS(Frame, slframe->data);
363  int frametype = frame->type;
364  switch(frametype) {
365  IntFrame* iframe;
366 
368  iframe = CASTTOCLASS(IntFrame, frame);
369  deadtime = iframe->getint(iframe);
370  break;
371 
373  iframe = CASTTOCLASS(IntFrame, frame);
374  warntime = iframe->getint(iframe);
375  break;
376 
377  case FRAMETYPE_IPPORT: {
378  HbListener* hblisten;
379  IpPortFrame* aframe;
381  aframe = CASTTOCLASS(IpPortFrame, frame);
382  addrcount++;
383  hblisten = hblistener_new(aframe->getnetaddr(aframe), config, 0);
384  hblisten->baseclass.associate(&hblisten->baseclass, transport);
385  if (deadtime > 0) {
386  // Otherwise we get the default deadtime
387  hblisten->set_deadtime(hblisten, deadtime);
388  }
389  if (warntime > 0) {
390  // Otherwise we get the default warntime
391  hblisten->set_warntime(hblisten, warntime);
392  }
393  hblisten->set_deadtime_callback(hblisten, _real_deadtime_agent);
394  hblisten->set_heartbeat_callback(hblisten, _real_heartbeat_agent);
395  hblisten->set_warntime_callback(hblisten, _real_warntime_agent);
396  hblisten->set_comealive_callback(hblisten, _real_comealive_agent);
397  // Intercept incoming heartbeat packets
398  transport->addListener(transport, FRAMESETTYPE_HEARTBEAT
399  , &hblisten->baseclass);
400  // Unref this heartbeat listener, and forget our reference.
401  UNREF2(hblisten);
402  /*
403  * That still leaves two references to 'hblisten':
404  * - in the transport dispatch table
405  * - in the global heartbeat listener table
406  * And one reference to the previous 'hblisten' object:
407  * - in the global heartbeat listener table
408  * Also note that we become the 'proxy' for all incoming heartbeats
409  * but we dispatch them to the right HbListener object.
410  * Since we've become the proxy for all incoming heartbeats, if
411  * we displace and free the old proxy, this all still works nicely,
412  * because the transport object gets rid of its old reference to the
413  * old 'proxy' object.
414  */
415  }
416  break;
417  }
418  }
419 }
420 
430 void
432  , FrameSet* fs
433  , NetAddr* fromaddr)
434 {
435  g_return_if_fail(fs != NULL && fs->fstype == FRAMESETTYPE_SENDEXPECTHB);
436 
437  if (nano_shutting_down) {
438  return;
439  }
440  // This will cause us to ACK the packet twice -- not a problem...
441  nanoobey_sendhb (parent, fs, fromaddr);
442  nanoobey_expecthb(parent, fs, fromaddr);
443 }
451 void
453  , FrameSet* fs
454  , NetAddr* fromaddr)
455 {
456  GSList* slframe;
457  (void)parent;
458  (void)fromaddr;
459 
460  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
461  Frame* frame = CASTTOCLASS(Frame, slframe->data);
462  switch(frame->type) {
463  case FRAMETYPE_IPPORT: {
464  // This is _so_ much simpler than the code to send them ;-)
465  IpPortFrame* aframe = CASTTOCLASS(IpPortFrame, frame);
466  hbsender_stopsend(aframe->getnetaddr(aframe));
467  break;
468  }
469  }//endswitch
470  }//endfor
471 }
472 
480 void
482  , FrameSet* fs
483  , NetAddr* fromaddr)
484 {
485  GSList* slframe;
486  (void)parent;
487  (void)fromaddr;
488 
489  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
490  Frame* frame = CASTTOCLASS(Frame, slframe->data);
491  switch(frame->type) {
492  case FRAMETYPE_IPPORT: {
493  // This is _so_ much simpler than the code to listen for heartbeats...
494  IpPortFrame* aframe = CASTTOCLASS(IpPortFrame, frame);
495  NetAddr* destaddr = aframe->getnetaddr(aframe);
497  hblistener_unlisten(destaddr);
498  transport->closeconn(transport, DEFAULT_FSP_QID, destaddr);
499  break;
500  }
501  }//endswitch
502  }//endfor
503 }
504 
510 void
512  , FrameSet* fs
513  , NetAddr* fromaddr)
514 {
515  nanoobey_stopexpecthb(parent, fs, fromaddr);
516  nanoobey_stopsendhb (parent, fs, fromaddr);
517 }
518 
519 /*
520  * Act on (obey) a <b>FRAMESETTYPE_SETCONFIG</b> @ref FrameSet.
521  * This frameset is sent during the initial configuration phase.
522  * It contains name value pairs to save into our configuration (ConfigContext).
523  * These might be {string,string} pairs or {string,ipaddr} pairs, or
524  * {string, integer} pairs. We process them all.
525  * The frame types that we receive for these are:
526  * <b>FRAMETYPE_PARAMNAME</b> - parameter name to set
527  * <b>FRAMETYPE_CSTRINGVAL</b> - string value to associate with name
528  * <b>FRAMETYPE_CINTVAL</b> - integer value to associate with naem
529  * <b>FRAMETYPE_IPPORT</b> - IP address to associate with name
530  */
531 void
533  , FrameSet* fs
534  , NetAddr* fromaddr)
535 {
536  GSList* slframe;
537  ConfigContext* cfg = parent->baseclass.config;
538  char * paramname = NULL;
539 
540  (void)fromaddr;
541 
542  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
543  Frame* frame = CASTTOCLASS(Frame, slframe->data);
544  int frametype = frame->type;
545  switch (frametype) {
546 
547  case FRAMETYPE_PARAMNAME: { // Parameter name to set
548  paramname = frame->value;
549  g_return_if_fail(paramname != NULL);
550  }
551  break;
552 
553  case FRAMETYPE_CSTRINGVAL: { // String value to set 'paramname' to
554  g_return_if_fail(paramname != NULL);
555  cfg->setstring(cfg, paramname, frame->value);
556  paramname = NULL;
557  }
558  break;
559 
560  case FRAMETYPE_CINTVAL: { // Integer value to set 'paramname' to
561  IntFrame* intf = CASTTOCLASS(IntFrame, frame);
562  g_return_if_fail(paramname != NULL);
563  cfg->setint(cfg, paramname, (gint)intf->getint(intf));
564  paramname = NULL;
565  }
566  break;
567 
568 
569  case FRAMETYPE_IPPORT: { // NetAddr value to set 'paramname' to
570  IpPortFrame* af = CASTTOCLASS(IpPortFrame, frame);
571  NetAddr* addr = af->getnetaddr(af);
572  g_return_if_fail(paramname != NULL);
573  cfg->setaddr(cfg, paramname, addr);
574  paramname = NULL;
575  }
576  break;
577 
578  }//endswitch
579  }//endfor
580  DUMP3("nanoobey_setconfig: cfg is", &cfg->baseclass, NULL);
581  if (cfg->getaddr(cfg, CONFIGNAME_CMAFAIL) != NULL) {
582  if (nanofailreportaddr == NULL) {
583  nanofailreportaddr = cfg->getaddr(cfg, CONFIGNAME_CMAFAIL);
584 
585 
586  }else if (cfg->getaddr(cfg, CONFIGNAME_CMAFAIL) != nanofailreportaddr) {
587  UNREF(nanofailreportaddr);
588  nanofailreportaddr = cfg->getaddr(cfg, CONFIGNAME_CMAFAIL);
589  }
590  DUMP3("nanoobey_setconfig: nanofailreportaddr", &nanofailreportaddr->baseclass, NULL);
591  {
592  // Alias localhost to the CMA nanofailreportaddr (at least for now...)
595 
596  NetAddr* localhost = netaddr_string_new("127.0.0.1");
597  NetIO* io = parent->baseclass.transport->_netio;
598  io->addalias(io, localhost, nanofailreportaddr);
599  UNREF(localhost);
600  }
601  REF(nanofailreportaddr);
602  }
603  g_message("Connected to CMA. Happiness :-D");
604  nano_connected = TRUE;
605 }//nanoobey_setconfig
606 
611 FSTATIC void
612 nanoobey_change_debug(gint plusminus
613  , AuthListener* parent
614  , FrameSet* fs
615  , NetAddr* fromaddr)
616 {
617 
618  GSList* slframe;
619  guint changecount = 0;
620 
621  (void)parent;
622  (void)fromaddr;
623 
624  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
625  Frame* frame = CASTTOCLASS(Frame, slframe->data);
626  int frametype = frame->type;
627  switch (frametype) {
628  case FRAMETYPE_CSTRINGVAL: { // String value to set 'paramname' to
629  ++changecount;
630  if (plusminus < 0) {
631  proj_class_decr_debug((char*)frame->value);
632  }else{
633  proj_class_incr_debug((char*)frame->value);
634  }
635  }
636  break;
637  }
638  }
639  if (changecount == 0) {
640  if (plusminus < 0) {
641  proj_class_decr_debug(NULL);
642  }else{
643  proj_class_incr_debug(NULL);
644  }
645  }
646 }
651 FSTATIC void
653  , FrameSet* fs
654  , NetAddr* fromaddr)
655 {
656  nanoobey_change_debug(+1, parent, fs, fromaddr);
657 }
662 FSTATIC void
664  , FrameSet* fs
665  , NetAddr* fromaddr)
666 {
667  nanoobey_change_debug(-1, parent, fs, fromaddr);
668 }
669 
677 FSTATIC void
679  , FrameSet* fs
680  , NetAddr* fromaddr)
681 {
682 
683  GSList* slframe;
684  guint interval = 0;
685  const char * discoveryname = NULL;
686 
687  (void)parent;
688  (void)fromaddr;
689 
690  if (nano_shutting_down) {
691  return;
692  }
693 
694  DEBUGMSG3("%s - got frameset", __FUNCTION__);
695  // Loop over the frames, looking for those we know what to do with ;-)
696  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
697  Frame* frame = CASTTOCLASS(Frame, slframe->data);
698  int frametype = frame->type;
699 
700  switch (frametype) {
701  case FRAMETYPE_DISCNAME: { // Discovery instance name
702  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
703  g_return_if_fail(strf != NULL);
704  g_return_if_fail(discoveryname == NULL);
705  discoveryname = strf->baseclass.value;
706  DEBUGMSG3("%s - got DISCOVERYNAME %s", __FUNCTION__, discoveryname);
707  }
708  break;
709 
710  case FRAMETYPE_DISCINTERVAL: { // Discovery interval
711  IntFrame* intf = CASTTOCLASS(IntFrame, frame);
712  interval = (guint)intf->getint(intf);
713  DEBUGMSG3("%s - got DISCOVERYINTERVAL %d", __FUNCTION__, interval);
714  }
715  break;
716 
717  case FRAMETYPE_DISCJSON: { // Discovery JSON string (parameters)
718  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
719  const char * jsonstring;
720  g_return_if_fail(strf != NULL);
721  jsonstring = strf->baseclass.value;
722  g_return_if_fail(discoveryname != NULL);
723  DEBUGMSG3("Got DISCJSON frame: %s %d %s" , discoveryname, interval, jsonstring);
724  nano_schedule_discovery(discoveryname, interval, jsonstring
725  , parent->baseclass.config
726  , parent->baseclass.transport
727  , fromaddr);
728  }
729  interval = 0;
730  discoveryname = NULL;
731  break;
732 
733  }
734  }
735 }
736 
738 FSTATIC void
739 _nano_send_rscexitstatus(ConfigContext* request, gpointer user_data
740 , enum HowDied reason, int rc, int signal, gboolean core_dumped
741 , const char * stringresult)
742 {
744  ConfigContext* response = configcontext_new(0);
747  char* rsp_json;
748 
749  struct {
750  const char* framename;
751  int framevalue;
752  } pktframes[] = {
753  {REQREASONENUMNAMEFIELD, reason},
754  {REQRCNAMEFIELD, rc},
755  {REQSIGNALNAMEFIELD, signal},
756  };
757  unsigned j;
758 
759  for (j=0; j < DIMOF(pktframes); ++j) {
760  response->setint(response, pktframes[j].framename, pktframes[j].framevalue);
761  }
762  response->setbool(response, REQCOREDUMPNAMEFIELD, core_dumped);
763  if (stringresult) {
764  response->setstring(response, REQSTRINGRETNAMEFIELD, stringresult);
765  }
766  // Copy the request ID over from the original request
767  response->setint(response, REQIDENTIFIERNAMEFIELD
768  , request->getint(request, REQIDENTIFIERNAMEFIELD));
769  rsp_json = response->baseclass.toString(&response->baseclass);
770  UNREF(response);
771  sf->baseclass.setvalue(&sf->baseclass, rsp_json, strlen(rsp_json)+1, g_free);
773  UNREF2(sf);
774  transport->_netio->sendareliablefs(transport->_netio, nanofailreportaddr, DEFAULT_FSP_QID, fs);
775  UNREF(fs);
776 }
777 FSTATIC void
779 {
780  GSList* slframe;
781 
782  (void)parent;
783  (void)fromaddr;
784  if (nano_shutting_down) {
785  return;
786  }
787  if (NULL == RscQ) {
788  RscQ = resourcequeue_new(0);
789  }
790  // Loop over the frames, looking for those we know what to do with ;-)
791  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
792  Frame* frame = CASTTOCLASS(Frame, slframe->data);
793  CstringFrame* csframe;
794  ConfigContext* cfg;
795  if (frame->type != FRAMETYPE_RSCJSON) {
796  continue;
797  }
798  csframe = CASTTOCLASS(CstringFrame, frame);
800  if (NULL == cfg) {
801  g_warning("%s.%d: Received malformed JSON string [%*s]"
802  , __FUNCTION__, __LINE__
803  , csframe->baseclass.length-1
804  , (char*)csframe->baseclass.value);
805  continue;
806  }
807  RscQ->Qcmd(RscQ, cfg, _nano_send_rscexitstatus, nanotransport);
808  UNREF(cfg);
809  }
810 }
811 
812 FSTATIC void
814 {
815  GSList* slframe;
816 
817  (void)parent;
818  (void)fromaddr;
819  if (NULL == RscQ) {
820  RscQ = resourcequeue_new(0);
821  }
822 
823  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
824  Frame* frame = CASTTOCLASS(Frame, slframe->data);
825  CstringFrame* csframe;
826  ConfigContext* cfg;
827  if (frame->type != FRAMETYPE_RSCJSON) {
828  continue;
829  }
830  csframe = CASTTOCLASS(CstringFrame, frame);
832  if (NULL == cfg) {
833  g_warning("%s.%d: Received malformed JSON string [%*s]"
834  , __FUNCTION__, __LINE__
835  , csframe->baseclass.length-1
836  , (char*)csframe->baseclass.value);
837  continue;
838  }
839  RscQ->cancel(RscQ, cfg);
840  UNREF(cfg);
841  }
842 }
843 
848 FSTATIC void
850  , FrameSet* fs
851  , NetAddr* fromaddr)
852 {
853 
854  GSList* slframe;
855 
856  (void)parent;
857  (void)fromaddr;
858 
859 
860  // Loop over the frames, looking for the one we know what to do with ;-)
861  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
862  Frame* frame = CASTTOCLASS(Frame, slframe->data);
863  int frametype = frame->type;
864 
865  switch (frametype) {
866  case FRAMETYPE_DISCNAME: { // Discovery instance name
867  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
868  const char * discoveryname;
869  g_return_if_fail(strf != NULL);
870  discoveryname = strf->baseclass.value;
871  g_return_if_fail(discoveryname == NULL);
872  discovery_unregister(discoveryname);
873  }
874  break;
875 
876  }
877  }
878 }
879 
880 
884 FSTATIC void
885 nano_schedule_discovery(const char *instance,
886  guint32 interval,
887  const char* json,
890  NetAddr* fromaddr)
891 {
892  ConfigContext* jsonroot;
893  JsonDiscovery* discovery;
894  const char* disctype;
895 
896  (void)fromaddr;
897 
898  DEBUGMSG3("%s(%s,%d,%s)", __FUNCTION__, instance, interval, json);
899  jsonroot = configcontext_new_JSON_string(json);
900  g_return_if_fail(jsonroot != NULL);
901  disctype = jsonroot->getstring(jsonroot, "type");
902  g_return_if_fail(disctype != NULL);
903  discovery = jsondiscovery_new(disctype, instance, interval, jsonroot
904  , transport, config, 0);
905  UNREF(jsonroot);
906  UNREF2(discovery);
907 }
908 
911  const char * initdiscover;
915 };
916 
923 gboolean
924 nano_startupidle(gpointer gcruft)
925 {
926  static enum istate {INIT=3, WAIT=5, DONE=7} state = INIT;
927  struct startup_cruft* cruft = gcruft;
928  const char * cfgname = cruft->initdiscover;
929 
930  if (state == DONE || nano_shutting_down) {
931  return FALSE;
932  }
933  if (state == INIT) {
934  const char * jsontext = "{\"parameters\":{}}";
935  ConfigContext* jsondata = configcontext_new_JSON_string(jsontext);
937  ( cruft->initdiscover
938  , cruft->initdiscover
939  , cruft->discover_interval
940  , jsondata
941  , cruft->iosource, cruft->context, 0);
942  UNREF(jsondata);
943  UNREF2(jd);
944  state = WAIT;
945  return TRUE;
946  }
947  if (cruft->context->getstring(cruft->context, cfgname)) {
948  state = DONE;
949  // Call it once, and arrange for it to repeat until we hear back.
950  g_timeout_add_seconds(5, nano_reqconfig, gcruft);
951  nano_reqconfig(gcruft);
952  return FALSE;
953  }
954  return TRUE;
955 }
956 
959 gboolean
960 nano_reqconfig(gpointer gcruft)
961 {
962  struct startup_cruft* cruft = gcruft;
963  FrameSet* fs;
964  CstringFrame* csf;
965  CstringFrame* usf;
966  const char * cfgname = cruft->initdiscover;
967  ConfigContext* context = cruft->context;
968  NetAddr* cmainit = context->getaddr(context, CONFIGNAME_CMAINIT);
969  const char * jsontext;
970  char * sysname = NULL;
971 
972  if (nano_shutting_down) {
973  return FALSE;
974  }
975 
976  // We <i>have</i> to know our initial request address - or all is lost.
977  // NOTE THAT THIS ADDRESS MIGHT BE MULTICAST AND MIGHT BE USED ONLY ONCE
978  g_return_val_if_fail(cmainit != NULL, FALSE);
979 
980  // Our initial configuration message must contain these parameters.
981  if (context->getaddr(context, CONFIGNAME_CMAADDR) != NULL
982  && context->getaddr(context, CONFIGNAME_CMAFAIL) != NULL
983  && context->getaddr(context, CONFIGNAME_CMADISCOVER) != NULL
984  && context->getint(context, CONFIGNAME_CMAPORT) > 0) {
985  return FALSE;
986  }
988 
989  // Put in the system name
991  sysname = proj_get_sysname();
992  usf->baseclass.setvalue(&usf->baseclass, g_strdup(sysname), strlen(sysname)+1
994  frameset_append_frame(fs, &usf->baseclass);
995  UNREF2(usf);
996 
997  // Put in the JSON discovery text
998  jsontext = context->getstring(context, cfgname);
1000  csf->baseclass.setvalue(&csf->baseclass, g_strdup(jsontext), strlen(jsontext)+1
1002 
1003  frameset_append_frame(fs, &csf->baseclass);
1004  UNREF2(csf);
1005 
1006  // We've constructed the frameset - now send it - unreliably...
1007  // That's because the reply is typically from a different address
1008  // which would confuse the blazes out of the reliable comm code.
1009  cruft->iosource->sendaframeset(cruft->iosource, cmainit, fs);
1010  DEBUGMSG("%s.%d: Sent initial STARTUP frameset for %s."
1011  , __FUNCTION__, __LINE__, sysname);
1012  g_free(sysname); sysname = NULL;
1013  UNREF(fs);
1014  return TRUE;
1015 }
1016 
1017 static PacketDecoder* decoder = NULL;
1018 static SwitchDiscovery* swdisc = NULL;
1019 static AuthListener* obeycollective = NULL;
1020 
1021 
1026  // This is the complete set of commands that nanoprobes know how to obey - so far...
1040  {0, NULL},
1041 };
1042 
1047 {
1048  static FrameTypeToFrame decodeframes[] = FRAMETYPEMAP;
1049  // Set up our packet decoder
1050  decoder = packetdecoder_new(0, decodeframes, DIMOF(decodeframes));
1051  return decoder;
1052 }
1053 
1054 
1081 WINEXPORT void
1082 nano_start_full(const char *initdiscoverpath
1083  , guint discover_interval
1084  , NetGSource* io
1085  , ConfigContext* config)
1086 {
1087  static struct startup_cruft cruftiness;
1088  struct startup_cruft initcrufty = {
1089  initdiscoverpath,
1091  io,
1092  config
1093  };
1094  nano_shutting_down = FALSE;
1095  BINDDEBUG(nanoprobe_main);
1096 
1098  cruftiness = initcrufty;
1099  g_source_ref(CASTTOCLASS(GSource, io));
1100  nanotransport = io;
1101 
1102  // Get our local switch discovery information.
1103  // To be really right, we probably ought to wait until we know our local network
1104  // configuration - and start it up on all interfaces assigned addresses of global scope.
1106  swdisc = switchdiscovery_new("switchdiscovery_eth0", "eth0", ENABLE_LLDP|ENABLE_CDP, G_PRIORITY_LOW
1107  , g_main_context_default(), io, config, 0);
1108  obeycollective = authlistener_new(0, collective_obeylist, config, TRUE);
1109  obeycollective->baseclass.associate(&obeycollective->baseclass, io);
1110  // Initiate the startup process
1111  g_idle_add(nano_startupidle, &cruftiness);
1112 }
1114 WINEXPORT void
1115 nano_shutdown(gboolean report)
1116 {
1117  if (report) {
1118  NetIOstats* ts = &nanotransport->_netio->stats;
1119  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of heartbeats:", nano_hbstats.heartbeat_count);
1120  g_info("%-35s %8d", "Count of deadtimes:", nano_hbstats.dead_count);
1121  g_info("%-35s %8d", "Count of warntimes:", nano_hbstats.warntime_count);
1122  g_info("%-35s %8d", "Count of comealives:", nano_hbstats.comealive_count);
1123  g_info("%-35s %8d", "Count of martians:", nano_hbstats.martian_count);
1124  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of LLDP/CDP pkts sent:", swdisc->baseclass.reportcount);
1125  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of LLDP/CDP pkts received:", swdisc->baseclass.discovercount);
1126  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of LLDP/CDP pkts received:", swdisc->baseclass.discovercount);
1127  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of recvfrom calls:", ts->recvcalls);
1128  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of pkts read:", ts->pktsread);
1129  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of framesets read:", ts->fsreads);
1130  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of sendto calls:", ts->sendcalls);
1131  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of pkts written:", ts->pktswritten);
1132  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of framesets written:", ts->fswritten);
1133  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of reliable framesets sent:", ts->reliablesends);
1134  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of reliable framesets recvd:", ts->reliablereads);
1135  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of ACKs sent:", ts->ackssent);
1136  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of ACKs recvd:", ts->acksrecvd);
1137  }
1140  UNREF2(swdisc);
1141  if (RscQ) {
1142  RscQ->cancelall(RscQ);
1143  UNREF(RscQ);
1144  }
1145  if (nanofailreportaddr) {
1146  UNREF(nanofailreportaddr);
1147  }
1148  if (nanotransport) {
1149  g_source_destroy(CASTTOCLASS(GSource, nanotransport));
1150  g_source_unref(CASTTOCLASS(GSource, nanotransport));
1151  nanotransport = NULL;
1152  }
1153  // Free packet decoder
1154  if (decoder) {
1155  UNREF(decoder);
1156  }
1157  // Unregister all discovery modules.
1159  obeycollective->baseclass.dissociate(&obeycollective->baseclass);
1160  UNREF2(obeycollective);
1161 }
1162 
1164 WINEXPORT gboolean
1166 {
1167 
1168  if (nano_connected) {
1169  FsProtocol* proto = CASTTOCLASS(ReliableUDP, nanotransport->_netio)->_protocol;
1170  char * sysname;
1171  DEBUGMSG("Sending HBSHUTDOWN to CMA");
1172  sysname = proj_get_sysname();
1174  g_free(sysname); sysname = NULL;
1175  // Initiate connection shutdown.
1176  // This process will wait for all our output to be ACKed.
1177  // It also has an ACK timer, so it won't wait forever...
1178  proto->closeall(proto);
1179  idle_shutdown_gsource = g_idle_add(shutdown_when_outdone, NULL);
1180  nano_shutting_down = TRUE;
1181  }else{
1182  nano_shutting_down = TRUE;
1183  g_warning("%s: Never connected to CMA - cannot send shutdown message.", procname);
1184  ++errcount; // Trigger non-zero exit code...
1185  _nano_final_shutdown(NULL);
1186  return TRUE;
1187  }
1188  return FALSE;
1189 }
1190 FSTATIC gboolean
1191 shutdown_when_outdone(gpointer unused)
1192 {
1193  ReliableUDP* t = CASTTOCLASS(ReliableUDP, nanotransport->_netio);
1195  (void)unused;
1196  // Wait for all our connections to be shut down
1197  if (proto->activeconncount(proto) == 0){
1198  DEBUGMSG("%s.%d: Shutting down - all connections closed."
1199  , __FUNCTION__, __LINE__);
1200  g_main_quit(mainloop);
1201  return FALSE;
1202  }
1203  return TRUE;
1204 }
1205 // Final Shutdown -- a contingency timer to make sure we eventually shut down
1206 FSTATIC gboolean
1207 _nano_final_shutdown(gpointer unused)
1208 {
1209  (void)unused;
1210  DEBUGMSG("Initiating final shutdown");
1211  if (nano_connected && nanotransport->_netio->outputpending(nanotransport->_netio)){
1212  g_warning("Shutting down with unACKed output.");
1213  DUMP("Transport info", &nanotransport->_netio->baseclass, NULL);
1214  }
1215  if (idle_shutdown_gsource) {
1216  g_source_remove(idle_shutdown_gsource);
1217  idle_shutdown_gsource = 0;
1218  }
1219  g_main_quit(mainloop);
1220  return FALSE;
1221 }