The Assimilation Project  based on Assimilation version 1.1.7.1474836767
fsqueue.c
Go to the documentation of this file.
1 
24 #include <projectcommon.h>
25 #include <stdlib.h>
26 #include <fsqueue.h>
27 #include <frameset.h>
28 #include <frametypes.h>
29 
30 
31 
32 
34 
36 FSTATIC gboolean _fsqueue_enq(FsQueue* self, FrameSet* fs);
37 FSTATIC gboolean _fsqueue_inqsorted(FsQueue* self, FrameSet* fs);
41 FSTATIC void _fsqueue_flush(FsQueue* self);
42 FSTATIC void _fsqueue_flush1(FsQueue* self);
43 FSTATIC guint _fsqueue_qlen(FsQueue* self);
44 FSTATIC void _fsqueue_setmaxqlen(FsQueue* self, guint max);
46 FSTATIC gboolean _fsqueue_hasqspace1(FsQueue* self);
47 FSTATIC gboolean _fsqueue_hasqspace(FsQueue* self, guint);
48 FSTATIC char* _fsqueue_toString(gconstpointer);
49 
53 
55 FSTATIC gboolean
57 , FrameSet* fs)
58 {
59  SeqnoFrame* seqno;
60  DEBUGMSG3("%s.%d: inserting fs %p: ref count = %d", __FUNCTION__, __LINE__, fs, fs->baseclass._refcount);
61  // This FrameSet shouldn't have a sequence number frame yet...
62  g_return_val_if_fail(fs->_seqframe == NULL, FALSE);
63  if (self->_maxqlen != 0 && self->_q->length >= self->_maxqlen) {
64  g_critical("%s.%d: Failing due to excess queue length (%d)"
65  , __FUNCTION__, __LINE__, self->_maxqlen);
66  DUMP("Queue contents follows", &self->baseclass, NULL);
67  return FALSE;
68  }
69  seqno = seqnoframe_new_init(FRAMETYPE_REQID, self->_nextseqno, self->_qid);
70  g_return_val_if_fail(seqno != NULL, FALSE);
71  ++self->_nextseqno;
72  DEBUGMSG3("%s.%d: next sequence number for %p is "FMT_64BIT"d", __FUNCTION__, __LINE__
73  , self, self->_nextseqno);
74 
75  // FYI: This coordinates with code in _fsprotocol_fspe_reinit() and seqnoframe_new_init()
76  if (0 == self->_sessionid) {
77  self->_sessionid = seqno->_sessionid;
78  }else{
79  seqno->_sessionid = self->_sessionid;
80  }
81 
82  // Put the frame at the beginning of the frameset
83  frameset_prepend_frame(fs, &seqno->baseclass);
84  // And put this FrameSet at the end of the queue
85  g_queue_push_tail(self->_q, fs);
86 
87  // Now do all the paperwork :-D
88  // We need for the FrameSet to be kept around for potentially a long time...
89  REF(fs);
90  // But we're done with the seqno frame (prepending it to the frameset bumped the ref count)
91  UNREF2(seqno);
92  DUMP3(__FUNCTION__, &self->baseclass, NULL);
93  return TRUE;
94 }
95 
99 {
100  gpointer ret = g_queue_peek_head(self->_q);
101  return ret ? CASTTOCLASS(FrameSet, ret) : NULL;
102 }
103 
107 {
108  gpointer ret = g_queue_pop_head(self->_q);
109  return ret ? CASTTOCLASS(FrameSet, ret) : NULL;
110 }
111 
115 FSTATIC gboolean
117 , FrameSet* fs)
118 {
119  GQueue* Q = self->_q;
120  GList* this;
121  SeqnoFrame* seqno;
122 
123  seqno = fs->_seqframe ? fs->_seqframe : fs->getseqno(fs);
124 
125  DEBUGMSG3("%s.%d: inserting fs %p: ref count = %d", __FUNCTION__, __LINE__, fs, fs->baseclass._refcount);
126 
127  if (seqno) {
128  // Validate sequence number...
129  if (self->_sessionid == 0) {
130  // This indicates the start of a session
131  self->_sessionid = seqno->_sessionid;
132  // If we've restarted since the far end did, they might have sent us
133  // a sequence number greater than 1
134  if (seqno->_reqid >= 2) {
135  char * destaddr = self->_destaddr->baseclass.toString
136  (&self->_destaddr->baseclass);
137  // The possibility exists that this isn't the perfect action.
138  // We could lose a packet from the far endpoint if they have
139  // queued/sent several since the last ACK.
140  g_info("Resuming previous session for %s at sequence number "FMT_64BIT"d"
141  , destaddr, seqno->_reqid);
142  g_free(destaddr); destaddr = NULL;
143  self->_nextseqno = seqno->_reqid;
144  }
145  }else if (seqno->_sessionid < self->_sessionid) {
146  // Replay attack?
147  g_warning("%s.%d: Possible replay attack? Current session id: %d, incoming session id: %d"
148  , __FUNCTION__, __LINE__, self->_sessionid, seqno->_sessionid);
149  return FALSE;
150  }else if (seqno->_sessionid > self->_sessionid) {
151  char * clientaddr = self->_destaddr->baseclass.toString
152  (&self->_destaddr->baseclass);
153  g_info("%s.%d: Protocol reset from client %s - session id updated to %d from %d"
154  , __FUNCTION__, __LINE__, clientaddr, seqno->_sessionid, self->_sessionid);
155  g_free(clientaddr); clientaddr = NULL;
156  // Our peer died before we ACKed them.
157  // We may have already given them to applications...
158  // (see code in _fsprotocol_ackseqno)
159  _fsqueue_flush(self);
160  self->_sessionid = seqno->_sessionid;
161  self->_nextseqno = 1;
162  }
163  if (seqno->_reqid < self->_nextseqno) {
164  // We've already delivered this packet to our customers...
165  // We need to see if we've already sent the ACK for this packet.
166  // If so, we need to ACK it again...
167  DUMP3(__FUNCTION__, &fs->baseclass, " Previously delivered to client");
168  DEBUGMSG3("%s.%d: reason: sequence number is "FMT_64BIT"d but next should be "FMT_64BIT"d"
169  , __FUNCTION__, __LINE__, seqno->_reqid, self->_nextseqno);
170  // Returning FALSE will trigger resending the ACK in _fsprotocol_receive()
171  return FALSE;
172  }
173  }
174 
175  // Probably this shouldn't really log an error - but I'd like to see it happen
176  // if it does -- unless of course, it turns out to happen a lot (unlikely...)
177 
178  if (self->_maxqlen != 0 && self->_q->length >= self->_maxqlen) {
179  g_critical("%s.%d: input queue overflow (maxlength=%d)"
180  , __FUNCTION__, __LINE__, self->_maxqlen);
181  return FALSE;
182  }
183 
184  // Frames without sequence numbers go to the head of the queue
185  if (seqno == NULL) {
186  // This is typically a heartbeat or similar
187  DEBUGMSG3("%s.%d: Pushing unsequenced frame into head of queue"
188  , __FUNCTION__, __LINE__);
189  DUMP3("UnSeqFrame", &fs->baseclass, NULL);
190  g_queue_push_head(Q, fs);
191  REF(fs);
192  return TRUE;
193  }
194 
195  // We have enough room for this FrameSet, and it's a sequenced FrameSet
196  // Insert it in its proper place
197  for (this = Q->head; this; this=this->next) {
198  FrameSet* tfs = CASTTOCLASS(FrameSet, this->data);
199  SeqnoFrame* thisseq = tfs->getseqno(tfs);
200  int diff = seqno->compare(seqno, thisseq);
201  if (diff < 0) {
202  g_queue_insert_before(Q, this, fs);
203  REF(fs);
204  return TRUE;
205  }else if (diff == 0) {
206  // Dup - don't keep it...
207  return TRUE;
208  }
209  }
210  // Either the list is empty, or this belongs after the last list element
211  // Regardless of which is true, we can call g_queue_push_tail...
212  REF(fs);
213  g_queue_push_tail(Q, fs);
214  DUMP3(__FUNCTION__, &self->baseclass, " putting at end of queue");
215  return TRUE;
216 }
217 
222 FSTATIC gint
224 , SeqnoFrame*seq)
225 {
226  FrameSet* fs;
227  guint64 reqid;
228  guint count = 0;
229 
230  g_return_val_if_fail(seq != NULL, 0);
231  DEBUGMSG3("%s.%d: ACKing through (%d:%d:"FMT_64BIT"d)", __FUNCTION__, __LINE__
232  , seq->getsessionid(seq), seq->getqid(seq), seq->getreqid(seq));
233  if (seq->getsessionid(seq) != self->_sessionid) {
234  return -1;
235  }
236 
237  if (seq->getreqid(seq) >= self->_nextseqno) {
238  char * deststr = self->_destaddr->baseclass.toString(&self->_destaddr->baseclass);
239  // I don't think this should happn...
240  g_warning("%s: Incoming ACK packet sequence number "FMT_64BIT"d from %s is >= "
241  FMT_64BIT"d (ACK ignored)."
242  , __FUNCTION__, seq->getreqid(seq), deststr, self->_nextseqno);
243  FREE(deststr); deststr = NULL;
244  DUMP("FsQueue", &self->baseclass, " is the queue in question.");
245  return -1;
246  }
247  reqid = seq->getreqid(seq);
248 
249  // The packets are in the queue in order of ascending sequence number
250  while((fs = self->qhead(self)) != NULL) {
251  SeqnoFrame* fseq = fs->getseqno(fs);
252  if (fseq != NULL && fseq->getreqid(fseq) > reqid) {
253  break;
254  }
255  self->flush1(self);
256  count += 1;
257  }
258  DEBUGMSG3("%s: returning %d - remaining (output) queue length is %d"
259  , __FUNCTION__, count, g_queue_get_length(self->_q));
260  return count;
261 }
262 
267 FSTATIC void
269 {
270  gpointer qelem;
271  while (NULL != (qelem = g_queue_pop_head(self->_q))) {
272  FrameSet * fs = CASTTOCLASS(FrameSet, qelem);
273  SeqnoFrame* seq;
274  DEBUGMSG3("%s: Flushing FrameSet at %p - ref count = %d"
275  , __FUNCTION__, fs, fs->baseclass._refcount);
276  DUMP4("Flushing", &fs->baseclass, " whoosh!");
277  // If this packet is in the input queue and hasn't yet been ACKed by our client application
278  // then there are two ref counts being held for it at the moment..
279  // >>>>>>>>>>>>>>>>>>>>>>>>>This seems totally bogus to me... <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
280  // But I put it in because it fixed/kludged around some problem or another...
281  if (self->_nextseqno > 0 && NULL != (seq = fs->getseqno(fs)) && seq->getreqid(seq) >= self->_nextseqno
282  && fs->baseclass._refcount > 1) {
283  FrameSet* tmpfs = fs;
284  g_debug("%s.%d: seqno "FMT_64BIT"d has refcount %d -> *NOT* dropping by one."
285  , __FUNCTION__, __LINE__, seq->getreqid(seq), fs->baseclass._refcount);
286 #if 0
287  UNREF(tmpfs); // Somewhat kludgy...
288 #else
289  (void)tmpfs;
290 #endif
291  }
292  UNREF(fs);
293  }
294 }
295 
298 FSTATIC void
300 {
301  gpointer qelem = g_queue_pop_head(self->_q);
302  if (qelem) {
303  FrameSet * fs = CASTTOCLASS(FrameSet, qelem);
304  UNREF(fs);
305  }
306 }
307 
309 FSTATIC guint
311 {
312  return self->_q->length;
313 }
314 
316 FSTATIC void
318 , guint max)
319 {
320  self->_maxqlen = max;
321 }
322 
324 FSTATIC guint
326 {
327  return self->_maxqlen;
328 }
329 
331 FSTATIC gboolean
333 {
334  return self->_maxqlen ==0 || g_queue_get_length(self->_q) < self->_maxqlen;
335 }
336 
338 FSTATIC gboolean
340 , guint desired)
341 {
342  return self->_maxqlen == 0 || (self->_maxqlen + desired) >= g_queue_get_length(self->_q);
343 }
344 
347 fsqueue_new(guint objsize
348 , NetAddr* dest
349 , guint16 qid)
350 {
351  FsQueue* self;
353  if (objsize < sizeof(FsQueue)) {
354  objsize = sizeof(FsQueue);
355  }
356  self = NEWSUBCLASS(FsQueue, assimobj_new(objsize));
357  if (!self) {
358  return NULL;
359  }
360  // Initialize member functions
361  self->baseclass._finalize=_fsqueue_finalize;
362  self->baseclass.toString=_fsqueue_toString;
363  self->enq = _fsqueue_enq;
364  self->inqsorted = _fsqueue_inqsorted;
365  self->qhead = _fsqueue_qhead;
366  self->deq = _fsqueue_deq;
367  self->ackthrough = _fsqueue_ackthrough;
368  self->flush = _fsqueue_flush;
369  self->flush1 = _fsqueue_flush1;
370  self->qlen = _fsqueue_qlen;
371  self->setmaxqlen = _fsqueue_setmaxqlen;
372  self->getmaxqlen = _fsqueue_getmaxqlen;
373  self->hasqspace1 = _fsqueue_hasqspace1;
374  self->hasqspace = _fsqueue_hasqspace;
375 
376  // Initialize data members
377  self->_q = g_queue_new();
378  self->_qid = qid;
379  self->_maxqlen = DEFAULT_FSQMAX;
380  self->_nextseqno = 1;
381  self->_sessionid = 0;
382  self->_destaddr = dest; REF(dest);
383  return self;
384 }
386 FSTATIC void
388 {
389  FsQueue* self = CASTTOCLASS(FsQueue, aself);
390 
391  DUMP3("FsQueue finalize", &self->baseclass, __FUNCTION__);
392  // Let our 'destaddr' object go...
393  UNREF(self->_destaddr);
394 
395  // Now flush (free up) any framesets that are still hanging around...
396  self->flush(self);
397 
398  // And free up the queue itself
399  g_queue_free(self->_q); self->_q = NULL;
400  // And finally, free up our direct storage
401  FREECLASSOBJ(self);
402 }
403 
404 FSTATIC char*
405 _fsqueue_toString(gconstpointer vself)
406 {
407  GString* fsqret = NULL;
408  char* ret = NULL;
409  char* tmp = NULL;
410  const FsQueue* self = CASTTOCONSTCLASS(FsQueue, vself);
411  GList* curfs;
412  const char * comma = "";
413 
414  g_return_val_if_fail(self != NULL, NULL);
415  fsqret = g_string_new("");
416 
417  tmp = self->_destaddr->baseclass.toString(&self->_destaddr->baseclass);
418  g_string_append_printf(fsqret
419  , "FsQueue(dest=%s//q=%d, _nextseqno="FMT_64BIT"d, _sessionid=%d, _maxqlen=%d isready=%s, ["
420  , tmp, self->_qid
421  , self->_nextseqno, self->_sessionid, self->_maxqlen, (self->isready? "T" : "F"));
422  g_free(tmp); tmp=NULL;
423 
424  for (curfs=self->_q->head; curfs != NULL; curfs = g_list_next(curfs)) {
425  FrameSet* fs = CASTTOCLASS(FrameSet, curfs->data);
426 
427  tmp = fs->baseclass.toString(&fs->baseclass);
428  g_string_append_printf(fsqret, "%s%s", comma, tmp);
429  g_free(tmp); tmp = NULL;
430  comma=", ";
431  }
432  g_string_append_printf(fsqret, "])");
433  ret = fsqret->str;
434  g_string_free(fsqret, FALSE);
435  return ret;
436 
437 
438 }
guint32 _sessionid
value of this session id
Definition: seqnoframe.h:53
guint32(* getsessionid)(SeqnoFrame *self)
get value of session id
Definition: seqnoframe.h:49
AssimObj baseclass
Definition: frameset.h:47
FSTATIC guint _fsqueue_qlen(FsQueue *self)
Return the current length of this queue.
Definition: fsqueue.c:310
SeqnoFrame * _seqframe
sequence number for this frameset
Definition: frameset.h:54
SeqnoFrame *(* getseqno)(FrameSet *)
Return the sequence number for this frameset (if any)
Definition: frameset.h:55
guint64(* getreqid)(SeqnoFrame *self)
get value of request id
Definition: seqnoframe.h:45
DEBUGDECLARATIONS FSTATIC void _fsqueue_finalize(AssimObj *aself)
Finalize routine for our FsQueue class objects.
Definition: fsqueue.c:387
NetAddr * destaddr
Definition: nanomain.c:75
FSTATIC gboolean _fsqueue_hasqspace(FsQueue *self, guint)
Does this queue have room for &#39;desired&#39; more elements?
Definition: fsqueue.c:339
SeqnoFrame * seqnoframe_new_init(guint16 frametype, guint64 reqid, guint16 qid)
Construct a fully-initialized SeqnoFrame object.
Definition: seqnoframe.c:251
FSTATIC FrameSet * _fsqueue_deq(FsQueue *self)
Return the FrameSet class from the head of the queue - and remove it from the queue.
Definition: fsqueue.c:106
Implements minimal client-oriented Frame and Frameset capabilities.
#define DEFAULT_FSQMAX
Default maximum length for these queues - zero means unlimited.
Definition: fsqueue.h:75
AssimObj baseclass
Definition: netaddr.h:44
FSTATIC void _fsqueue_flush1(FsQueue *self)
Flush only the first frameset from the queue (if any).
Definition: fsqueue.c:299
#define WINEXPORT
Definition: projectcommon.h:45
#define FSTATIC
Definition: projectcommon.h:31
#define DEBUGMSG3(...)
Definition: proj_classes.h:91
int _refcount
Reference count (private)
Definition: assimobj.h:53
#define FREECLASSOBJ(obj)
Free a C-class object.
Definition: proj_classes.h:76
#define BINDDEBUG(Cclass)
BINDDEBUG is for telling the class system where the debug variable for this class is - put it in the ...
Definition: proj_classes.h:82
guint64 _reqid
value of this request id
Definition: seqnoframe.h:52
#define __FUNCTION__
#define DUMP3(prefix, obj, suffix)
Definition: proj_classes.h:97
#define FMT_64BIT
Format designator for a 64 bit integer.
Definition: projectcommon.h:32
This is an FsQueue class object - designed for queueuing up FrameSet class objects for transmission...
Definition: fsqueue.h:45
#define FREE(m)
Our interface to free.
Definition: projectcommon.h:29
#define REF(obj)
Definition: assimobj.h:39
gchar *(* toString)(gconstpointer)
Produce malloc-ed string representation.
Definition: assimobj.h:58
#define CASTTOCONSTCLASS(Cclass, obj)
Safely cast &#39;obj&#39; to const C-class &#39;class&#39; - verifying that it was registered as being of type class ...
Definition: proj_classes.h:71
Project common header file.
Header file defining the data layouts for our Frames.
WINEXPORT FsQueue * fsqueue_new(guint objsize, NetAddr *dest, guint16 qid)
Construct an FsQueue object - from a (far endpoint address, Queue Id) pair.
Definition: fsqueue.c:347
AssimObj * assimobj_new(guint objsize)
Definition: assimobj.c:74
Implements the FsQueue object.
Frame baseclass
base Frame class object
Definition: seqnoframe.h:44
The NetAddr class class represents a general network address - whether IP, MAC, or any other type of ...
Definition: netaddr.h:43
FSTATIC guint _fsqueue_getmaxqlen(FsQueue *self)
Return the maximum number of queue elements.
Definition: fsqueue.c:325
void frameset_prepend_frame(FrameSet *fs, Frame *f)
Prepend frame to the front of the frame list.
Definition: frameset.c:132
FSTATIC char * _fsqueue_toString(gconstpointer)
Definition: fsqueue.c:405
#define DUMP(prefix, obj, suffix)
Definition: proj_classes.h:94
#define g_info(...)
Definition: projectcommon.h:66
FSTATIC FrameSet * _fsqueue_qhead(FsQueue *self)
Return the FrameSet class from the head of the FsQueue class.
Definition: fsqueue.c:98
FrameSet class - used for collecting Frames when not on the wire, and for marshalling/demarshalling t...
Definition: frameset.h:46
guint16(* getqid)(SeqnoFrame *self)
get value of queue id
Definition: seqnoframe.h:47
FSTATIC void _fsqueue_setmaxqlen(FsQueue *self, guint max)
Set the maximum number of queue elements.
Definition: fsqueue.c:317
#define DEBUGDECLARATIONS
Definition: proj_classes.h:79
FSTATIC gboolean _fsqueue_enq(FsQueue *self, FrameSet *fs)
Enqueue a FrameSet class onto an FsQueue class - exclusively for output queues - adds sequence number...
Definition: fsqueue.c:56
FSTATIC gboolean _fsqueue_inqsorted(FsQueue *self, FrameSet *fs)
Enqueue a FrameSet class onto an FsQueue class - sorted by sequence number - NO dups allowed This fun...
Definition: fsqueue.c:116
FSTATIC void _fsqueue_flush(FsQueue *self)
Flush all framesets from the queue (if any).
Definition: fsqueue.c:268
struct _FsQueue FsQueue
Definition: fsqueue.h:37
#define DUMP4(prefix, obj, suffix)
Definition: proj_classes.h:98
FSTATIC gboolean _fsqueue_hasqspace1(FsQueue *self)
Does this queue have room for one more element?
Definition: fsqueue.c:332
#define UNREF2(obj)
Definition: assimobj.h:36
#define CASTTOCLASS(Cclass, obj)
Safely cast &#39;obj&#39; to C-class &#39;class&#39; - verifying that it was registerd as being of type class ...
Definition: proj_classes.h:66
#define UNREF(obj)
Definition: assimobj.h:35
#define NEWSUBCLASS(Cclass, obj)
Definition: proj_classes.h:67
This is an SeqnoFrame class TLV (type, length, value) frame.
Definition: seqnoframe.h:43
int(* compare)(SeqnoFrame *self, SeqnoFrame *rhs)
Compare two SeqnoFrames: -1, 0, +1.
Definition: seqnoframe.h:51
#define FRAMETYPE_REQID
FRAMETYPE_REQID Frame (frametype 4) Frame subclass - SeqnoFrame class.
Definition: frametypes.h:138
FSTATIC gint _fsqueue_ackthrough(FsQueue *self, SeqnoFrame *)
Acknowledge and flush all framesets up through and including the given sequence number This is used e...
Definition: fsqueue.c:223