The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
hblistener.c
Go to the documentation of this file.
1 
24 #define IS_LISTENER_SUBCLASS 1
25 #include <memory.h>
26 #include <glib.h>
27 #include <frame.h>
28 #include <frameset.h>
29 #include <hblistener.h>
30 #include <stdlib.h>
33 FSTATIC void _hblistener_notify_function(gpointer ignoreddata);
37 FSTATIC void _hblistener_checktimeouts(gboolean urgent);
39 FSTATIC void _hblistener_set_deadtime(HbListener* self, guint64 deadtime);
40 FSTATIC void _hblistener_set_warntime(HbListener* self, guint64 warntime);
43 FSTATIC gboolean _hblistener_gsourcefunc(gpointer);
44 FSTATIC void _hblistener_set_deadtime_callback(HbListener*, void (*callback)(HbListener* who));
46 FSTATIC void _hblistener_set_warntime_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
47 FSTATIC void _hblistener_set_comealive_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
48 
50 
55 
56 static GSList* _hb_listeners = NULL;
57 static gint _hb_listener_count = 0;
58 static guint64 _hb_listener_lastcheck = 0;
59 static void (*_hblistener_martiancallback)(NetAddr* who) = NULL;
60 static gint hb_timeout_id = -1;
61 
62 #define ONESEC 1000000
63 
66 FSTATIC void
68 {
69  HbListener* old;
70  if (_hb_listeners == NULL) {
71  hb_timeout_id = g_timeout_add_seconds_full(G_PRIORITY_LOW, 1
74  }else if ((old=hblistener_find_by_address(self->listenaddr)) != NULL) {
76  }
77  _hb_listeners = g_slist_prepend(_hb_listeners, self);
78  _hb_listener_count += 1;
79  REF2(self);
80 }
81 
83 FSTATIC void
85 {
86  if (g_slist_find(_hb_listeners, self) != NULL) {
87  _hb_listeners = g_slist_remove(_hb_listeners, self);
88  _hb_listener_count -= 1;
89  UNREF2(self);
90  return;
91  }
92 }
93 
97 {
98  GSList* obj;
99  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
100  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
101  if (which->equal(which, listener->listenaddr)) {
102  return listener;
103  }
104  }
105  return NULL;
106 }
107 
108 
110 FSTATIC void
111 _hblistener_checktimeouts(gboolean urgent)
112 {
113  guint64 now = g_get_real_time();
114  GSList* obj;
115  if (!urgent && (now - _hb_listener_lastcheck) < ONESEC) {
116  return;
117  }
118  _hb_listener_lastcheck = now;
119 
120  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
121  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
122  if (now > listener->nexttime && listener->status == HbPacketsBeingReceived) {
123  if (listener->_deadtime_callback) {
124  listener->_deadtime_callback(listener);
125  }else{
126  g_warning("our node looks dead from here...");
127  }
128  listener->status = HbPacketsTimedOut;
129  }
130  }
131 }
132 
134 FSTATIC gboolean
135 _hblistener_gsourcefunc(gpointer ignored)
136 {
137  gboolean ret;
138  (void)ignored;
140  ret = (_hb_listeners != NULL);
141  if (!ret) {
142  hb_timeout_id = -1;
143  }
144  return ret;
145 }
146 
148 FSTATIC gboolean
150 {
151  guint64 now = g_get_real_time();
152  HbListener* addmatch;
153 
154  (void)self; // Odd, but true - because we're a proxy for all hblisteners...
155  addmatch = hblistener_find_by_address(srcaddr);
156  if (addmatch != NULL) {
157  if (addmatch->status == HbPacketsTimedOut) {
158  guint64 howlate = now - addmatch->nexttime;
159  addmatch->status = HbPacketsBeingReceived;
160  howlate /= 1000;
161  if (addmatch->_comealive_callback) {
162  addmatch->_comealive_callback(addmatch, howlate);
163  }else{
164  g_message("A node is now back alive! late by "FMT_64BIT "d ms", howlate);
165  }
166  } else if (now > addmatch->warntime) {
167  guint64 howlate = now - addmatch->warntime;
168  howlate /= 1000;
169  if (addmatch->_warntime_callback) {
170  addmatch->_warntime_callback(addmatch, howlate);
171  }else{
172  g_warning("A node was " FMT_64BIT "u ms late in sending heartbeat..."
173  , howlate);
174  }
175  }
176  if (addmatch->_heartbeat_callback) {
177  addmatch->_heartbeat_callback(addmatch);
178  }
179  addmatch->nexttime = now + addmatch->_expected_interval;
180  addmatch->warntime = now + addmatch->_warn_interval;
181  UNREF(fs);
182  return TRUE;
183  }
184  // The 'martian' callback is necessarily global to all heartbeat listeners
185  if (_hblistener_martiancallback) {
186  _hblistener_martiancallback(srcaddr);
187  }else{
188  gchar * saddr = srcaddr->baseclass.toString(srcaddr);
189  g_warning("Received 'martian' packet from address [%s]", saddr);
190  g_free(saddr); saddr = NULL;
191  }
192  UNREF(fs);
193  return TRUE;
194 }
195 
197 FSTATIC void
198 _hblistener_notify_function(gpointer ignored)
199 {
200  (void)ignored;
202 }
203 
205 FSTATIC void
207 {
208  GSList* this;
209  GSList* next = NULL;
210  static gboolean shuttingdown = FALSE;
211 
212  if (shuttingdown) {
213  return;
214  }
215  shuttingdown = TRUE;
216  // Unref all our listener objects...
217  for (this = _hb_listeners; this; this=next) {
218  HbListener* listener = CASTTOCLASS(HbListener, this->data);
219  next = this->next;
220  UNREF2(listener);
221  }
222  if (_hb_listeners) {
223  g_slist_free(_hb_listeners);
224  _hb_listeners = NULL;
225  }
226  if (hb_timeout_id > 0) {
227  g_source_remove(hb_timeout_id);
228  }
229  shuttingdown = FALSE;
230 }
231 
232 
234 FSTATIC void
236 {
237  HbListener *hbself = CASTTOCLASS(HbListener, self);
238  DEBUGMSG3("%s.%d - finalizing.", __FUNCTION__, __LINE__);
239  UNREF(hbself->listenaddr);
240  _listener_finalize(self);
241  self = NULL; hbself = NULL;
242 }
243 
244 
247 HbListener*
248 hblistener_new(NetAddr* listenaddr,
249  ConfigContext*cfg,
250  gsize objsize)
251 {
252  HbListener * newlistener;
253  Listener * base;
255  if (objsize < sizeof(HbListener)) {
256  objsize = sizeof(HbListener);
257  }
258  base = listener_new(cfg, objsize);
259  proj_class_register_subclassed(base, "HbListener");
260  newlistener = CASTTOCLASS(HbListener, base);
261  if (NULL == newlistener) {
262  return NULL;
263  }
266  newlistener->listenaddr = listenaddr;
267  REF(listenaddr);
268  newlistener->get_deadtime = _hblistener_get_deadtime;
269  newlistener->set_deadtime = _hblistener_set_deadtime;
270  newlistener->get_warntime = _hblistener_get_warntime;
271  newlistener->set_warntime = _hblistener_set_warntime;
276 
277  if (cfg->getint(cfg, CONFIGNAME_TIMEOUT) > 0) {
278  newlistener->set_deadtime(newlistener, cfg->getint(cfg, CONFIGNAME_TIMEOUT));
279  }else{
280  newlistener->set_deadtime(newlistener, DEFAULT_DEADTIME);
281  }
282  if (cfg->getint(cfg, CONFIGNAME_WARNTIME) > 0) {
283  newlistener->set_warntime(newlistener, cfg->getint(cfg, CONFIGNAME_WARNTIME));
284  }else{
285  newlistener->set_warntime(newlistener, (DEFAULT_DEADTIME*2)/3);
286  }
287  newlistener->status = HbPacketsBeingReceived;
288  _hblistener_addlist(newlistener);
289  return newlistener;
290 }
291 
293 FSTATIC void
295  guint64 deadtime)
296 {
297  guint64 now = g_get_real_time();
298  self->_expected_interval = deadtime*1000000L;
299  self->nexttime = now + self->_expected_interval;
300  //g_debug("Setting HbListener deadtime to " FMT_64BIT "d secs", deadtime);
301 }
302 
304 FSTATIC guint64
306 {
307  return self->_expected_interval;
308 }
309 
311 FSTATIC void
313  guint64 warntime)
314 {
315  guint64 now = g_get_real_time();
316  self->_warn_interval = warntime*1000000;
317  self->warntime = now + self->_warn_interval;
318 }
320 FSTATIC guint64
322 {
323  return self->_warn_interval;
324 }
325 
326 
332 FSTATIC void
334 {
336  if (listener != NULL) {
337  _hblistener_dellist(listener);
338  return;
339  }
340  g_warning("Attempt to unlisten an unregistered address");
341 }
342 
344 FSTATIC void
346 {
347  self->_deadtime_callback = callback;
348 }
349 
351 FSTATIC void
353 {
354  self->_heartbeat_callback = callback;
355 }
356 
358 FSTATIC void
359 _hblistener_set_warntime_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
360 {
361  self->_warntime_callback = callback;
362 }
363 
365 FSTATIC void
366 _hblistener_set_comealive_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
367 {
368  self->_comealive_callback = callback;
369 }
370 
372 FSTATIC void
374 {
375  _hblistener_martiancallback = callback;
376 }
FSTATIC void _hblistener_set_heartbeat_callback(HbListener *, void(*callback)(HbListener *who))
Call to set a callback to be called when a heartbeat is received.
Definition: hblistener.c:352
FSTATIC guint64 _hblistener_get_deadtime(HbListener *self)
Return deadtime.
Definition: hblistener.c:305
FSTATIC void _hblistener_set_deadtime_callback(HbListener *, void(*callback)(HbListener *who))
Call to set a callback to be called when a node apparently dies.
Definition: hblistener.c:345
#define DEFAULT_DEADTIME
Definition: hblistener.h:65
#define REF2(obj)
Definition: assimobj.h:40
DEBUGDECLARATIONS
Definition: hblistener.c:49
Listener * listener_new(ConfigContext *config, gsize objsize)
Construct a new Listener - setting up GSource and timeout data structures for it. ...
Definition: listener.c:82
FSTATIC gboolean _hblistener_got_frameset(Listener *, FrameSet *, NetAddr *)
Function called when a heartbeat FrameSet class (fs) arrived from the given NetAddr class (srcaddr) ...
Definition: hblistener.c:149
Implements minimal client-oriented Frame and Frameset capabilities.
FSTATIC void _hblistener_notify_function(gpointer ignoreddata)
We get called when our gSource gets removed. This function is probably unnecessary...
Definition: hblistener.c:198
guint64 warntime
Warn heartbeat time.
Definition: hblistener.h:61
NetAddr * listenaddr
What address are we listening for?
Definition: hblistener.h:62
AssimObj baseclass
Definition: netaddr.h:44
FSTATIC void hblistener_set_martian_callback(void(*callback)(NetAddr *who))
Call to set a callback to be called when an unrecognized node sends us a heartbeat.
Definition: hblistener.c:373
WINEXPORT gint64 g_get_real_time(void)
HAVE_G_GET_MONOTONIC_TIME.
FSTATIC gboolean _hblistener_gsourcefunc(gpointer)
A GSourceFunc to be used with g_timeout_add_seconds()
Definition: hblistener.c:135
FSTATIC void hblistener_shutdown(void)
Shuts down all our hblisteners...
Definition: hblistener.c:206
#define FSTATIC
Definition: projectcommon.h:31
#define DEBUGMSG3(...)
Definition: proj_classes.h:91
gint64(* getint)(const ConfigContext *, const char *name)
Get integer value.
Definition: configcontext.h:74
#define BINDDEBUG(Cclass)
BINDDEBUG is for telling the class system where the debug variable for this class is - put it in the ...
Definition: proj_classes.h:82
void(* set_deadtime_callback)(HbListener *, void(*)(HbListener *who))
Definition: hblistener.h:51
void(* set_deadtime)(HbListener *, guint64)
Set deadtime.
Definition: hblistener.h:47
FSTATIC void _hblistener_finalize(AssimObj *self)
Finalize an HbListener.
Definition: hblistener.c:235
#define __FUNCTION__
#define CONFIGNAME_WARNTIME
How long w/o heartbeats before whining?
FSTATIC void _hblistener_set_comealive_callback(HbListener *, void(*callback)(HbListener *who, guint64 howlate))
Call to set a callback to be called when a node passes deadtime but heartbeats again.
Definition: hblistener.c:366
#define CONFIGNAME_TIMEOUT
How before declaring a serious problem...
void(* _deadtime_callback)(HbListener *who)
Definition: hblistener.h:55
gpointer proj_class_register_subclassed(gpointer object, const char *static_subclassname)
Log the creation of a subclassed object from a superclassed object.
Definition: proj_classes.c:192
guint64 _warn_interval
When to warn about late heartbeats.
Definition: hblistener.h:59
void(* set_comealive_callback)(HbListener *, void(*)(HbListener *who, guint64 howlate))
Definition: hblistener.h:53
#define FMT_64BIT
Format designator for a 64 bit integer.
Definition: projectcommon.h:32
#define REF(obj)
Definition: assimobj.h:39
Implements basic Frame class.
This is the base HbListener class. object - which listens for heartbeats from a particular sender...
Definition: hblistener.h:44
HbNodeStatus status
What status is this node in?
Definition: hblistener.h:63
FSTATIC void _hblistener_addlist(HbListener *self)
Add an HbListener to our global list of HBListeners, and unref (and neuter) any old HbListeners liste...
Definition: hblistener.c:67
void(* _comealive_callback)(HbListener *who, guint64 howlate)
Definition: hblistener.h:57
HbListener * hblistener_new(NetAddr *listenaddr, ConfigContext *cfg, gsize objsize)
Construct a new HbListener - setting up GSource and timeout data structures for it.
Definition: hblistener.c:248
struct _HbListener HbListener
Definition: hblistener.h:32
void(* set_heartbeat_callback)(HbListener *, void(*)(HbListener *who))
Definition: hblistener.h:50
FSTATIC guint64 _hblistener_get_warntime(HbListener *self)
Return warntime.
Definition: hblistener.c:321
FSTATIC void _hblistener_set_warntime_callback(HbListener *, void(*callback)(HbListener *who, guint64 howlate))
Call to set a callback to be called when a node passes warntime before heartbeating again...
Definition: hblistener.c:359
FSTATIC void _hblistener_checktimeouts(gboolean urgent)
Function called when it's time to see if anyone timed out...
Definition: hblistener.c:111
guint64(* get_deadtime)(HbListener *)
Retrieve deadtime.
Definition: hblistener.h:46
FSTATIC void _hblistener_set_deadtime(HbListener *self, guint64 deadtime)
Set deadtime.
Definition: hblistener.c:294
void(* _finalize)(AssimObj *)
Free object (private)
Definition: assimobj.h:55
FSTATIC void _hblistener_set_warntime(HbListener *self, guint64 warntime)
Set warntime.
Definition: hblistener.c:312
gboolean(* equal)(const NetAddr *, const NetAddr *)
Compare NetAddrs.
Definition: netaddr.h:53
guint64(* get_warntime)(HbListener *)
Retrieve warntime.
Definition: hblistener.h:48
The NetAddr class class represents a general network address - whether IP, MAC, or any other type of ...
Definition: netaddr.h:43
gboolean(* got_frameset)(Listener *self, FrameSet *fs, NetAddr *na)
called when a FrameSet arrives
Definition: listener.h:45
void(* _warntime_callback)(HbListener *who, guint64 howlate)
Definition: hblistener.h:56
void(* _heartbeat_callback)(HbListener *who)
Definition: hblistener.h:54
FrameSet class - used for collecting Frames when not on the wire, and for marshalling/demarshalling t...
Definition: frameset.h:46
gchar *(* toString)(gconstpointer)
Produce malloc-ed string representation.
Definition: assimobj.h:58
FSTATIC void _hblistener_dellist(HbListener *self)
Remove an HbListener from our global list of HBListeners.
Definition: hblistener.c:84
guint64 nexttime
When next heartbeat is due.
Definition: hblistener.h:60
#define ONESEC
Definition: hblistener.c:62
HbListener * hblistener_find_by_address(const NetAddr *which)
Find the listener that's listening to a particular address.
Definition: hblistener.c:96
FSTATIC void hblistener_unlisten(NetAddr *unlistenaddr)
Stop expecting (listening for) heartbeats from a particular address.
Definition: hblistener.c:333
AssimObj baseclass
Definition: listener.h:42
void(* set_warntime)(HbListener *, guint64)
Set warntime.
Definition: hblistener.h:49
#define UNREF2(obj)
Definition: assimobj.h:36
void(* set_warntime_callback)(HbListener *, void(*)(HbListener *who, guint64 howlate))
Definition: hblistener.h:52
#define CASTTOCLASS(Cclass, obj)
Safely cast 'obj' to C-class 'class' - verifying that it was registerd as being of type class ...
Definition: proj_classes.h:66
#define UNREF(obj)
Definition: assimobj.h:35
This is the Listener class. object - which generically listens for packets.
Definition: listener.h:41
guint64 _expected_interval
How often to expect heartbeats.
Definition: hblistener.h:58
void _listener_finalize(AssimObj *self)
Finalize a Listener.
Definition: listener.c:53
Defines Heartbeat Listener interfaces.