OILS / pyext / fanos_test.py View on Github | oilshell.org

274 lines, 178 significant
1#!/usr/bin/env python2
2# coding=utf8
3# Copyright 2021 Andy Chu. All rights reserved.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9from __future__ import print_function
10"""
11fanos_test.py: Tests for fanos.c
12"""
13import errno
14import socket
15import sys
16import unittest
17
18from mycpp.mylib import log
19
20import fanos # module under test
21
22
23def netstring_encode(s):
24 return b'%d:%s,' % (len(s), s)
25
26
27def netstring_recv(sock):
28 """Plain decoder that IGNORES file descriptors.
29
30 Using pure Python libs is a useful sanity check on the protocol.
31 (Python 2 doesn't have recvmsg()).
32 """
33 len_buf = []
34 while True:
35 byte = sock.recv(1)
36 #log('byte = %r', byte)
37
38 if len(byte) == 0:
39 raise RuntimeError('Expected a netstring length byte')
40
41 if byte == b':':
42 break
43
44 if b'0' <= byte and byte <= b'9':
45 len_buf.append(byte)
46 else:
47 raise RuntimeError('Invalid netstring length byte %r' % byte)
48
49 num_bytes = int(b''.join(len_buf))
50 log('num_bytes = %d', num_bytes)
51
52 # +1 for the comma
53 n = num_bytes
54 msg = b''
55 #fd_list = []
56
57 while n > 0:
58 chunk = sock.recv(n)
59 log("chunk %r", chunk)
60
61 msg += chunk
62 n -= len(chunk)
63
64 if len(msg) == n:
65 break
66
67 byte = sock.recv(1)
68 if byte != b',':
69 raise RuntimeError('Expected ,')
70
71 return msg
72
73
74class FanosTest(unittest.TestCase):
75
76 def testSend(self):
77 """Send with our fanos library; receive with Python stdlib."""
78
79 print('\n___ fanos.send ___')
80 left, right = socket.socketpair()
81 print(left)
82 print(right)
83
84 print(fanos.send(left.fileno(), b'foo'))
85 print(fanos.send(left.fileno(), b'https://www.oilshell.org/',
86 sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()))
87
88 msg = netstring_recv(right)
89 self.assertEqual('foo', msg)
90 msg = netstring_recv(right)
91 self.assertEqual('https://www.oilshell.org/', msg)
92
93 def testRecv(self):
94 """Send with Python; received our fanos library"""
95 print('\n___ fanos.recv ___')
96 left, right = socket.socketpair()
97
98 left.send(netstring_encode('spam'))
99
100 fd_out = []
101 msg = fanos.recv(right.fileno(), fd_out)
102 self.assertEqual('spam', msg)
103 print("msg = %r" % msg)
104 print('fd_out = %s' % fd_out)
105
106 left.send(netstring_encode('eggs-eggs-eggs'))
107
108 msg = fanos.recv(right.fileno(), fd_out)
109 self.assertEqual('eggs-eggs-eggs', msg)
110 print("py msg = %r" % msg)
111 print('fd_out = %s' % fd_out)
112
113 # Empty string
114 left.send(netstring_encode(''))
115
116 msg = fanos.recv(right.fileno(), fd_out)
117 self.assertEqual('', msg)
118 print("py msg = %r" % msg)
119 print('fd_out = %s' % fd_out)
120
121 def testIOErrors(self):
122 try:
123 fanos.send(99, b'foo')
124 except IOError as e:
125 print(e)
126 print(type(e))
127 self.assertEqual(errno.EBADF, e.errno)
128 else:
129 self.fail('Expected IOError')
130
131 try:
132 result = fanos.recv(99, [])
133 except IOError as e:
134 print(e)
135 print(type(e))
136 self.assertEqual(errno.EBADF, e.errno)
137 else:
138 self.fail('Expected IOError')
139
140 def testRecvErrors(self):
141 left, right = socket.socketpair()
142
143 # TODO: test invalid netstring cases
144 # Instead of RuntimeError they should be fanos.error?
145 # Instead of 'OK' you return
146 # 'fanos ERROR: Invalid netstring'
147
148 # This is OK
149 left.send(b'000000003:foo,')
150
151 fd_out = []
152 msg = fanos.recv(right.fileno(), fd_out)
153 print("msg = %r" % msg)
154 print('fd_out = %s' % fd_out)
155
156 # This is too long
157 left.send(b'0000000003:foo,')
158
159 try:
160 msg = fanos.recv(right.fileno(), fd_out)
161 except ValueError:
162 pass
163 else:
164 self.fail('Expected ValueError')
165
166 print("msg = %r" % msg)
167 print('fd_out = %s' % fd_out)
168
169 def testSendRecv(self):
170 """Send and receive with our fanos library"""
171 print('\n___ testSendReceive ___')
172
173 left, right = socket.socketpair()
174
175 print(fanos.send(left.fileno(), b'foo'))
176 print(fanos.send(left.fileno(), b'https://www.oilshell.org/',
177 sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()))
178
179 fd_out = []
180 msg = fanos.recv(right.fileno(), fd_out)
181 self.assertEqual('foo', msg)
182 self.assertEqual([], fd_out)
183 print("py msg = %r" % msg)
184 print('fd_out = %s' % fd_out)
185
186 del fd_out[:]
187 msg = fanos.recv(right.fileno(), fd_out)
188 self.assertEqual('https://www.oilshell.org/', msg)
189 self.assertEqual(3, len(fd_out))
190
191 print("py msg = %r" % msg)
192 print('fd_out = %s' % fd_out)
193
194 left.close()
195 msg = fanos.recv(right.fileno(), fd_out)
196 self.assertEqual(None, msg) # Valid EOF
197
198 right.close()
199
200
201class InvalidMessageTests(unittest.TestCase):
202 """COPIED from py_fanos_test.py."""
203
204 def testInvalidColon(self):
205 fd_out = []
206 left, right = socket.socketpair()
207
208 left.send(b':') # Should be 3:foo,
209 try:
210 msg = fanos.recv(right.fileno(), fd_out)
211 except ValueError as e:
212 print(type(e))
213 print(e)
214 else:
215 self.fail('Expected failure')
216
217 left.close()
218 right.close()
219
220 def testInvalidDigits(self):
221 fd_out = []
222 left, right = socket.socketpair()
223
224 left.send(b'34') # EOF in the middle of length
225 left.close()
226 try:
227 msg = fanos.recv(right.fileno(), fd_out)
228 except ValueError as e:
229 print(type(e))
230 print(e)
231 else:
232 self.fail('Expected failure')
233
234 right.close()
235
236 def testInvalidMissingColon(self):
237 fd_out = []
238 left, right = socket.socketpair()
239
240 left.send(b'34foo')
241 left.close()
242 try:
243 msg = fanos.recv(right.fileno(), fd_out)
244 except ValueError as e:
245 print(type(e))
246 print(e)
247 else:
248 self.fail('Expected failure')
249
250 right.close()
251
252 def testInvalidMissingComma(self):
253 fd_out = []
254 left, right = socket.socketpair()
255
256 # Short payload BLOCKS indefinitely?
257 #left.send(b'3:fo')
258
259 left.send(b'3:foo') # missing comma
260
261 left.close()
262 try:
263 msg = fanos.recv(right.fileno(), fd_out)
264 except ValueError as e:
265 print(type(e))
266 print(e)
267 else:
268 self.fail('Expected failure')
269
270 right.close()
271
272
273if __name__ == '__main__':
274 unittest.main()