The Assimilation Project
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
hblistener.c
Go to the documentation of this file.
1 
24 #define IS_LISTENER_SUBCLASS 1
25 #include <memory.h>
26 #include <glib.h>
27 #include <frame.h>
28 #include <frameset.h>
29 #include <hblistener.h>
30 #include <stdlib.h>
33 FSTATIC void _hblistener_notify_function(gpointer ignoreddata);
37 FSTATIC void _hblistener_checktimeouts(gboolean urgent);
39 FSTATIC void _hblistener_set_deadtime(HbListener* self, guint64 deadtime);
40 FSTATIC void _hblistener_set_warntime(HbListener* self, guint64 warntime);
43 FSTATIC gboolean _hblistener_gsourcefunc(gpointer);
44 FSTATIC void _hblistener_set_deadtime_callback(HbListener*, void (*callback)(HbListener* who));
46 FSTATIC void _hblistener_set_warntime_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
47 FSTATIC void _hblistener_set_comealive_callback(HbListener*, void (*callback)(HbListener* who, guint64 howlate));
48 
50 
55 
56 static GSList* _hb_listeners = NULL;
57 static gint _hb_listener_count = 0;
58 static guint64 _hb_listener_lastcheck = 0;
59 static void (*_hblistener_martiancallback)(NetAddr* who) = NULL;
60 static gint hb_timeout_id = -1;
61 
62 #define ONESEC 1000000
63 
66 FSTATIC void
68 {
69  HbListener* old;
70  if (_hb_listeners == NULL) {
71  hb_timeout_id = g_timeout_add_seconds_full(G_PRIORITY_LOW, 1
74  }else if ((old=hblistener_find_by_address(self->listenaddr)) != NULL) {
76  }
77  _hb_listeners = g_slist_prepend(_hb_listeners, self);
78  _hb_listener_count += 1;
79  REF2(self);
80 }
81 
83 FSTATIC void
85 {
86  if (g_slist_find(_hb_listeners, self) != NULL) {
87  _hb_listeners = g_slist_remove(_hb_listeners, self);
88  _hb_listener_count -= 1;
89  UNREF2(self);
90  return;
91  }
92 }
93 
97 {
98  GSList* obj;
99  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
100  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
101  if (which->equal(which, listener->listenaddr)) {
102  return listener;
103  }
104  }
105  return NULL;
106 }
107 
108 
110 FSTATIC void
111 _hblistener_checktimeouts(gboolean urgent)
112 {
113  guint64 now = g_get_real_time();
114  GSList* obj;
115  if (!urgent && (now - _hb_listener_lastcheck) < ONESEC) {
116  return;
117  }
118  _hb_listener_lastcheck = now;
119 
120  for (obj = _hb_listeners; obj != NULL; obj=g_slist_next(obj)) {
121  HbListener* listener = CASTTOCLASS(HbListener, obj->data);
122  if (now > listener->nexttime && listener->status == HbPacketsBeingReceived) {
123  if (listener->_deadtime_callback) {
124  listener->_deadtime_callback(listener);
125  }else{
126  g_warning("our node looks dead from here...");
127  }
128  listener->status = HbPacketsTimedOut;
129  }
130  }
131 }
132 
134 FSTATIC gboolean
135 _hblistener_gsourcefunc(gpointer ignored)
136 {
137  (void)ignored;
139  return _hb_listeners != NULL;
140 }
141 
143 FSTATIC gboolean
145 {
146  guint64 now = g_get_real_time();
147  HbListener* addmatch;
148 
149  (void)self; // Odd, but true - because we're a proxy for all hblisteners...
150  addmatch = hblistener_find_by_address(srcaddr);
151  if (addmatch != NULL) {
152  if (addmatch->status == HbPacketsTimedOut) {
153  guint64 howlate = now - addmatch->nexttime;
154  addmatch->status = HbPacketsBeingReceived;
155  howlate /= 1000;
156  if (addmatch->_comealive_callback) {
157  addmatch->_comealive_callback(addmatch, howlate);
158  }else{
159  g_message("A node is now back alive! late by "FMT_64BIT "d ms", howlate);
160  }
161  } else if (now > addmatch->warntime) {
162  guint64 howlate = now - addmatch->warntime;
163  howlate /= 1000;
164  if (addmatch->_warntime_callback) {
165  addmatch->_warntime_callback(addmatch, howlate);
166  }else{
167  g_warning("A node was " FMT_64BIT "u ms late in sending heartbeat..."
168  , howlate);
169  }
170  }
171  if (addmatch->_heartbeat_callback) {
172  addmatch->_heartbeat_callback(addmatch);
173  }
174  addmatch->nexttime = now + addmatch->_expected_interval;
175  addmatch->warntime = now + addmatch->_warn_interval;
176  UNREF(fs);
177  return TRUE;
178  }
179  // The 'martian' callback is necessarily global to all heartbeat listeners
180  if (_hblistener_martiancallback) {
181  _hblistener_martiancallback(srcaddr);
182  }else{
183  gchar * saddr = srcaddr->baseclass.toString(srcaddr);
184  g_warning("Received 'martian' packet from address [%s]", saddr);
185  g_free(saddr); saddr = NULL;
186  }
187  UNREF(fs);
188  return TRUE;
189 }
190 
191 FSTATIC void
192 _hblistener_notify_function(gpointer ignored)
193 {
194  (void)ignored;
196 }
197 
199 FSTATIC void
201 {
202  GSList* this;
203  GSList* next = NULL;
204 
205  // Unref all our listener objects...
206  for (this = _hb_listeners; this; this=next) {
207  HbListener* listener = CASTTOCLASS(HbListener, this->data);
208  next = this->next;
209  UNREF2(listener);
210  }
211  if (_hb_listeners) {
212  g_slist_free(_hb_listeners);
213  _hb_listeners = NULL;
214  }
215  if (hb_timeout_id > 0) {
216  g_source_remove(hb_timeout_id);
217  hb_timeout_id = -1;
218  }
219 }
220 
221 
223 FSTATIC void
225 {
226  HbListener *hbself = CASTTOCLASS(HbListener, self);
227  DEBUGMSG3("%s.%d - finalizing.", __FUNCTION__, __LINE__);
228  UNREF(hbself->listenaddr);
229  _listener_finalize(self);
230  self = NULL; hbself = NULL;
231 }
232 
233 
236 HbListener*
237 hblistener_new(NetAddr* listenaddr,
238  ConfigContext*cfg,
239  gsize objsize)
240 {
241  HbListener * newlistener;
242  Listener * base;
244  if (objsize < sizeof(HbListener)) {
245  objsize = sizeof(HbListener);
246  }
247  base = listener_new(cfg, objsize);
248  proj_class_register_subclassed(base, "HbListener");
249  newlistener = CASTTOCLASS(HbListener, base);
250  if (NULL == newlistener) {
251  return NULL;
252  }
255  newlistener->listenaddr = listenaddr;
256  REF(listenaddr);
257  newlistener->get_deadtime = _hblistener_get_deadtime;
258  newlistener->set_deadtime = _hblistener_set_deadtime;
259  newlistener->get_warntime = _hblistener_get_warntime;
260  newlistener->set_warntime = _hblistener_set_warntime;
265 
266  newlistener->set_deadtime(newlistener, DEFAULT_DEADTIME*1000000);
267  newlistener->set_warntime(newlistener, DEFAULT_DEADTIME*1000000/4);
268  newlistener->status = HbPacketsBeingReceived;
269  _hblistener_addlist(newlistener);
270  return newlistener;
271 }
272 
274 FSTATIC void
276  guint64 deadtime)
277 {
278  guint64 now = g_get_real_time();
279  self->_expected_interval = deadtime;
280  self->nexttime = now + self->_expected_interval;
281  //g_debug("Setting HbListener deadtime to " FMT_64BIT "d ms", deadtime/1000);
282 }
283 
285 FSTATIC guint64
287 {
288  return self->_expected_interval;
289 }
290 
292 FSTATIC void
294  guint64 warntime)
295 {
296  guint64 now = g_get_real_time();
297  self->_warn_interval = warntime;
298  self->warntime = now + self->_warn_interval;
299 }
301 FSTATIC guint64
303 {
304  return self->_warn_interval;
305 }
306 
307 
313 FSTATIC void
315 {
317  if (listener != NULL) {
318  _hblistener_dellist(listener);
319  return;
320  }
321  g_warning("Attempt to unlisten an unregistered address");
322 }
323 
325 FSTATIC void
327 {
328  self->_deadtime_callback = callback;
329 }
330 
332 FSTATIC void
334 {
335  self->_heartbeat_callback = callback;
336 }
337 
339 FSTATIC void
340 _hblistener_set_warntime_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
341 {
342  self->_warntime_callback = callback;
343 }
344 
346 FSTATIC void
347 _hblistener_set_comealive_callback(HbListener* self, void (*callback)(HbListener* who, guint64 howlate))
348 {
349  self->_comealive_callback = callback;
350 }
351 
353 FSTATIC void
355 {
356  _hblistener_martiancallback = callback;
357 }