|
The Assimilation Monitoring Project
|
00001 # vim: smartindent tabstop=4 shiftwidth=4 expandtab 00002 _suites = ['all', 'cclass'] 00003 import sys 00004 sys.path.append("../cma") 00005 from testify import * 00006 00007 from frameinfo import * 00008 from AssimCclasses import * 00009 import gc 00010 import re 00011 00012 00013 CheckForDanglingClasses = True 00014 WorstDanglingCount = 0 00015 DEBUG=False 00016 00017 def assert_no_dangling_Cclasses(): 00018 global CheckForDanglingClasses 00019 global WorstDanglingCount 00020 gc.collect() 00021 count = proj_class_live_object_count() 00022 # Avoid cluttering the output up with redundant messages... 00023 if count > WorstDanglingCount and CheckForDanglingClasses: 00024 WorstDanglingCount = count 00025 proj_class_dump_live_objects() 00026 raise AssertionError, "Dangling C-class objects - %d still around" % count 00027 00028 class pyNetAddrTest(TestCase): 00029 "A pyNetAddr is a network address of some kind... - let's test it" 00030 def test_constructor(self): 00031 if DEBUG: print >>sys.stderr, "===============test_constructor(pyNetAddrTest)" 00032 if DEBUG: print >>sys.stderr, "===============test_constructor(pyNetAddrTest)" 00033 if DEBUG: print >>sys.stderr, "===============test_constructor(pyNetAddrTest)" 00034 if DEBUG: print >>sys.stderr, "===============test_constructor(pyNetAddrTest)" 00035 if DEBUG: print >>sys.stderr, "===============test_constructor(pyNetAddrTest)" 00036 ipv4 = pyNetAddr((1,2,3,4),) 00037 ipv4b = pyNetAddr((1,2,3,5),) 00038 mac48 = pyNetAddr((1,2,3,4,5,6),) 00039 mac64 = pyNetAddr( (1,2,3,4,5,6,7,8),) 00040 ipv6 = pyNetAddr((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16),) 00041 self.assertEqual(str(ipv4), "1.2.3.4") 00042 self.assertEqual(str(ipv4b), "1.2.3.5") 00043 self.assertEqual(str(mac48), "01:02:03:04:05:06") 00044 self.assertEqual(str(mac64), "01:02:03:04:05:06:07:08") 00045 self.assertFalse(ipv4 != ipv4) 00046 self.assertTrue(ipv4 == ipv4) 00047 self.assertTrue(mac48 == mac48) 00048 self.assertTrue(mac64 == mac64) 00049 self.assertFalse(ipv4 == ipv4b) 00050 self.assertFalse(ipv4 == mac48) 00051 self.assertFalse(mac48 == ipv4) 00052 self.assertFalse(ipv4 == mac64) 00053 self.assertFalse(mac64 == ipv4) 00054 self.assertFalse(mac48 == mac64) 00055 self.assertFalse(mac64 == mac48) 00056 self.assertRaises(ValueError, pyNetAddr, (1,)) 00057 self.assertRaises(ValueError, pyNetAddr, (1,2,)) 00058 self.assertRaises(ValueError, pyNetAddr, (1,2,3)) 00059 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5)) 00060 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7)) 00061 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9)) 00062 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10)) 00063 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11)) 00064 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11,12)) 00065 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11,12,13)) 00066 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 00067 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)) 00068 self.assertRaises(ValueError, pyNetAddr, (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17)) 00069 00070 def test_ipv6_str(self): 00071 'Test the str() function for ipv6 - worth a separate test.' 00072 if DEBUG: print >>sys.stderr, "===============test_ipv6_str(pyNetAddrTest)" 00073 ipv6 = pyNetAddr((0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0),) 00074 self.assertEqual(str(ipv6),"::") 00075 ipv6 = pyNetAddr((0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,01),) 00076 self.assertEqual(str(ipv6),"::1") 00077 ipv6 = pyNetAddr((0,0,0,0,0,0,0,0,0,0,0,0,0,0,01,02),) 00078 self.assertEqual(str(ipv6),"::102") 00079 ipv6 = pyNetAddr((0,0,0,1,0,2,0,3,0,4,0,5,0,6,0,7),) 00080 self.assertEqual(str(ipv6),"0:1:2:3:4:5:6:7") 00081 ipv6 = pyNetAddr((0,0,0,0,0,2,0,3,0,4,0,5,0,6,0,7),) 00082 self.assertEqual(str(ipv6),"::2:3:4:5:6:7") 00083 # Example below is from http://en.wikipedia.org/wiki/IPv6_address 00084 # Note that we now convert it into the equivalent IPv4 address as 00085 # suggested: ::ffff:192.0.2.128 00086 ipv6 = pyNetAddr((0,0,0,0,0,0,0,0,0,0,255,255,192,0,2,128),) 00087 self.assertEqual(str(ipv6),"::ffff:192.0.2.128") 00088 00089 @class_teardown 00090 def tearDown(self): 00091 assert_no_dangling_Cclasses() 00092 00093 class pyFrameTest(TestCase): 00094 '''Frames are our basic superclass for things we put on the wire. 00095 This base class just has a generic binary blob with no special 00096 properties. They are all valid (if they have a value)''' 00097 def test_constructor(self): 00098 if DEBUG: print >>sys.stderr, "===============test_constructor(pyFrameTest)" 00099 pyf = pyFrame(100) 00100 self.assertEqual(pyf.frametype(), 100) 00101 self.assertTrue(pyf.isvalid()) 00102 00103 def test_setvalue(self): 00104 if DEBUG: print >>sys.stderr, "===============test_setvalue(pyFrameTest)" 00105 pyf = pyFrame(101) 00106 pyf.setvalue('fred') 00107 self.assertTrue(pyf.isvalid(), "PyFrame('fred') failed isvalid())") 00108 self.assertEqual(pyf.framelen(), 5) 00109 self.assertEqual(pyf.dataspace(), 9) # Total space for this Frame on the wire 00110 self.assertEqual(string_at(pyf.framevalue()), 'fred') # Raw from 'C' 00111 00112 @class_teardown 00113 def tearDown(self): 00114 assert_no_dangling_Cclasses() 00115 00116 class pyAddrFrameTest(TestCase): 00117 'An AddrFrame wraps a NetAddr for sending on the wire' 00118 def test_constructor(self): 00119 if DEBUG: print >>sys.stderr, "===============test_constructor(pyAddrFrameTest)" 00120 pyf = pyAddrFrame(200, addrstring=(1,2,3,4)) 00121 self.assertEqual(pyf.frametype(), 200) 00122 self.assertEqual(pyf.framelen(), 6) 00123 self.assertEqual(str(pyf), 'pyAddrFrame(200, (1.2.3.4))') 00124 self.assertEqual(pyf.addrtype(), 1) 00125 self.assertTrue(pyf.isvalid(), "AddrFrame(200, (1,2,3,4)) failed isvalid()") 00126 self.assertRaises(ValueError, pyAddrFrame, 201, addrstring=(1,2,3)) 00127 00128 @class_teardown 00129 def tearDown(self): 00130 assert_no_dangling_Cclasses() 00131 00132 00133 class pyIpPortFrameTest(TestCase): 00134 'An AddrFrame wraps a NetAddr *with a port* for sending on the wire' 00135 def test_constructor(self): 00136 if DEBUG: print >>sys.stderr, "===============test_constructor(pyIpAddrFrameTest)" 00137 pyf = pyIpPortFrame(200, (1,2,3,4), 1984) 00138 self.assertEqual(pyf.frametype(), 200) 00139 self.assertEqual(pyf.framelen(), 8) 00140 self.assertEqual(str(pyf), 'pyIpPortFrame(200, (1.2.3.4:1984))') 00141 self.assertEqual(pyf.getnetaddr(), pyNetAddr('1.2.3.4:1984')) 00142 self.assertEqual(pyf.addrtype(), 1) 00143 self.assertTrue(pyf.isvalid(), "pyIpPortFrame(200, (1,2,3,4:1984)) failed isvalid()") 00144 self.assertRaises(ValueError, pyIpPortFrame, 201, (1,2,3),80) 00145 pyf = pyIpPortFrame(202, (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16), 1984) 00146 self.assertEqual(pyf.frametype(), 202) 00147 self.assertEqual(pyf.framelen(), 20) 00148 self.assertEqual(pyf.addrtype(), 2) 00149 self.assertTrue(pyf.isvalid(), 'pyIpPortFrame(202, ([102:304:506:708:90a:b0c:d0e:f10]:1984))') 00150 self.assertEqual(str(pyf), 'pyIpPortFrame(202, ([102:304:506:708:90a:b0c:d0e:f10]:1984))') 00151 sameaddr = pyNetAddr([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10], port=1984) 00152 self.assertEqual(pyf.getnetaddr(), sameaddr) 00153 pyf = pyIpPortFrame(202, sameaddr, None) 00154 self.assertEqual(str(pyf), 'pyIpPortFrame(202, ([102:304:506:708:90a:b0c:d0e:f10]:1984))') 00155 self.assertEqual(pyf.getnetaddr(), sameaddr) 00156 00157 00158 @class_teardown 00159 def tearDown(self): 00160 assert_no_dangling_Cclasses() 00161 00162 class pyIntFrameTest(TestCase): 00163 'An IntFrame wraps various sizes of unsigned integers for sending on the wire' 00164 def test_constructor(self): 00165 if DEBUG: print >>sys.stderr, "========================test_constructor(pyIntFrameTest)" 00166 # Test a variety of illegal/unsupported integer sizes 00167 for size in (5, 6, 7, 9, 10): 00168 self.assertRaises(ValueError, pyIntFrame, 300+size-1, intbytes=size) 00169 # Some network protocols have 3-byte integers. So I implemented them. 00170 for size in (1, 2, 3, 4, 8): 00171 pyf = pyIntFrame(310+size, initval=42, intbytes=size) 00172 self.assertTrue(pyf.isvalid()) 00173 self.assertEqual(pyf.intlength(), size) 00174 self.assertEqual(int(pyf), 42) 00175 self.assertEqual(str(pyf), 'pyIntFrame(%d, (42))' % (310+size)) 00176 00177 def test_set(self): 00178 'Test setting integer values for all the size integers' 00179 if DEBUG: print >>sys.stderr, "========================test_set(pyIntFrameTest)" 00180 for size in (1, 2, 3, 4, 8): 00181 pyf = pyIntFrame(320, initval=0, intbytes=size) 00182 val = 42 + size 00183 pyf.setint(val) 00184 self.assertEqual(int(pyf), val) 00185 self.assertEqual(str(pyf), ('pyIntFrame(320, (%d))' % val)) 00186 00187 00188 @class_teardown 00189 def tearDown(self): 00190 assert_no_dangling_Cclasses() 00191 00192 class pyUnknownFrameTest(TestCase): 00193 "An unknown frame is one we don't recognize the type of." 00194 def test_constructor(self): 00195 if DEBUG: print >>sys.stderr, "========================test_constructor(pyUnknownFrameTest)" 00196 pyf = pyUnknownFrame(400) 00197 self.assertEqual(pyf.frametype(), 400) 00198 # All Unknown frames are invalid... 00199 self.assertFalse(pyf.isvalid(), "pyUnkownFrame(400) should not have passed isvalid()") 00200 00201 @class_teardown 00202 def tearDown(self): 00203 assert_no_dangling_Cclasses() 00204 00205 00206 class pySeqnoFrameTest(TestCase): 00207 'A SeqnoFrame is a frame wrapping an ordered pair for a sequence number' 00208 def test_constructor(self): 00209 if DEBUG: print >>sys.stderr, "========================test_constructor(pySeqnoFrameTest)" 00210 pyf = pySeqnoFrame(500) 00211 self.assertEqual(pyf.frametype(), 500) 00212 self.assertTrue(pyf.isvalid(), 'pySeqnoFrame(500) did not pass isvalid()') 00213 pyf = pySeqnoFrame(501,(1,2)) 00214 self.assertEqual(pyf.frametype(), 501) 00215 self.assertTrue(pyf.isvalid(), 'pySeqnoFrame(501) did not pass isvalid()') 00216 00217 def test_reqid(self): 00218 'reqid is the request id of a sequence number' 00219 if DEBUG: print >>sys.stderr, "========================test_reqid(pySeqnoFrameTest)" 00220 pyf = pySeqnoFrame(502) 00221 pyf.setreqid(42) 00222 self.assertTrue(pyf.getreqid, 42) 00223 pyf.setreqid(43) 00224 self.assertTrue(pyf.getreqid, 43) 00225 00226 def test_qid(self): 00227 'qid is analogous to a port - it is the id of a queue on the other side' 00228 if DEBUG: print >>sys.stderr, "========================test_qid(pySeqnoFrameTest)" 00229 pyf = pySeqnoFrame(503) 00230 pyf.setqid(6) 00231 self.assertTrue(pyf.getqid, 6) 00232 pyf.setqid(7) 00233 self.assertTrue(pyf.getqid, 7) 00234 00235 def test_equal(self): 00236 'A bit of overkill, but nothing really wrong with it' 00237 if DEBUG: print >>sys.stderr, "========================test_equal(pySeqnoFrameTest)" 00238 seqFrame1 = pySeqnoFrame( 504, (1,1)) 00239 seqFrame1b = pySeqnoFrame(505, (1,1)) 00240 seqFrame2 = pySeqnoFrame( 506, (1,2)) 00241 seqFrame3 = pySeqnoFrame( 507, (2,1)) 00242 seqFrame4 = pySeqnoFrame( 508, (2,2)) 00243 seqFrame4b = pySeqnoFrame(509, (2,2)) 00244 self.assertTrue(seqFrame1 == seqFrame1) 00245 self.assertTrue(seqFrame1 == seqFrame1b) 00246 self.assertTrue(seqFrame1b == seqFrame1) 00247 self.assertFalse(seqFrame1 == seqFrame2) 00248 self.assertFalse(seqFrame1 == seqFrame3) 00249 self.assertFalse(seqFrame1 == seqFrame4) 00250 self.assertFalse(seqFrame1 == seqFrame4b) 00251 self.assertFalse(seqFrame2 == seqFrame1) 00252 self.assertFalse(seqFrame2 == seqFrame1b) 00253 self.assertFalse(seqFrame2 == seqFrame1) 00254 self.assertTrue (seqFrame2 == seqFrame2) 00255 self.assertFalse(seqFrame2 == seqFrame3) 00256 self.assertFalse(seqFrame2 == seqFrame4) 00257 self.assertFalse(seqFrame1 == seqFrame4b) 00258 self.assertFalse(seqFrame3 == seqFrame1) 00259 self.assertFalse(seqFrame3 == seqFrame1b) 00260 self.assertFalse(seqFrame3 == seqFrame1) 00261 self.assertFalse(seqFrame3 == seqFrame2) 00262 self.assertTrue (seqFrame3 == seqFrame3) 00263 self.assertFalse(seqFrame3 == seqFrame4) 00264 self.assertFalse(seqFrame3 == seqFrame4b) 00265 self.assertFalse(seqFrame4 == seqFrame1) 00266 self.assertFalse(seqFrame4 == seqFrame1b) 00267 self.assertFalse(seqFrame4 == seqFrame1) 00268 self.assertFalse(seqFrame4 == seqFrame2) 00269 self.assertFalse(seqFrame4 == seqFrame3) 00270 self.assertTrue (seqFrame4 == seqFrame4) 00271 self.assertTrue (seqFrame4 == seqFrame4b) 00272 self.assertTrue(seqFrame4b == seqFrame4) 00273 00274 @class_teardown 00275 def tearDown(self): 00276 assert_no_dangling_Cclasses() 00277 00278 00279 class pyCstringFrameTest(TestCase): 00280 '''A CstringFrame is a frame which can only hold NUL-terminated C strings. 00281 The last byte must be the one and only NUL character in a CstringFrame value.''' 00282 def test_constructor(self): 00283 if DEBUG: print >>sys.stderr, "========================test_constructor(pyCstringFrameTest)" 00284 pyf = pyCstringFrame(600, "Hello, World.") 00285 self.assertTrue(pyf.isvalid()) 00286 self.assertEqual(str(pyf), '600: CstringFrame(600, "Hello, World.")') 00287 pyf2 = pyCstringFrame(601) 00288 self.assertFalse(pyf2.isvalid()) 00289 pyf2.setvalue("42") 00290 self.assertTrue(pyf2.isvalid()) 00291 self.assertEqual(str(pyf2), '601: CstringFrame(601, "42")') 00292 00293 @class_teardown 00294 def tearDown(self): 00295 assert_no_dangling_Cclasses() 00296 00297 class pySignFrameTest(TestCase): 00298 'A SignFrame is a digital signature frame.' 00299 def test_constructor(self): 00300 if DEBUG: print >>sys.stderr, "========================test_constructor(pySignFrameTest)" 00301 pyf = pySignFrame(1) # the 1 determines the type of digital signature 00302 self.assertTrue(pyf.isvalid()) 00303 self.assertRaises(ValueError, pySignFrame, 935) # Just a random invalid signature type 00304 00305 @class_teardown 00306 def tearDown(self): 00307 assert_no_dangling_Cclasses() 00308 00309 00310 class pyFrameSetTest(TestCase): 00311 'A FrameSet is a collection of frames - typically to be sent over the wire' 00312 00313 @staticmethod 00314 def cmpstring(frame): 00315 s=str(frame) 00316 s = re.sub(' at 0x[^{}]*', ' at 0xsomewhere', s) 00317 return s 00318 00319 def test_constructor(self): 00320 if DEBUG: print >>sys.stderr, "========================test_constructor(pyFrameSetTest)" 00321 pyf = pyFrameSet(700) # The 700 is the frameset (message) type 00322 self.assertEqual(pyf.get_framesettype(), 700) 00323 00324 def test_flags(self): 00325 if DEBUG: print >>sys.stderr, "========================test_flags(pyFrameSetTest)" 00326 'Flags are bit masks, to be turned on or off. They are 16-bits only.' 00327 pyf = pyFrameSet(701) 00328 self.assertEqual(pyf.get_flags(), 0x00) 00329 pyf.set_flags(0x01) 00330 self.assertEqual(pyf.get_flags(), 0x01) 00331 pyf.set_flags(0x01) 00332 self.assertEqual(pyf.get_flags(), 0x01) 00333 pyf.set_flags(0x02) 00334 self.assertEqual(pyf.get_flags(), 0x03) 00335 pyf.clear_flags(0x01) 00336 self.assertEqual(pyf.get_flags(), 0x02) 00337 pyf.set_flags(0x0fffffffffffffffff) 00338 self.assertEqual(pyf.get_flags(), 0x0ffff) 00339 pyf.clear_flags(0x5555) 00340 self.assertEqual(pyf.get_flags(), 0x0AAAA) 00341 00342 def test_buildlistforward(self): 00343 'Build a FrameSet using append and verify that it gets built right' 00344 if DEBUG: print >>sys.stderr, "========================test_buildlistforward(pyFrameSetTest)" 00345 pyfs = pyFrameSet(702) 00346 sign = pySignFrame(1) # digital signature frame 00347 flist = (pyFrame(703), pyAddrFrame(704, (42,42,42,42)), pyIntFrame(705,42), 00348 pyCstringFrame(706, "HhGttG"), 00349 pySeqnoFrame(707, (42, 424242424242)), 00350 pyIpPortFrame(200, (1,2,3,4), 1984), 00351 pyIpPortFrame(202, (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16), 1984) 00352 ) 00353 for frame in flist: 00354 pyfs.append(frame) 00355 self.assertEqual(len(pyfs), 7) 00356 #pyfs.dump() 00357 ylist = [] 00358 # The iter member function is a generator. I love it. 00359 # It goes to a lot of trouble to wrap the underlying C Classes with Python classes. 00360 for frame in pyfs.iter(): 00361 ylist.append(frame) 00362 for i in range(0,len(ylist)): 00363 f=flist[i] 00364 y=ylist[i] 00365 # This isn't exhaustive, but it isn't bad. 00366 self.assertEqual(f.frametype(), y.frametype()) 00367 self.assertEqual(type(f), type(y)) 00368 # Constructing the packet will add a signature frame at the beginning 00369 # and an END (type 0) frame at the end 00370 pyfs.construct_packet(sign) 00371 # So we do it over again to make sure everything still looks OK 00372 ylist = [] 00373 for frame in pyfs.iter(): 00374 ylist.append(frame) 00375 self.assertEqual(len(pyfs), 9) 00376 self.assertEqual(len(ylist), len(pyfs)) # len(pyfs) traverses the linked list 00377 for i in range(0,len(flist)): 00378 f=flist[i] 00379 y=ylist[i+1] 00380 # This isn't exhaustive, but it isn't bad. 00381 self.assertEqual(f.frametype(), y.frametype()) 00382 self.assertEqual(type(f), type(y)) 00383 # Check on our automatically added frames. 00384 self.assertEqual(ylist[0].frametype(), 1) 00385 self.assertEqual(ylist[8].frametype(), 0) 00386 00387 def test_buildlistbackwards(self): 00388 '''Build a FrameSet using prepend and verify that it gets built right. 00389 Similar to the append testing above, but backwards ;-)''' 00390 if DEBUG: print >>sys.stderr, "========================test_buildlistbackwards(pyFrameSetTest)" 00391 pyfs = pyFrameSet(707) 00392 sign = pySignFrame(1) 00393 flist = (pyFrame(708), pyAddrFrame(709, (42,42,42,42)), pyIntFrame(710,42), pyCstringFrame(711, "HhGttG"), 00394 pySeqnoFrame(712, (42, 424242424242))) 00395 for frame in flist: 00396 pyfs.prepend(frame) 00397 self.assertEqual(len(pyfs), 5) 00398 #pyfs.dump() 00399 ylist = [] 00400 for frame in pyfs.iter(): 00401 ylist.append(frame) 00402 for i in range(0,len(flist)): 00403 f=flist[i] 00404 y=ylist[4-i] 00405 # This isn't exhaustive, but it isn't bad. 00406 self.assertEqual(f.frametype(), y.frametype()) 00407 self.assertEqual(type(f), type(y)) 00408 self.assertEqual(f.__class__, y.__class__) 00409 if DEBUG: print >>sys.stderr, "Classes are", f.__class__, "lens are", f.framelen(), y.framelen() 00410 self.assertEqual(f.framelen(), y.framelen()) 00411 00412 def test_buildpacket(self): 00413 'Build a FrameSet, then make it into a packet, and make a frameset list out of the packet' 00414 if DEBUG: print >>sys.stderr, "========================test_buildpacket(pyFrameSetTest)" 00415 pyfs = pyFrameSet(801) 00416 sign = pySignFrame(1) # digital signature frame 00417 flist = (pyAddrFrame(FrameTypes.IPADDR, (42,42,42,42)), pyIntFrame(FrameTypes.WALLCLOCK,42), pyCstringFrame(FrameTypes.INTERFACE, "HhGttG"), 00418 pyIntFrame(FrameTypes.CINTVAL,3000000, intbytes=4), 00419 pyIntFrame(FrameTypes.CINTVAL,3000000000000, intbytes=8), 00420 pySeqnoFrame(FrameTypes.REQID, (42, 424242424242)), 00421 pyIntFrame(FrameTypes.CINTVAL,4242, intbytes=3)) 00422 if DEBUG: print >>sys.stderr, "flist:", flist 00423 decoder = pyPacketDecoder(0) 00424 for frame in flist: 00425 pyfs.append(frame) 00426 pyfs.construct_packet(sign) 00427 if DEBUG: print >>sys.stderr, "packet constructed" 00428 xlist=[] 00429 for frame in pyfs.iter(): 00430 xlist.append(frame) 00431 if DEBUG: print >>sys.stderr, "xlist constructed" 00432 pktdata = pyfs.getpacket() 00433 if DEBUG: print >>sys.stderr, "getpacket done", pktdata 00434 cp_pyfs = decoder.fslist_from_pktdata(pktdata) 00435 if DEBUG: print >>sys.stderr, "decoder done", cp_pyfs 00436 fs0 = cp_pyfs[0] 00437 ylist=[] 00438 for frame in fs0.iter(): 00439 ylist.append(frame) 00440 for i in range(0,len(xlist)): 00441 x=xlist[i] 00442 y=ylist[i] 00443 self.assertEqual(x.frametype(), y.frametype()) 00444 self.assertEqual(x.framelen(), y.framelen()) 00445 self.assertEqual(x.dataspace(), y.dataspace()) 00446 self.assertEqual(type(x), type(y)) 00447 self.assertEqual(x.__class__, y.__class__) 00448 # Not all our classes have a __str__ method defined. 00449 strx = re.sub(str(x), ' instance at .*>', ' instance at -somewhere- >') 00450 stry = re.sub(str(y), ' instance at .*>', ' instance at -somewhere- >') 00451 self.assertEqual(strx, stry) 00452 self.assertEqual(pyFrameSetTest.cmpstring(x), pyFrameSetTest.cmpstring(y)) 00453 00454 00455 @class_teardown 00456 def tearDown(self): 00457 assert_no_dangling_Cclasses() 00458 00459 class pyConfigContextTest(TestCase): 00460 00461 def test_constructor(self): 00462 pyConfigContext() 00463 if DEBUG: print >>sys.stderr, "===============test_constructor(pyConfigContextTest)" 00464 foo = pyConfigContext(init={'int1': 42, 'str1': 'forty-two', 'bar': pyNetAddr((1,2,3,4),) }) 00465 foo = pyConfigContext(init={'int1': 42, 'str1': 'forty-two', 'bar': pyNetAddr((1,2,3,4),), 'csf': pyCstringFrame(42, '41+1')}) 00466 self.assertEqual(foo.getint('int1'), 42) 00467 self.assertEqual(foo.getstring('str1'), 'forty-two') 00468 self.assertRaises(IndexError, foo.getaddr, ('int1')) 00469 self.assertRaises(IndexError, foo.getstring, ('int1')) 00470 self.assertRaises(IndexError, foo.getaddr, ('str1')) 00471 self.assertRaises(IndexError, foo.getframe, ('str1')) 00472 self.assertRaises(IndexError, foo.getframe, ('int1')) 00473 self.assertEqual(foo['int1'], 42) 00474 self.assertEqual(foo['str1'], 'forty-two') 00475 self.assertEqual(foo.getint('fred'), -1) 00476 foo['bar'] 00477 self.assertEqual(foo['bar'], pyNetAddr((1,2,3,4),)) 00478 self.assertEqual(str(foo['bar']), '1.2.3.4') 00479 self.assertEqual(str(foo['csf']), '42: CstringFrame(42, "41+1")') 00480 self.assertEqual(str(foo), '{"str1":"forty-two","csf":"CstringFrame(42, \\"41+1\\")","int1":42,"bar":"1.2.3.4"}') 00481 00482 foo['isf'] = pyIntFrame(310, initval=42, intbytes=3) 00483 if DEBUG: print >>sys.stderr, "test_constructor.18(pyConfigContextTest)" 00484 self.assertEqual(str(foo), 00485 '{"isf":"IntFrame(310, 3, 42)","str1":"forty-two","csf":"CstringFrame(42, \\"41+1\\")","int1":42,"bar":"1.2.3.4"}') 00486 if DEBUG: print >>sys.stderr, "test_constructor.19(pyConfigContextTest)" 00487 00488 def test_string(self): 00489 if DEBUG: print >>sys.stderr, "test_string(pyConfigContextTest)" 00490 foo = pyConfigContext() 00491 foo['arthur'] = 'dent' 00492 foo['seven'] = 'ofnine' 00493 foo['JeanLuc'] = 'Picard' 00494 foo['important'] = 'towel' 00495 foo['integer'] = 42 00496 self.assertEqual(foo['arthur'], 'dent') 00497 self.assertEqual(foo['seven'], 'ofnine') 00498 self.assertEqual(foo['JeanLuc'], 'Picard') 00499 self.assertEqual(foo['important'], 'towel') 00500 self.assertRaises(IndexError, foo.getstring, ('towel')) 00501 self.assertEqual(str(foo), '{"arthur":"dent","JeanLuc":"Picard","important":"towel","integer":42,"seven":"ofnine"}') 00502 foo['seven'] = '7' 00503 self.assertEqual(foo['seven'], '7') 00504 self.assertEqual(type(foo['seven']), str) 00505 foo['JeanLuc'] = 'Locutus' 00506 self.assertEqual(foo['JeanLuc'], 'Locutus') 00507 self.assertEqual(str(foo), '{"arthur":"dent","JeanLuc":"Locutus","important":"towel","integer":42,"seven":"7"}') 00508 00509 def test_int(self): 00510 if DEBUG: print >>sys.stderr, "test_int(pyConfigContextTest)" 00511 foo = pyConfigContext() 00512 foo['arthur'] = 42 00513 foo['seven'] = 9 00514 self.assertEqual(foo['arthur'], 42) 00515 self.assertEqual(foo['seven'], 9) 00516 self.assertEqual(str(foo), '{"arthur":42,"seven":9}') 00517 foo['seven'] = 7 00518 self.assertEqual(type(foo['seven']), int) 00519 self.assertEqual(foo["seven"], 7) 00520 self.assertEqual(str(foo), '{"arthur":42,"seven":7}') 00521 00522 def test_child_ConfigContext(self): 00523 if DEBUG: print >>sys.stderr, "========================test_child_ConfigContext(pyConfigContextTest)" 00524 foo = pyConfigContext() 00525 foo['ford'] = 'prefect' 00526 baz = pyConfigContext() 00527 baz['Kathryn'] = 'Janeway' 00528 baz['there\'s no place like'] = pyNetAddr((127,0,0,1),) 00529 bar = pyConfigContext() 00530 bar['hhgttg'] = foo 00531 bar['voyager'] = baz 00532 if DEBUG: print >>sys.stderr, "EQUAL TEST" 00533 self.assertEqual(str(bar), '{"hhgttg":{"ford":"prefect"},"voyager":{"there\'s no place like":"127.0.0.1","Kathryn":"Janeway"}}') 00534 # We make a new pyConfigContext object from the str() of another one. Cool! 00535 if DEBUG: print >>sys.stderr, "JSON TEST" 00536 bar2 = pyConfigContext(str(bar)) 00537 if DEBUG: print >>sys.stderr, "JSON COMPARE" 00538 self.assertEqual(str(bar), str(bar2)) 00539 self.assertEqual(bar["voyager"]["Kathryn"], "Janeway") 00540 self.assertEqual(bar["hhgttg"]["ford"], "prefect") 00541 self.assertEqual(bar2["voyager"]["Kathryn"], "Janeway") 00542 self.assertEqual(bar2["hhgttg"]["ford"], "prefect") 00543 if DEBUG: print >>sys.stderr, "COMPARE DONE" 00544 # However... The pyNetAddr() was turned into a mere string :-( - at least for the moment... Sigh... 00545 if DEBUG: print >>sys.stderr, "END OF ========================test_child_ConfigContext(pyConfigContextTest)" 00546 00547 00548 def test_keys(self): 00549 if DEBUG: print >>sys.stderr, "===============test_keys(pyConfigContextTest)" 00550 foo = pyConfigContext() 00551 foo['arthur'] = 'dent' 00552 foo['seven'] = 'ofnine' 00553 foo['JeanLuc'] = 'Picard' 00554 foo['important'] = 'towel' 00555 foo['integer'] = 42 00556 self.assertEqual(str(foo.keys()), "['JeanLuc', 'arthur', 'important', 'integer', 'seven']") 00557 00558 def test_ConfigContext_array(self): 00559 array1str = '{"a":[1,2,3,4,"a",{"b":true},[5,6,7,8,3.14]]}' 00560 array1config = pyConfigContext(array1str) 00561 self.assertEqual(array1str, str(array1config)) 00562 00563 00564 @class_teardown 00565 def tearDown(self): 00566 assert_no_dangling_Cclasses() 00567 00568 class pyNetIOudpTest(TestCase): 00569 00570 def test_constructor(self): 00571 if DEBUG: print >>sys.stderr, "========================test_constructor(pyNetIOudpTest)" 00572 config = pyConfigContext(init={'outsig': pySignFrame(1)}) 00573 io = pyNetIOudp(config, pyPacketDecoder(0)) 00574 self.assertTrue(io.getfd() > 2) 00575 00576 def test_members(self): 00577 if DEBUG: print >>sys.stderr, "========================test_members(pyNetIOudpTest)" 00578 io = pyNetIOudp(pyConfigContext(init={'outsig': pySignFrame(1)}), pyPacketDecoder(0)) 00579 self.assertTrue(io.getmaxpktsize() > 65000) 00580 self.assertTrue(io.getmaxpktsize() < 65535) 00581 io.setmaxpktsize(1500) 00582 self.assertEqual(io.getmaxpktsize(), 1500) 00583 # Does signframe really work? Next statement seems to crash things 00584 #self.assertEqual(type(io.signframe()), type(pySignFrame(1))) 00585 00586 def test_send(self): 00587 if DEBUG: print >>sys.stderr, "========================test_send(pyNetIOudpTest)" 00588 home = pyNetAddr((127,0,0,1),1984) 00589 fs = pyFrameSet(801) 00590 flist = (pyAddrFrame(FrameTypes.IPADDR, (42,42,42,42)), pyIntFrame(FrameTypes.HBWARNTIME,42), pyCstringFrame(FrameTypes.HOSTNAME, "HhGttG"), 00591 pyIntFrame(FrameTypes.PORTNUM,3000000, intbytes=4), 00592 pyIntFrame(FrameTypes.HBINTERVAL,3000000000000, intbytes=8), 00593 pySeqnoFrame(FrameTypes.REPLYID, (42, 424242424242)), 00594 pyIntFrame(FrameTypes.CINTVAL,4242, intbytes=3)) 00595 for frame in flist: 00596 fs.append(frame) 00597 io = pyNetIOudp(pyConfigContext(init={'outsig': pySignFrame(1)}), pyPacketDecoder(0)) 00598 io.sendframesets(home, fs) 00599 io.sendframesets(home, (fs,fs,fs)) 00600 00601 def test_receiveandsend(self): 00602 if DEBUG: print >>sys.stderr, "========================test_receiveandsend(pyNetIOudpTest)" 00603 home = pyNetAddr((127,0,0,1),1984) 00604 anyaddr = pyNetAddr((0,0,0,0),1984) 00605 fs = pyFrameSet(801) 00606 #flist = (pyIntFrame(7,42), pyCstringFrame(8, "HhGttG"), 00607 flist = (pyAddrFrame(FrameTypes.IPADDR, (42,42,42,42)), pyIntFrame(7,42), pyCstringFrame(8, "HhGttG"), 00608 pyAddrFrame(FrameTypes.IPADDR,(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)), 00609 pyIntFrame(FrameTypes.HBINTERVAL,3000000, intbytes=4), 00610 pyIntFrame(FrameTypes.HBDEADTIME,3000000000000, intbytes=8), 00611 pySeqnoFrame(FrameTypes.REPLYID, (42, 424242424242)), 00612 pyIntFrame(FrameTypes.PORTNUM,4242, intbytes=3)) 00613 for frame in flist: 00614 fs.append(frame) 00615 io = pyNetIOudp(pyConfigContext(init={'outsig': pySignFrame(1)}), pyPacketDecoder(0)) 00616 io.bindaddr(anyaddr) 00617 io.sendframesets(home, fs) # Send a packet with a single frameset containing a bunch of frames 00618 (addr, framesetlist) = io.recvframesets() # Receive a packet - with some framesets in it 00619 #print >>sys.stderr, 'ADDR: [%s] HOME: [%s]' % (addr, home) 00620 self.assertEqual(addr, home) 00621 self.assertEqual(len(framesetlist), 1) 00622 ylist = [] 00623 for frame in framesetlist[0].iter(): 00624 ylist.append(frame) 00625 self.assertEqual(len(flist), len(ylist)-2) 00626 for i in range(0,len(flist)): 00627 x=flist[i] 00628 y=ylist[i+1] 00629 self.assertEqual(x.frametype(), y.frametype()) 00630 self.assertEqual(x.framelen(), y.framelen()) 00631 self.assertEqual(x.dataspace(), y.dataspace()) 00632 self.assertEqual(type(x), type(y)) 00633 self.assertEqual(x.__class__, y.__class__) 00634 self.assertEqual(pyFrameSetTest.cmpstring(x), pyFrameSetTest.cmpstring(y)) 00635 00636 @class_teardown 00637 def tearDown(self): 00638 assert_no_dangling_Cclasses() 00639 00640 if __name__ == "__main__": 00641 run()