1 | #!/usr/bin/env python2
|
2 | # coding=utf8
|
3 | # Copyright 2021 Andy Chu. All rights reserved.
|
4 | # Licensed under the Apache License, Version 2.0 (the "License");
|
5 | # you may not use this file except in compliance with the License.
|
6 | # You may obtain a copy of the License at
|
7 | #
|
8 | # http://www.apache.org/licenses/LICENSE-2.0
|
9 | from __future__ import print_function
|
10 | """
|
11 | fanos_test.py: Tests for fanos.c
|
12 | """
|
13 | import errno
|
14 | import socket
|
15 | import sys
|
16 | import unittest
|
17 |
|
18 | from mycpp.mylib import log
|
19 |
|
20 | import fanos # module under test
|
21 |
|
22 |
|
23 | def netstring_encode(s):
|
24 | return b'%d:%s,' % (len(s), s)
|
25 |
|
26 |
|
27 | def netstring_recv(sock):
|
28 | """Plain decoder that IGNORES file descriptors.
|
29 |
|
30 | Using pure Python libs is a useful sanity check on the protocol.
|
31 | (Python 2 doesn't have recvmsg()).
|
32 | """
|
33 | len_buf = []
|
34 | while True:
|
35 | byte = sock.recv(1)
|
36 | #log('byte = %r', byte)
|
37 |
|
38 | if len(byte) == 0:
|
39 | raise RuntimeError('Expected a netstring length byte')
|
40 |
|
41 | if byte == b':':
|
42 | break
|
43 |
|
44 | if b'0' <= byte and byte <= b'9':
|
45 | len_buf.append(byte)
|
46 | else:
|
47 | raise RuntimeError('Invalid netstring length byte %r' % byte)
|
48 |
|
49 | num_bytes = int(b''.join(len_buf))
|
50 | log('num_bytes = %d', num_bytes)
|
51 |
|
52 | # +1 for the comma
|
53 | n = num_bytes
|
54 | msg = b''
|
55 | #fd_list = []
|
56 |
|
57 | while n > 0:
|
58 | chunk = sock.recv(n)
|
59 | log("chunk %r", chunk)
|
60 |
|
61 | msg += chunk
|
62 | n -= len(chunk)
|
63 |
|
64 | if len(msg) == n:
|
65 | break
|
66 |
|
67 | byte = sock.recv(1)
|
68 | if byte != b',':
|
69 | raise RuntimeError('Expected ,')
|
70 |
|
71 | return msg
|
72 |
|
73 |
|
74 | class FanosTest(unittest.TestCase):
|
75 |
|
76 | def testSend(self):
|
77 | """Send with our fanos library; receive with Python stdlib."""
|
78 |
|
79 | print('\n___ fanos.send ___')
|
80 | left, right = socket.socketpair()
|
81 | print(left)
|
82 | print(right)
|
83 |
|
84 | print(fanos.send(left.fileno(), b'foo'))
|
85 | print(fanos.send(left.fileno(), b'https://www.oilshell.org/',
|
86 | sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()))
|
87 |
|
88 | msg = netstring_recv(right)
|
89 | self.assertEqual('foo', msg)
|
90 | msg = netstring_recv(right)
|
91 | self.assertEqual('https://www.oilshell.org/', msg)
|
92 |
|
93 | def testRecv(self):
|
94 | """Send with Python; received our fanos library"""
|
95 | print('\n___ fanos.recv ___')
|
96 | left, right = socket.socketpair()
|
97 |
|
98 | left.send(netstring_encode('spam'))
|
99 |
|
100 | fd_out = []
|
101 | msg = fanos.recv(right.fileno(), fd_out)
|
102 | self.assertEqual('spam', msg)
|
103 | print("msg = %r" % msg)
|
104 | print('fd_out = %s' % fd_out)
|
105 |
|
106 | left.send(netstring_encode('eggs-eggs-eggs'))
|
107 |
|
108 | msg = fanos.recv(right.fileno(), fd_out)
|
109 | self.assertEqual('eggs-eggs-eggs', msg)
|
110 | print("py msg = %r" % msg)
|
111 | print('fd_out = %s' % fd_out)
|
112 |
|
113 | # Empty string
|
114 | left.send(netstring_encode(''))
|
115 |
|
116 | msg = fanos.recv(right.fileno(), fd_out)
|
117 | self.assertEqual('', msg)
|
118 | print("py msg = %r" % msg)
|
119 | print('fd_out = %s' % fd_out)
|
120 |
|
121 | def testIOErrors(self):
|
122 | try:
|
123 | fanos.send(99, b'foo')
|
124 | except IOError as e:
|
125 | print(e)
|
126 | print(type(e))
|
127 | self.assertEqual(errno.EBADF, e.errno)
|
128 | else:
|
129 | self.fail('Expected IOError')
|
130 |
|
131 | try:
|
132 | result = fanos.recv(99, [])
|
133 | except IOError as e:
|
134 | print(e)
|
135 | print(type(e))
|
136 | self.assertEqual(errno.EBADF, e.errno)
|
137 | else:
|
138 | self.fail('Expected IOError')
|
139 |
|
140 | def testRecvErrors(self):
|
141 | left, right = socket.socketpair()
|
142 |
|
143 | # TODO: test invalid netstring cases
|
144 | # Instead of RuntimeError they should be fanos.error?
|
145 | # Instead of 'OK' you return
|
146 | # 'fanos ERROR: Invalid netstring'
|
147 |
|
148 | # This is OK
|
149 | left.send(b'000000003:foo,')
|
150 |
|
151 | fd_out = []
|
152 | msg = fanos.recv(right.fileno(), fd_out)
|
153 | print("msg = %r" % msg)
|
154 | print('fd_out = %s' % fd_out)
|
155 |
|
156 | # This is too long
|
157 | left.send(b'0000000003:foo,')
|
158 |
|
159 | try:
|
160 | msg = fanos.recv(right.fileno(), fd_out)
|
161 | except ValueError:
|
162 | pass
|
163 | else:
|
164 | self.fail('Expected ValueError')
|
165 |
|
166 | print("msg = %r" % msg)
|
167 | print('fd_out = %s' % fd_out)
|
168 |
|
169 | def testSendRecv(self):
|
170 | """Send and receive with our fanos library"""
|
171 | print('\n___ testSendReceive ___')
|
172 |
|
173 | left, right = socket.socketpair()
|
174 |
|
175 | print(fanos.send(left.fileno(), b'foo'))
|
176 | print(fanos.send(left.fileno(), b'https://www.oilshell.org/',
|
177 | sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()))
|
178 |
|
179 | fd_out = []
|
180 | msg = fanos.recv(right.fileno(), fd_out)
|
181 | self.assertEqual('foo', msg)
|
182 | self.assertEqual([], fd_out)
|
183 | print("py msg = %r" % msg)
|
184 | print('fd_out = %s' % fd_out)
|
185 |
|
186 | del fd_out[:]
|
187 | msg = fanos.recv(right.fileno(), fd_out)
|
188 | self.assertEqual('https://www.oilshell.org/', msg)
|
189 | self.assertEqual(3, len(fd_out))
|
190 |
|
191 | print("py msg = %r" % msg)
|
192 | print('fd_out = %s' % fd_out)
|
193 |
|
194 | left.close()
|
195 | msg = fanos.recv(right.fileno(), fd_out)
|
196 | self.assertEqual(None, msg) # Valid EOF
|
197 |
|
198 | right.close()
|
199 |
|
200 |
|
201 | class InvalidMessageTests(unittest.TestCase):
|
202 | """COPIED from py_fanos_test.py."""
|
203 |
|
204 | def testInvalidColon(self):
|
205 | fd_out = []
|
206 | left, right = socket.socketpair()
|
207 |
|
208 | left.send(b':') # Should be 3:foo,
|
209 | try:
|
210 | msg = fanos.recv(right.fileno(), fd_out)
|
211 | except ValueError as e:
|
212 | print(type(e))
|
213 | print(e)
|
214 | else:
|
215 | self.fail('Expected failure')
|
216 |
|
217 | left.close()
|
218 | right.close()
|
219 |
|
220 | def testInvalidDigits(self):
|
221 | fd_out = []
|
222 | left, right = socket.socketpair()
|
223 |
|
224 | left.send(b'34') # EOF in the middle of length
|
225 | left.close()
|
226 | try:
|
227 | msg = fanos.recv(right.fileno(), fd_out)
|
228 | except ValueError as e:
|
229 | print(type(e))
|
230 | print(e)
|
231 | else:
|
232 | self.fail('Expected failure')
|
233 |
|
234 | right.close()
|
235 |
|
236 | def testInvalidMissingColon(self):
|
237 | fd_out = []
|
238 | left, right = socket.socketpair()
|
239 |
|
240 | left.send(b'34foo')
|
241 | left.close()
|
242 | try:
|
243 | msg = fanos.recv(right.fileno(), fd_out)
|
244 | except ValueError as e:
|
245 | print(type(e))
|
246 | print(e)
|
247 | else:
|
248 | self.fail('Expected failure')
|
249 |
|
250 | right.close()
|
251 |
|
252 | def testInvalidMissingComma(self):
|
253 | fd_out = []
|
254 | left, right = socket.socketpair()
|
255 |
|
256 | # Short payload BLOCKS indefinitely?
|
257 | #left.send(b'3:fo')
|
258 |
|
259 | left.send(b'3:foo') # missing comma
|
260 |
|
261 | left.close()
|
262 | try:
|
263 | msg = fanos.recv(right.fileno(), fd_out)
|
264 | except ValueError as e:
|
265 | print(type(e))
|
266 | print(e)
|
267 | else:
|
268 | self.fail('Expected failure')
|
269 |
|
270 | right.close()
|
271 |
|
272 |
|
273 | if __name__ == '__main__':
|
274 | unittest.main()
|