The Assimilation Project  based on Assimilation version 1.1.7.1474836767
resourcequeue.c
Go to the documentation of this file.
1 
22 #include <projectcommon.h>
23 #include <resourcequeue.h>
25 
30 
31 #define uSPERSEC ((guint64)1000000)
32 
33 typedef struct _RscQElem RscQElem;
34 struct _RscQElem {
35  gint64 queuetime;
36  ResourceCmd* cmd;
40  GQueue* ourQ;
42  gpointer user_data;
44  gboolean cancelonfail;
45  gint64 requestid;
46  gboolean cancelme;
47 };
48 
49 
50 
59 FSTATIC void _resource_queue_hash_data_destructor(gpointer dataptr);
60 FSTATIC void _resource_queue_hash_key_destructor(gpointer dataptr);
67 , ResourceCmdCallback cb, gpointer user_data);
70 , ResourceCmdCallback cb, gpointer user_data, GQueue* Q);
72 FSTATIC gboolean _resource_queue_runqueue(gpointer pself);
74 , enum HowDied exittype, int rc, int signal, gboolean core_dumped
75 , const char* stringresult);
76 FSTATIC gint _queue_compare_requestid (gconstpointer a, gconstpointer b);
77 
80 resourcequeue_new(guint structsize)
81 {
82  AssimObj* aself;
83  ResourceQueue* self;
85 
86  if (structsize < sizeof(ResourceQueue)) {
87  structsize = sizeof(ResourceQueue);
88  }
89  aself = assimobj_new(structsize);
90  self = NEWSUBCLASS(ResourceQueue, aself);
92 
93  self->Qcmd = _resource_queue_Qcmd;
94  self->cancel = _resource_queue_cancel;
95  self->cancelall = _resource_queue_cancelall;
96  self->resources = g_hash_table_new_full(g_str_hash, g_str_equal
98  self->timerid = g_timeout_add_seconds(1, _resource_queue_runqueue, self);
99  self->activechildcnt = 0;
100  self->shuttingdown = FALSE;
101 
102  return self;
103 }
104 
106 FSTATIC void
108 {
109  ResourceQueue* self = CASTTOCLASS(ResourceQueue, aself);
110 
111  self->shuttingdown = TRUE;
112  if (self->activechildcnt > 0) {
113  return;
114  }
115  if (self->resources) {
116  g_hash_table_destroy(self->resources);
117  self->resources = NULL;
118  }
119  if (self->timerid >= 0) {
120  g_source_remove(self->timerid);
121  self->timerid = -1;
122  }
123  _assimobj_finalize(&self->baseclass);
124  self = NULL;
125 }
127 FSTATIC gboolean
129 , ConfigContext* request
131 , gpointer user_data)
132 {
133  ResourceCmd* cmd;
134  gboolean ret;
135 
136  // Will replace NULL with our qelem object
137  cmd = resourcecmd_new(request, NULL, _resource_queue_endnotify);
138 
139  if (cmd == NULL) {
140  if (callback) {
141  callback(request, user_data, EXITED_INVAL
142  , 0, 0, FALSE, "Invalid Arguments");
143  }
144  return FALSE;
145  }
146 
147  ret = _resource_queue_cmd_append(self, cmd, callback, user_data);
148  UNREF(cmd);
149  return ret;
150 }
151 
154 FSTATIC gint
155 _queue_compare_requestid (gconstpointer a, gconstpointer b)
156 {
157  const RscQElem* acmd = CASTTOCONSTCLASS(RscQElem, a);
158  const RscQElem* bcmd = CASTTOCONSTCLASS(RscQElem, b);
159 
160  return acmd->requestid == bcmd->requestid;
161 }
162 
164 FSTATIC gboolean
166 , ResourceCmdCallback cb, gpointer user_data)
167 {
168  GQueue* q;
169  RscQElem* qelem;
170 
171  gint64 requestid;
172 
173  requestid = cmd->request->getint(cmd->request, REQIDENTIFIERNAMEFIELD);
174  if (requestid <= 0) {
175  g_warning("%s.%d: Request rejected - no request id for resource %s."
176  , __FUNCTION__, __LINE__, cmd->resourcename);
177  return FALSE;
178  }
179  q = g_hash_table_lookup(self->resources, cmd->resourcename);
180  if (NULL == q) {
181  q = g_queue_new();
182  g_hash_table_insert(self->resources, g_strdup(cmd->resourcename), q);
183  }
184  qelem = _resource_queue_qelem_new(cmd, self, cb, user_data, q);
185  cmd->user_data = qelem;
186  qelem->requestid = requestid;
187  if (g_queue_find_custom(q, qelem, _queue_compare_requestid)) {
188  // This can happen if the CMA crashes and restarts and for other reasons.
189  // But we shouldn't obey it in any case...
190  g_info("%s:%d: Duplicate request id ["FMT_64BIT"d] for resource %s - ignored."
191  , __FUNCTION__, __LINE__, requestid, cmd->resourcename);
192  _resource_queue_qelem_finalize(qelem); qelem = NULL;
193  return FALSE;
194  }
195  g_queue_push_tail(q, qelem);
196  if (self->timerid < 0) {
197  self->timerid = g_timeout_add_seconds(1, _resource_queue_runqueue, self);
198  }
199  return TRUE;
200 }
201 
203 FSTATIC void
205 {
206  GHashTableIter iter;
207  gpointer pkey;
208  gpointer pvalue;
209 
210  g_hash_table_iter_init(&iter, self->resources);
211  while(g_hash_table_iter_next(&iter, &pkey, &pvalue)) {
212  GQueue* q = (GQueue*) pvalue;
213  GList* l;
214  for (l=q->head; NULL != l; l=l->next) {
215  RscQElem* qe = CASTTOCLASS(RscQElem, l->data);
216  _resource_queue_cancel(self, qe->cmd->request);
217  }
218  }
219 }
220 
222 FSTATIC gboolean
224 {
225  gint64 requestid;
226  const char* resourcename;
227  GHashTableIter iter;
228  RscQElem* qelem = NULL;
229  gpointer pkey;
230  gpointer pvalue;
231  requestid = request->getint(request, REQIDENTIFIERNAMEFIELD);
232  if (requestid <= 0) {
233  return FALSE;
234  }
235 
236  resourcename = request->getstring(request, CONFIGNAME_INSTANCE);
237 
238  if (NULL == resourcename) {
239  g_hash_table_iter_init(&iter, self->resources);
240  while(g_hash_table_iter_next(&iter, &pkey, &pvalue)) {
241  GQueue* q = (GQueue*) pvalue;
242  GList* l;
243  for (l=q->head; NULL != l; l=l->next) {
244  RscQElem* qe = CASTTOCLASS(RscQElem, l->data);
245  if (qe->requestid == requestid) {
246  qelem = qe;
247  goto finalize;
248  }
249  }
250  }
251  }else{
252  GQueue* q = g_hash_table_lookup(self->resources, resourcename);
253  if (NULL != q) {
254  GList* l;
255  for (l=q->head; NULL != l; l=l->next) {
256  RscQElem* qe = CASTTOCLASS(RscQElem, l->data);
257  if (qe->requestid == requestid) {
258  qelem = qe;
259  break;
260  }
261  }
262  }
263  }
264 
265  finalize:
266  if (qelem) {
267  _resource_queue_cmd_remove(self, qelem);
268  }
269  return NULL != qelem;
270 
271 
272 }
273 
275 FSTATIC void
277 {
278  GQueue* q = qelem->ourQ;
279 
280  if (g_queue_remove_boolean(q, qelem)) {
281  if (g_queue_get_length(q) == 0) {
282  g_hash_table_remove(self->resources, qelem->cmd->resourcename);
283  }
284  }else{
285  g_return_if_reached();
286  }
288 }
289 
293 , ResourceCmdCallback callback, gpointer user_data, GQueue* Q)
294 {
295  RscQElem* self = MALLOCCLASS(RscQElem, sizeof(RscQElem));
296  gint64 repeat;
297  gint64 initdelay;
298  self->queuetime = g_get_monotonic_time();
299  self->cmd = cmd;
300  REF(cmd);
301  self->parent = parent;
302  self->callback = callback;
303  self->user_data = user_data;
304  self->ourQ = Q;
305  self->cancelme = FALSE;
306  repeat = cmd->request->getint(cmd->request, REQREPEATNAMEFIELD);
307  self->repeatinterval = (repeat > 0 ? repeat : 0);
308  if (cmd->request->gettype(cmd->request, REQCANCELONFAILFIELD) != CFG_BOOL) {
309  self->cancelonfail = FALSE;
310  }else{
311  self->cancelonfail = cmd->request->getbool(cmd->request, REQCANCELONFAILFIELD);
312  }
313  initdelay = cmd->request->getint(cmd->request, CONFIGNAME_INITDELAY);
314  initdelay = (initdelay > 0 ? initdelay : 0);
315  cmd->starttime = self->queuetime + (initdelay*uSPERSEC);
316  DEBUGMSG2("%s.%d: %s:%s (initdelay: %ld, repeat: %ld)", __FUNCTION__, __LINE__
317  , self->cmd->resourcename, self->cmd->operation, (long)initdelay
318  , (long)self->repeatinterval);
319  return self;
320 }
321 
323 FSTATIC void
325 {
326  DEBUGMSG3("%s.%d: UNREF(self->cmd, refcount=%d)"
327  , __FUNCTION__, __LINE__, self->cmd->baseclass._refcount);
328  UNREF(self->cmd);
329  FREECLASSOBJ(self);
330 }
331 
333 FSTATIC void
335 {
336  GQueue* q = (GQueue*) dataptr;
337  GList* l;
338 
339  for (l=q->head; NULL != l; l=l->next) {
340  RscQElem* qelem = CASTTOCLASS(RscQElem, l->data);
341  _resource_queue_qelem_finalize(qelem); l->data = NULL; qelem = NULL;
342  }
343  g_queue_clear(q);
344  q = NULL;
345  dataptr = NULL;
346 }
347 
349 FSTATIC void
351 {
352  g_free(keyptr);
353  keyptr = NULL;
354 }
355 
358 FSTATIC gboolean
360 {
361  ResourceQueue* self = CASTTOCLASS(ResourceQueue, pself);
362  GHashTableIter iter;
363  gpointer key;
364  gpointer value;
365  gint64 now = g_get_monotonic_time();
366  gboolean anyelems = FALSE;
367 
368  DEBUGMSG3("%s.%d: Examining queues", __FUNCTION__, __LINE__);
369  g_hash_table_iter_init(&iter, self->resources);
370 
371  while(g_hash_table_iter_next(&iter, &key, &value)) {
372  GQueue* rsc_q = (GQueue*)value;
373  GList* qelem;
374  gboolean any_running = FALSE;
375  for (qelem=rsc_q->head; NULL != qelem; qelem=qelem->next) {
376  RscQElem* qe = CASTTOCLASS(RscQElem, qelem->data);
377  anyelems = TRUE;
378  if (qe->cmd->is_running) {
379  any_running = TRUE;
380  break;
381  }
382  }
383  if (any_running) {
384  continue;
385  }
386  DEBUGMSG4("%s.%d: No resource jobs are running.", __FUNCTION__, __LINE__);
387  for (qelem=rsc_q->head; NULL != qelem; qelem=qelem->next) {
388  RscQElem* qe = CASTTOCLASS(RscQElem, qelem->data);
389  if (now >= qe->cmd->starttime) {
390  REF(self); // We undo this when process exits
391  self->activechildcnt += 1;
392  qe->cmd->execute(qe->cmd);
393  break;
394  }
395  }
396  }
397  if (!anyelems && self->timerid >= 0) {
398  g_source_remove(self->timerid);
399  self->timerid = -1;
400  }
401  return anyelems;
402 }
403 
406 FSTATIC void
408 ( ConfigContext* request
409 , gpointer user_data
410 , enum HowDied exittype
411 , int rc
412 , int signal
413 , gboolean core_dumped
414 , const char* stringresult)
415 {
416  RscQElem* self = CASTTOCLASS(RscQElem, user_data);
417  ResourceQueue* parent = self->parent;
418  ResourceCmd* cmd = self->cmd;
419  gboolean shouldrepeat = TRUE;
420 
421 
422  g_queue_remove(self->ourQ, self);
423  parent->activechildcnt -= 1;
424  if (parent->shuttingdown && parent->activechildcnt <= 0) {
426  return;
427  }
428  DEBUGMSG1("%s.%d: EXIT happened exittype:%d repeat:%d, cancelme:%d", __FUNCTION__, __LINE__
429  , exittype, self->repeatinterval, self->cancelme);
430 
431  // Should this request repeat?
432  if (self->cancelme || (self->cancelonfail && exittype != EXITED_ZERO)
433  || (0 == self->repeatinterval)) {
434  DEBUGMSG3("%s.%d: shouldrepeat:FALSE"
435  " // cancelme:%d, cancelonfail:%d, exittype:%d repeatinterval:%ld"
436  , __FUNCTION__, __LINE__, self->cancelme
437  , self->cancelonfail, exittype, (long)self->repeatinterval);
438  shouldrepeat = FALSE;
439  }else{
440  shouldrepeat = TRUE;
441  }
442  // Notify the user that their repeating command flipped status, or has stopped running
443  // (i.e., if it was failing but now works, or was working, but now fails)
444  if (FALSE == shouldrepeat
445  || (exittype == EXITED_ZERO && !cmd->last_success)
446  || (exittype != EXITED_ZERO && cmd->last_success)) {
447  DEBUGMSG1("%s.%d: Calling callback for request id " FMT_64BIT "d."
448  , __FUNCTION__, __LINE__, self->requestid);
449  self->callback(request, self->user_data, exittype, rc, signal, core_dumped
450  , stringresult);
451  if (shouldrepeat && exittype == EXITED_ZERO && stringresult) {
452  g_message("%s: %s", cmd->loggingname, stringresult);
453  }
454  }
455  cmd->last_success = (exittype == EXITED_ZERO);
456 
457  if (shouldrepeat) {
458  DEBUGMSG1("%s.%d: Repeat request id " FMT_64BIT "d.", __FUNCTION__, __LINE__
459  , self->requestid);
460  self->queuetime = g_get_monotonic_time();
461  cmd->starttime = self->queuetime + (self->repeatinterval*uSPERSEC);
462  g_queue_push_tail(self->ourQ, self);
463  _resource_queue_runqueue(self->parent);
464  }else{
465  DEBUGMSG1("%s.%d: Don't repeat request id " FMT_64BIT "d.", __FUNCTION__, __LINE__
466  , self->requestid);
467  if (g_queue_get_length(self->ourQ) == 0) {
468  g_hash_table_remove(self->parent->resources, cmd->resourcename);
469  }
470  _resource_queue_runqueue(self->parent);
472  self = NULL;
473  }
474  // Undo the ref we did before starting this job
475  UNREF(parent);
476 }
477 
FSTATIC RscQElem * _resource_queue_qelem_new(ResourceCmd *cmd, ResourceQueue *parent, ResourceCmdCallback cb, gpointer user_data, GQueue *Q)
Create a new RscQElem object.
#define REQREPEATNAMEFIELD
Definition: resourcecmd.h:73
void _assimobj_finalize(AssimObj *self)
Definition: assimobj.c:61
#define MALLOCCLASS(Cclass, size)
Allocate memory for an object (which might be further subclassed) - and register it with our C-Class ...
Definition: proj_classes.h:61
void(* ResourceCmdCallback)(ConfigContext *request, gpointer user_data, enum HowDied reason, int rc, int signal, gboolean core_dumped, const char *stringresult)
Definition: resourcecmd.h:44
gboolean(* getbool)(const ConfigContext *, const char *name)
Get boolean value.
Definition: configcontext.h:77
#define DEBUGMSG1(...)
Definition: proj_classes.h:89
FSTATIC void _resource_queue_cmd_remove(ResourceQueue *self, RscQElem *qelem)
Remove the first instance of a ResourceCmd from a ResourceQueue.
#define DEBUGMSG4(...)
Definition: proj_classes.h:92
gboolean is_running
TRUE if this resource agent is running.
Definition: resourcecmd.h:63
FSTATIC void _resource_queue_cancelall(ResourceQueue *self)
Cancel all outstanding requests.
const char *(* getstring)(const ConfigContext *, const char *name)
Get String value.
Definition: configcontext.h:86
void(* execute)(ResourceCmd *self)
Execute this resource command.
Definition: resourcecmd.h:58
#define FSTATIC
Definition: projectcommon.h:31
#define DEBUGMSG3(...)
Definition: proj_classes.h:91
int signal
Definition: childprocess.c:238
gint activechildcnt
Count of active child processes.
Definition: resourcequeue.h:51
#define FREECLASSOBJ(obj)
Free a C-class object.
Definition: proj_classes.h:76
FSTATIC gboolean _resource_queue_runqueue(gpointer pself)
Examine our queues and run anything that needs running.
gboolean shuttingdown
TRUE means we&#39;ll shut down when last job finishes.
Definition: resourcequeue.h:52
ResourceCmdCallback callback
Who to call when it completes.
Definition: resourcequeue.c:41
#define BINDDEBUG(Cclass)
BINDDEBUG is for telling the class system where the debug variable for this class is - put it in the ...
Definition: proj_classes.h:82
#define __FUNCTION__
FSTATIC gboolean _resource_queue_Qcmd(ResourceQueue *self, ConfigContext *request, ResourceCmdCallback callback, gpointer user_data)
Append a ResourceQuee.
gint64(* getint)(const ConfigContext *, const char *name)
Get integer value.
Definition: configcontext.h:74
FSTATIC void _resource_queue_finalize(AssimObj *aself)
Finalize a ResourceQueue – RIP.
#define FMT_64BIT
Format designator for a 64 bit integer.
Definition: projectcommon.h:32
gint64 starttime
Time to start it next (or when it started if it&#39;s now running)
Definition: resourcecmd.h:52
#define REF(obj)
Definition: assimobj.h:39
gint64 queuetime
Time this particular request entered the queue.
Definition: resourcequeue.c:35
struct _ResourceQueue ResourceQueue
Definition: resourcequeue.h:44
WINEXPORT gint64 g_get_monotonic_time(void)
Local replacement for g_get_monotonic_time() - for old releases of glib.
#define CASTTOCONSTCLASS(Cclass, obj)
Safely cast &#39;obj&#39; to const C-class &#39;class&#39; - verifying that it was registered as being of type class ...
Definition: proj_classes.h:71
Project common header file.
Was not attempted - invalid request.
Definition: childprocess.h:42
FSTATIC void _resource_queue_qelem_finalize(RscQElem *self)
Finalize (free) a RscQElem object.
gboolean cancelme
Cancel after current request completes.
Definition: resourcequeue.c:46
AssimObj * assimobj_new(guint objsize)
Definition: assimobj.c:74
const char * resourcename
Name of this resource.
Definition: resourcecmd.h:59
gint repeatinterval
How often to repeat? 0 == single-shot.
Definition: resourcequeue.c:43
GQueue * ourQ
Which Queue are we in?
Definition: resourcequeue.c:40
#define REQCANCELONFAILFIELD
Definition: resourcecmd.h:74
#define uSPERSEC
Definition: resourcequeue.c:31
gboolean last_success
TRUE if previous operation was successful.
Definition: resourcecmd.h:64
gboolean cancelonfail
TRUE if we should cancel the repeat on failure.
Definition: resourcequeue.c:44
#define CONFIGNAME_INITDELAY
How long to wait before starting.
ResourceQueue * resourcequeue_new(guint structsize)
Construct a new ResourceQueue system (you probably only need one)
Definition: resourcequeue.c:80
HowDied
Definition: childprocess.h:35
#define REQIDENTIFIERNAMEFIELD
Definition: resourcecmd.h:75
AssimObj baseclass
Base object: implements ref, unref, toString.
Definition: resourcequeue.h:48
#define g_info(...)
Definition: projectcommon.h:66
#define CONFIGNAME_INSTANCE
Instance name for discovery.
enum ConfigValType(* gettype)(const ConfigContext *, const char *)
Return type.
Definition: configcontext.h:99
ResourceCmd * resourcecmd_new(ConfigContext *request, gpointer user_data, ResourceCmdCallback callback)
Our ResourceCmd Factory object - constructs an object of the proper subclass for the given instantiat...
Definition: resourcecmd.c:65
Implements the resource queue class.
Exited with zero return code.
Definition: childprocess.h:37
FSTATIC void _resource_queue_hash_key_destructor(gpointer dataptr)
Function for destroying keys when an element is removed from self->resources hash table...
gboolean g_queue_remove_boolean(GQueue *queue, gconstpointer element)
HAVE_G_GET_ENVIRON.
ResourceQueue * parent
Our parent ResourceQueue.
Definition: resourcequeue.c:39
#define DEBUGDECLARATIONS
Definition: proj_classes.h:79
gpointer user_data
user_data for callback
Definition: resourcequeue.c:42
#define DEBUGMSG2(...)
Definition: proj_classes.h:90
void(* _finalize)(AssimObj *)
Free object (private)
Definition: assimobj.h:55
FSTATIC gboolean _resource_queue_cancel(ResourceQueue *self, ConfigContext *request)
Cancel a specific request.
ResourceCmd * cmd
The request.
Definition: resourcequeue.c:38
FSTATIC void _resource_queue_endnotify(ConfigContext *request, gpointer user_data, enum HowDied exittype, int rc, int signal, gboolean core_dumped, const char *stringresult)
Called when an operation completes - it calls requestor&#39;s callback if no repeat, and requeues it if i...
gpointer user_data
User data for the request.
Definition: resourcecmd.h:56
#define CASTTOCLASS(Cclass, obj)
Safely cast &#39;obj&#39; to C-class &#39;class&#39; - verifying that it was registerd as being of type class ...
Definition: proj_classes.h:66
#define UNREF(obj)
Definition: assimobj.h:35
ConfigContext * request
The request.
Definition: resourcecmd.h:55
#define NEWSUBCLASS(Cclass, obj)
Definition: proj_classes.h:67
char * loggingname
Malloced.
Definition: resourcecmd.h:61
gint64 requestid
Request ID.
Definition: resourcequeue.c:45
FSTATIC gint _queue_compare_requestid(gconstpointer a, gconstpointer b)
Compare the request ids of two different ResourceCmds using GCompareFunc style arguments for use with...
FSTATIC void _resource_queue_hash_data_destructor(gpointer dataptr)
self->resources is a hash table of GQueue indexed by the resource name.
FSTATIC gboolean _resource_queue_cmd_append(ResourceQueue *self, ResourceCmd *cmd, ResourceCmdCallback cb, gpointer user_data)
Append a ResourceCmd to a ResourceQueue.