defsafe_exec(code: str, timeout=2): code = code.encode().decode('unicode_escape') try: tree = ast.parse(code) SandboxVisitor().visit(tree) except Exception as e: returnf"Error: AST check failed ({e.__class__.__name__})"
result_queue = multiprocessing.Queue() p = multiprocessing.Process(target=sandbox_executor, args=(code, result_queue)) p.start() p.join(timeout=timeout)
if p.is_alive(): p.terminate() return"Timeout: code took too long to run."
try: status, output = result_queue.get_nowait() print(output) return output if status == "ok"elsef"Error exec: {output}" except: return"Error: no output from sandbox."
CODE = ''' def my_audit_checker(event,args): allowed_events = ["import", "time.sleep", "builtins.input", "builtins.input/result"] if not list(filter(lambda x: event == x, allowed_events)): raise Exception if len(args) > 0: raise Exception addaudithook(my_audit_checker) print("{}") '''
badchars = "\"'|&`+-*/()[]{}_ .".replace(" ", "")
defevaluate_wish_text(text: str) -> str: for ch in badchars: if ch in text: print(f"ch={ch}") returnf"Error:waf {ch}" out = safe_exec(CODE.format(text)) return out
知识补充:python中的生成器
使用了yield的函数被称为生成器
defrunning_averager(): """一个计算动态平均值的协程 (Coroutine)""" print("--- 平均值计算器已启动 ---") # 初始化状态变量 total = 0.0 count = 0 average = None whileTrue: # 关键部分! # 1. 向调用方产出当前的 average 值 # 2. 暂停,等待调用方下一次 send() 一个值进来 # 3. term 会接收到 send() 进来的值 term = yield average # 如果接收到 None,则跳过此次计算 (通常用于启动) if term isNone: continue # 用接收到的值更新状态 total += term count += 1 average = total / count