Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import signal | |
| import subprocess | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Callable, Dict, List, Literal, Optional, Tuple, Union | |
| from langchain.tools import tool | |
| from ptrace.debugger import ( | |
| NewProcessEvent, | |
| ProcessExecution, | |
| ProcessExit, | |
| ProcessSignal, | |
| PtraceDebugger, | |
| PtraceProcess, | |
| ) | |
| from ptrace.func_call import FunctionCallOptions | |
| from ptrace.syscall import PtraceSyscall | |
| from ptrace.tools import signal_to_exitcode | |
| from swarms.tools.base import BaseToolSet, SessionGetter, ToolScope, tool | |
| from swarms.utils.logger import logger | |
| from swarms.utils.main import ANSI, Color, Style # test | |
| #helpers | |
| PipeType = Union[Literal["stdout"], Literal["stderr"]] | |
| def verify(func): | |
| def wrapper(*args, **kwargs): | |
| try: | |
| filepath = args[0].filepath | |
| except AttributeError: | |
| raise Exception("This tool doesn't have filepath. Please check your code.") | |
| if not str(Path(filepath).resolve()).startswith(str(Path().resolve())): | |
| return "You can't access file outside of playground." | |
| return func(*args, **kwargs) | |
| return wrapper | |
| class SyscallTimeoutException(Exception): | |
| def __init__(self, pid: int, *args) -> None: | |
| super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args) | |
| class SyscallTracer: | |
| def __init__(self, pid: int): | |
| self.debugger: PtraceDebugger = PtraceDebugger() | |
| self.pid: int = pid | |
| self.process: PtraceProcess = None | |
| def is_waiting(self, syscall: PtraceSyscall) -> bool: | |
| if syscall.name.startswith("wait"): | |
| return True | |
| return False | |
| def attach(self): | |
| self.process = self.debugger.addProcess(self.pid, False) | |
| def detach(self): | |
| self.process.detach() | |
| self.debugger.quit() | |
| def set_timer(self, timeout: int): | |
| def handler(signum, frame): | |
| raise SyscallTimeoutException(self.process.pid) | |
| signal.signal(signal.SIGALRM, handler) | |
| signal.alarm(timeout) | |
| def reset_timer(self): | |
| signal.alarm(0) | |
| def wait_syscall_with_timeout(self, timeout: int): | |
| self.set_timer(timeout) | |
| self.process.waitSyscall() | |
| self.reset_timer() | |
| def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]: | |
| self.process.syscall() | |
| exitcode = None | |
| reason = "" | |
| while True: | |
| if not self.debugger: | |
| break | |
| try: | |
| self.wait_syscall_with_timeout(30) | |
| except ProcessExit as event: | |
| if event.exitcode is not None: | |
| exitcode = event.exitcode | |
| continue | |
| except ProcessSignal as event: | |
| event.process.syscall(event.signum) | |
| exitcode = signal_to_exitcode(event.signum) | |
| reason = event.reason | |
| continue | |
| except NewProcessEvent: | |
| continue | |
| except ProcessExecution: | |
| continue | |
| except Exception as e: | |
| reason = str(e) | |
| break | |
| syscall = self.process.syscall_state.event( | |
| FunctionCallOptions( | |
| write_types=False, | |
| write_argname=False, | |
| string_max_length=300, | |
| replace_socketcall=True, | |
| write_address=False, | |
| max_array_count=20, | |
| ) | |
| ) | |
| self.process.syscall() | |
| if syscall is None: | |
| continue | |
| if syscall.result: | |
| continue | |
| self.reset_timer() | |
| return exitcode, reason | |
| class StdoutTracer: | |
| def __init__( | |
| self, | |
| process: subprocess.Popen, | |
| timeout: int = 30, | |
| interval: int = 0.1, | |
| on_output: Callable[[PipeType, str], None] = lambda: None, | |
| ): | |
| self.process: subprocess.Popen = process | |
| self.timeout: int = timeout | |
| self.interval: int = interval | |
| self.last_output: datetime = None | |
| self.on_output: Callable[[PipeType, str], None] = on_output | |
| def nonblock(self): | |
| os.set_blocking(self.process.stdout.fileno(), False) | |
| os.set_blocking(self.process.stderr.fileno(), False) | |
| def get_output(self, pipe: PipeType) -> str: | |
| output = None | |
| if pipe == "stdout": | |
| output = self.process.stdout.read() | |
| elif pipe == "stderr": | |
| output = self.process.stderr.read() | |
| if output: | |
| decoded = output.decode() | |
| self.on_output(pipe, decoded) | |
| self.last_output = datetime.now() | |
| return decoded | |
| return "" | |
| def last_output_passed(self, seconds: int) -> bool: | |
| return (datetime.now() - self.last_output).seconds > seconds | |
| def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]: | |
| self.nonblock() | |
| self.last_output = datetime.now() | |
| output = "" | |
| exitcode = None | |
| while True: | |
| new_stdout = self.get_output("stdout") | |
| if new_stdout: | |
| output += new_stdout | |
| new_stderr = self.get_output("stderr") | |
| if new_stderr: | |
| output += new_stderr | |
| if self.process.poll() is not None: | |
| exitcode = self.process.poll() | |
| break | |
| if self.last_output_passed(self.timeout): | |
| self.process.kill() | |
| break | |
| time.sleep(self.interval) | |
| return (exitcode, output) | |
| class Terminal(BaseToolSet): | |
| def __init__(self): | |
| self.sessions: Dict[str, List[SyscallTracer]] = {} | |
| def execute(self, commands: str, get_session: SessionGetter) -> str: | |
| session, _ = get_session() | |
| try: | |
| process = subprocess.Popen( | |
| commands, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ") | |
| output = "" | |
| tracer = StdoutTracer( | |
| process, | |
| on_output=lambda p, o: logger.info( | |
| ANSI(p).to(Style.dim()) + " " + o.strip("\n") | |
| ), | |
| ) | |
| exitcode, output = tracer.wait_until_stop_or_exit() | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed Terminal, Input Commands: {commands} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| ############# | |
| def terminal_execute(self, commands: str, get_session: SessionGetter) -> str: | |
| session, _ = get_session() | |
| try: | |
| process = subprocess.Popen( | |
| commands, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ") | |
| output = "" | |
| tracer = StdoutTracer( | |
| process, | |
| on_output=lambda p, o: logger.info( | |
| ANSI(p).to(Style.dim()) + " " + o.strip("\n") | |
| ), | |
| ) | |
| exitcode, output = tracer.wait_until_stop_or_exit() | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed Terminal, Input Commands: {commands} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| """ | |
| write protocol: | |
| <filepath> | |
| <content> | |
| """ | |
| class WriteCommand: | |
| separator = "\n" | |
| def __init__(self, filepath: str, content: int): | |
| self.filepath: str = filepath | |
| self.content: str = content | |
| self.mode: str = "w" | |
| def with_mode(self, mode: str) -> "WriteCommand": | |
| self.mode = mode | |
| return self | |
| def execute(self) -> str: | |
| dir_path = os.path.dirname(self.filepath) | |
| if dir_path: | |
| os.makedirs(dir_path, exist_ok=True) | |
| with open(self.filepath, self.mode) as f: | |
| f.write(self.content) | |
| return self.content | |
| def from_str(command: str) -> "WriteCommand": | |
| filepath = command.split(WriteCommand.separator)[0] | |
| return WriteCommand(filepath, command[len(filepath) + 1 :]) | |
| class CodeWriter: | |
| def write(command: str) -> str: | |
| return WriteCommand.from_str(command).with_mode("w").execute() | |
| def append(command: str) -> str: | |
| return WriteCommand.from_str(command).with_mode("a").execute() | |
| """ | |
| read protocol: | |
| <filepath>|<start line>-<end line> | |
| """ | |
| class Line: | |
| def __init__(self, content: str, line_number: int, depth: int): | |
| self.__content: str = content | |
| self.__line_number: int = line_number | |
| self.__depth: int = depth | |
| self.__children: List[Line] = [] | |
| def get_content(self) -> str: | |
| return self.__content | |
| def get_depth(self) -> int: | |
| return self.__depth | |
| def append_child(self, child: "Line") -> None: | |
| self.__children.append(child) | |
| def find_by_lte_depth(self, depth: int) -> List["Line"]: | |
| if self.__depth > depth: | |
| return [] | |
| lines: List[Line] = [self] | |
| for child in self.__children: | |
| lines += child.find_by_lte_depth(depth) | |
| return lines | |
| def find_by_content(self, content: str) -> List["Line"]: | |
| if content in self.__content: | |
| return [self] | |
| lines: List[Line] = [] | |
| for child in self.__children: | |
| lines += child.find_by_content(content) | |
| return lines | |
| def find_last_lines(self) -> List["Line"]: | |
| if len(self.__children) == 0: | |
| return [self] | |
| else: | |
| return [self, *self.__children[-1].find_last_lines()] | |
| def print(self, depth: int = 0) -> None: | |
| print(f"{' ' * depth}{self}", end="") | |
| for child in self.__children: | |
| child.print(depth + 1) | |
| def __repr__(self): | |
| return f"{self.__line_number}: {self.__content}" | |
| class CodeTree: | |
| def __init__(self): | |
| self.root: Line = Line("\n", -1, -1) | |
| def append(self, content: str, line_number: int) -> None: | |
| last_lines: List[Line] = self.root.find_last_lines() | |
| new_leading_spaces: int = self.__get_leading_spaces(content) | |
| previous_line: Line = self.root | |
| previous_leading_spaces: int = -1 | |
| for line in last_lines: | |
| leading_spaces = self.__get_leading_spaces(line.get_content()) | |
| if ( | |
| previous_leading_spaces < new_leading_spaces | |
| and new_leading_spaces <= leading_spaces | |
| ): | |
| break | |
| previous_line, previous_leading_spaces = line, leading_spaces | |
| new_line_depth: int = previous_line.get_depth() + 1 | |
| previous_line.append_child(Line(content, line_number, new_line_depth)) | |
| def find_from_root(self, depth: int) -> List[Line]: | |
| return self.root.find_by_lte_depth(depth) | |
| def find_from_parent(self, depth: int, parent_content: str) -> List[Line]: | |
| lines: List[Line] = self.root.find_by_content(parent_content) | |
| if len(lines) == 0: | |
| return [] | |
| parent = lines[0] | |
| return parent.find_by_lte_depth(depth + parent.get_depth()) | |
| def print(self): | |
| print("Code Tree:") | |
| print("=================================") | |
| self.root.print() | |
| print("=================================") | |
| def __get_leading_spaces(self, content: str) -> int: | |
| return len(content) - len(content.lstrip()) | |
| class ReadCommand: | |
| separator = "|" | |
| def __init__(self, filepath: str, start: int, end: int): | |
| self.filepath: str = filepath | |
| self.start: int = start | |
| self.end: int = end | |
| def execute(self) -> str: | |
| with open(self.filepath, "r") as f: | |
| code = f.readlines() | |
| if self.start == self.end: | |
| code = code[self.start - 1] | |
| else: | |
| code = "".join(code[self.start - 1 : self.end]) | |
| return code | |
| def from_str(command: str) -> "ReadCommand": | |
| filepath, line = command.split(ReadCommand.separator) | |
| start, end = line.split("-") | |
| return ReadCommand(filepath, int(start), int(end)) | |
| class SummaryCommand: | |
| separator = "|" | |
| def __init__(self, filepath: str, depth: int, parent_content: Optional[str] = None): | |
| self.filepath: str = filepath | |
| self.depth: int = depth | |
| self.parent_content: Optional[str] = parent_content | |
| def execute(self) -> str: | |
| with open(self.filepath, "r") as f: | |
| code = f.readlines() | |
| code_tree = CodeTree() | |
| for i, line in enumerate(code): | |
| if line.strip() != "": | |
| code_tree.append(line, i + 1) | |
| if self.parent_content is None: | |
| lines = code_tree.find_from_root(self.depth) | |
| else: | |
| lines = code_tree.find_from_parent(self.depth, self.parent_content) | |
| return "".join([str(line) for line in lines]) | |
| def from_str(command: str) -> "SummaryCommand": | |
| command_list: List[str] = command.split(SummaryCommand.separator) | |
| filepath: str = command_list[0] | |
| depth: int = int(command_list[1]) | |
| parent_content: str | None = command_list[2] if len(command_list) == 3 else None | |
| return SummaryCommand( | |
| filepath=filepath, depth=depth, parent_content=parent_content | |
| ) | |
| class CodeReader: | |
| def read(command: str) -> str: | |
| return ReadCommand.from_str(command).execute() | |
| def summary(command: str) -> str: | |
| return SummaryCommand.from_str(command).execute() | |
| """ | |
| patch protocol: | |
| <filepath>|<line>,<col>|<line>,<col>|<content> | |
| ---~~~+++===+++~~~--- | |
| <filepath>|<line>,<col>|<line>,<col>|<content> | |
| ---~~~+++===+++~~~--- | |
| ... | |
| ---~~~+++===+++~~~--- | |
| let say original code is: | |
| ``` | |
| import requests | |
| def crawl_news(keyword): | |
| url = f"https://www.google.com/search?q={keyword}+news" | |
| response = requests.get(url) | |
| news = [] | |
| for result in response: | |
| news.append(result.text) | |
| return news | |
| ``` | |
| and we want to change it to: | |
| ``` | |
| import requests | |
| from bs4 import BeautifulSoup | |
| def crawl_news(keyword): | |
| url = f"https://www.google.com/search?q={keyword}+news" | |
| html = requests.get(url).text | |
| soup = BeautifulSoup(html, "html.parser") | |
| news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd") | |
| news_titles = [] | |
| for result in news_results: | |
| news_titles.append(result.text) | |
| return news_titles | |
| ``` | |
| then the command will be: | |
| test.py|2,1|2,1|from bs4 import BeautifulSoup | |
| ---~~~+++===+++~~~--- | |
| test.py|5,5|5,33|html = requests.get(url).text | |
| soup = BeautifulSoup(html, "html.parser") | |
| news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd") | |
| ---~~~+++===+++~~~--- | |
| test.py|7,5|9,13|news_titles = [] | |
| for result in news_results: | |
| news_titles | |
| ---~~~+++===+++~~~--- | |
| test.py|11,16|11,16|_titles | |
| """ | |
| class Position: | |
| separator = "," | |
| def __init__(self, line: int, col: int): | |
| self.line: int = line | |
| self.col: int = col | |
| def __str__(self): | |
| return f"(Ln {self.line}, Col {self.col})" | |
| def from_str(pos: str) -> "Position": | |
| line, col = pos.split(Position.separator) | |
| return Position(int(line) - 1, int(col) - 1) | |
| class PatchCommand: | |
| separator = "|" | |
| def __init__(self, filepath: str, start: Position, end: Position, content: str): | |
| self.filepath: str = filepath | |
| self.start: Position = start | |
| self.end: Position = end | |
| self.content: str = content | |
| def read_lines(self) -> list[str]: | |
| with open(self.filepath, "r") as f: | |
| lines = f.readlines() | |
| return lines | |
| def write_lines(self, lines: list[str]) -> int: | |
| with open(self.filepath, "w") as f: | |
| f.writelines(lines) | |
| return sum([len(line) for line in lines]) | |
| def execute(self) -> Tuple[int, int]: | |
| lines = self.read_lines() | |
| before = sum([len(line) for line in lines]) | |
| lines[self.start.line] = ( | |
| lines[self.start.line][: self.start.col] | |
| + self.content | |
| + lines[self.end.line][self.end.col :] | |
| ) | |
| lines = lines[: self.start.line + 1] + lines[self.end.line + 1 :] | |
| after = self.write_lines(lines) | |
| written = len(self.content) | |
| deleted = before - after + written | |
| return written, deleted | |
| def from_str(command: str) -> "PatchCommand": | |
| match = re.search( | |
| r"(.*)\|([0-9]*),([0-9]*)\|([0-9]*),([0-9]*)(\||\n)(.*)", | |
| command, | |
| re.DOTALL, | |
| ) | |
| filepath = match.group(1) | |
| start_line = match.group(2) | |
| start_col = match.group(3) | |
| end_line = match.group(4) | |
| end_col = match.group(5) | |
| content = match.group(7) | |
| return PatchCommand( | |
| filepath, | |
| Position.from_str(f"{start_line},{start_col}"), | |
| Position.from_str(f"{end_line},{end_col}"), | |
| content, | |
| ) | |
| class CodePatcher: | |
| separator = "\n---~~~+++===+++~~~---\n" | |
| def sort_commands(commands: list[PatchCommand]) -> list[PatchCommand]: | |
| return sorted(commands, key=lambda c: c.start.line, reverse=True) | |
| def patch(bulk_command: str) -> Tuple[int, int]: | |
| commands = [ | |
| PatchCommand.from_str(command) | |
| for command in bulk_command.split(CodePatcher.separator) | |
| if command != "" | |
| ] | |
| commands = CodePatcher.sort_commands(commands) | |
| written, deleted = 0, 0 | |
| for command in commands: | |
| if command: | |
| w, d = command.execute() | |
| written += w | |
| deleted += d | |
| return written, deleted | |
| class CodeEditor(BaseToolSet): | |
| def read(self, inputs: str) -> str: | |
| try: | |
| output = CodeReader.read(inputs) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.READ, Input Commands: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def summary(self, inputs: str) -> str: | |
| try: | |
| output = CodeReader.summary(inputs) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.SUMMARY, Input Commands: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def append(self, inputs: str) -> str: | |
| try: | |
| code = CodeWriter.append(inputs) | |
| output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.APPEND, Input: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def write(self, inputs: str) -> str: | |
| try: | |
| code = CodeWriter.write(inputs.lstrip()) | |
| output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.WRITE, Input: {inputs} " f"Output Answer: {output}" | |
| ) | |
| return output | |
| def patch(self, patches: str) -> str: | |
| try: | |
| w, d = CodePatcher.patch(patches) | |
| output = f"successfully wrote {w}, deleted {d}" | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.PATCH, Input Patch: {patches} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def delete(self, inputs: str, filepath: str) -> str: | |
| try: | |
| with open(filepath, "w") as f: | |
| f.write("") | |
| output = "success" | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.DELETE, Input filename: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| #---------------- end | |
| def code_editor_read(self, inputs: str) -> str: | |
| try: | |
| output = CodeReader.read(inputs) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.READ, Input Commands: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def code_editor_summary(self, inputs: str) -> str: | |
| try: | |
| output = CodeReader.summary(inputs) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.SUMMARY, Input Commands: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def code_editor_append(self, inputs: str) -> str: | |
| try: | |
| code = CodeWriter.append(inputs) | |
| output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.APPEND, Input: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def code_editor_write(self, inputs: str) -> str: | |
| try: | |
| code = CodeWriter.write(inputs.lstrip()) | |
| output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.WRITE, Input: {inputs} " f"Output Answer: {output}" | |
| ) | |
| return output | |
| def code_editor_patch(self, patches: str) -> str: | |
| try: | |
| w, d = CodePatcher.patch(patches) | |
| output = f"successfully wrote {w}, deleted {d}" | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.PATCH, Input Patch: {patches} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |
| def code_editor_delete(self, inputs: str, filepath: str) -> str: | |
| try: | |
| with open(filepath, "w") as f: | |
| f.write("") | |
| output = "success" | |
| except Exception as e: | |
| output = str(e) | |
| logger.debug( | |
| f"\nProcessed CodeEditor.DELETE, Input filename: {inputs} " | |
| f"Output Answer: {output}" | |
| ) | |
| return output | |