OILS / builtin / func_misc.py View on Github | oilshell.org

483 lines, 286 significant
1#!/usr/bin/env python2
2"""
3func_misc.py
4"""
5from __future__ import print_function
6
7from _devbuild.gen.runtime_asdl import (scope_e)
8from _devbuild.gen.value_asdl import (value, value_e, value_t, value_str)
9
10from core import error
11from core import num
12from core import state
13from core import ui
14from core import vm
15from data_lang import j8
16from frontend import match
17from frontend import typed_args
18from mycpp import mops
19from mycpp import mylib
20from mycpp.mylib import NewDict, iteritems, log, tagswitch
21from ysh import expr_eval
22from ysh import val_ops
23
24from typing import TYPE_CHECKING, Dict, List, cast
25if TYPE_CHECKING:
26 from osh import glob_
27 from osh import split
28
29_ = log
30
31
32class Len(vm._Callable):
33
34 def __init__(self):
35 # type: () -> None
36 pass
37
38 def Call(self, rd):
39 # type: (typed_args.Reader) -> value_t
40
41 x = rd.PosValue()
42 rd.Done()
43
44 UP_x = x
45 with tagswitch(x) as case:
46 if case(value_e.List):
47 x = cast(value.List, UP_x)
48 return num.ToBig(len(x.items))
49
50 elif case(value_e.Dict):
51 x = cast(value.Dict, UP_x)
52 return num.ToBig(len(x.d))
53
54 elif case(value_e.Str):
55 x = cast(value.Str, UP_x)
56 return num.ToBig(len(x.s))
57
58 raise error.TypeErr(x, 'len() expected Str, List, or Dict',
59 rd.BlamePos())
60
61
62class Join(vm._Callable):
63 """Both free function join() and List->join() method."""
64
65 def __init__(self):
66 # type: () -> None
67 pass
68
69 def Call(self, rd):
70 # type: (typed_args.Reader) -> value_t
71
72 li = rd.PosList()
73 delim = rd.OptionalStr(default_='')
74 rd.Done()
75
76 strs = [] # type: List[str]
77 for i, el in enumerate(li):
78 strs.append(val_ops.Stringify(el, rd.LeftParenToken()))
79
80 return value.Str(delim.join(strs))
81
82
83class Maybe(vm._Callable):
84
85 def __init__(self):
86 # type: () -> None
87 pass
88
89 def Call(self, rd):
90 # type: (typed_args.Reader) -> value_t
91
92 val = rd.PosValue()
93 rd.Done()
94
95 if val == value.Null:
96 return value.List([])
97
98 s = val_ops.ToStr(
99 val, 'maybe() expected Str, but got %s' % value_str(val.tag()),
100 rd.LeftParenToken())
101 if len(s):
102 return value.List([val]) # use val to avoid needlessly copy
103
104 return value.List([])
105
106
107class Type(vm._Callable):
108
109 def __init__(self):
110 # type: () -> None
111 pass
112
113 def Call(self, rd):
114 # type: (typed_args.Reader) -> value_t
115
116 val = rd.PosValue()
117 rd.Done()
118
119 return value.Str(ui.ValType(val))
120
121
122class Bool(vm._Callable):
123
124 def __init__(self):
125 # type: () -> None
126 pass
127
128 def Call(self, rd):
129 # type: (typed_args.Reader) -> value_t
130
131 val = rd.PosValue()
132 rd.Done()
133
134 return value.Bool(val_ops.ToBool(val))
135
136
137class Int(vm._Callable):
138
139 def __init__(self):
140 # type: () -> None
141 pass
142
143 def Call(self, rd):
144 # type: (typed_args.Reader) -> value_t
145
146 val = rd.PosValue()
147 rd.Done()
148
149 UP_val = val
150 with tagswitch(val) as case:
151 if case(value_e.Int):
152 return val
153
154 elif case(value_e.Bool):
155 val = cast(value.Bool, UP_val)
156 return value.Int(mops.FromBool(val.b))
157
158 elif case(value_e.Float):
159 val = cast(value.Float, UP_val)
160 return value.Int(mops.FromFloat(val.f))
161
162 elif case(value_e.Str):
163 val = cast(value.Str, UP_val)
164 if not match.LooksLikeInteger(val.s):
165 raise error.Expr('Cannot convert %s to Int' % val.s,
166 rd.BlamePos())
167
168 return value.Int(mops.FromStr(val.s))
169
170 raise error.TypeErr(val, 'int() expected Bool, Int, Float, or Str',
171 rd.BlamePos())
172
173
174class Float(vm._Callable):
175
176 def __init__(self):
177 # type: () -> None
178 pass
179
180 def Call(self, rd):
181 # type: (typed_args.Reader) -> value_t
182
183 val = rd.PosValue()
184 rd.Done()
185
186 UP_val = val
187 with tagswitch(val) as case:
188 if case(value_e.Int):
189 val = cast(value.Int, UP_val)
190 return value.Float(mops.ToFloat(val.i))
191
192 elif case(value_e.Float):
193 return val
194
195 elif case(value_e.Str):
196 val = cast(value.Str, UP_val)
197 if not match.LooksLikeFloat(val.s):
198 raise error.Expr('Cannot convert %s to Float' % val.s,
199 rd.BlamePos())
200
201 return value.Float(float(val.s))
202
203 raise error.TypeErr(val, 'float() expected Int, Float, or Str',
204 rd.BlamePos())
205
206
207class Str_(vm._Callable):
208
209 def __init__(self):
210 # type: () -> None
211 pass
212
213 def Call(self, rd):
214 # type: (typed_args.Reader) -> value_t
215
216 val = rd.PosValue()
217 rd.Done()
218
219 # TODO: Should we call Stringify here? That would handle Eggex.
220
221 UP_val = val
222 with tagswitch(val) as case:
223 if case(value_e.Int):
224 val = cast(value.Int, UP_val)
225 return value.Str(mops.ToStr(val.i))
226
227 elif case(value_e.Float):
228 val = cast(value.Float, UP_val)
229 return value.Str(str(val.f))
230
231 elif case(value_e.Str):
232 return val
233
234 raise error.TypeErr(val, 'str() expected Str, Int, or Float',
235 rd.BlamePos())
236
237
238class List_(vm._Callable):
239
240 def __init__(self):
241 # type: () -> None
242 pass
243
244 def Call(self, rd):
245 # type: (typed_args.Reader) -> value_t
246
247 val = rd.PosValue()
248 rd.Done()
249
250 l = [] # type: List[value_t]
251 it = None # type: val_ops._ContainerIter
252 UP_val = val
253 with tagswitch(val) as case:
254 if case(value_e.List):
255 val = cast(value.List, UP_val)
256 it = val_ops.ListIterator(val)
257
258 elif case(value_e.Dict):
259 val = cast(value.Dict, UP_val)
260 it = val_ops.DictIterator(val)
261
262 elif case(value_e.Range):
263 val = cast(value.Range, UP_val)
264 it = val_ops.RangeIterator(val)
265
266 else:
267 raise error.TypeErr(val,
268 'list() expected Dict, List, or Range',
269 rd.BlamePos())
270
271 assert it is not None
272 while not it.Done():
273 l.append(it.FirstValue())
274 it.Next()
275
276 return value.List(l)
277
278
279class Dict_(vm._Callable):
280
281 def __init__(self):
282 # type: () -> None
283 pass
284
285 def Call(self, rd):
286 # type: (typed_args.Reader) -> value_t
287
288 val = rd.PosValue()
289 rd.Done()
290
291 UP_val = val
292 with tagswitch(val) as case:
293 if case(value_e.Dict):
294 d = NewDict() # type: Dict[str, value_t]
295 val = cast(value.Dict, UP_val)
296 for k, v in iteritems(val.d):
297 d[k] = v
298
299 return value.Dict(d)
300
301 elif case(value_e.BashAssoc):
302 d = NewDict()
303 val = cast(value.BashAssoc, UP_val)
304 for k, s in iteritems(val.d):
305 d[k] = value.Str(s)
306
307 return value.Dict(d)
308
309 raise error.TypeErr(val, 'dict() expected Dict or BashAssoc',
310 rd.BlamePos())
311
312
313class Split(vm._Callable):
314
315 def __init__(self, splitter):
316 # type: (split.SplitContext) -> None
317 vm._Callable.__init__(self)
318 self.splitter = splitter
319
320 def Call(self, rd):
321 # type: (typed_args.Reader) -> value_t
322 s = rd.PosStr()
323
324 ifs = rd.OptionalStr()
325
326 rd.Done()
327
328 l = [
329 value.Str(elem)
330 for elem in self.splitter.SplitForWordEval(s, ifs=ifs)
331 ] # type: List[value_t]
332 return value.List(l)
333
334
335class Glob(vm._Callable):
336
337 def __init__(self, globber):
338 # type: (glob_.Globber) -> None
339 vm._Callable.__init__(self)
340 self.globber = globber
341
342 def Call(self, rd):
343 # type: (typed_args.Reader) -> value_t
344 s = rd.PosStr()
345 rd.Done()
346
347 out = [] # type: List[str]
348 self.globber._Glob(s, out)
349
350 l = [value.Str(elem) for elem in out] # type: List[value_t]
351 return value.List(l)
352
353
354class Shvar_get(vm._Callable):
355 """Look up with dynamic scope."""
356
357 def __init__(self, mem):
358 # type: (state.Mem) -> None
359 vm._Callable.__init__(self)
360 self.mem = mem
361
362 def Call(self, rd):
363 # type: (typed_args.Reader) -> value_t
364 name = rd.PosStr()
365 rd.Done()
366 return state.DynamicGetVar(self.mem, name, scope_e.Dynamic)
367
368
369class GetVar(vm._Callable):
370 """Look up normal scoping rules."""
371
372 def __init__(self, mem):
373 # type: (state.Mem) -> None
374 vm._Callable.__init__(self)
375 self.mem = mem
376
377 def Call(self, rd):
378 # type: (typed_args.Reader) -> value_t
379 name = rd.PosStr()
380 rd.Done()
381 return state.DynamicGetVar(self.mem, name, scope_e.LocalOrGlobal)
382
383
384class Assert(vm._Callable):
385
386 def __init__(self):
387 # type: () -> None
388 pass
389
390 def Call(self, rd):
391 # type: (typed_args.Reader) -> value_t
392
393 val = rd.PosValue()
394
395 msg = rd.OptionalStr(default_='')
396
397 rd.Done()
398
399 if not val_ops.ToBool(val):
400 raise error.AssertionErr(msg, rd.LeftParenToken())
401
402 return value.Null
403
404
405class EvalExpr(vm._Callable):
406
407 def __init__(self, expr_ev):
408 # type: (expr_eval.ExprEvaluator) -> None
409 self.expr_ev = expr_ev
410
411 def Call(self, rd):
412 # type: (typed_args.Reader) -> value_t
413 lazy = rd.PosExpr()
414 rd.Done()
415
416 result = self.expr_ev.EvalExpr(lazy, rd.LeftParenToken())
417
418 return result
419
420
421class ToJson8(vm._Callable):
422
423 def __init__(self, is_j8):
424 # type: (bool) -> None
425 self.is_j8 = is_j8
426
427 def Call(self, rd):
428 # type: (typed_args.Reader) -> value_t
429
430 val = rd.PosValue()
431 space = mops.BigTruncate(rd.NamedInt('space', 0))
432 rd.Done()
433
434 # Convert from external JS-like API to internal API.
435 if space <= 0:
436 indent = -1
437 else:
438 indent = space
439
440 buf = mylib.BufWriter()
441 try:
442 if self.is_j8:
443 j8.PrintMessage(val, buf, indent)
444 else:
445 j8.PrintJsonMessage(val, buf, indent)
446 except error.Encode as e:
447 # status code 4 is special, for encode/decode errors.
448 raise error.Structured(4, e.Message(), rd.LeftParenToken())
449
450 return value.Str(buf.getvalue())
451
452
453class FromJson8(vm._Callable):
454
455 def __init__(self, is_j8):
456 # type: (bool) -> None
457 self.is_j8 = is_j8
458
459 def Call(self, rd):
460 # type: (typed_args.Reader) -> value_t
461
462 s = rd.PosStr()
463 rd.Done()
464
465 p = j8.Parser(s, self.is_j8)
466 try:
467 val = p.ParseValue()
468 except error.Decode as e:
469 # Right now I'm not exposing the original string, because that
470 # could lead to a memory leak in the _error Dict.
471 # The message quotes part of the string, and we could improve
472 # that. We could have a substring with context.
473 props = {
474 'start_pos': num.ToBig(e.start_pos),
475 'end_pos': num.ToBig(e.end_pos),
476 } # type: Dict[str, value_t]
477 # status code 4 is special, for encode/decode errors.
478 raise error.Structured(4, e.Message(), rd.LeftParenToken(), props)
479
480 return val
481
482
483# vim: sw=2