| 1 | """
 | 
| 2 | builtin/pure_ysh.py - YSH builtins that don't do I/O.
 | 
| 3 | """
 | 
| 4 | from __future__ import print_function
 | 
| 5 | 
 | 
| 6 | from _devbuild.gen.runtime_asdl import (cmd_value, scope_e)
 | 
| 7 | from _devbuild.gen.syntax_asdl import command_t, loc, loc_t
 | 
| 8 | from _devbuild.gen.value_asdl import (value, value_e, value_t, LeftName)
 | 
| 9 | from core import error
 | 
| 10 | from core import state
 | 
| 11 | from core import vm
 | 
| 12 | from frontend import flag_util
 | 
| 13 | from frontend import location
 | 
| 14 | from frontend import typed_args
 | 
| 15 | from mycpp import mylib
 | 
| 16 | from mycpp.mylib import tagswitch
 | 
| 17 | 
 | 
| 18 | from typing import TYPE_CHECKING, cast, Any, Dict, List, Tuple
 | 
| 19 | 
 | 
| 20 | if TYPE_CHECKING:
 | 
| 21 |     from core import ui
 | 
| 22 |     from osh.cmd_eval import CommandEvaluator
 | 
| 23 | 
 | 
| 24 | 
 | 
| 25 | class ctx_Shvar(object):
 | 
| 26 |     """For shvar LANG=C _ESCAPER=posix-sh-word _DIALECT=ninja."""
 | 
| 27 | 
 | 
| 28 |     def __init__(self, mem, pairs):
 | 
| 29 |         # type: (state.Mem, List[Tuple[str, value_t]]) -> None
 | 
| 30 |         #log('pairs %s', pairs)
 | 
| 31 |         self.mem = mem
 | 
| 32 |         self.restore = []  # type: List[Tuple[LeftName, value_t]]
 | 
| 33 |         self._Push(pairs)
 | 
| 34 | 
 | 
| 35 |     def __enter__(self):
 | 
| 36 |         # type: () -> None
 | 
| 37 |         pass
 | 
| 38 | 
 | 
| 39 |     def __exit__(self, type, value, traceback):
 | 
| 40 |         # type: (Any, Any, Any) -> None
 | 
| 41 |         self._Pop()
 | 
| 42 | 
 | 
| 43 |     # Note: _Push and _Pop are separate methods because the C++ translation
 | 
| 44 |     # doesn't like when they are inline in __init__ and __exit__.
 | 
| 45 |     def _Push(self, pairs):
 | 
| 46 |         # type: (List[Tuple[str, value_t]]) -> None
 | 
| 47 |         for name, v in pairs:
 | 
| 48 |             lval = location.LName(name)
 | 
| 49 |             # LocalOnly because we are only overwriting the current scope
 | 
| 50 |             old_val = self.mem.GetValue(name, scope_e.LocalOnly)
 | 
| 51 |             self.restore.append((lval, old_val))
 | 
| 52 |             self.mem.SetNamed(lval, v, scope_e.LocalOnly)
 | 
| 53 | 
 | 
| 54 |     def _Pop(self):
 | 
| 55 |         # type: () -> None
 | 
| 56 |         for lval, old_val in self.restore:
 | 
| 57 |             if old_val.tag() == value_e.Undef:
 | 
| 58 |                 self.mem.Unset(lval, scope_e.LocalOnly)
 | 
| 59 |             else:
 | 
| 60 |                 self.mem.SetNamed(lval, old_val, scope_e.LocalOnly)
 | 
| 61 | 
 | 
| 62 | 
 | 
| 63 | class Shvar(vm._Builtin):
 | 
| 64 | 
 | 
| 65 |     def __init__(self, mem, search_path, cmd_ev):
 | 
| 66 |         # type: (state.Mem, state.SearchPath, CommandEvaluator) -> None
 | 
| 67 |         self.mem = mem
 | 
| 68 |         self.search_path = search_path  # to clear PATH
 | 
| 69 |         self.cmd_ev = cmd_ev  # To run blocks
 | 
| 70 | 
 | 
| 71 |     def Run(self, cmd_val):
 | 
| 72 |         # type: (cmd_value.Argv) -> int
 | 
| 73 |         _, arg_r = flag_util.ParseCmdVal('shvar',
 | 
| 74 |                                          cmd_val,
 | 
| 75 |                                          accept_typed_args=True)
 | 
| 76 | 
 | 
| 77 |         cmd = typed_args.OptionalBlock(cmd_val)
 | 
| 78 |         if not cmd:
 | 
| 79 |             # TODO: I think shvar LANG=C should just mutate
 | 
| 80 |             # But should there be a whitelist?
 | 
| 81 |             raise error.Usage('expected a block', loc.Missing)
 | 
| 82 | 
 | 
| 83 |         pairs = []  # type: List[Tuple[str, value_t]]
 | 
| 84 |         args, arg_locs = arg_r.Rest2()
 | 
| 85 |         if len(args) == 0:
 | 
| 86 |             raise error.Usage('Expected name=value', loc.Missing)
 | 
| 87 | 
 | 
| 88 |         for i, arg in enumerate(args):
 | 
| 89 |             name, s = mylib.split_once(arg, '=')
 | 
| 90 |             if s is None:
 | 
| 91 |                 raise error.Usage('Expected name=value', arg_locs[i])
 | 
| 92 |             v = value.Str(s)  # type: value_t
 | 
| 93 |             pairs.append((name, v))
 | 
| 94 | 
 | 
| 95 |             # Important fix: shvar PATH='' { } must make all binaries invisible
 | 
| 96 |             if name == 'PATH':
 | 
| 97 |                 self.search_path.ClearCache()
 | 
| 98 | 
 | 
| 99 |         with ctx_Shvar(self.mem, pairs):
 | 
| 100 |             unused = self.cmd_ev.EvalCommand(cmd)
 | 
| 101 | 
 | 
| 102 |         return 0
 | 
| 103 | 
 | 
| 104 | 
 | 
| 105 | class ctx_Context(object):
 | 
| 106 |     """For ctx push (context) { ... }"""
 | 
| 107 | 
 | 
| 108 |     def __init__(self, mem, context):
 | 
| 109 |         # type: (state.Mem, Dict[str, value_t]) -> None
 | 
| 110 |         self.mem = mem
 | 
| 111 |         self.mem.PushContextStack(context)
 | 
| 112 | 
 | 
| 113 |     def __enter__(self):
 | 
| 114 |         # type: () -> None
 | 
| 115 |         pass
 | 
| 116 | 
 | 
| 117 |     def __exit__(self, type, value, traceback):
 | 
| 118 |         # type: (Any, Any, Any) -> None
 | 
| 119 |         self.mem.PopContextStack()
 | 
| 120 | 
 | 
| 121 | 
 | 
| 122 | class Ctx(vm._Builtin):
 | 
| 123 | 
 | 
| 124 |     def __init__(self, mem, cmd_ev):
 | 
| 125 |         # type: (state.Mem, CommandEvaluator) -> None
 | 
| 126 |         self.mem = mem
 | 
| 127 |         self.cmd_ev = cmd_ev  # To run blocks
 | 
| 128 | 
 | 
| 129 |     def _GetContext(self):
 | 
| 130 |         # type: () -> Dict[str, value_t]
 | 
| 131 |         ctx = self.mem.GetContext()
 | 
| 132 |         if ctx is None:
 | 
| 133 |             raise error.Expr(
 | 
| 134 |                 "Could not find a context. Did you forget to 'ctx push'?",
 | 
| 135 |                 loc.Missing)
 | 
| 136 |         return ctx
 | 
| 137 | 
 | 
| 138 |     def _Push(self, context, block):
 | 
| 139 |         # type: (Dict[str, value_t], command_t) -> int
 | 
| 140 |         with ctx_Context(self.mem, context):
 | 
| 141 |             return self.cmd_ev.EvalCommand(block)
 | 
| 142 | 
 | 
| 143 |     def _Set(self, updates):
 | 
| 144 |         # type: (Dict[str, value_t]) -> int
 | 
| 145 |         ctx = self._GetContext()
 | 
| 146 |         ctx.update(updates)
 | 
| 147 |         return 0
 | 
| 148 | 
 | 
| 149 |     def _Emit(self, field, item, blame):
 | 
| 150 |         # type: (str, value_t, loc_t) -> int
 | 
| 151 |         ctx = self._GetContext()
 | 
| 152 | 
 | 
| 153 |         if field not in ctx:
 | 
| 154 |             ctx[field] = value.List([])
 | 
| 155 | 
 | 
| 156 |         UP_arr = ctx[field]
 | 
| 157 |         if UP_arr.tag() != value_e.List:
 | 
| 158 |             raise error.TypeErr(
 | 
| 159 |                 UP_arr,
 | 
| 160 |                 "Expected the context item '%s' to be a List" % (field), blame)
 | 
| 161 | 
 | 
| 162 |         arr = cast(value.List, UP_arr)
 | 
| 163 |         arr.items.append(item)
 | 
| 164 | 
 | 
| 165 |         return 0
 | 
| 166 | 
 | 
| 167 |     def Run(self, cmd_val):
 | 
| 168 |         # type: (cmd_value.Argv) -> int
 | 
| 169 |         rd = typed_args.ReaderForProc(cmd_val)
 | 
| 170 |         _, arg_r = flag_util.ParseCmdVal('ctx',
 | 
| 171 |                                          cmd_val,
 | 
| 172 |                                          accept_typed_args=True)
 | 
| 173 | 
 | 
| 174 |         verb, verb_loc = arg_r.ReadRequired2(
 | 
| 175 |             'Expected a verb (push, set, emit)')
 | 
| 176 | 
 | 
| 177 |         if verb == "push":
 | 
| 178 |             context = rd.PosDict()
 | 
| 179 |             block = rd.RequiredBlock()
 | 
| 180 |             rd.Done()
 | 
| 181 |             arg_r.AtEnd()
 | 
| 182 | 
 | 
| 183 |             return self._Push(context, block)
 | 
| 184 | 
 | 
| 185 |         elif verb == "set":
 | 
| 186 |             updates = rd.RestNamed()
 | 
| 187 |             rd.Done()
 | 
| 188 |             arg_r.AtEnd()
 | 
| 189 | 
 | 
| 190 |             return self._Set(updates)
 | 
| 191 | 
 | 
| 192 |         elif verb == "emit":
 | 
| 193 |             field, field_loc = arg_r.ReadRequired2(
 | 
| 194 |                 "A target field is required")
 | 
| 195 |             item = rd.PosValue()
 | 
| 196 |             rd.Done()
 | 
| 197 |             arg_r.AtEnd()
 | 
| 198 | 
 | 
| 199 |             return self._Emit(field, item, field_loc)
 | 
| 200 | 
 | 
| 201 |         else:
 | 
| 202 |             raise error.Usage("Unknown verb '%s'" % verb, verb_loc)
 | 
| 203 | 
 | 
| 204 | 
 | 
| 205 | class PushRegisters(vm._Builtin):
 | 
| 206 | 
 | 
| 207 |     def __init__(self, mem, cmd_ev):
 | 
| 208 |         # type: (state.Mem, CommandEvaluator) -> None
 | 
| 209 |         self.mem = mem
 | 
| 210 |         self.cmd_ev = cmd_ev  # To run blocks
 | 
| 211 | 
 | 
| 212 |     def Run(self, cmd_val):
 | 
| 213 |         # type: (cmd_value.Argv) -> int
 | 
| 214 |         _, arg_r = flag_util.ParseCmdVal('push-registers',
 | 
| 215 |                                          cmd_val,
 | 
| 216 |                                          accept_typed_args=True)
 | 
| 217 | 
 | 
| 218 |         cmd = typed_args.OptionalBlock(cmd_val)
 | 
| 219 |         if not cmd:
 | 
| 220 |             raise error.Usage('expected a block', loc.Missing)
 | 
| 221 | 
 | 
| 222 |         with state.ctx_Registers(self.mem):
 | 
| 223 |             unused = self.cmd_ev.EvalCommand(cmd)
 | 
| 224 | 
 | 
| 225 |         # make it "SILENT" in terms of not mutating $?
 | 
| 226 |         # TODO: Revisit this.  It might be better to provide the headless shell
 | 
| 227 |         # with a way to SET $? instead.  Needs to be tested/prototyped.
 | 
| 228 |         return self.mem.last_status[-1]
 | 
| 229 | 
 | 
| 230 | 
 | 
| 231 | class Append(vm._Builtin):
 | 
| 232 |     """Push word args onto an List.
 | 
| 233 | 
 | 
| 234 |     Not doing typed args since you can do
 | 
| 235 | 
 | 
| 236 |     :: mylist->append(42)
 | 
| 237 |     """
 | 
| 238 | 
 | 
| 239 |     def __init__(self, mem, errfmt):
 | 
| 240 |         # type: (state.Mem, ui.ErrorFormatter) -> None
 | 
| 241 |         self.mem = mem
 | 
| 242 |         self.errfmt = errfmt
 | 
| 243 | 
 | 
| 244 |     def Run(self, cmd_val):
 | 
| 245 |         # type: (cmd_value.Argv) -> int
 | 
| 246 | 
 | 
| 247 |         # This means we ignore -- , which is consistent
 | 
| 248 |         arg, arg_r = flag_util.ParseCmdVal('append',
 | 
| 249 |                                            cmd_val,
 | 
| 250 |                                            accept_typed_args=True)
 | 
| 251 | 
 | 
| 252 |         rd = typed_args.ReaderForProc(cmd_val)
 | 
| 253 |         val = rd.PosValue()
 | 
| 254 |         rd.Done()
 | 
| 255 | 
 | 
| 256 |         UP_val = val
 | 
| 257 |         with tagswitch(val) as case:
 | 
| 258 |             if case(value_e.BashArray):
 | 
| 259 |                 val = cast(value.BashArray, UP_val)
 | 
| 260 |                 val.strs.extend(arg_r.Rest())
 | 
| 261 |             elif case(value_e.List):
 | 
| 262 |                 val = cast(value.List, UP_val)
 | 
| 263 |                 typed = [value.Str(s)
 | 
| 264 |                          for s in arg_r.Rest()]  # type: List[value_t]
 | 
| 265 |                 val.items.extend(typed)
 | 
| 266 |             else:
 | 
| 267 |                 raise error.TypeErr(val, 'expected List or BashArray',
 | 
| 268 |                                     loc.Missing)
 | 
| 269 | 
 | 
| 270 |         return 0
 |