The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
clientlib/nanoprobe.c
Go to the documentation of this file.
1 
25 #include <projectcommon.h>
26 #include <string.h>
27 #include <frameset.h>
28 #include <framesettypes.h>
29 #include <frametypes.h>
30 #include <compressframe.h>
31 #include <cryptframe.h>
32 #include <intframe.h>
33 #include <cstringframe.h>
34 #include <addrframe.h>
35 #include <ipportframe.h>
36 #include <seqnoframe.h>
37 #include <packetdecoder.h>
38 #include <netgsource.h>
39 #include <reliableudp.h>
40 #include <authlistener.h>
41 #include <nvpairframe.h>
42 #include <hblistener.h>
43 #include <hbsender.h>
44 #include <configcontext.h>
45 #include <pcap_min.h>
46 #include <jsondiscovery.h>
47 #include <switchdiscovery.h>
48 #include <fsprotocol.h>
49 #include <resourcecmd.h>
50 #include <resourcequeue.h>
51 #include <misc.h>
52 #include <nanoprobe.h>
53 
54 
60 void (*nanoprobe_warntime_agent)(HbListener*, guint64 howlate) = NULL;
61 void (*nanoprobe_comealive_agent)(HbListener*, guint64 howlate) = NULL;
62 WINEXPORT NanoHbStats nano_hbstats = {0U, 0U, 0U, 0U, 0U};
63 gboolean nano_connected = FALSE;
65 WINEXPORT GMainLoop* mainloop = NULL;
66 
74 FSTATIC void nanoobey_change_debug(gint plusminus, AuthListener*, FrameSet*, NetAddr*);
80 FSTATIC void _nano_send_rscexitstatus(ConfigContext* request, gpointer user_data
81 , enum HowDied reason, int rc, int signal, gboolean core_dumped
82 , const char * stringresult);
84 FSTATIC void nano_schedule_discovery(const char *name, guint32 interval,const char* json
85  , ConfigContext*, NetGSource* transport, NetAddr* fromaddr);
86 FSTATIC void nano_stop_discovery(const char * discoveryname, NetGSource*, NetAddr*);
87 FSTATIC gboolean nano_startupidle(gpointer gcruft);
88 FSTATIC gboolean nano_reqconfig(gpointer gcruft);
91 FSTATIC void _real_warntime_agent(HbListener* who, guint64 howlate);
92 FSTATIC void _real_comealive_agent(HbListener* who, guint64 howlate);
95 FSTATIC gboolean _nano_final_shutdown(gpointer unused);
96 FSTATIC gboolean shutdown_when_outdone(gpointer unused);
97 
98 HbListener* (*nanoprobe_hblistener_new)(NetAddr*, ConfigContext*) = _real_hblistener_new;
99 
100 gboolean nano_shutting_down = FALSE;
101 const char * procname = "nanoprobe";
102 static AuthListener* obeycollective = NULL;
103 
104 static NetAddr* nanofailreportaddr = NULL;
105 static NetGSource* nanotransport = NULL;
106 static guint idle_shutdown_gsource = 0;
107 static ResourceQueue* RscQ = NULL;
108 
110 
115 {
116  return hblistener_new(addr, context, 0);
117 }
118 
120 void
121 nanoprobe_report_upstream(guint16 reporttype
122 , NetAddr* who
123 , const char * systemnm
124 , guint64 howlate)
125 {
127  FrameSet* fs;
128 
129  if (nano_shutting_down || NULL == nanofailreportaddr) {
130  DEBUGMSG("%s.%d: Ignoring request to send fstype=%d message upstream [%s]."
131  , __FUNCTION__, __LINE__, reporttype
132  , (nano_shutting_down ? "shutting down" : "not connected to CMA"));
133  return;
134  }
135 
136  fs = frameset_new(reporttype);
137  // Construct and send a frameset reporting this event...
138  if (howlate > 0) {
140  lateframe->setint(lateframe, howlate);
141  frameset_append_frame(fs, &lateframe->baseclass);
142  UNREF2(lateframe);
143  }
144  // Add the address - if any...
145  if (who != NULL) {
147  frameset_append_frame(fs, &peeraddr->baseclass);
148  UNREF2(peeraddr);
149  }
150  // Add the system name - if any...
151  if (systemnm != NULL) {
153  usf->baseclass.setvalue(&usf->baseclass, g_strdup(systemnm), strlen(systemnm)+1
155  frameset_append_frame(fs, &usf->baseclass);
156  UNREF2(usf);
157  }
158  DEBUGMSG3("%s - sending frameset of type %d", __FUNCTION__, reporttype);
159  DUMP3("nanoprobe_report_upstream", &nanofailreportaddr->baseclass, NULL);
160  nanotransport->_netio->sendareliablefs(nanotransport->_netio, nanofailreportaddr, DEFAULT_FSP_QID, fs);
161  UNREF(fs);
162 }
163 
164 
166 FSTATIC void
168 {
169  static guint64 last_martian_time = 0; // microseconds
170  static guint recent_martian_count = 0;
171  guint64 now = g_get_monotonic_time(); // microseconds
172  const guint64 uS = 1000000;
173 
175 
176  // If it's been more than MARTIAN_TIMEOUT seconds since the last
177  // martian, then reset the count of recent martians
178  if (now > (last_martian_time + (MARTIAN_TIMEOUT*uS))) {
179  recent_martian_count = 0;
180  }
181 
182  last_martian_time = now;
183  ++recent_martian_count;
184 
185  // This means if we only get one martian then none, we say nothing
186  // This can happen as a result of timing - and it's OK.
187  // But if we get more than one, we complain then and once every 10 afterwards
188  if ((recent_martian_count % 10) == 2) {
189  char * addrstring;
190 
192  addrstring = who->baseclass.toString(who);
193  g_warning("System at address %s is sending unexpected heartbeats.", addrstring);
194  g_free(addrstring);
195 
197  }
198 }
201 FSTATIC void
203 {
207  }else{
208  char * addrstring;
209 
210  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
211  g_warning("Peer at address %s is dead (has timed out).", addrstring);
212  g_free(addrstring);
213 
215  }
216 }
217 
220 FSTATIC void
222 {
226  }
227 }
228 
229 
232 FSTATIC void
233 _real_warntime_agent(HbListener* who, guint64 howlate)
234 {
237  nanoprobe_warntime_agent(who, howlate);
238  }else{
239  char * addrstring;
240  guint64 mslate = howlate / 1000;
241  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
242  g_warning("Heartbeat from peer at address %s was "FMT_64BIT"d ms late.", addrstring, mslate);
243  g_free(addrstring);
245  }
246 }
249 FSTATIC void
250 _real_comealive_agent(HbListener* who, guint64 howlate)
251 {
254  nanoprobe_comealive_agent(who, howlate);
255  }else{
256  char * addrstring;
257  double secsdead = ((double)((howlate+50000) / 100000))/10.0; // Round to nearest tenth of a second
258  addrstring = who->listenaddr->baseclass.toString(who->listenaddr);
259  g_warning("Peer at address %s came alive after being dead for %g seconds.", addrstring, secsdead);
260  g_free(addrstring);
262  }
263 }
264 
276 void
278  , FrameSet* fs
279  , NetAddr* fromaddr)
280 {
281 
282  GSList* slframe;
283  guint addrcount = 0;
285  guint16 sendinterval = 0;
286 
287  if (nano_shutting_down) {
288  return;
289  }
290 
291  g_return_if_fail(fs != NULL);
292  (void)fromaddr;
293 
294  if (config->getint(config, CONFIGNAME_HBTIME) > 0) {
295  sendinterval = config->getint(config, CONFIGNAME_HBTIME);
296  }
297 
298  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
299  Frame* frame = CASTTOCLASS(Frame, slframe->data);
300  int frametype = frame->type;
301  switch(frametype) {
302  IntFrame* iframe;
303  IpPortFrame* aframe;
304  HbSender* hb;
305 
307  iframe = CASTTOCLASS(IntFrame, frame);
308  sendinterval = (guint16) iframe->getint(iframe);
309  break;
310  case FRAMETYPE_IPPORT:
311  if (0 == sendinterval) {
312  g_warning("Send interval is zero in %s", __FUNCTION__);
313  continue;
314  }
315  aframe = CASTTOCLASS(IpPortFrame, frame);
316  addrcount++;
317  hb = hbsender_new(aframe->getnetaddr(aframe), parent->baseclass.transport
318  , sendinterval, 0);
319  (void)hb;
320  break;
321  }
322  }
323 }
335 void
337  , FrameSet* fs
338  , NetAddr* fromaddr)
339 {
340 
341  GSList* slframe;
343  guint addrcount = 0;
344 
345  guint64 deadtime = 0;
346  guint64 warntime = 0;
347 
348  (void)fromaddr;
349 
350  if (nano_shutting_down) {
351  return;
352  }
353 
354  g_return_if_fail(fs != NULL);
355  if (config->getint(config, CONFIGNAME_DEADTIME) > 0) {
356  deadtime = config->getint(config, CONFIGNAME_DEADTIME);
357  }
358  if (config->getint(config, CONFIGNAME_WARNTIME) > 0) {
359  warntime = config->getint(config, CONFIGNAME_WARNTIME);
360  }
361 
362  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
363  Frame* frame = CASTTOCLASS(Frame, slframe->data);
364  int frametype = frame->type;
365  switch(frametype) {
366  IntFrame* iframe;
367 
369  iframe = CASTTOCLASS(IntFrame, frame);
370  deadtime = iframe->getint(iframe);
371  break;
372 
374  iframe = CASTTOCLASS(IntFrame, frame);
375  warntime = iframe->getint(iframe);
376  break;
377 
378  case FRAMETYPE_IPPORT: {
379  HbListener* hblisten;
380  IpPortFrame* aframe;
382  aframe = CASTTOCLASS(IpPortFrame, frame);
383  addrcount++;
384  hblisten = hblistener_new(aframe->getnetaddr(aframe), config, 0);
385  hblisten->baseclass.associate(&hblisten->baseclass, transport);
386  if (deadtime > 0) {
387  // Otherwise we get the default deadtime
388  hblisten->set_deadtime(hblisten, deadtime);
389  }
390  if (warntime > 0) {
391  // Otherwise we get the default warntime
392  hblisten->set_warntime(hblisten, warntime);
393  }
394  hblisten->set_deadtime_callback(hblisten, _real_deadtime_agent);
395  hblisten->set_heartbeat_callback(hblisten, _real_heartbeat_agent);
396  hblisten->set_warntime_callback(hblisten, _real_warntime_agent);
397  hblisten->set_comealive_callback(hblisten, _real_comealive_agent);
398  // Intercept incoming heartbeat packets
399  transport->addListener(transport, FRAMESETTYPE_HEARTBEAT
400  , &hblisten->baseclass);
401  // Unref this heartbeat listener, and forget our reference.
402  UNREF2(hblisten);
403  /*
404  * That still leaves two references to 'hblisten':
405  * - in the transport dispatch table
406  * - in the global heartbeat listener table
407  * And one reference to the previous 'hblisten' object:
408  * - in the global heartbeat listener table
409  * Also note that we become the 'proxy' for all incoming heartbeats
410  * but we dispatch them to the right HbListener object.
411  * Since we've become the proxy for all incoming heartbeats, if
412  * we displace and free the old proxy, this all still works nicely,
413  * because the transport object gets rid of its old reference to the
414  * old 'proxy' object.
415  */
416  }
417  break;
418  }
419  }
420 }
421 
431 void
433  , FrameSet* fs
434  , NetAddr* fromaddr)
435 {
436  g_return_if_fail(fs != NULL && fs->fstype == FRAMESETTYPE_SENDEXPECTHB);
437 
438  if (nano_shutting_down) {
439  return;
440  }
441  // This will cause us to ACK the packet twice -- not a problem...
442  nanoobey_sendhb (parent, fs, fromaddr);
443  nanoobey_expecthb(parent, fs, fromaddr);
444 }
452 void
454  , FrameSet* fs
455  , NetAddr* fromaddr)
456 {
457  GSList* slframe;
458  (void)parent;
459  (void)fromaddr;
460 
461  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
462  Frame* frame = CASTTOCLASS(Frame, slframe->data);
463  switch(frame->type) {
464  case FRAMETYPE_IPPORT: {
465  // This is _so_ much simpler than the code to send them ;-)
466  IpPortFrame* aframe = CASTTOCLASS(IpPortFrame, frame);
467  hbsender_stopsend(aframe->getnetaddr(aframe));
468  break;
469  }
470  }//endswitch
471  }//endfor
472 }
473 
481 void
483  , FrameSet* fs
484  , NetAddr* fromaddr)
485 {
486  GSList* slframe;
487  (void)parent;
488  (void)fromaddr;
489 
490  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
491  Frame* frame = CASTTOCLASS(Frame, slframe->data);
492  switch(frame->type) {
493  case FRAMETYPE_IPPORT: {
494  // This is _so_ much simpler than the code to listen for heartbeats...
495  IpPortFrame* aframe = CASTTOCLASS(IpPortFrame, frame);
496  NetAddr* destaddr = aframe->getnetaddr(aframe);
498  hblistener_unlisten(destaddr);
499  transport->closeconn(transport, DEFAULT_FSP_QID, destaddr);
500  break;
501  }
502  }//endswitch
503  }//endfor
504 }
505 
511 void
513  , FrameSet* fs
514  , NetAddr* fromaddr)
515 {
516  nanoobey_stopexpecthb(parent, fs, fromaddr);
517  nanoobey_stopsendhb (parent, fs, fromaddr);
518 }
519 
520 /*
521  * Act on (obey) a <b>FRAMESETTYPE_SETCONFIG</b> @ref FrameSet.
522  * This frameset is sent during the initial configuration phase.
523  * It contains name value pairs to save into our configuration (ConfigContext).
524  * These might be {string,string} pairs or {string,ipaddr} pairs, or
525  * {string, integer} pairs. We process them all.
526  * The frame types that we receive for these are:
527  * <b>FRAMETYPE_PARAMNAME</b> - parameter name to set
528  * <b>FRAMETYPE_CSTRINGVAL</b> - string value to associate with name
529  * <b>FRAMETYPE_CINTVAL</b> - integer value to associate with naem
530  * <b>FRAMETYPE_IPPORT</b> - IP address to associate with name
531  */
532 void
534  , FrameSet* fs
535  , NetAddr* fromaddr)
536 {
537  GSList* slframe;
538  ConfigContext* newconfig = NULL;
540 
541  (void)fromaddr;
542 
543 
544  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
545  Frame* frame = CASTTOCLASS(Frame, slframe->data);
546  switch (frame->type) {
547  case FRAMETYPE_CONFIGJSON: { // Configuration JSON string (parameters)
548  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
549  const char * jsonstring;
550  g_return_if_fail(strf != NULL);
551  jsonstring = strf->baseclass.value;
552  DEBUGMSG3("%s.%d: Got CONFIGJSON frame: %s", __FUNCTION__, __LINE__
553  , jsonstring);
554  newconfig = configcontext_new_JSON_string(jsonstring);
555  goto endloop;
556  }
557  }
558  }
559 endloop:
560  if (NULL == newconfig) {
561  g_warning("%s.%d: SETCONFIG message without valid JSON configuration"
562  , __FUNCTION__, __LINE__);
563  return;
564  }
565 
566  if (config) {
567  GSList* keylist = newconfig->keys(newconfig);
568  GSList* thiskey;
569  GSList* nextkey;
570 
571  // Merge the new configuration into the old configuration data...
572  for (thiskey = keylist; thiskey; thiskey=nextkey) {
573  const char * key = thiskey->data;
574  enum ConfigValType valtype = newconfig->gettype(newconfig, key);
575 
576  nextkey=thiskey->next;
577 
578  switch(valtype) {
579  case CFG_NETADDR:
580  config->setaddr(config, key, newconfig->getaddr(newconfig, key));
581  break;
582 
583  case CFG_CFGCTX:
584  config->setconfig(config, key, newconfig->getconfig(newconfig, key));
585  break;
586 
587  case CFG_STRING:
588  config->setstring(config, key, newconfig->getstring(newconfig, key));
589  break;
590 
591  case CFG_BOOL:
592  config->setbool(config, key, newconfig->getbool(newconfig, key));
593  break;
594 
595  case CFG_INT64:
596  config->setint(config, key, newconfig->getint(newconfig, key));
597  break;
598 
599  case CFG_FLOAT:
600  config->setdouble(config, key, newconfig->getdouble(newconfig, key));
601  break;
602  default:
603  break;
604  }
605  g_slist_free1(thiskey);
606  }
607  }
608  UNREF(newconfig);
609 
610  DUMP3("nanoobey_setconfig: cfg is", &config->baseclass, NULL);
611 
612  if (config && config->getaddr(config, CONFIGNAME_CMAFAIL) != NULL) {
613  if (nanofailreportaddr == NULL) {
614  nanofailreportaddr = config->getaddr(config, CONFIGNAME_CMAFAIL);
615  }else if (config->getaddr(config, CONFIGNAME_CMAFAIL) != nanofailreportaddr) {
616  UNREF(nanofailreportaddr);
617  nanofailreportaddr = config->getaddr(config, CONFIGNAME_CMAFAIL);
618  }
619  DUMP3("nanoobey_setconfig: nanofailreportaddr", &nanofailreportaddr->baseclass, NULL);
620  {
621  // Alias localhost to the CMA nanofailreportaddr (at least for now...)
624 
625  NetAddr* localhost = netaddr_string_new("127.0.0.1");
626  NetIO* io = parent->baseclass.transport->_netio;
627  io->addalias(io, localhost, nanofailreportaddr);
628  UNREF(localhost);
629  }
630  REF(nanofailreportaddr);
631  }
632  g_message("Connected to CMA. Happiness :-D");
633  nano_connected = TRUE;
634 }//nanoobey_setconfig
635 
640 FSTATIC void
641 nanoobey_change_debug(gint plusminus
642  , AuthListener* parent
643  , FrameSet* fs
644  , NetAddr* fromaddr)
645 {
646 
647  GSList* slframe;
648  guint changecount = 0;
649 
650  (void)parent;
651  (void)fromaddr;
652 
653  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
654  Frame* frame = CASTTOCLASS(Frame, slframe->data);
655  int frametype = frame->type;
656  switch (frametype) {
657  case FRAMETYPE_CSTRINGVAL: { // String value to set 'paramname' to
658  ++changecount;
659  if (plusminus < 0) {
660  proj_class_decr_debug((char*)frame->value);
661  }else{
662  proj_class_incr_debug((char*)frame->value);
663  }
664  }
665  break;
666  }
667  }
668  if (changecount == 0) {
669  if (plusminus < 0) {
670  proj_class_decr_debug(NULL);
671  }else{
672  proj_class_incr_debug(NULL);
673  }
674  }
675 }
680 FSTATIC void
682  , FrameSet* fs
683  , NetAddr* fromaddr)
684 {
685  nanoobey_change_debug(+1, parent, fs, fromaddr);
686 }
691 FSTATIC void
693  , FrameSet* fs
694  , NetAddr* fromaddr)
695 {
696  nanoobey_change_debug(-1, parent, fs, fromaddr);
697 }
698 
706 FSTATIC void
708  , FrameSet* fs
709  , NetAddr* fromaddr)
710 {
711 
712  GSList* slframe;
713  guint interval = 0;
714  const char * discoveryname = NULL;
715 
716  (void)parent;
717  (void)fromaddr;
718 
719  if (nano_shutting_down) {
720  return;
721  }
722 
723  DEBUGMSG3("%s - got frameset", __FUNCTION__);
724  // Loop over the frames, looking for those we know what to do with ;-)
725  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
726  Frame* frame = CASTTOCLASS(Frame, slframe->data);
727  int frametype = frame->type;
728 
729  switch (frametype) {
730  case FRAMETYPE_DISCNAME: { // Discovery instance name
731  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
732  g_return_if_fail(strf != NULL);
733  g_return_if_fail(discoveryname == NULL);
734  discoveryname = strf->baseclass.value;
735  DEBUGMSG3("%s - got DISCOVERYNAME %s", __FUNCTION__, discoveryname);
736  }
737  break;
738 
739  case FRAMETYPE_DISCINTERVAL: { // Discovery interval
740  IntFrame* intf = CASTTOCLASS(IntFrame, frame);
741  interval = (guint)intf->getint(intf);
742  DEBUGMSG3("%s - got DISCOVERYINTERVAL %d", __FUNCTION__, interval);
743  }
744  break;
745 
746  case FRAMETYPE_DISCJSON: { // Discovery JSON string (parameters)
747  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
748  const char * jsonstring;
749  g_return_if_fail(strf != NULL);
750  jsonstring = strf->baseclass.value;
751  g_return_if_fail(discoveryname != NULL);
752  DEBUGMSG3("Got DISCJSON frame: %s %d %s" , discoveryname, interval, jsonstring);
753  nano_schedule_discovery(discoveryname, interval, jsonstring
754  , parent->baseclass.config
755  , parent->baseclass.transport
756  , fromaddr);
757  }
758  interval = 0;
759  discoveryname = NULL;
760  break;
761 
762  }
763  }
764 }
765 
767 FSTATIC void
768 _nano_send_rscexitstatus(ConfigContext* request, gpointer user_data
769 , enum HowDied reason, int rc, int signal, gboolean core_dumped
770 , const char * stringresult)
771 {
773  ConfigContext* response = configcontext_new(0);
776  char* rsp_json;
777 
778  struct {
779  const char* framename;
780  int framevalue;
781  } pktframes[] = {
782  {REQREASONENUMNAMEFIELD, reason},
783  {REQRCNAMEFIELD, rc},
784  {REQSIGNALNAMEFIELD, signal},
785  };
786  unsigned j;
787 
788  for (j=0; j < DIMOF(pktframes); ++j) {
789  response->setint(response, pktframes[j].framename, pktframes[j].framevalue);
790  }
791  response->setbool(response, REQCOREDUMPNAMEFIELD, core_dumped);
792  if (stringresult) {
793  response->setstring(response, REQSTRINGRETNAMEFIELD, stringresult);
794  }
795  // Copy the request ID over from the original request
796  response->setint(response, REQIDENTIFIERNAMEFIELD
797  , request->getint(request, REQIDENTIFIERNAMEFIELD));
798  // Copy the resource name over from the original request
799  response->setstring(response, REQRSCNAMEFIELD
800  , request->getstring(request, REQRSCNAMEFIELD));
801  // Package it up as a JSON string to send to the CMA
802  rsp_json = response->baseclass.toString(&response->baseclass);
803  UNREF(response);
804  DEBUGMSG1("Reporting resource failure: %s", rsp_json);
805  sf->baseclass.setvalue(&sf->baseclass, rsp_json, strlen(rsp_json)+1, g_free);
807  UNREF2(sf);
808  transport->_netio->sendareliablefs(transport->_netio, nanofailreportaddr, DEFAULT_FSP_QID, fs);
809  UNREF(fs);
810 }
811 FSTATIC void
813 {
814  GSList* slframe;
815 
816  (void)parent;
817  (void)fromaddr;
818  if (nano_shutting_down) {
819  return;
820  }
821  if (NULL == RscQ) {
822  RscQ = resourcequeue_new(0);
823  }
824  // Loop over the frames, looking for those we know what to do with ;-)
825  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
826  Frame* frame = CASTTOCLASS(Frame, slframe->data);
827  CstringFrame* csframe;
828  ConfigContext* cfg;
829  if (frame->type != FRAMETYPE_RSCJSON) {
830  continue;
831  }
832  csframe = CASTTOCLASS(CstringFrame, frame);
834  if (NULL == cfg) {
835  g_warning("%s.%d: Received malformed JSON string [%*s]"
836  , __FUNCTION__, __LINE__
837  , csframe->baseclass.length-1
838  , (char*)csframe->baseclass.value);
839  continue;
840  }
841  RscQ->Qcmd(RscQ, cfg, _nano_send_rscexitstatus, nanotransport);
842  UNREF(cfg);
843  }
844 }
845 
846 FSTATIC void
848 {
849  GSList* slframe;
850 
851  (void)parent;
852  (void)fromaddr;
853  if (NULL == RscQ) {
854  RscQ = resourcequeue_new(0);
855  }
856 
857  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
858  Frame* frame = CASTTOCLASS(Frame, slframe->data);
859  CstringFrame* csframe;
860  ConfigContext* cfg;
861  if (frame->type != FRAMETYPE_RSCJSON) {
862  continue;
863  }
864  csframe = CASTTOCLASS(CstringFrame, frame);
866  if (NULL == cfg) {
867  g_warning("%s.%d: Received malformed JSON string [%*s]"
868  , __FUNCTION__, __LINE__
869  , csframe->baseclass.length-1
870  , (char*)csframe->baseclass.value);
871  continue;
872  }
873  RscQ->cancel(RscQ, cfg);
874  UNREF(cfg);
875  }
876 }
877 
882 FSTATIC void
884  , FrameSet* fs
885  , NetAddr* fromaddr)
886 {
887 
888  GSList* slframe;
889 
890  (void)parent;
891  (void)fromaddr;
892 
893 
894  // Loop over the frames, looking for the one we know what to do with ;-)
895  for (slframe = fs->framelist; slframe != NULL; slframe = g_slist_next(slframe)) {
896  Frame* frame = CASTTOCLASS(Frame, slframe->data);
897  int frametype = frame->type;
898 
899  switch (frametype) {
900  case FRAMETYPE_DISCNAME: { // Discovery instance name
901  CstringFrame* strf = CASTTOCLASS(CstringFrame, frame);
902  const char * discoveryname;
903  g_return_if_fail(strf != NULL);
904  discoveryname = strf->baseclass.value;
905  g_return_if_fail(discoveryname == NULL);
906  discovery_unregister(discoveryname);
907  }
908  break;
909 
910  }
911  }
912 }
913 
914 
918 FSTATIC void
919 nano_schedule_discovery(const char *instance,
920  guint32 interval,
921  const char* json,
924  NetAddr* fromaddr)
925 {
926  ConfigContext* jsonroot;
927  JsonDiscovery* discovery;
928  const char* disctype;
929 
930  (void)fromaddr;
931 
932  DEBUGMSG3("%s(%s,%d,%s)", __FUNCTION__, instance, interval, json);
933  jsonroot = configcontext_new_JSON_string(json);
934  g_return_if_fail(jsonroot != NULL);
935  disctype = jsonroot->getstring(jsonroot, "type");
936  g_return_if_fail(disctype != NULL);
937  discovery = jsondiscovery_new(disctype, instance, interval, jsonroot
938  , transport, config, 0);
939  UNREF(jsonroot);
940  if (discovery) {
941  UNREF2(discovery);
942  }
943 }
944 
947  const char * initdiscover;
950 };
951 
958 gboolean
959 nano_startupidle(gpointer gcruft)
960 {
961  static enum istate {INIT=3, WAIT=5, DONE=7} state = INIT;
962  struct startup_cruft* cruft = gcruft;
963  const char * cfgname = cruft->initdiscover;
964 
965  if (state == DONE || nano_shutting_down) {
966  return FALSE;
967  }
968  if (state == INIT) {
969  const char * jsontext = "{\"parameters\":{}}";
970  ConfigContext* jsondata = configcontext_new_JSON_string(jsontext);
972  ( cruft->initdiscover
973  , cruft->initdiscover
974  , cruft->discover_interval
975  , jsondata
976  , cruft->iosource, obeycollective->baseclass.config, 0);
977  UNREF(jsondata);
978  UNREF2(jd);
979  state = WAIT;
980  return TRUE;
981  }
982  if (obeycollective->baseclass.config->getstring(obeycollective->baseclass.config, cfgname)) {
983  state = DONE;
984  // Call it once, and arrange for it to repeat until we hear back.
985  g_timeout_add_seconds(5, nano_reqconfig, gcruft);
986  nano_reqconfig(gcruft);
987  return FALSE;
988  }
989  return TRUE;
990 }
991 
994 gboolean
995 nano_reqconfig(gpointer gcruft)
996 {
997  struct startup_cruft* cruft = gcruft;
998  FrameSet* fs;
999  CstringFrame* csf;
1000  CstringFrame* usf;
1001  const char * cfgname = cruft->initdiscover;
1002  ConfigContext* context = obeycollective->baseclass.config;
1003  NetAddr* cmainit = context->getaddr(context, CONFIGNAME_CMAINIT);
1004  const char * jsontext;
1005  char * sysname = NULL;
1006 
1007  if (nano_shutting_down) {
1008  return FALSE;
1009  }
1010 
1011  // We <i>have</i> to know our initial request address - or all is lost.
1012  // NOTE THAT THIS ADDRESS MIGHT BE MULTICAST AND MIGHT BE USED ONLY ONCE
1013  g_return_val_if_fail(cmainit != NULL, FALSE);
1014 
1015  // Our initial configuration message must contain these parameters.
1016  if (context->getaddr(context, CONFIGNAME_CMAADDR) != NULL
1017  && context->getaddr(context, CONFIGNAME_CMAFAIL) != NULL
1018  && context->getaddr(context, CONFIGNAME_CMADISCOVER) != NULL
1019  && context->getint(context, CONFIGNAME_CMAPORT) > 0) {
1020  return FALSE;
1021  }
1023 
1024  // Put in the system name
1026  sysname = proj_get_sysname();
1027  usf->baseclass.setvalue(&usf->baseclass, g_strdup(sysname), strlen(sysname)+1
1029  frameset_append_frame(fs, &usf->baseclass);
1030  UNREF2(usf);
1031 
1032  // Put in the JSON configuration text
1033  jsontext = context->getstring(context, cfgname);
1035  csf->baseclass.setvalue(&csf->baseclass, g_strdup(jsontext), strlen(jsontext)+1
1037 
1038  frameset_append_frame(fs, &csf->baseclass);
1039  UNREF2(csf);
1040 
1041  // We've constructed the frameset - now send it - unreliably...
1042  // That's because the reply is typically from a different address
1043  // which would confuse the blazes out of the reliable comm code.
1044  cruft->iosource->sendaframeset(cruft->iosource, cmainit, fs);
1045  DEBUGMSG("%s.%d: Sent initial STARTUP frameset for %s."
1046  , __FUNCTION__, __LINE__, sysname);
1047  g_free(sysname); sysname = NULL;
1048  UNREF(fs);
1049  return TRUE;
1050 }
1051 
1052 static PacketDecoder* decoder = NULL;
1053 static SwitchDiscovery* swdisc = NULL;
1054 
1055 
1060  // This is the complete set of commands that nanoprobes know how to obey - so far...
1074  {0, NULL},
1075 };
1076 
1081 {
1082  static FrameTypeToFrame decodeframes[] = FRAMETYPEMAP;
1083  // Set up our packet decoder
1084  decoder = packetdecoder_new(0, decodeframes, DIMOF(decodeframes));
1085  return decoder;
1086 }
1087 
1088 
1115 WINEXPORT void
1116 nano_start_full(const char *initdiscoverpath
1117  , guint discover_interval
1118  , NetGSource* io
1119  , ConfigContext* config)
1120 {
1121  static struct startup_cruft cruftiness;
1122  struct startup_cruft initcrufty = {
1123  initdiscoverpath,
1125  io,
1126  };
1127  nano_shutting_down = FALSE;
1128  BINDDEBUG(nanoprobe_main);
1129 
1131  cruftiness = initcrufty;
1132  g_source_ref(CASTTOCLASS(GSource, io));
1133  nanotransport = io;
1134 
1135  // Get our local switch discovery information.
1136  // To be really right, we probably ought to wait until we know our local network
1137  // configuration - and start it up on all interfaces assigned addresses of global scope.
1139  swdisc = switchdiscovery_new("switchdiscovery_eth0", "eth0", ENABLE_LLDP|ENABLE_CDP, G_PRIORITY_LOW
1140  , g_main_context_default(), io, config, 0);
1141  obeycollective = authlistener_new(0, collective_obeylist, config, TRUE);
1142  obeycollective->baseclass.associate(&obeycollective->baseclass, io);
1143  // Initiate the startup process
1144  g_idle_add(nano_startupidle, &cruftiness);
1145 }
1146 
1148 WINEXPORT void
1149 nano_shutdown(gboolean report)
1150 {
1151  if (report) {
1152  NetIOstats* ts = &nanotransport->_netio->stats;
1153  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of heartbeats:", nano_hbstats.heartbeat_count);
1154  g_info("%-35s %8d", "Count of deadtimes:", nano_hbstats.dead_count);
1155  g_info("%-35s %8d", "Count of warntimes:", nano_hbstats.warntime_count);
1156  g_info("%-35s %8d", "Count of comealives:", nano_hbstats.comealive_count);
1157  g_info("%-35s %8d", "Count of martians:", nano_hbstats.martian_count);
1158  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of LLDP/CDP pkts sent:", swdisc->baseclass.reportcount);
1159  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of LLDP/CDP pkts received:", swdisc->baseclass.discovercount);
1160  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of recvfrom calls:", ts->recvcalls);
1161  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of pkts read:", ts->pktsread);
1162  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of framesets read:", ts->fsreads);
1163  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of sendto calls:", ts->sendcalls);
1164  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of pkts written:", ts->pktswritten);
1165  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of framesets written:", ts->fswritten);
1166  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of reliable framesets sent:", ts->reliablesends);
1167  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of reliable framesets recvd:", ts->reliablereads);
1168  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of ACKs sent:", ts->ackssent);
1169  g_info("%-35s %8"G_GINT64_MODIFIER"d", "Count of ACKs recvd:", ts->acksrecvd);
1170  }
1173  UNREF2(swdisc);
1174  if (nanofailreportaddr) {
1175  UNREF(nanofailreportaddr);
1176  }
1177  if (nanotransport) {
1178  g_source_destroy(CASTTOCLASS(GSource, nanotransport));
1179  g_source_unref(CASTTOCLASS(GSource, nanotransport));
1180  nanotransport = NULL;
1181  }
1182  // Free packet decoder
1183  if (decoder) {
1184  UNREF(decoder);
1185  }
1186  obeycollective->baseclass.dissociate(&obeycollective->baseclass);
1187  UNREF2(obeycollective);
1188 }
1189 
1191 WINEXPORT gboolean
1193 {
1194 
1195  if (nano_connected) {
1196  FsProtocol* proto = CASTTOCLASS(ReliableUDP, nanotransport->_netio)->_protocol;
1197  char * sysname;
1198  DEBUGMSG("Sending HBSHUTDOWN to CMA");
1199  sysname = proj_get_sysname();
1201  g_free(sysname); sysname = NULL;
1202  // Initiate connection shutdown.
1203  // This process will wait for all our output to be ACKed.
1204  // It also has an ACK timer, so it won't wait forever...
1205  proto->closeall(proto);
1206  idle_shutdown_gsource = g_idle_add(shutdown_when_outdone, NULL);
1207  nano_shutting_down = TRUE;
1208  // Unregister all discovery modules. Keep us from starting any new ones...
1210  // Let's not start any more resource operations either...
1211  if (RscQ) {
1212  RscQ->cancelall(RscQ);
1213  UNREF(RscQ);
1214  }
1215  // @TODO We need to ignore additional requests during shutdown as well...
1216  }else{
1217  nano_shutting_down = TRUE;
1218  g_warning("%s: Never connected to CMA - cannot send shutdown message.", procname);
1219  ++errcount; // Trigger non-zero exit code...
1220  _nano_final_shutdown(NULL);
1221  return TRUE;
1222  }
1223  return FALSE;
1224 }
1225 
1227 FSTATIC gboolean
1228 shutdown_when_outdone(gpointer unused)
1229 {
1230  ReliableUDP* t = CASTTOCLASS(ReliableUDP, nanotransport->_netio);
1232  (void)unused;
1233  // Wait for all our connections to be shut down
1234  if (proto->activeconncount(proto) == 0){
1235  DEBUGMSG("%s.%d: Shutting down - all connections closed."
1236  , __FUNCTION__, __LINE__);
1237  g_main_quit(mainloop);
1238  return FALSE;
1239  }
1240  return TRUE;
1241 }
1242 // Final Shutdown -- a contingency timer to make sure we eventually shut down
1243 FSTATIC gboolean
1244 _nano_final_shutdown(gpointer unused)
1245 {
1246  (void)unused;
1247  DEBUGMSG("Initiating final shutdown");
1248  if (nano_connected && nanotransport->_netio->outputpending(nanotransport->_netio)){
1249  g_warning("Shutting down with unACKed output.");
1250  DUMP("Transport info", &nanotransport->_netio->baseclass, NULL);
1251  }
1252  if (idle_shutdown_gsource) {
1253  g_source_remove(idle_shutdown_gsource);
1254  idle_shutdown_gsource = 0;
1255  }
1256  g_main_quit(mainloop);
1257  return FALSE;
1258 }