Skip to content

BashToolkit

https://github.com/pexpect/pexpect @ii-agent/src/ii_agent/tools/bash_tool.py

--- https://www.anthropic.com/engineering/swe-bench-sonnet --- Run commands in a bash shell

  • When invoking this tool, the contents of the "command" parameter does NOT need to be XML-escaped.

  • You don't have access to the internet via this tool.

  • You do have access to a mirror of common linux and python packages via apt and pip.

  • State is persistent across command calls and discussions with the user.

  • To inspect a particular line range of a file, e.g. lines 10-25, try 'sed -n 10,25p /path/to/the/file'.

  • Please avoid commands that may produce a very large amount of output.

  • Please run long lived commands in the background, e.g. 'sleep 10 &' or start a server in the background."

BashToolkit

Bases: AsyncBaseToolkit

Source code in utu/tools/bash_toolkit.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class BashToolkit(AsyncBaseToolkit):
    def __init__(self, config: ToolkitConfig = None) -> None:
        super().__init__(config)
        self.workspace_root = self.config.config.get("workspace_root", "/tmp/")
        # self.require_confirmation = self.config.config.get("require_confirmation", False)
        # self.command_filters = self.config.config.get("command_filters", [])
        self.timeout = self.config.config.get("timeout", 60)
        self.banned_command_strs = [
            "git init",
            "git commit",
            "git add",
        ]

        self.child, self.custom_prompt = self.start_persistent_shell(timeout=self.timeout)
        if self.workspace_root:
            self.setup_workspace(self.workspace_root)

    def setup_workspace(self, workspace_root: str):
        self.run_command(self.child, self.custom_prompt, f"cd {workspace_root}")

    @staticmethod
    def start_persistent_shell(timeout: int):
        import pexpect
        # https://github.com/pexpect/pexpect/issues/321

        # Start a new Bash shell
        child = pexpect.spawn("/bin/bash", encoding="utf-8", echo=False, timeout=timeout)
        # Set a known, unique prompt
        # We use a random string that is unlikely to appear otherwise
        # so we can detect the prompt reliably.
        custom_prompt = "PEXPECT_PROMPT>> "
        child.sendline("stty -onlcr")
        child.sendline("unset PROMPT_COMMAND")
        child.sendline(f"PS1='{custom_prompt}'")
        # Force an initial read until the newly set prompt shows up
        child.expect(custom_prompt)
        return child, custom_prompt

    @staticmethod
    def run_command(child, custom_prompt: str, cmd: str) -> str:
        # Send the command
        child.sendline(cmd)
        # Wait until we see the prompt again
        child.expect(custom_prompt)
        # Output is everything printed before the prompt minus the command itself
        # pexpect puts the matched prompt in child.after and everything before it in child.before.

        raw_output = child.before.strip()
        ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
        clean_output = ansi_escape.sub("", raw_output)

        if clean_output.startswith("\r"):
            clean_output = clean_output[1:]

        return clean_output

    async def run_bash(self, command: str) -> str:
        """Execute a bash command in your workspace and return its output.

        Args:
            command: The command to execute
        """
        # 1) filter: change command before execution. E.g. used in SSH or Docker.
        # original_command = command
        # command = self.apply_filters(original_command)
        # if command != original_command:
        #     logger.info(f"Command filtered: {original_command} -> {command}")

        # 2) banned command check
        for banned_str in self.banned_command_strs:
            if banned_str in command:
                return f"Command not executed due to banned string in command: {banned_str} found in {command}."

        # if self.require_confirmation:
        #     ...

        # confirm no bad stuff happened
        try:
            echo_result = self.run_command(self.child, self.custom_prompt, "echo hello")
            assert echo_result.strip() == "hello"
        except Exception:  # pylint: disable=broad-except
            self.child, self.custom_prompt = self.start_persistent_shell(self.timeout)

        # 3) Execute the command and capture output
        try:
            result = self.run_command(self.child, self.custom_prompt, command)
            return str(
                {
                    "command output": result,
                }
            )
        except Exception as e:  # pylint: disable=broad-except
            return str(
                {
                    "error": str(e),
                }
            )
        # TODO: add workspace tree in output

    async def get_tools_map(self) -> dict[str, Callable]:
        return {
            "run_bash": self.run_bash,
        }

run_bash async

run_bash(command: str) -> str

Execute a bash command in your workspace and return its output.

Parameters:

Name Type Description Default
command str

The command to execute

required
Source code in utu/tools/bash_toolkit.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def run_bash(self, command: str) -> str:
    """Execute a bash command in your workspace and return its output.

    Args:
        command: The command to execute
    """
    # 1) filter: change command before execution. E.g. used in SSH or Docker.
    # original_command = command
    # command = self.apply_filters(original_command)
    # if command != original_command:
    #     logger.info(f"Command filtered: {original_command} -> {command}")

    # 2) banned command check
    for banned_str in self.banned_command_strs:
        if banned_str in command:
            return f"Command not executed due to banned string in command: {banned_str} found in {command}."

    # if self.require_confirmation:
    #     ...

    # confirm no bad stuff happened
    try:
        echo_result = self.run_command(self.child, self.custom_prompt, "echo hello")
        assert echo_result.strip() == "hello"
    except Exception:  # pylint: disable=broad-except
        self.child, self.custom_prompt = self.start_persistent_shell(self.timeout)

    # 3) Execute the command and capture output
    try:
        result = self.run_command(self.child, self.custom_prompt, command)
        return str(
            {
                "command output": result,
            }
        )
    except Exception as e:  # pylint: disable=broad-except
        return str(
            {
                "error": str(e),
            }
        )

get_tools_map_func async

get_tools_map_func() -> dict[str, Callable]

Get tools map. It will filter tools by config.activated_tools if it is not None.

Source code in utu/tools/base.py
58
59
60
61
62
63
64
65
66
67
68
69
async def get_tools_map_func(self) -> dict[str, Callable]:
    """Get tools map. It will filter tools by config.activated_tools if it is not None."""
    if self.tools_map is None:
        self.tools_map = await self.get_tools_map()
    if self.config.activated_tools:
        assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
            f"Error config activated tools: {self.config.activated_tools}! available tools: {self.tools_map.keys()}"
        )
        tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}
    else:
        tools_map = self.tools_map
    return tools_map

get_tools_in_agents async

get_tools_in_agents() -> list[FunctionTool]

Get tools in openai-agents format.

Source code in utu/tools/base.py
71
72
73
74
75
76
77
78
79
80
81
82
async def get_tools_in_agents(self) -> list[FunctionTool]:
    """Get tools in openai-agents format."""
    tools_map = await self.get_tools_map_func()
    tools = []
    for _, tool in tools_map.items():
        tools.append(
            function_tool(
                tool,
                strict_mode=False,  # turn off strict mode
            )
        )
    return tools

get_tools_in_openai async

get_tools_in_openai() -> list[dict]

Get tools in OpenAI format.

Source code in utu/tools/base.py
84
85
86
87
async def get_tools_in_openai(self) -> list[dict]:
    """Get tools in OpenAI format."""
    tools = await self.get_tools_in_agents()
    return [ChatCompletionConverter.tool_to_openai(tool) for tool in tools]

get_tools_in_mcp async

get_tools_in_mcp() -> list[Tool]

Get tools in MCP format.

Source code in utu/tools/base.py
89
90
91
92
async def get_tools_in_mcp(self) -> list[types.Tool]:
    """Get tools in MCP format."""
    tools = await self.get_tools_in_agents()
    return [MCPConverter.function_tool_to_mcp(tool) for tool in tools]

call_tool async

call_tool(name: str, arguments: dict) -> str

Call a tool by its name.

Source code in utu/tools/base.py
 94
 95
 96
 97
 98
 99
100
async def call_tool(self, name: str, arguments: dict) -> str:
    """Call a tool by its name."""
    tools_map = await self.get_tools_map_func()
    if name not in tools_map:
        raise ValueError(f"Tool {name} not found")
    tool = tools_map[name]
    return await tool(**arguments)