The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
hblistener.c
Go to the documentation of this file.
1 
24 #define IS_LISTENER_SUBCLASS 1
25 #include <memory.h>
26 #include <glib.h>
27 #include <frame.h>
28 #include <frameset.h>
29 #include <hblistener.h>
30 #include <stdlib.h>
33 FSTATIC void _hblistener_notify_function(gpointer ignoreddata);
37 FSTATIC void _hblistener_checktimeouts(gboolean urgent);
39 FSTATIC void _hblistener_set_deadtime(HbListener* self, guint64 deadtime);
40 FSTATIC void _hblistener_set_warntime(HbListener* self, guint64 warntime);
43 FSTATIC gboolean _hblistener_gsourcefunc(gpointer);
44 FSTATIC void _hblistener_set_deadtime_callback(HbListener*, void (*callback)(HbListener* who));
46 FSTATIC void _hblistener_set_warntime_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
47 FSTATIC void _hblistener_set_comealive_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
48 
50 
55 
56 static GSList* _hb_listeners = NULL;
57 static gint _hb_listener_count = 0;
58 static guint64 _hb_listener_lastcheck = 0;
59 static void (*_hblistener_martiancallback)(NetAddr* who) = NULL;
60 static gint hb_timeout_id = -1;
61 
62 #define ONESEC 1000000
63 
66 FSTATIC void
68 {
69  HbListener* old;
70  if (_hb_listeners == NULL) {
71  hb_timeout_id = g_timeout_add_seconds_full(G_PRIORITY_LOW, 1
74  }else if ((old=hblistener_find_by_address(self->listenaddr)) != NULL) {
76  }
77  _hb_listeners = g_slist_prepend(_hb_listeners, self);
78  _hb_listener_count += 1;
79  REF2(self);
80 }
81 
83 FSTATIC void
85 {
86  if (g_slist_find(_hb_listeners, self) != NULL) {
87  _hb_listeners = g_slist_remove(_hb_listeners, self);
88  _hb_listener_count -= 1;
89  UNREF2(self);
90  return;
91  }
92 }
93 
97 {
98  GSList* obj;
99  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
100  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
101  if (which->equal(which, listener->listenaddr)) {
102  return listener;
103  }
104  }
105  return NULL;
106 }
107 
108 
110 FSTATIC void
111 _hblistener_checktimeouts(gboolean urgent)
112 {
113  guint64 now = g_get_real_time();
114  GSList* obj;
115  if (!urgent && (now - _hb_listener_lastcheck) < ONESEC) {
116  return;
117  }
118  _hb_listener_lastcheck = now;
119 
120  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
121  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
122  if (now > listener->nexttime && listener->status == HbPacketsBeingReceived) {
123  if (listener->_deadtime_callback) {
124  listener->_deadtime_callback(listener);
125  }else{
126  g_warning("our node looks dead from here...");
127  }
128  listener->status = HbPacketsTimedOut;
129  }
130  }
131 }
132 
134 FSTATIC gboolean
135 _hblistener_gsourcefunc(gpointer ignored)
136 {
137  (void)ignored;
139  return _hb_listeners != NULL;
140 }
141 
143 FSTATIC gboolean
145 {
146  guint64 now = g_get_real_time();
147  HbListener* addmatch;
148 
149  (void)self; // Odd, but true - because we're a proxy for all hblisteners...
150  addmatch = hblistener_find_by_address(srcaddr);
151  if (addmatch != NULL) {
152  if (addmatch->status == HbPacketsTimedOut) {
153  guint64 howlate = now - addmatch->nexttime;
154  addmatch->status = HbPacketsBeingReceived;
155  howlate /= 1000;
156  if (addmatch->_comealive_callback) {
157  addmatch->_comealive_callback(addmatch, howlate);
158  }else{
159  g_message("A node is now back alive! late by "FMT_64BIT "d ms", howlate);
160  }
161  } else if (now > addmatch->warntime) {
162  guint64 howlate = now - addmatch->warntime;
163  howlate /= 1000;
164  if (addmatch->_warntime_callback) {
165  addmatch->_warntime_callback(addmatch, howlate);
166  }else{
167  g_warning("A node was " FMT_64BIT "u ms late in sending heartbeat..."
168  , howlate);
169  }
170  }
171  if (addmatch->_heartbeat_callback) {
172  addmatch->_heartbeat_callback(addmatch);
173  }
174  addmatch->nexttime = now + addmatch->_expected_interval;
175  addmatch->warntime = now + addmatch->_warn_interval;
176  UNREF(fs);
177  return TRUE;
178  }
179  // The 'martian' callback is necessarily global to all heartbeat listeners
180  if (_hblistener_martiancallback) {
181  _hblistener_martiancallback(srcaddr);
182  }else{
183  gchar * saddr = srcaddr->baseclass.toString(srcaddr);
184  g_warning("Received 'martian' packet from address [%s]", saddr);
185  g_free(saddr); saddr = NULL;
186  }
187  UNREF(fs);
188  return TRUE;
189 }
190 
191 FSTATIC void
192 _hblistener_notify_function(gpointer ignored)
193 {
194  (void)ignored;
196 }
197 
199 FSTATIC void
201 {
202  GSList* this;
203  GSList* next = NULL;
204 
205  // Unref all our listener objects...
206  for (this = _hb_listeners; this; this=next) {
207  HbListener* listener = CASTTOCLASS(HbListener, this->data);
208  next = this->next;
209  UNREF2(listener);
210  }
211  if (_hb_listeners) {
212  g_slist_free(_hb_listeners);
213  _hb_listeners = NULL;
214  }
215  if (hb_timeout_id > 0) {
216  g_source_remove(hb_timeout_id);
217  hb_timeout_id = -1;
218  }
219 }
220 
221 
223 FSTATIC void
225 {
226  HbListener *hbself = CASTTOCLASS(HbListener, self);
227  DEBUGMSG3("%s.%d - finalizing.", __FUNCTION__, __LINE__);
228  UNREF(hbself->listenaddr);
229  _listener_finalize(self);
230  self = NULL; hbself = NULL;
231 }
232 
233 
236 HbListener*
237 hblistener_new(NetAddr* listenaddr,
238  ConfigContext*cfg,
239  gsize objsize)
240 {
241  HbListener * newlistener;
242  Listener * base;
244  if (objsize < sizeof(HbListener)) {
245  objsize = sizeof(HbListener);
246  }
247  base = listener_new(cfg, objsize);
248  proj_class_register_subclassed(base, "HbListener");
249  newlistener = CASTTOCLASS(HbListener, base);
250  if (NULL == newlistener) {
251  return NULL;
252  }
255  newlistener->listenaddr = listenaddr;
256  REF(listenaddr);
257  newlistener->get_deadtime = _hblistener_get_deadtime;
258  newlistener->set_deadtime = _hblistener_set_deadtime;
259  newlistener->get_warntime = _hblistener_get_warntime;
260  newlistener->set_warntime = _hblistener_set_warntime;
265 
266  if (cfg->getint(cfg, CONFIGNAME_TIMEOUT) > 0) {
267  newlistener->set_deadtime(newlistener, cfg->getint(cfg, CONFIGNAME_TIMEOUT));
268  }else{
269  newlistener->set_deadtime(newlistener, DEFAULT_DEADTIME);
270  }
271  if (cfg->getint(cfg, CONFIGNAME_WARNTIME) > 0) {
272  newlistener->set_warntime(newlistener, cfg->getint(cfg, CONFIGNAME_WARNTIME));
273  }else{
274  newlistener->set_warntime(newlistener, (DEFAULT_DEADTIME*2)/3);
275  }
276  newlistener->status = HbPacketsBeingReceived;
277  _hblistener_addlist(newlistener);
278  return newlistener;
279 }
280 
282 FSTATIC void
284  guint64 deadtime)
285 {
286  guint64 now = g_get_real_time();
287  self->_expected_interval = deadtime*1000000L;
288  self->nexttime = now + self->_expected_interval;
289  //g_debug("Setting HbListener deadtime to " FMT_64BIT "d secs", deadtime);
290 }
291 
293 FSTATIC guint64
295 {
296  return self->_expected_interval;
297 }
298 
300 FSTATIC void
302  guint64 warntime)
303 {
304  guint64 now = g_get_real_time();
305  self->_warn_interval = warntime*1000000;
306  self->warntime = now + self->_warn_interval;
307 }
309 FSTATIC guint64
311 {
312  return self->_warn_interval;
313 }
314 
315 
321 FSTATIC void
323 {
325  if (listener != NULL) {
326  _hblistener_dellist(listener);
327  return;
328  }
329  g_warning("Attempt to unlisten an unregistered address");
330 }
331 
333 FSTATIC void
335 {
336  self->_deadtime_callback = callback;
337 }
338 
340 FSTATIC void
342 {
343  self->_heartbeat_callback = callback;
344 }
345 
347 FSTATIC void
348 _hblistener_set_warntime_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
349 {
350  self->_warntime_callback = callback;
351 }
352 
354 FSTATIC void
355 _hblistener_set_comealive_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
356 {
357  self->_comealive_callback = callback;
358 }
359 
361 FSTATIC void
363 {
364  _hblistener_martiancallback = callback;
365 }
FSTATIC void _hblistener_set_heartbeat_callback(HbListener *, void(*callback)(HbListener *who))
Call to set a callback to be called when a heartbeat is received.
Definition: hblistener.c:341
FSTATIC guint64 _hblistener_get_deadtime(HbListener *self)
Return deadtime.
Definition: hblistener.c:294
FSTATIC void _hblistener_set_deadtime_callback(HbListener *, void(*callback)(HbListener *who))
Call to set a callback to be called when a node apparently dies.
Definition: hblistener.c:334
#define DEFAULT_DEADTIME
Definition: hblistener.h:65
#define REF2(obj)
Definition: assimobj.h:40
DEBUGDECLARATIONS
Definition: hblistener.c:49
Listener * listener_new(ConfigContext *config, gsize objsize)
Construct a new Listener - setting up GSource and timeout data structures for it. ...
Definition: listener.c:82
FSTATIC gboolean _hblistener_got_frameset(Listener *, FrameSet *, NetAddr *)
Function called when a heartbeat FrameSet class (fs) arrived from the given NetAddr class (srcaddr) ...
Definition: hblistener.c:144
Implements minimal client-oriented Frame and Frameset capabilities.
guint64 warntime
Warn heartbeat time.
Definition: hblistener.h:61
NetAddr * listenaddr
What address are we listening for?
Definition: hblistener.h:62
AssimObj baseclass
Definition: netaddr.h:44
FSTATIC void hblistener_set_martian_callback(void(*callback)(NetAddr *who))
Call to set a callback to be called when an unrecognized node sends us a heartbeat.
Definition: hblistener.c:362
WINEXPORT gint64 g_get_real_time(void)
HAVE_G_GET_MONOTONIC_TIME.
FSTATIC gboolean _hblistener_gsourcefunc(gpointer)
A GSourceFunc to be used with g_timeout_add_seconds()
Definition: hblistener.c:135
FSTATIC void hblistener_shutdown(void)
Shuts down all our hblisteners...
Definition: hblistener.c:200
#define FSTATIC
Definition: projectcommon.h:31
#define DEBUGMSG3(...)
Definition: proj_classes.h:91
gint64(* getint)(const ConfigContext *, const char *name)
Get integer value.
Definition: configcontext.h:74
#define BINDDEBUG(Cclass)
BINDDEBUG is for telling the class system where the debug variable for this class is - put it in the ...
Definition: proj_classes.h:82
void(* set_deadtime_callback)(HbListener *, void(*)(HbListener *who))
Definition: hblistener.h:51
void(* set_deadtime)(HbListener *, guint64)
Set deadtime.
Definition: hblistener.h:47
FSTATIC void _hblistener_finalize(AssimObj *self)
Finalize an HbListener.
Definition: hblistener.c:224
#define __FUNCTION__
#define CONFIGNAME_WARNTIME
How long w/o heartbeats before whining?
FSTATIC void _hblistener_set_comealive_callback(HbListener *, void(*callback)(HbListener *who, guint64 howlate))
Call to set a callback to be called when a node passes deadtime but heartbeats again.
Definition: hblistener.c:355
#define CONFIGNAME_TIMEOUT
How before declaring a serious problem...
void(* _deadtime_callback)(HbListener *who)
Definition: hblistener.h:55
gpointer proj_class_register_subclassed(gpointer object, const char *static_subclassname)
Log the creation of a subclassed object from a superclassed object.
Definition: proj_classes.c:192
guint64 _warn_interval
When to warn about late heartbeats.
Definition: hblistener.h:59
void(* set_comealive_callback)(HbListener *, void(*)(HbListener *who, guint64 howlate))
Definition: hblistener.h:53
#define FMT_64BIT
Format designator for a 64 bit integer.
Definition: projectcommon.h:32
#define REF(obj)
Definition: assimobj.h:39
Implements basic Frame class.
This is the base HbListener class. object - which listens for heartbeats from a particular sender...
Definition: hblistener.h:44
HbNodeStatus status
What status is this node in?
Definition: hblistener.h:63
FSTATIC void _hblistener_addlist(HbListener *self)
Add an HbListener to our global list of HBListeners, and unref (and neuter) any old HbListeners liste...
Definition: hblistener.c:67
void(* _comealive_callback)(HbListener *who, guint64 howlate)
Definition: hblistener.h:57
HbListener * hblistener_new(NetAddr *listenaddr, ConfigContext *cfg, gsize objsize)
Construct a new HbListener - setting up GSource and timeout data structures for it.
Definition: hblistener.c:237
struct _HbListener HbListener
Definition: hblistener.h:32
void(* set_heartbeat_callback)(HbListener *, void(*)(HbListener *who))
Definition: hblistener.h:50
FSTATIC guint64 _hblistener_get_warntime(HbListener *self)
Return warntime.
Definition: hblistener.c:310
FSTATIC void _hblistener_set_warntime_callback(HbListener *, void(*callback)(HbListener *who, guint64 howlate))
Call to set a callback to be called when a node passes warntime before heartbeating again...
Definition: hblistener.c:348
FSTATIC void _hblistener_checktimeouts(gboolean urgent)
Function called when it's time to see if anyone timed out...
Definition: hblistener.c:111
guint64(* get_deadtime)(HbListener *)
Retrieve deadtime.
Definition: hblistener.h:46
FSTATIC void _hblistener_set_deadtime(HbListener *self, guint64 deadtime)
Set deadtime.
Definition: hblistener.c:283
void(* _finalize)(AssimObj *)
Free object (private)
Definition: assimobj.h:55
FSTATIC void _hblistener_set_warntime(HbListener *self, guint64 warntime)
Set warntime.
Definition: hblistener.c:301
gboolean(* equal)(const NetAddr *, const NetAddr *)
Compare NetAddrs.
Definition: netaddr.h:53
guint64(* get_warntime)(HbListener *)
Retrieve warntime.
Definition: hblistener.h:48
The NetAddr class class represents a general network address - whether IP, MAC, or any other type of ...
Definition: netaddr.h:43
gboolean(* got_frameset)(Listener *self, FrameSet *fs, NetAddr *na)
called when a FrameSet arrives
Definition: listener.h:45
void(* _warntime_callback)(HbListener *who, guint64 howlate)
Definition: hblistener.h:56
void(* _heartbeat_callback)(HbListener *who)
Definition: hblistener.h:54
FrameSet class - used for collecting Frames when not on the wire, and for marshalling/demarshalling t...
Definition: frameset.h:45
gchar *(* toString)(gconstpointer)
Produce malloc-ed string representation.
Definition: assimobj.h:58
FSTATIC void _hblistener_dellist(HbListener *self)
Remove an HbListener from our global list of HBListeners.
Definition: hblistener.c:84
guint64 nexttime
When next heartbeat is due.
Definition: hblistener.h:60
#define ONESEC
Definition: hblistener.c:62
HbListener * hblistener_find_by_address(const NetAddr *which)
Find the listener that's listening to a particular address.
Definition: hblistener.c:96
FSTATIC void hblistener_unlisten(NetAddr *unlistenaddr)
Stop expecting (listening for) heartbeats from a particular address.
Definition: hblistener.c:322
AssimObj baseclass
Definition: listener.h:42
void(* set_warntime)(HbListener *, guint64)
Set warntime.
Definition: hblistener.h:49
FSTATIC void _hblistener_notify_function(gpointer ignoreddata)
Definition: hblistener.c:192
#define UNREF2(obj)
Definition: assimobj.h:36
void(* set_warntime_callback)(HbListener *, void(*)(HbListener *who, guint64 howlate))
Definition: hblistener.h:52
#define CASTTOCLASS(Cclass, obj)
Safely cast 'obj' to C-class 'class' - verifying that it was registerd as being of type class ...
Definition: proj_classes.h:66
#define UNREF(obj)
Definition: assimobj.h:35
This is the Listener class. object - which generically listens for packets.
Definition: listener.h:41
guint64 _expected_interval
How often to expect heartbeats.
Definition: hblistener.h:58
void _listener_finalize(AssimObj *self)
Finalize a Listener.
Definition: listener.c:53
Defines Heartbeat Listener interfaces.