The Assimilation Project  based on Assimilation version 1.1.7.1474836767
fsprotocol.c
Go to the documentation of this file.
1 
24 #include <string.h>
25 #include <projectcommon.h>
26 #include <fsprotocol.h>
27 #include <framesettypes.h>
28 #include <frametypes.h>
29 #include <seqnoframe.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <assert.h>
33 
34 FSTATIC void _fsprotocol_protoelem_destroy(gpointer fsprotoelemthing);
35 FSTATIC gboolean _fsprotocol_protoelem_equal(gconstpointer lhs, gconstpointer rhs);
36 FSTATIC guint _fsprotocol_protoelem_hash(gconstpointer fsprotoelemthing);
37 FSTATIC gboolean _fsprotocol_timeoutfun(gpointer userdata);
38 FSTATIC gboolean _fsprotocol_shuttimeout(gpointer userdata);
39 FSTATIC gboolean _fsprotocol_finalizetimer(gpointer userdata);
40 
43 FSTATIC void _fsprotocol_closeconn(FsProtocol*self, guint16 qid, const NetAddr* destaddr);
45 FSTATIC FsProtoElem* _fsprotocol_find(FsProtocol* self, guint16 qid, const NetAddr* destaddr);
51 FSTATIC gboolean _fsprotocol_send1(FsProtocol*, FrameSet*, guint16 qid, NetAddr*);
52 FSTATIC gboolean _fsprotocol_send(FsProtocol*, GSList*, guint16 qid, NetAddr*);
61 FSTATIC gboolean _fsprotocol_canclose_immediately(gpointer unused_key, gpointer v_fspe, gpointer unused_user);
62 FSTATIC void _fsprotocol_log_conn(FsProtocol* self, guint16 qid, NetAddr* destaddr);
63 
65 
66 FSTATIC void _fsprotocol_auditfspe(const FsProtoElem*, const char * function, int lineno);
67 FSTATIC void _fsprotocol_auditiready(const char * fun, unsigned lineno, const FsProtocol* self);
71 FSTATIC const char* _fsprotocol_fsa_actions(unsigned int actionbits);
76 
77 #define AUDITFSPE(fspe) { if (fspe) _fsprotocol_auditfspe(fspe, __FUNCTION__, __LINE__); }
78 #define AUDITIREADY(self) {_fsprotocol_auditiready(__FUNCTION__, __LINE__, self);}
79 
80 
85 
92  FSPROTO_REQSEND = 1,
101 };
102 
103 static const FsProtoState nextstates[FSPR_INVALID][FSPROTO_INVAL] = {
104 // START REQSEND GOTC_NAK REQSHUTDOWN RCVSHUT, ACKTIMEOUT OUTALLDONE SHUT_TO
108 // SHUT1: No OUTDONE, no CONNSHUT
110 // SHUT2: got CONNSHUT, Waiting for OUTDONE
112 // SHUT3: got OUTDONE, waiting for CONNSHUT
114 };
115 #define A_CLOSE (1<<0)
116 #define A_OOPS (1<<1)
117 #define A_DEBUG (1<<2)
118 #define A_SNDNAK (1<<3)
119 #define A_SNDSHUT (1<<4)
120 #define A_ACKTO (1<<5)
121 #define A_ACKME (1<<6)
122 #define A_TIMER (1<<7)
123 #define A_NOTIME (1<<8)
125 #define A_NOSHUT (1<<9)
126 
127 #define SHUTnTIMER (A_SNDSHUT|A_TIMER)
128 #define ACKnSHUT (A_ACKME|SHUTnTIMER)
129 #define ACKnCLOSE (A_ACKME|A_CLOSE)
130 #define CLOSEnNOTIME (A_CLOSE|A_NOTIME)
131 
132 static const unsigned actions[FSPR_INVALID][FSPROTO_INVAL] = {
133 // START REQSEND GOTCONN_NAK REQSHUTDOWN RCVSHUTDOWN ACKTIMEOUT OUTDONE SHUT_TO
134 /*NONE*/ {0, 0, A_CLOSE, A_CLOSE, SHUTnTIMER, A_ACKTO|A_OOPS, A_OOPS, A_OOPS},
135 /*INIT*/ {0, 0, A_CLOSE, SHUTnTIMER, ACKnSHUT, A_CLOSE, 0, A_OOPS},
136 /*UP*/ {0, 0, A_CLOSE, SHUTnTIMER, ACKnSHUT, A_ACKTO, 0, A_OOPS},
137 //SHUT1: no OUTDONE, no CONNSHUT - only got REQSHUTDOWN
138 /*SHUT1*/{0, A_DEBUG, A_OOPS, 0, A_ACKME, A_CLOSE|A_NOTIME, 0, A_CLOSE},
139 //SHUT2: got CONNSHUT, Waiting for OUTDONE
140 /*SHUT2*/{0, A_DEBUG, 0, 0, A_ACKME, A_CLOSE|A_NOTIME, CLOSEnNOTIME, A_CLOSE},
141 //SHUT3: Got OUTDONE, waiting for CONNSHUT
142 /*SHUT3*/{0, A_DEBUG, A_OOPS, 0, ACKnCLOSE|A_NOTIME, A_CLOSE|A_NOTIME, 0, A_CLOSE},
143 };
144 
146 FSTATIC const char*
148 {
149  static char unknown[32];
150  switch (state) {
151  case FSPR_NONE: return "NONE";
152  case FSPR_INIT: return "INIT";
153  case FSPR_UP: return "UP";
154  case FSPR_SHUT1: return "SHUT1";
155  case FSPR_SHUT2: return "SHUT2";
156  case FSPR_SHUT3: return "SHUT2";
157  case FSPR_INVALID: return "INVALID";
158  }
159  g_snprintf(unknown, sizeof(unknown), "UNKNOWN%d", (int)state);
160  return unknown;
161 }
162 
164 FSTATIC const char*
166 {
167  static char unknown[32];
168  switch (input) {
169  case FSPROTO_GOTSTART: return "GOTSTART";
170  case FSPROTO_REQSEND: return "REQSEND";
171  case FSPROTO_GOTCONN_NAK: return "GOTCONN_NAK";
172  case FSPROTO_REQSHUTDOWN: return "GOTREQSHUTDOWN";
173  case FSPROTO_RCVSHUTDOWN: return "RCVSHUTDOWN";
174  case FSPROTO_ACKTIMEOUT: return "ACKTIMEOUT";
175  case FSPROTO_OUTALLDONE: return "OUTALLDONE";
176  case FSPROTO_SHUT_TO: return "SHUT_TO";
177  case FSPROTO_INVAL: return "INVAL";
178  }
179  g_snprintf(unknown, sizeof(unknown), "UNKNOWN%d", (int)input);
180  return unknown;
181 }
182 
184 FSTATIC const char*
185 _fsprotocol_fsa_actions(unsigned actionmask)
186 {
187  static char result[512];
188  char leftovers[32];
189  unsigned actidx;
190  struct {
191  unsigned bit;
192  const char * bitname;
193  }map[] = {
194  {A_CLOSE, "CLOSE"},
195  {A_OOPS, "OOPS"},
196  {A_DEBUG, "DEBUG"},
197  {A_SNDNAK, "SNDNAK"},
198  {A_SNDSHUT, "SNDSHUT(recursion)"},
199  {A_ACKTO, "ACKTO"},
200  {A_ACKME, "ACKME"},
201  {A_TIMER, "TIMER"},
202  {A_NOTIME, "NOTIME"},
203  {A_NOSHUT, "NOSHUT"},
204  };
205  if (actionmask == 0) {
206  return "None";
207  }
208  result[0] = '\0';
209  for (actidx=0; actidx < DIMOF(map); ++actidx) {
210  if (actionmask & (map[actidx].bit)) {
211  if (result[0] != '\0') {
212  g_strlcat(result, "+", sizeof(result));
213  }
214  g_strlcat(result, map[actidx].bitname, sizeof(result));
215  actionmask &= ~(map[actidx].bit);
216  }
217  }
218  if (actionmask != 0) {
219  g_snprintf(leftovers, sizeof(leftovers), "+0x%x", actionmask);
220  g_strlcat(result, leftovers, sizeof(result));
221  }
222  return result;
223 }
224 
226 FSTATIC void
228  FsProtoState state,
229  FsProtoInput input,
230  guint16 actions)
231 {
232  int index = self->hist_next;
233  self->fsa_states[index] = state;
234  self->fsa_inputs[index] = (guint8)input;
235  self->fsa_actions[index] = actions;
236  index += 1;
237  if (index >= FSPE_HISTSIZE) {
238  index = 0;
239  }
240 }
241 
243 FSTATIC void
245  FsProtoState curstate,
246  FsProtoState nextstate,
247  FsProtoInput input,
248  guint16 nextactions)
249 
250 {
251  int index;
252  int j;
253  char * deststr = self->endpoint->baseclass.toString(&self->endpoint->baseclass);
254  g_info("%s.%d: FSA history for endpoint %s", __FUNCTION__, __LINE__, deststr);
255  FREE(deststr); deststr = NULL;
256 
257  // Start at the hist_next position - it's the oldest element in the circular list
258  index = (self->hist_next+1 >= FSPE_HISTSIZE ? 0 : self->hist_next+1);
259  j = 0;
260  do {
261  g_info("FSA History[%d]: (%s, %s) => (%s, ...)", j
262  , _fsprotocol_fsa_states(self->fsa_states[index])
263  , _fsprotocol_fsa_inputs((FsProtoInput)self->fsa_inputs[index])
264  , _fsprotocol_fsa_actions(self->fsa_actions[index]));
265  j += 1;
266  index = ((index+1) >= FSPE_HISTSIZE ? 0 : (index+1));
267  } while (index != self->hist_next);
268  g_info("FSA Current[%d]: (%s, %s) => (%s , %s)", j
269  , _fsprotocol_fsa_states(curstate)
270  , _fsprotocol_fsa_inputs(input)
271  , _fsprotocol_fsa_actions(nextactions)
272  , _fsprotocol_fsa_states(nextstate));
273 }
274 
275 
277 FSTATIC void
279  FsProtoInput input,
280  FrameSet* fs)
281 {
282  FsProtocol* parent = fspe->parent;
283  FsProtoState curstate;
284  FsProtoState nextstate;
285  unsigned action;
286 
287  (void)parent;
288  g_return_if_fail(fspe->state < FSPR_INVALID);
289  g_return_if_fail(input < FSPROTO_INVAL);
290 
291  curstate = fspe->state;
292  nextstate = nextstates[fspe->state][input];
293  action = actions[fspe->state][input];
294  // DEBUG = 3;
295 
296 #if 0
297  if ((action & (A_CLOSE|A_SNDSHUT|A_NOSHUT))
298  || curstate >= FSPR_SHUT1 || nextstate >= FSPR_SHUT1
299  || FSPROTO_RCVSHUTDOWN == input || FSPROTO_REQSHUTDOWN == input) {
300  action |= A_DEBUG;
301  }
302 #endif
303 
304  DUMP2("_fsprotocol_fsa() {: endpoint ", &fspe->endpoint->baseclass, NULL);
305  if (DEBUG >= 2 || (action & A_DEBUG)) {
306  DEBUGMSG("%s.%d: (state %s, input %s) => (state %s, actions %s)"
307  , __FUNCTION__, __LINE__
309  , _fsprotocol_fsa_states(nextstate), _fsprotocol_fsa_actions(action));
310  }
311 
312  // Complain about an ACK timeout
313  if (action & A_ACKTO) {
314  char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
315  g_warning("%s.%d: Timed out waiting for an ACK while communicating with %s/%d in state %s."
316  , __FUNCTION__, __LINE__, deststr, fspe->_qid, _fsprotocol_fsa_states(curstate));
317  FREE(deststr); deststr = NULL;
318  DUMP3("_fsprotocol_fsa: Output Queue", &fspe->outq->baseclass, NULL);
319  if (DEBUG < 2) {
320  DEBUG = 2;
321  //@FIXME: need to remove this when the protocol gets better...
322  g_warning("%s.%d: RAISING DEBUG LEVEL TO 2", __FUNCTION__, __LINE__);
323  }
324  }
325 
326  // Tell other endpoint we don't like their packet (not currently used?)
327  if (action & A_SNDNAK) {
329  SeqnoFrame* seq;
330  if (fs && NULL != (seq = fs->getseqno(fs))) {
331  frameset_append_frame(fset, &seq->baseclass);
332  }else{
333  g_critical("%s.%d: A_SNDNAK action either without valid FrameSet or valid seqno"
334  " in state %s with input %s", __FUNCTION__, __LINE__
335  , _fsprotocol_fsa_states(curstate)
336  , _fsprotocol_fsa_inputs(input));
337  action |= A_OOPS;
338  }
339  // Should this be being sent reliably? Or w/o protocol?
340  parent->send1(parent, fset, fspe->_qid, fspe->endpoint);
341  UNREF(fset);
342  }
343  if (action & A_ACKME) {
344  SeqnoFrame* seq;
345  if (fs == NULL || NULL == (seq = fs->getseqno(fs))) {
346  g_critical("%s.%d: A_ACKME action either without valid FrameSet or valid seqno"
347  " in state %s with input %s", __FUNCTION__, __LINE__
348  , _fsprotocol_fsa_states(curstate)
349  , _fsprotocol_fsa_inputs(input));
350  action |= A_OOPS;
351  }else{
352  _fsprotocol_ackseqno(parent, fspe->endpoint, seq);
353  }
354  }
355 
356  // Notify other endpoint we're going away
357  if (action & A_SNDSHUT) {
359  // Note that this will generate a recursive call to the FSA...
360  parent->send1(parent, fset, fspe->_qid, fspe->endpoint);
361  if (action & A_DEBUG) {
362  DUMP("HERE IS THE CONNSHUT packet ", &fset->baseclass, "");
363  }
364  UNREF(fset);
365  }
366 
367  // Flush any pending CONNSHUT packets
368  if (action & A_NOSHUT) {
369  // This comes about if an ACK to our CONNSHUT gets lost, then the
370  // CONNSHUT hangs around and causes us heartburn when the far end restarts
371  // and we resend it. Bad idea... https://trello.com/c/mLIA2fXJ
373  }
374 
375  if (action & A_TIMER) { // Start the FSPROTO_SHUT_TO timer
376  if (fspe->shuttimer > 0) {
377  g_warning("%s.%d: Adding SHUTDOWN timer when one is already running."
378  , __FUNCTION__, __LINE__);
379  action |= A_DEBUG;
380  }else{
381  fspe->shuttimer = g_timeout_add_seconds(parent->acktimeout/1000000, _fsprotocol_shuttimeout, fspe);
382  }
383  }
384  if (action & A_NOTIME) { // Cancel the FSPROTO_SHUT_TO timer
385  if (fspe->shuttimer > 0) {
386  g_source_remove(fspe->shuttimer);
387  fspe->shuttimer = 0;
388  }
389  }
390  if (action & A_DEBUG) {
391  char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
392  DEBUGMSG("%s.%d: Got a %s input for %s/%d while in state %s", __FUNCTION__, __LINE__
393  , _fsprotocol_fsa_inputs(input), deststr, fspe->_qid
394  , _fsprotocol_fsa_states(curstate));
395  FREE(deststr); deststr = NULL;
396  }
397 
398 
399  // Should remain the second-to-the-last action in the FSA function
400  // This is because a previous action might want to OR in an A_OOPS into action
401  // to trigger this action - if something is out of whack.
402  if (action & A_OOPS) {
403  char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
404  char * fsstr = (fs ? fs->baseclass.toString(&fs->baseclass) : NULL);
405 
406  g_warning("%s.%d: Got a %s input for %s/%d while in state %s", __FUNCTION__, __LINE__
407  , _fsprotocol_fsa_inputs(input), deststr, fspe->_qid
408  , _fsprotocol_fsa_states(curstate));
409  FREE(deststr); deststr = NULL;
410  if (fsstr) {
411  g_warning("%s.%d: Frameset given was: %s", __FUNCTION__, __LINE__, fsstr);
412  FREE(fsstr);
413  fsstr = NULL;
414  }
415  _fsprotocol_fsa_log_history(fspe, curstate, nextstate, input, action);
416  }
417 
418  if (action & A_CLOSE) {
419  DUMP3("CLOSING CONNECTION (A_CLOSE)", &fspe->endpoint->baseclass, "");
421  fspe->shutdown_complete = TRUE;
422  // Clean this up after a while
423  // The time was chosen to occur after the other end will have given up on us and shut down anyway...
425  fspe->finalizetimer = g_timeout_add_seconds(1+parent->acktimeout/1000000, _fsprotocol_finalizetimer, fspe);
426  }
427  // Check for possible errors in our FSA tables...
428  if (FSPR_NONE == nextstate && curstate != nextstate && fspe->outq->_q->length != 0) {
429  char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
430  g_critical("%s.%d: Inappropriate transition for %s to state NONE"
431  ": (%s, %s) => %s. Actions=[%s], outq length=%d"
432  , __FUNCTION__, __LINE__, deststr
433  , _fsprotocol_fsa_states(curstate)
434  , _fsprotocol_fsa_inputs(input)
435  , _fsprotocol_fsa_states(nextstate)
436  , _fsprotocol_fsa_actions(action)
437  , fspe->outq->_q->length);
438  FREE(deststr); deststr = NULL;
439  _fsprotocol_fsa_log_history(fspe, curstate, nextstate, input, action);
440  }
441  // Having it here maximizes the length of the history...
442  _fsprotocol_fsa_history(fspe, curstate, input, action);
443  fspe->state = nextstate;
444  DEBUGMSG2("} /* %s:%d */", __FUNCTION__, __LINE__);
445 }
446 
448 FSTATIC void
450 {
451  FrameSet* fs;
452  g_return_if_fail(fspe != NULL);
453  fs = fspe->outq->qhead(fspe->outq);
454  if (NULL == fs) {
455  return;
456  }
457  if (FRAMESETTYPE_CONNSHUT == fs->fstype) {
458  DUMP3("_fsprotocol_flush_pending_connshut: FLUSHing this CONNSHUT packet: "
459  , &fs->baseclass, "");
460  fspe->outq->flush1(fspe->outq);
461  }else{
462  DUMP3("_fsprotocol_flush_pending_connshut: NOT FLUSHing this packet: "
463  , &fs->baseclass, "");
464  }
465 }
466 
468 #define TRYXMIT(fspe) {AUDITFSPE(fspe); _fsprotocol_xmitifwecan(fspe);}
469 
470 
471 
473 FSTATIC void
474 _fsprotocol_auditfspe(const FsProtoElem* self, const char * function, int lineno)
475 {
476  guint outqlen = self->outq->_q->length;
477  FsProtocol* parent = self->parent;
478  gboolean in_unackedlist = (g_list_find(parent->unacked, self) != NULL);
479 
480  if (outqlen != 0 && !in_unackedlist) {
481  g_critical("%s:%d: outqlen is %d but not in unacked list"
482  , function, lineno, outqlen);
483  DUMP("WARN: previous unacked warning was for this address", &self->endpoint->baseclass, NULL);
484  }
485  if (outqlen == 0 && in_unackedlist) {
486  g_critical("%s:%d: outqlen is zero but it IS in the unacked list"
487  , function, lineno);
488  DUMP("WARN: previous unacked warning was for this address", &self->endpoint->baseclass, NULL);
489  }
490 }
491 FSTATIC void
492 _fsprotocol_auditiready(const char * fun, unsigned lineno, const FsProtocol* self)
493 {
494  GHashTableIter iter;
495  gpointer key;
496  gpointer value;
497  unsigned hashcount = 0;
498 
499  g_hash_table_iter_init(&iter, self->endpoints);
500 
501  while(g_hash_table_iter_next(&iter, &key, &value)) {
502  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, key);
503  FsQueue* iq = fspe->inq;
504  FrameSet* fs = iq->qhead(iq);
505  SeqnoFrame* seq;
506  // We can read the next packet IF:
507  // it doesn't have a sequence number, OR it is the seqno we expect
508  if (NULL == fs) {
509  continue;
510  }
511  seq = fs->getseqno(fs);
512  if (seq == NULL || seq->_reqid == iq->_nextseqno) {
513  ++hashcount;
514  if (!fspe->inq->isready) {
515  g_critical("%s.%d: Queue is ready but not marked 'isready'"
516  , fun, lineno);
517  DUMP("Queue with problems", &fspe->inq->baseclass, NULL);
518  }
519  }else if (fspe->inq->isready) {
520  g_critical("%s.%d: Queue is NOT ready but IS marked 'isready'"
521  , fun, lineno);
522  DUMP("Problematic Queue", &fspe->inq->baseclass, NULL);
523  }
524  }
525  if (g_queue_get_length(self->ipend) != hashcount) {
526  g_critical("%s.%d: ipend queue length is %d, but should be %d"
527  , fun, lineno, g_queue_get_length(self->ipend), hashcount);
528  }
529 }
530 
535 , guint16 qid
536 , const NetAddr* destaddr)
537 {
538  FsProtoElem* retval = NULL;
540 
541  elem._qid = qid;
542  switch(destaddr->_addrtype) {
543 
544  case ADDR_FAMILY_IPV6:
545  elem.endpoint = destaddr;
546  retval = CASTTOCLASS(FsProtoElem, g_hash_table_lookup(self->endpoints, &elem));
547  break;
548 
549  case ADDR_FAMILY_IPV4: {
550  NetAddr* v6addr = destaddr->toIPv6(destaddr);
551 
552  elem.endpoint = v6addr;
553  retval = CASTTOCLASS(FsProtoElem, g_hash_table_lookup(self->endpoints, &elem));
554  UNREF(v6addr); elem.endpoint = NULL;
555  break;
556  }
557  }
558  return retval;
559 }
560 
565 , NetAddr* addr
566 , FrameSet* fs)
567 {
568  SeqnoFrame* seq = fs->getseqno(fs);
569  guint16 qid = DEFAULT_FSP_QID;
570  FsProtoElem* ret;
571  if (NULL != seq) {
572  qid = seq->getqid(seq);
573  }
574  // Although we normally don't want to allow unsequenced packets to rest our port number,
575  // the exception is a STARTUP packet. They have to be unsequenced but are far more
576  // important in terms of understanding the protocol than something like a heartbeat.
577  //
578  // This only comes up because we have this idea that we have two protocol endpoints
579  // on the CMA - one for the CMA itself, and one for the nanoprobe which is running on it.
580  //
582  ret = self->addconn(self, qid, addr);
583  return ret;
584 }
585 
586 
591 , guint16 qid
592 , NetAddr* destaddr)
593 {
594  FsProtoElem* ret;
595 
596 
597  if ((ret = self->find(self, qid, destaddr))) {
598  return ret;
599  }
600  ret = MALLOCCLASS(FsProtoElem, sizeof(FsProtoElem));
601  if (ret) {
602  ret->endpoint = destaddr->toIPv6(destaddr); // No need to REF() again...
603  ret->_qid = qid;
604  ret->outq = fsqueue_new(0, ret->endpoint, qid);
605  ret->inq = fsqueue_new(0, ret->endpoint, qid);
606  ret->lastacksent = NULL;
607  ret->lastseqsent = NULL;
608  ret->parent = self;
609  ret->nextrexmit = 0;
610  ret->acktimeout = 0;
611  ret->state = FSPR_NONE;
612  ret->shuttimer = 0;
613  ret->finalizetimer = 0;
614  ret->shutdown_complete = FALSE;
615  ret->is_encrypted = FALSE;
616  ret->peer_identity = NULL;
617  ret->hist_next = 0;
618  memset(ret->fsa_states, 0, sizeof(ret->fsa_states));
619  memset(ret->fsa_inputs, 0, sizeof(ret->fsa_inputs));
620  memset(ret->fsa_actions, 0, sizeof(ret->fsa_actions));
621  // This lookup assumes FsProtoElemSearchKey looks like the start of FsProtoElem
622  g_warn_if_fail(NULL == g_hash_table_lookup(self->endpoints, ret));
623  g_hash_table_insert(self->endpoints, ret, ret);
624  DEBUGMSG3("%s: Creating new FSPE connection (%p) for qid = %d. Dest address follows."
625  , __FUNCTION__, ret, qid);
626  DUMP3(__FUNCTION__, &ret->endpoint->baseclass, " is dest address for new FSPE");
627  }
628  return ret;
629 }
630 
632 FSTATIC void
634 , guint16 qid
635 , const NetAddr* destaddr)
636 {
637  FsProtoElem* fspe = _fsprotocol_find(self, qid, destaddr);
638  DUMP3("_fsprotocol_closeconn() - closing connection to", &destaddr->baseclass, NULL);
639  if (fspe) {
640  DUMP3("_fsprotocol_closeconn: shutting down connection to", &destaddr->baseclass, NULL);
642 
643  }else if (DEBUG > 0) {
644  char suffix[16];
645  g_snprintf(suffix, sizeof(suffix), "/%d", qid);
646  DUMP("_fsprotocol_closeconn: Could not locate connection", &destaddr->baseclass, suffix);
647  }
648 }
650 FSTATIC void
652 {
653  GHashTableIter iter;
654  gpointer key;
655  gpointer value;
656 
657  DEBUGMSG("In %s.%d", __FUNCTION__, __LINE__);
658  // Can't modify the table during an iteration...
659  g_hash_table_foreach_remove(self->endpoints, _fsprotocol_canclose_immediately, NULL);
660  g_hash_table_iter_init(&iter, self->endpoints);
661 
662  while(g_hash_table_iter_next(&iter, &key, &value)) {
663  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, key);
664  _fsprotocol_closeconn(self, fspe->_qid, fspe->endpoint);
665  }
666 }
667 
669 FSTATIC gboolean
670 _fsprotocol_canclose_immediately(gpointer v_fspe, gpointer unused, gpointer unused_user)
671 {
672  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, v_fspe);
673  gboolean ret;
674  (void)unused;
675  (void)unused_user;
676  ret = (fspe->outq->_nextseqno <= 1 && fspe->inq->_nextseqno <= 1);
677  if (ret) {
678  DUMP3("IMMEDIATE REMOVE OF", CASTTOCLASS(AssimObj, &fspe->endpoint->baseclass), "");
679  }
680  return ret;
681 }
682 
683 
684 FSTATIC int
686 {
687  GHashTableIter iter;
688  gpointer key;
689  gpointer value;
690  int count = 0;
691 
692  g_hash_table_iter_init(&iter, self->endpoints);
693 
694  while(g_hash_table_iter_next(&iter, &key, &value)) {
695  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, key);
696  FsProtoState state = fspe->state;
697  if (state != FSPR_NONE
698  && (fspe->inq->_nextseqno > 1 || fspe->outq->_nextseqno > 1)
699  && !fspe->shutdown_complete) {
700  DUMP5("THIS CONNECTION IS ACTIVE", CASTTOCLASS(AssimObj,&fspe->endpoint->baseclass), "");
701  ++count;
702  }
703  }
704  if (count == 0) {
705  g_hash_table_iter_init(&iter, self->endpoints);
706  while(g_hash_table_iter_next(&iter, &key, &value)) {
707  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, key);
708  fspe->shutdown_complete = FALSE;
709  }
710  }
711  return count;
712 }
713 
716 {
717  FsProtoElem* fspe = _fsprotocol_find(self, qid, destaddr);
718  if (fspe == NULL) {
719  return FSPR_NONE;
720  }
721  return fspe->state;
722 }
723 
724 // Reinitialize an FSPE into a no-connection state
725 FSTATIC void
727 {
728 
729  if (!g_queue_is_empty(self->outq->_q)) {
730  DUMP3("REINIT OF OUTQ", &self->outq->baseclass, __FUNCTION__);
731  self->outq->flush(self->outq);
732  self->parent->unacked = g_list_remove(self->parent->unacked, self);
733  self->outq->isready = FALSE;
734  }
735  // See the code in _fsqueue_enq and also in seqnoframe_new_init for how all these pieces
736  // fit together...
737  self->outq->_nextseqno = 1;
738  if (self->outq->_sessionid != 0) {
739  self->outq->_sessionid += 1;
740  }
741  if (!g_queue_is_empty(self->inq->_q)) {
742  self->inq->flush(self->inq);
743  g_queue_remove(self->parent->ipend, self);
744  self->inq->isready = FALSE;
745  }
746  self->inq->_nextseqno = 1;
747  self->inq->_sessionid = 0;
748 
749  if (self->lastacksent) {
750  UNREF2(self->lastacksent);
751  }
752  if (self->lastseqsent) {
753  UNREF2(self->lastseqsent);
754  }
755  if (self->shuttimer > 0) {
756  g_source_remove(self->shuttimer);
757  self->shuttimer = 0;
758  }
759  if (self->finalizetimer > 0) {
760  g_source_remove(self->finalizetimer);
761  self->finalizetimer = 0;
762  }
763  self->nextrexmit = 0;
764  self->acktimeout = 0;
765  self->state = FSPR_NONE;
766  self->shutdown_complete = FALSE;
767  AUDITIREADY(self->parent);
768 }
769 
772 FSTATIC void
774 {
775  DUMP5("_fsprotocol_fspe_closeconn: removing connection to", &self->endpoint->baseclass, NULL);
776  g_hash_table_remove(self->parent->endpoints, self);
777  self = NULL;
778 }
779 
780 
781 
784 fsprotocol_new(guint objsize
785 , NetIO* io
786 , guint rexmit_timer_uS)
787 {
788  FsProtocol* self;
790  if (objsize < sizeof(FsProtocol)) {
791  objsize = sizeof(FsProtocol);
792  }
793  self = NEWSUBCLASS(FsProtocol, assimobj_new(objsize));
794  if (!self) {
795  return NULL;
796  }
797  // Initialize our (virtual) member functions
798  self->baseclass._finalize = _fsprotocol_finalize;
799  self->find = _fsprotocol_find;
800  self->findbypkt = _fsprotocol_findbypkt;
801  self->addconn = _fsprotocol_addconn;
802  self->iready = _fsprotocol_iready;
803  self->outputpending = _fsprotocol_outputpending;
804  self->read = _fsprotocol_read;
805  self->receive = _fsprotocol_receive;
806  self->send1 = _fsprotocol_send1;
807  self->send = _fsprotocol_send;
808  self->ackmessage = _fsprotocol_ackmessage;
809  self->closeconn = _fsprotocol_closeconn;
810  self->closeall = _fsprotocol_closeall;
811  self->activeconncount = _fsprotocol_activeconncount;
812  self->connstate = _fsprotocol_connstate;
813  self->log_conn = _fsprotocol_log_conn;
814 
815  // Initialize our data members
816  self->io = io; // REF(io);
817  // NOTE that the REF has been commented out to prevent
818  // a circular reference chain - screwing up freeing things...
819 
822  self->endpoints = g_hash_table_new_full(_fsprotocol_protoelem_hash,_fsprotocol_protoelem_equal
824  self->unacked = NULL;
825  self->ipend = g_queue_new();
826  self->window_size = FSPROTO_WINDOWSIZE;
827  self->rexmit_interval = FSPROTO_REXMITINTERVAL;
828  self->acktimeout = FSPROTO_ACKTIMEOUTINT;
829 
830  if (rexmit_timer_uS == 0) {
831  rexmit_timer_uS = self->rexmit_interval/2;
832  }
833 
834 
835  if ((rexmit_timer_uS % 1000000) == 0) {
836  self->_timersrc = g_timeout_add_seconds(rexmit_timer_uS/1000000, _fsprotocol_timeoutfun, self);
837  }else{
838  self->_timersrc = g_timeout_add(rexmit_timer_uS/1000, _fsprotocol_timeoutfun, self);
839  }
840  DEBUGMSG3("%s: Constructed new FsProtocol object (%p)", __FUNCTION__, self);
841  return self;
842 }
843 
845 FSTATIC void
847 {
848  FsProtocol* self = CASTTOCLASS(FsProtocol, aself);
849 
850  DUMP3("_fsprotocol_finalize - this object", aself, NULL);
851  if (self->_timersrc) {
852  g_source_remove(self->_timersrc);
853  self->_timersrc = 0;
854  }
855 
856  // Free up our hash table of endpoints
857  if (self->endpoints) {
858  g_hash_table_destroy(self->endpoints); // It will free the FsProtoElems contained therein
859  self->endpoints = NULL;
860  }
861 
862  // Free up the unacked list
863  g_list_free(self->unacked); // No additional 'ref's were taken for this list
864  self->unacked = NULL;
865 
866  // Free up the input pending list
867  g_queue_free(self->ipend); // No additional 'ref's were taken for this list either
868  self->ipend = NULL;
869 
870 
871  // Lastly free our base storage
872  FREECLASSOBJ(self);
873 }
874 
876 FSTATIC void
877 _fsprotocol_protoelem_destroy(gpointer fsprotoelemthing)
878 {
879  FsProtoElem * self = CASTTOCLASS(FsProtoElem, fsprotoelemthing);
880  DUMP5("Destroying FsProtoElem", &self->endpoint->baseclass, __FUNCTION__);
881 
882  DUMP3("Destroying FsProtoElem", &self->endpoint->baseclass, __FUNCTION__);
883  // This does a lot of our cleanup - but doesn't destroy anything important...
885 
886  // So let's get on with the destruction ;-)
887  DUMP3("UNREFING FSPE: endpoint", &self->endpoint->baseclass, __FUNCTION__);
888  UNREF(self->endpoint);
889  DUMP3("UNREFING FSPE: INQ", &self->inq->baseclass, __FUNCTION__);
890  UNREF(self->inq);
891  DUMP3("UNREFING FSPE: OUTQ", &self->outq->baseclass, __FUNCTION__);
892  UNREF(self->outq);
893  self->parent = NULL;
894  if (self->peer_identity) {
895  FREE(self->peer_identity); self->peer_identity = NULL;
896  }
897  memset(self, 0, sizeof(*self));
898  FREECLASSOBJ(self);
899 }
900 
902 FSTATIC gboolean
903 _fsprotocol_protoelem_equal(gconstpointer lhs
904 , gconstpointer rhs)
905 {
906  const FsProtoElem * lhselem = (const FsProtoElem*)lhs;
907  const FsProtoElem * rhselem = (const FsProtoElem*)rhs;
908 
909  return lhselem->_qid == rhselem->_qid
910  && lhselem->endpoint->equal(lhselem->endpoint, rhselem->endpoint);
911 
912 
913 }
914 
916 FSTATIC guint
917 _fsprotocol_protoelem_hash(gconstpointer fsprotoelemthing)
918 {
919  const FsProtoElem * key = (const FsProtoElem*)fsprotoelemthing;
920  // One could imagine doing a random circular rotate on the Queue Id before xoring it...
921  // But this is probably good enough...
922  return (key->endpoint->hash(key->endpoint) ^ key->_qid);
923 }
924 
926 FSTATIC gboolean
928 {
929  AUDITIREADY(self);
930  return !g_queue_is_empty(self->ipend);
931 }
932 
934 FSTATIC gboolean
936 {
937  return self->unacked != NULL;
938 }
939 
943 , NetAddr** fromaddr)
944 {
945  GList* list; // List of all our FsQueues which have input
946 
947  AUDITIREADY(self);
948  // Loop over all the FSqueues which we think are ready to read...
949  for (list=self->ipend->head; list != NULL; list=list->next) {
950  FrameSet* fs;
951  SeqnoFrame* seq;
952  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, list->data);
953  FsQueue* iq;
954  if (NULL == fspe || (NULL == (iq = fspe->inq)) || (NULL == (fs = iq->qhead(iq)))) {
955  g_warn_if_reached();
956  continue;
957  }
958  if (!fspe->inq->isready) {
959  g_warn_if_reached();
960  // But trudge on anyway...
961  }
962  seq = fs->getseqno(fs);
963  // Look to see if there is something ready to be read on this queue
964  // There should be something ready to be read!
965  if (seq == NULL || seq->_reqid == iq->_nextseqno) {
966  FrameSet* ret;
967  gboolean del_link = FALSE;
968  REF(iq->_destaddr);
969  *fromaddr = iq->_destaddr;
970  ret = iq->deq(iq);
971  DEBUGMSG3("%s.%d: Reading Frameset of type %d:"
972  , __FUNCTION__, __LINE__, fs->fstype);
973  DUMP3("_fsprotocol_read: Dequeuing FrameSet from: ", &(*fromaddr)->baseclass, "");
974  DUMP3("_fsprotocol_read: Dequeuing FrameSet: ", &ret->baseclass, "");
975  if (seq != NULL) {
976  iq->_nextseqno += 1;
977  }else{
978  DUMP4("fsprotocol_read: returning unsequenced frame", &ret->baseclass, NULL);
979  }
980  // Now look and see if there will _still_ be something
981  // ready to be read on this input queue. If not, then
982  // we should remove this FsProtoElem from the 'ipend' queue
983  fs = iq->qhead(iq);
984  if (fs == NULL) {
985  // Our FsQueue is empty. Remove our FsProtoElem from the ipend queue
986  del_link = TRUE;
987  }else{
988  // We can read the next packet IF:
989  // it doesn't have a sequence number, OR it is the seqno we expect
990  seq = fs->getseqno(fs);
991  if (seq != NULL && seq->_reqid != iq->_nextseqno) {
992  del_link = TRUE;
993  }
994  }
995  g_queue_remove(self->ipend, fspe);
996  if (del_link) {
997  fspe->inq->isready = FALSE;
998  }else{
999  // Give someone else a chance to get their packets read
1000  // Otherwise we get stuck reading the same endpoint(s) over and over
1001  // at least while reading initial discovery data.
1002  fspe->inq->isready = TRUE;
1003  g_queue_push_tail(self->ipend, fspe);
1004  }
1005  if (ret && FRAMESETTYPE_CONNSHUT == ret->fstype) {
1007  }
1008  TRYXMIT(fspe);
1009  self->io->stats.reliablereads++;
1010  AUDITIREADY(self);
1011  return ret;
1012  }
1013  AUDITIREADY(self);
1014  g_warn_if_reached();
1015  TRYXMIT(fspe);
1016  }
1017  AUDITIREADY(self);
1018  return NULL;
1019 }
1020 
1022 FSTATIC void
1024 , NetAddr* fromaddr
1025 , FrameSet* fs)
1026 {
1027  SeqnoFrame* seq = fs->getseqno(fs);
1028  FsProtoElem* fspe;
1029  const char * keyid = NULL;
1030  gpointer maybecrypt;
1031  const char* sender_id = NULL;
1032 
1033  fspe = self->findbypkt(self, fromaddr, fs);
1034  if (fspe == NULL) {
1035  goto badret;
1036  }
1037  AUDITIREADY(self);
1038  AUDITFSPE(fspe);
1039 
1040  DEBUGMSG3("%s.%d: Received type FrameSet fstype=%d", __FUNCTION__, __LINE__, fs->fstype);
1041  // Once we start talking encrypted on a channel, we make sure
1042  // that all future packets are encrypted.
1043  // If we know the identity of the far end, we make sure future packets
1044  // come from that identity.
1045  maybecrypt = g_slist_nth_data(fs->framelist, 1);
1046  if (maybecrypt && OBJ_IS_A(maybecrypt, "CryptFrame")) {
1047  keyid = CASTTOCLASS(CryptFrame, maybecrypt)->sender_key_id;
1048  }
1049  if (keyid) {
1050  sender_id = cryptframe_whois_key_id(keyid);
1051  fspe->is_encrypted = TRUE;
1052  if (sender_id && !fspe->peer_identity) {
1053  fspe->peer_identity = g_strdup(sender_id);
1054  }
1055  }
1056  if (fs->fstype >= MIN_SEQFRAMESET) {
1057  // In this case, we enforce encryption and identity...
1058  if (fspe->peer_identity) {
1059  if (!sender_id || strcmp(sender_id, fspe->peer_identity) != 0) {
1060  char * srcstr = fromaddr->baseclass.toString(&fromaddr->baseclass);
1061  g_warning("%s.%d: Discarded FrameSet %d from %s with wrong identity"
1062  ": %s instead of %s [key id %s]"
1063  , __FUNCTION__, __LINE__, fs->fstype, srcstr, sender_id
1064  , fspe->peer_identity, keyid);
1065  g_free(srcstr); srcstr = NULL;
1066  DUMP("_fsprotocol_receive: FrameSet w/wrong identity: ", &fs->baseclass, "")
1067  // If any are bad - throw out the whole packet
1068  goto badret;
1069  }
1070  }else if (fspe->is_encrypted && !keyid) {
1071  char * srcstr = fromaddr->baseclass.toString(&fromaddr->baseclass);
1072  g_warning("%s.%d: Discarded unencrypted FrameSet %d"
1073  " on encrypted channel from address %s."
1074  , __FUNCTION__, __LINE__, fs->fstype, srcstr);
1075  g_free(srcstr); srcstr = NULL;
1076  DUMP("_fsprotocol_receive: unencrypted FrameSet is: ", &fs->baseclass, "")
1077  goto badret;
1078  }
1079  }
1080  UNREF(fromaddr);
1081  switch(fs->fstype) {
1082  case FRAMESETTYPE_ACK: {
1083  guint64 now = g_get_monotonic_time();
1084  int ackcount = 0;
1085  // Find the packet being ACKed, remove it from the output queue, and send
1086  // out the next packet in that output queue...
1087  self->io->stats.acksrecvd++;
1088  g_return_if_fail(seq != NULL);
1089  ackcount = fspe->outq->ackthrough(fspe->outq, seq);
1090  if (ackcount < 0) {
1091  // This can happen when shutting down - if we've already shut down
1092  // and got a duplicate ACK
1093  DUMP3("Received bad ACK from", &fspe->endpoint->baseclass, NULL);
1094  DUMP3(__FUNCTION__, &fs->baseclass, " was ACK received.");
1095  }else if (fspe->outq->_q->length == 0) {
1096  fspe->parent->unacked = g_list_remove(fspe->parent->unacked, fspe);
1097  fspe->nextrexmit = 0;
1098  TRYXMIT(fspe);
1099  fspe->acktimeout = 0;
1100  if (ackcount > 0) {
1102  }
1103  }else{
1104  fspe->nextrexmit = now + self->rexmit_interval;
1105  fspe->acktimeout = now + self->acktimeout;
1106  TRYXMIT(fspe);
1107  }
1108  AUDITIREADY(self);
1109  return;
1110  }
1111  case FRAMESETTYPE_CONNNAK: {
1113  AUDITIREADY(self);
1114  return;
1115  }
1116 #if 0
1117  // We now process this when the client reads the packet (i.e., in order)
1118  case FRAMESETTYPE_CONNSHUT: {
1120  AUDITIREADY(self);
1121  return;
1122  }
1123 #endif
1124  default:
1125  /* Process below... */
1126  break;
1127  }
1128  AUDITFSPE(fspe);
1129  AUDITIREADY(self);
1130  // Queue up the received frameset
1131  DUMP3(__FUNCTION__, &fs->baseclass, "given to inq->inqsorted");
1132  if (fspe->inq->inqsorted(fspe->inq, fs)) {
1133  // It inserted correctly.
1134  if (seq) {
1135  if (fspe->acktimeout == 0) {
1136  fspe->acktimeout = g_get_monotonic_time() + self->acktimeout;
1137  }
1138  if (seq->_reqid == 1) {
1139  _fsprotocol_fsa(fspe, FSPROTO_GOTSTART, fs);
1140  }
1141  }
1142  }else{
1143  DUMP3(__FUNCTION__, &fs->baseclass, " Frameset failed to go into queue :-(.");
1144  DEBUGMSG3("%s.%d: seq=%p lastacksent=%p", __FUNCTION__, __LINE__
1145  , seq, fspe->lastacksent);
1146  // One reason for not queueing it is that we've already sent it
1147  // to our client. If they have already ACKed it, then we will ACK
1148  // it again automatically - because the application won't be shown
1149  // this packet again - so they can't ACK it and our ACK might have
1150  // gotten lost, so we need to send it again...
1151  //
1152  // On the other hand, we cannot re-send an ACK that the application hasn't given us yet...
1153  // We could wind up here if the app is slow to ACK packets we gave it
1154  if (seq && fspe->lastacksent) {
1155  if (seq->_sessionid == fspe->lastacksent->_sessionid
1156  && seq->compare(seq, fspe->lastacksent) <= 0) {
1157  // We've already ACKed this packet - send our highest seq# ACK
1158  DEBUGMSG3("%s.%d: Resending ACK", __FUNCTION__, __LINE__);
1159  _fsprotocol_ackseqno(self, fspe->endpoint, fspe->lastacksent);
1160  }
1161  }
1162  }
1163  AUDITFSPE(fspe);
1164 
1165  DEBUGMSG3("%s: isready: %d seq->_reqid:%d , fspe->inq->_nextseqno: "FMT_64BIT"d"
1166  , __FUNCTION__, fspe->inq->isready, (seq ? (gint)seq->_reqid : -1), fspe->inq->_nextseqno);
1167  // If this queue wasn't shown as ready before - see if it is ready for reading now...
1168  if (!fspe->inq->isready) {
1169  if (seq == NULL || seq->_reqid == fspe->inq->_nextseqno) {
1170  // Now ready to read - put our fspe on the list of fspes with input pending
1171  g_queue_push_head(self->ipend, fspe);
1172  fspe->inq->isready = TRUE;
1173  AUDITIREADY(self);
1174  }
1175  }
1176  AUDITIREADY(self);
1177  AUDITFSPE(fspe);
1178  TRYXMIT(fspe);
1179  return;
1180 badret:
1181  if (fromaddr) {
1182  UNREF(fromaddr);
1183  }
1184 }
1185 
1187 FSTATIC gboolean
1189 , FrameSet* fs
1190 , guint16 qid
1191 , NetAddr* toaddr)
1192 {
1193  FsProtoElem* fspe;
1194  gboolean ret;
1195 
1196  DEBUGMSG3("%s.%d: called", __FUNCTION__, __LINE__);
1197  DUMP3( __FUNCTION__, &fs->baseclass, " is frameset");
1198  DUMP3( __FUNCTION__, &toaddr->baseclass, " is dest address");
1199 
1200  fspe = self->addconn(self, qid, toaddr);
1201  if (NULL == fspe) {
1202  // This can happen if we're shutting down...
1203  DEBUGMSG3("%s.%d: NULL fspe", __FUNCTION__, __LINE__);
1204  return FALSE;
1205  }
1206  g_return_val_if_fail(NULL != fspe, FALSE); // Should not be possible...
1207  AUDITFSPE(fspe);
1208 
1209  if (FSPR_INSHUTDOWN(fspe->state)) {
1210  DEBUGMSG2("%s.%d: Attempt to send FrameSet while link shutting down - FrameSet ignored."
1211  , __FUNCTION__, __LINE__);
1212  return TRUE;
1213  }
1214  DEBUGMSG3("%s.%d: calling fsprotocol_fsa(FSPROTO_REQSEND)", __FUNCTION__, __LINE__);
1215  _fsprotocol_fsa(fspe, FSPROTO_REQSEND, NULL);
1216 
1217  if (fspe->outq->_q->length == 0) {
1218  guint64 now = g_get_monotonic_time();
1221  fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
1222  fspe->nextrexmit = now + self->rexmit_interval;
1223  fspe->acktimeout = now + self->acktimeout;
1224  }
1225  DEBUGMSG4("%s.%d: calling fspe->outq->enq()", __FUNCTION__, __LINE__);
1226  ret = fspe->outq->enq(fspe->outq, fs);
1227  self->io->stats.reliablesends++;
1228  DEBUGMSG4("%s.%d: calling TRYXMIT()", __FUNCTION__, __LINE__);
1229  TRYXMIT(fspe);
1230  AUDITFSPE(fspe);
1231  DEBUGMSG3("%s.%d: returning %s", __FUNCTION__, __LINE__, (ret ? "TRUE" : "FALSE"));
1232  return ret;
1233 }
1235 FSTATIC gboolean
1237 , GSList* framesets
1238 , guint16 qid
1239 , NetAddr* toaddr)
1240 {
1241  FsProtoElem* fspe = self->addconn(self, qid, toaddr);
1242  gboolean ret = TRUE;
1243  AUDITFSPE(fspe);
1244  if (FSPR_INSHUTDOWN(fspe->state)) {
1245  return FALSE;
1246  }
1247  // Send them all -- or none of them...
1248  ret = fspe->outq->hasqspace(fspe->outq, g_slist_length(framesets));
1249 
1250  if (ret) {
1251  GSList* this;
1252  int count = 0;
1253  // Loop over our framesets and send them ouit...
1254  for (this=framesets; this; this=this->next) {
1255  FrameSet* fs = CASTTOCLASS(FrameSet, this->data);
1256  g_return_val_if_fail(fs != NULL, FALSE);
1257  DEBUGMSG3("%s: queueing up frameset %d of type %d"
1258  , __FUNCTION__, count, fs->fstype);
1259  _fsprotocol_send1(self, fs, qid, toaddr);
1260  ++count;
1261  }
1262  }
1263  AUDITFSPE(fspe);
1264  TRYXMIT(fspe);
1265  AUDITFSPE(fspe);
1266  return ret;
1267 }
1268 
1270 FSTATIC void
1272 {
1273  SeqnoFrame* seq = fs->getseqno(fs);
1274  if (seq != 0) {
1275  _fsprotocol_ackseqno(self, destaddr, seq);
1276  }
1277 }
1278 
1280 FSTATIC void
1282 {
1283  FrameSet* fs;
1284  FsProtoElem* fspe;
1285  g_return_if_fail(seq != NULL);
1286 
1287  DUMP3(__FUNCTION__, &seq->baseclass.baseclass, " SENDING ACK.");
1289 
1290  frameset_append_frame(fs, &seq->baseclass);
1291  // Appending the seq frame will increment its reference count
1292 
1293  fspe = self->find(self, seq->_qid, destaddr);
1294  // It is possible that this packet may not be in a queue at this point in time.
1295  // This can happen if there's been a protocol reset from the other end...
1296  // See code in _fsqueue_inqsorted
1297  // But if *our* idea of the session id is zero, then we've done a reset on the way out...
1298  // They may need our ACK for them to shut down properly...
1299  if (seq->_sessionid != fspe->inq->_sessionid && fspe->inq->_sessionid != 0) {
1300  DEBUGMSG2("%s.%d: NOT ACKing packet with session id %d - current session id is %d"
1301  , __FUNCTION__, __LINE__, seq->_sessionid, fspe->inq->_sessionid);
1302  return;
1303  }
1304  // sendaframeset will hang onto frameset and frames as long as it needs them
1305  AUDITFSPE(fspe);
1306  self->io->sendaframeset(self->io, destaddr, fs);
1307  self->io->stats.ackssent++;
1308  AUDITFSPE(fspe);
1309  UNREF(fs);
1310 
1311  if (NULL == fspe) {
1312  // We may have closed this connection
1313  DEBUGMSG3("Sending an ACK on a closed channel.");
1314  DUMP3(__FUNCTION__, &destaddr->baseclass, " is the destination for the ACK.");
1315  DUMP3(__FUNCTION__, &seq->baseclass.baseclass, " is the ACK sequence number.");
1316  }else if ((fspe->lastacksent == NULL || fspe->lastacksent->compare(fspe->lastacksent, seq) < 0)) {
1317  if (fspe->lastacksent) {
1318  UNREF2(fspe->lastacksent);
1319  }
1320  REF2(seq);
1321  fspe->lastacksent = seq;
1322  }
1323 }
1324 
1342 //
1343 FSTATIC void
1345 {
1346  GList* qelem;
1347  FsQueue* outq;
1348  FsProtocol* parent;
1349  SeqnoFrame* lastseq;
1350  NetIO* io;
1351  guint orig_outstanding;
1352  gint64 now;
1353 
1354  g_return_if_fail(fspe != NULL);
1355  outq = fspe->outq;
1356  parent = fspe->parent;
1357  lastseq = fspe->lastseqsent;
1358  io = parent->io;
1359  orig_outstanding = fspe->outq->_q->length;
1360 
1361  AUDITFSPE(fspe);
1362  // Look for any new packets that might have showed up to send
1363  // Check to see if we've exceeded our window size...
1364  if (fspe->outq->_q->length < parent->window_size) {
1365  // Nope. Look for packets that we haven't yet sent.
1366  // This code is sub-optimal when congestion occurs and we have a larger
1367  // window size (i.e. when we have a number of un-ACKed packets)
1368  for (qelem=outq->_q->head; NULL != qelem; qelem=qelem->next) {
1369  FrameSet* fs = CASTTOCLASS(FrameSet, qelem->data);
1370  SeqnoFrame* seq = fs->getseqno(fs);
1371  if (NULL != lastseq && NULL != seq && seq->compare(seq, lastseq) <= 0) {
1372  // Not a new packet (we've sent it before)
1373  continue;
1374  }
1375  DUMP3(__FUNCTION__, &fs->baseclass, " is frameset");
1376  DUMP3(__FUNCTION__, &seq->baseclass.baseclass, " is frame being sent");
1377  DUMP3(__FUNCTION__, &fspe->endpoint->baseclass, " is destination endpoint");
1378  io->sendaframeset(io, fspe->endpoint, fs);
1379  if (NULL == seq) {
1380  g_warn_if_reached();
1381  continue;
1382  }
1383  if (lastseq) {
1384  // lastseq is a copy of fspe->lastseqsent
1385  UNREF2(lastseq);
1386  }
1387  lastseq = fspe->lastseqsent = seq;
1388  REF2(lastseq);
1389  if (fspe->outq->_q->length >= parent->window_size) {
1390  break;
1391  }
1392  }
1393  }
1394  AUDITFSPE(fspe);
1395  now = g_get_monotonic_time();
1396 
1397  if (fspe->nextrexmit == 0 && fspe->outq->_q->length > 0) {
1398  // Next retransmission time not yet set...
1399  fspe->nextrexmit = now + parent->rexmit_interval;
1400  AUDITFSPE(fspe);
1401  } else if (fspe->nextrexmit != 0 && now > fspe->nextrexmit) {
1402  FrameSet* fs = outq->qhead(outq);
1403  // It's time to retransmit something. Hurray!
1404  if (NULL != fs) {
1405  // Update next retransmission time...
1406  fspe->nextrexmit = now + parent->rexmit_interval;
1407  DUMP3(__FUNCTION__, &fspe->endpoint->baseclass, " Retransmission target");
1408  DUMP3(__FUNCTION__, &fs->baseclass, " is frameset being REsent");
1409  io->sendaframeset(io, fspe->endpoint, fs);
1410  AUDITFSPE(fspe);
1411 
1412  if (now > fspe->acktimeout) {
1413  _fsprotocol_fsa(fspe, FSPROTO_ACKTIMEOUT, NULL);
1414  // No point in whining incessantly...
1415  fspe->acktimeout = now + parent->acktimeout;
1416  }
1417  }else{
1418  g_warn_if_reached();
1419  fspe->nextrexmit = 0;
1420  }
1421  }
1422 
1423  // Make sure we remember to check this periodicially for retransmits...
1424  if (orig_outstanding == 0 && fspe->outq->_q->length > 0) {
1425  // Put 'fspe' on the list of fspe's with unacked packets
1426  fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
1427  // See comment in the _send function regarding eventual efficiency concerns
1428  }
1429  AUDITFSPE(fspe);
1430 }
1431 
1433 FSTATIC gboolean
1434 _fsprotocol_timeoutfun(gpointer userdata)
1435 {
1436  FsProtocol* self = CASTTOCLASS(FsProtocol, userdata);
1437  GList* pending;
1438  GList* next;
1439 
1440  g_return_val_if_fail(self != NULL, FALSE);
1441 
1442  DEBUGMSG4("%s: checking for timeouts: unacked = %p", __FUNCTION__, self->unacked);
1443  for (pending = self->unacked; NULL != pending; pending=next) {
1444  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, pending->data);
1445  next = pending->next;
1446  AUDITFSPE(fspe);
1447  TRYXMIT(fspe);
1448  AUDITFSPE(fspe);
1449  }
1450  return TRUE;
1451 }
1452 
1454 FSTATIC gboolean
1455 _fsprotocol_shuttimeout(gpointer userdata)
1456 {
1457  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, userdata);
1458  _fsprotocol_fsa(fspe, FSPROTO_SHUT_TO, NULL);
1459  return FALSE;
1460 }
1461 
1463 FSTATIC gboolean
1464 _fsprotocol_finalizetimer(gpointer userdata)
1465 {
1466  FsProtoElem* fspe = CASTTOCLASS(FsProtoElem, userdata);
1467 
1468  if (fspe->state != FSPR_NONE) {
1469  AUDITFSPE(fspe);
1470  return FALSE;
1471  }
1473  return FALSE;
1474 }
1476 FSTATIC void
1478 {
1479  char * deststr = destaddr->baseclass.toString(&destaddr->baseclass);
1480  char * qstr;
1481  FsProtoElem* fspe = self->find(self, qid, destaddr);
1482  if (fspe == NULL) {
1483  g_info("Cannot dump connection %s - not found.", deststr);
1484  g_free(deststr); deststr = NULL;
1485  return;
1486  }
1487  qstr = fspe->inq->baseclass.toString(&fspe->inq->baseclass);
1488  g_info("INPUT queue [%s] = %s", deststr, qstr);
1489  g_free(qstr);
1490  qstr = fspe->outq->baseclass.toString(&fspe->outq->baseclass);
1491  g_info("OUTPUT queue [%s] = %s", deststr, qstr);
1492  g_free(qstr); qstr = NULL;
1493  g_free(deststr); deststr = NULL;
1494 }
FrameSet *(* deq)(FsQueue *self)
return and remove head packet
Definition: fsqueue.h:63
guint32 _sessionid
value of this session id
Definition: seqnoframe.h:53
#define TRYXMIT(fspe)
Try and transmit a packet after auditing the FSPE data structure.
Definition: fsprotocol.c:468
#define FRAMESETTYPE_ACK
Frame referred to has been acted on. (can also come from the CMA)
Definition: framesettypes.h:36
#define MALLOCCLASS(Cclass, size)
Allocate memory for an object (which might be further subclassed) - and register it with our C-Class ...
Definition: proj_classes.h:61
It is REQUIRED that these fields are the same as the first two in the FsProtoElem structure...
Definition: fsprotocol.h:99
AssimObj baseclass
Definition: frameset.h:47
#define A_SNDNAK
0x08 Don&#39;t appear to be using this action...
Definition: fsprotocol.c:118
#define AUDITIREADY(self)
Definition: fsprotocol.c:78
SeqnoFrame *(* getseqno)(FrameSet *)
Return the sequence number for this frameset (if any)
Definition: frameset.h:55
gboolean is_encrypted
TRUE if this channel is encrypted.
Definition: fsprotocol.h:90
#define A_DEBUG
0x04 - print state info, etc
Definition: fsprotocol.c:117
FSTATIC void _fsprotocol_fsa_history(FsProtoElem *, FsProtoState, FsProtoInput, guint16)
Add a (state, input, action) to the history for this particular FSA.
Definition: fsprotocol.c:227
#define FSPR_INSHUTDOWN(state)
Definition: fsprotocol.h:70
FSTATIC FsProtoState _fsprotocol_connstate(FsProtocol *self, guint16 qid, const NetAddr *destaddr)
Definition: fsprotocol.c:715
NetAddr * destaddr
Definition: nanomain.c:75
#define REF2(obj)
Definition: assimobj.h:40
FSTATIC FsProtoElem * _fsprotocol_addconn(FsProtocol *self, guint16 qid, NetAddr *destaddr)
Add and return a FsProtoElem connection to our collection of connections...
Definition: fsprotocol.c:590
FSTATIC void _fsprotocol_log_conn(FsProtocol *self, guint16 qid, NetAddr *destaddr)
Dump information about this connection to our logs.
Definition: fsprotocol.c:1477
FSTATIC const char * _fsprotocol_fsa_inputs(FsProtoInput input)
Returns the input name (string) for an input - returns static array.
Definition: fsprotocol.c:165
Received a CONN_NAK packet.
Definition: fsprotocol.c:94
FSTATIC void _fsprotocol_auditiready(const char *fun, unsigned lineno, const FsProtocol *self)
Definition: fsprotocol.c:492
guint16 _qid
Queue id of far endpoint.
Definition: fsprotocol.h:101
#define DEBUGMSG4(...)
Definition: proj_classes.h:92
guint16 _qid
Queue id of far endpoint.
Definition: fsprotocol.h:78
AssimObj baseclass
Base object class for our Class system.
Definition: frame.h:44
Output drained, Waiting for CONNSHUT.
Definition: fsprotocol.h:66
guint shuttimer
FSPROTO_SHUT_TO timer (see FSA for details)
Definition: fsprotocol.h:87
NetAddr * _destaddr
Far endpoint address.
Definition: fsqueue.h:51
guint finalizetimer
Timer for removing these objects (if not reopened)
Definition: fsprotocol.h:88
#define DEBUGMSG(...)
Definition: proj_classes.h:87
#define SHUTnTIMER
Definition: fsprotocol.c:127
FSTATIC gboolean _fsprotocol_protoelem_equal(gconstpointer lhs, gconstpointer rhs)
Equal-compare function for FsProtoElem structures suitable for GHashTables.
Definition: fsprotocol.c:903
gboolean(* send1)(FsProtocol *, FrameSet *, guint16, NetAddr *)
Send one FrameSet class.
Definition: fsprotocol.h:126
FSTATIC gboolean _fsprotocol_send(FsProtocol *, GSList *, guint16 qid, NetAddr *)
Enqueue and send a list of reliable FrameSets (send all or none)
Definition: fsprotocol.c:1236
gboolean(* hasqspace)(FsQueue *self, guint)
TRUE if space for desired packets available.
Definition: fsqueue.h:72
No connection in progress.
Definition: fsprotocol.h:57
AssimObj baseclass
Definition: netaddr.h:44
gint64 rexmit_interval
How often to retransmit - in uS.
Definition: fsprotocol.h:114
#define A_OOPS
0x02 - this should not happen - complain about it
Definition: fsprotocol.c:116
FsQueue * inq
Queue of incoming messages - perhaps missing packets...
Definition: fsprotocol.h:80
Connection initiated, awaiting first ACK packet from far side.
Definition: fsprotocol.h:58
FSTATIC void _fsproto_sendconnak(FsProtoElem *fspe, NetAddr *dest)
FrameSet * frameset_new(guint16 frameset_type)
Construct a new frameset of the given type.
Definition: frameset.c:110
#define ACKnSHUT
Definition: fsprotocol.c:128
NetAddr *(* toIPv6)(const NetAddr *)
Convert this NetAddr to the IPv6 equivalent.
Definition: netaddr.h:56
#define WINEXPORT
Definition: projectcommon.h:45
#define FSTATIC
Definition: projectcommon.h:31
#define DEBUGMSG3(...)
Definition: proj_classes.h:91
guint16 _qid
value of this queue id
Definition: seqnoframe.h:54
gint64 nextrexmit
When to retransmit next...
Definition: fsprotocol.h:84
WINEXPORT FsProtocol * fsprotocol_new(guint objsize, NetIO *io, guint rexmit_timer_uS)
Construct an FsProtocol object.
Definition: fsprotocol.c:784
io
Definition: hello.py:115
GQueue * _q
FrameSet class queue
Definition: fsqueue.h:50
int hist_next
current index into history circular queue
Definition: fsprotocol.h:92
guint16 fsa_actions[FSPE_HISTSIZE]
history of FSA actions
Definition: fsprotocol.h:95
guint window_size
Window size of our connections.
Definition: fsprotocol.h:113
#define FREECLASSOBJ(obj)
Free a C-class object.
Definition: proj_classes.h:76
guint16 fstype
Type of frameset.
Definition: frameset.h:52
FSTATIC void _fsprotocol_closeall(FsProtocol *self)
Start the process of shutting down all our connections.
Definition: fsprotocol.c:651
enum _FsProtoState FsProtoState
Definition: fsprotocol.h:48
NetAddr * endpoint
Who is our partner in this?
Definition: fsprotocol.h:77
#define FSPROTO_REXMITINTERVAL
FsProtocol rexmit interval in uS = 2 secs.
Definition: fsprotocol.h:136
gboolean isready
TRUE when ready for I or O (depending)
Definition: fsqueue.h:53
FSTATIC void _fsprotocol_fsa(FsProtoElem *fspe, FsProtoInput input, FrameSet *fs)
FsProtocol Finite state Automaton modelling connection establishment and shutdown.
Definition: fsprotocol.c:278
#define BINDDEBUG(Cclass)
BINDDEBUG is for telling the class system where the debug variable for this class is - put it in the ...
Definition: proj_classes.h:82
FSTATIC void _fsprotocol_closeconn(FsProtocol *self, guint16 qid, const NetAddr *destaddr)
Close a specific connection - allowing it to be reopened by more communication – effectively a reset...
Definition: fsprotocol.c:633
FSTATIC void _fsprotocol_fspe_reinit(FsProtoElem *self)
Definition: fsprotocol.c:726
guint64 _reqid
value of this request id
Definition: seqnoframe.h:52
WINEXPORT const char * cryptframe_whois_key_id(const char *key_id)
Return the identity associated with the given key id.
Definition: cryptframe.c:371
gint(* ackthrough)(FsQueue *self, SeqnoFrame *)
ACK packets through given seqno.
Definition: fsqueue.h:64
void frameset_append_frame(FrameSet *fs, Frame *f)
Append frame to the front of the end of the frame list.
Definition: frameset.c:143
#define __FUNCTION__
#define DEBUG
Definition: proj_classes.h:85
FSTATIC const char * _fsprotocol_fsa_states(FsProtoState state)
Returns the state name (string) for state - returns static data.
Definition: fsprotocol.c:147
guint(* hash)(const NetAddr *)
Compute hash of the NetAddr.
Definition: netaddr.h:54
SeqnoFrame * lastseqsent
Last sequence number which has been sent at least once.
Definition: fsprotocol.h:82
gint64 acktimeout
ACK timeout interval.
Definition: fsprotocol.h:115
FsQueue * outq
Queue of outbound messages.
Definition: fsprotocol.h:79
#define DUMP3(prefix, obj, suffix)
Definition: proj_classes.h:97
FSTATIC gboolean _fsprotocol_outputpending(FsProtocol *)
Return TRUE if there are any unACKed packets in any output queue.
Definition: fsprotocol.c:935
#define A_SNDSHUT
0x10 Send CONNSHUT packet
Definition: fsprotocol.c:119
FsProtocol * parent
Our parent FsProtocol object.
Definition: fsprotocol.h:83
Header file defining all known FrameSet types THIS FILE MECHANICALLY GENERATED by "/home/alanr/assim/...
#define OBJ_IS_A(obj, Cclass)
Definition: proj_classes.h:72
#define A_ACKTO
0x20 - Announce an ACK timeout - 0x20
Definition: fsprotocol.c:120
Got request to shut down.
Definition: fsprotocol.c:95
#define A_NOSHUT
0x200 Flush out any pending CONNSHUT packets
Definition: fsprotocol.c:125
_FsProtoInput
Inputs are:
Definition: fsprotocol.c:90
void sendaframeset(NetIO *self, const NetAddr *dest, FrameSet *frameset)
< Send a FrameSet list to a NetIO class
#define FMT_64BIT
Format designator for a 64 bit integer.
Definition: projectcommon.h:32
This is an FsQueue class object - designed for queueuing up FrameSet class objects for transmission...
Definition: fsqueue.h:45
#define FREE(m)
Our interface to free.
Definition: projectcommon.h:29
#define REF(obj)
Definition: assimobj.h:39
Connection fully established - received at least one ACK.
Definition: fsprotocol.h:63
AssimObj baseclass
base AssimObj class object
Definition: fsqueue.h:46
#define A_NOTIME
0x100 Cancel the FSPROTO_SHUT_TO timer
Definition: fsprotocol.c:124
WINEXPORT gint64 g_get_monotonic_time(void)
Local replacement for g_get_monotonic_time() - for old releases of glib.
FSTATIC gboolean _fsprotocol_timeoutfun(gpointer userdata)
Retransmit timer function...
Definition: fsprotocol.c:1434
gchar *(* toString)(gconstpointer)
Produce malloc-ed string representation.
Definition: assimobj.h:58
#define ADDR_FAMILY_IPV6
IPv6.
Received a CONNSHUT packet, waiting for output to drain.
Definition: fsprotocol.h:65
FSTATIC void _fsprotocol_flush_pending_connshut(FsProtoElem *fspe)
Flush the leading CONNSHUT packet in the queue – if any.
Definition: fsprotocol.c:449
FSTATIC void _fsprotocol_fsa_log_history(FsProtoElem *, FsProtoState, FsProtoState, FsProtoInput, guint16)
Log our FSA history for all to see...
Definition: fsprotocol.c:244
void(* flush1)(FsQueue *self)
flush head FrameSet in the queue
Definition: fsqueue.h:67
#define FRAMESETTYPE_CONNNAK
Ignoring your connection start request (can also come from CMA)
Definition: framesettypes.h:38
guint8 fsa_inputs[FSPE_HISTSIZE]
history of FSA inputs
Definition: fsprotocol.h:94
FSTATIC void _fsprotocol_receive(FsProtocol *, NetAddr *, FrameSet *)
Enqueue a received packet - handling ACKs when they show up.
Definition: fsprotocol.c:1023
Project common header file.
#define DIMOF(a)
Definition: lldp_min.c:30
FSTATIC const char * _fsprotocol_fsa_actions(unsigned int actionbits)
Header file defining the data layouts for our Frames.
FSTATIC FsProtoElem * _fsprotocol_find(FsProtocol *self, guint16 qid, const NetAddr *destaddr)
Locate the FsProtoElem structure that corresponds to this (destaddr, qid) pair Convert everything to ...
Definition: fsprotocol.c:534
#define CLOSEnNOTIME
Definition: fsprotocol.c:130
guint64 _nextseqno
Next sequence number.
Definition: fsqueue.h:47
WINEXPORT FsQueue * fsqueue_new(guint objsize, NetAddr *dest, guint16 qid)
Construct an FsQueue object - from a (far endpoint address, Queue Id) pair.
Definition: fsqueue.c:347
gboolean shutdown_complete
Definition: fsprotocol.h:89
FSTATIC void _fsprotocol_fspe_closeconn(FsProtoElem *self)
Close down (destroy) an FSPE-level connection Note that this depends on FsProtoElemSearchKey being th...
Definition: fsprotocol.c:773
enum _FsProtoInput FsProtoInput
Definition: fsprotocol.c:64
End marker - Invalid state.
Definition: fsprotocol.h:67
All output has been ACKed.
Definition: fsprotocol.c:98
FSTATIC guint _fsprotocol_protoelem_hash(gconstpointer fsprotoelemthing)
Hash function over FsProtoElem structures suitable for GHashTables.
Definition: fsprotocol.c:917
#define AUDITFSPE(fspe)
Definition: fsprotocol.c:77
gint64 acktimeout
When to timeout waiting for an ACK.
Definition: fsprotocol.h:85
#define FSPE_HISTSIZE
Definition: fsprotocol.h:71
AssimObj * assimobj_new(guint objsize)
Definition: assimobj.c:74
FSTATIC gboolean _fsprotocol_send1(FsProtocol *, FrameSet *, guint16 qid, NetAddr *)
Enqueue and send a single reliable frameset.
Definition: fsprotocol.c:1188
#define DUMP5(prefix, obj, suffix)
Definition: proj_classes.h:99
Got request to send a packet.
Definition: fsprotocol.c:93
gboolean(* inqsorted)(FsQueue *, FrameSet *fs)
Enqueue an incoming FrameSet - sorted by sequence # - no dups allowed Used ONLY for input queues...
Definition: fsqueue.h:58
FSTATIC void _fsprotocol_finalize(AssimObj *aself)
Finalize function for our FsProtocol class objects.
Definition: fsprotocol.c:846
char * peer_identity
Identity of the far end...
Definition: fsprotocol.h:91
End marker – invalid input.
Definition: fsprotocol.c:100
SeqnoFrame * lastacksent
The highest sequence number we&#39;ve sent an ACK for.
Definition: fsprotocol.h:81
Frame baseclass
base Frame class object
Definition: seqnoframe.h:44
FSTATIC int _fsprotocol_activeconncount(FsProtocol *self)
Definition: fsprotocol.c:685
FSTATIC FrameSet * _fsprotocol_read(FsProtocol *, NetAddr **)
Read the next available FrameSet from any of our sources.
Definition: fsprotocol.c:942
GSList * framelist
List of frames in this FrameSet.
Definition: frameset.h:48
The NetAddr class class represents a general network address - whether IP, MAC, or any other type of ...
Definition: netaddr.h:43
gboolean(* equal)(const NetAddr *, const NetAddr *)
Compare NetAddrs.
Definition: netaddr.h:53
#define DUMP(prefix, obj, suffix)
Definition: proj_classes.h:94
FSTATIC gboolean _fsprotocol_finalizetimer(gpointer userdata)
Close down (free up) an FSPE object when the timer pops - provided its still closed...
Definition: fsprotocol.c:1464
#define g_info(...)
Definition: projectcommon.h:66
guint32 _sessionid
Current session id for this Queue.
Definition: fsqueue.h:48
FSTATIC gboolean _fsprotocol_shuttimeout(gpointer userdata)
Channel shutdown timer - invokes the FSA with FSPROTO_SHUT_TO.
Definition: fsprotocol.c:1455
FsProtoState fsa_states[FSPE_HISTSIZE]
history of FSA inputs
Definition: fsprotocol.h:93
#define DEFAULT_FSP_QID
Default Queue ID.
Definition: fsprotocol.h:134
NetIO * io
Our parent NetIO object.
Definition: fsprotocol.h:109
FrameSet class - used for collecting Frames when not on the wire, and for marshalling/demarshalling t...
Definition: frameset.h:46
#define MIN_SEQFRAMESET
First frameset type with a sequence number.
Definition: framesettypes.h:65
FSTATIC void _fsprotocol_xmitifwecan(FsProtoElem *)
Our role in life is to send any packets that need sending.
Definition: fsprotocol.c:1344
Got a timeout waiting for a SHUTDOWN.
Definition: fsprotocol.c:99
guint16(* getqid)(SeqnoFrame *self)
get value of queue id
Definition: seqnoframe.h:47
FrameSet *(* qhead)(FsQueue *self)
return packet at head of queue
Definition: fsqueue.h:62
FSTATIC void _fsprotocol_ackseqno(FsProtocol *self, NetAddr *destaddr, SeqnoFrame *seq)
Send an ACK packet that corresponds to this sequence number frame.
Definition: fsprotocol.c:1281
#define DEBUGDECLARATIONS
Definition: proj_classes.h:79
FsProtoState state
State of this connection.
Definition: fsprotocol.h:86
FSTATIC void _fsprotocol_protoelem_destroy(gpointer fsprotoelemthing)
Finalize function suitable for GHashTables holding FsProtoElems as keys (and values) ...
Definition: fsprotocol.c:877
#define DUMP2(prefix, obj, suffix)
Definition: proj_classes.h:96
Implements the SeqnoFrame class.
#define DEBUGMSG2(...)
Definition: proj_classes.h:90
This is our CryptFrame class object - representing an encryption method.
Definition: cryptframe.h:53
#define A_TIMER
0x80 Start the FSPROTO_SHUT_TO timer - calls _fsprotocol_shuttimeout -
Definition: fsprotocol.c:122
guint16 _addrtype
private: Address type
Definition: netaddr.h:60
#define ADDR_FAMILY_IPV4
IPv4.
FSTATIC FsProtoElem * _fsprotocol_findbypkt(FsProtocol *self, NetAddr *, FrameSet *)
Find the FsProtoElem that corresponds to the given FrameSet class.
Definition: fsprotocol.c:564
FSTATIC gboolean _fsprotocol_canclose_immediately(gpointer unused_key, gpointer v_fspe, gpointer unused_user)
Returns TRUE if the given FSPE can be closed immediately.
Definition: fsprotocol.c:670
#define DUMP4(prefix, obj, suffix)
Definition: proj_classes.h:98
#define FSPROTO_ACKTIMEOUTINT
ACK timeout interval (2 minutes)
Definition: fsprotocol.h:137
#define UNREF2(obj)
Definition: assimobj.h:36
#define ACKnCLOSE
Definition: fsprotocol.c:129
#define CASTTOCLASS(Cclass, obj)
Safely cast &#39;obj&#39; to C-class &#39;class&#39; - verifying that it was registerd as being of type class ...
Definition: proj_classes.h:66
#define UNREF(obj)
Definition: assimobj.h:35
This is a basic NetIO class abstract class for doing network I/O.
Definition: netio.h:58
Implements the FsProtocol object.
GList * unacked
List of FsProtoElems awaiting ACKs.
Definition: fsprotocol.h:111
Received a CONNSHUT packet.
Definition: fsprotocol.c:96
#define A_ACKME
0x40 - Ack this packet
Definition: fsprotocol.c:121
const NetAddr * endpoint
Who is our partner in this?
Definition: fsprotocol.h:100
struct _FsProtocol FsProtocol
Definition: fsprotocol.h:45
#define NEWSUBCLASS(Cclass, obj)
Definition: proj_classes.h:67
This is an SeqnoFrame class TLV (type, length, value) frame.
Definition: seqnoframe.h:43
FSTATIC void _fsprotocol_auditfspe(const FsProtoElem *, const char *function, int lineno)
Audit a FsProtoElem object for consistency */.
Definition: fsprotocol.c:474
int(* compare)(SeqnoFrame *self, SeqnoFrame *rhs)
Compare two SeqnoFrames: -1, 0, +1.
Definition: seqnoframe.h:51
Not a full-blown class - just a utility structure.
Definition: fsprotocol.h:76
FSTATIC gboolean _fsprotocol_iready(FsProtocol *)
Return TRUE if there are any packets available to read.
Definition: fsprotocol.c:927
#define FRAMESETTYPE_CONNSHUT
Shutting down this connection (can also come from CMA)
Definition: framesettypes.h:37
Timed out waiting for an ACK.
Definition: fsprotocol.c:97
#define FSPROTO_WINDOWSIZE
FsProtocol window size.
Definition: fsprotocol.h:135
Received a packet with sequence number 1 and a valid (new) session id.
Definition: fsprotocol.c:91
#define A_CLOSE
0x01 - set cleanup timer
Definition: fsprotocol.c:115
FSTATIC void _fsprotocol_ackmessage(FsProtocol *self, NetAddr *destaddr, FrameSet *fs)
Send an ACK packet that corresponds to this FrameSet.
Definition: fsprotocol.c:1271
This is an FsProtocol class object - implementing a reliable user-level FrameSet class delivery syste...
Definition: fsprotocol.h:107
Waiting on CONNSHUT and ACK.
Definition: fsprotocol.h:64
gboolean(* enq)(FsQueue *self, FrameSet *fs)
Enqueue an outgoing FrameSet - adding seqno ONLY for output queues.
Definition: fsqueue.h:54