Skip to content

OpenAIUtils

OpenAIUtils

Source code in utu/utils/openai_utils/openai_utils.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class OpenAIUtils:
    # --------------------------------------------------------
    # chat completions
    # --------------------------------------------------------
    @staticmethod
    def print_message(message: ChatCompletionMessage) -> None:
        if hasattr(message, "reasoning_content") and message.reasoning_content:
            PrintUtils.print_info(f"{message.reasoning_content}")
        if message.content:
            PrintUtils.print_bot(f"{message.content}", add_prefix=True)
        if message.tool_calls:
            for tool_call in message.tool_calls:
                PrintUtils.print_bot(f"<{tool_call.function.name}>{tool_call.function.arguments}", add_prefix=True)

    @staticmethod
    async def print_stream(stream: AsyncStream[ChatCompletionChunk]) -> ChatCompletionMessage:
        final_tool_calls: dict[int, ChatCompletionMessageToolCall] = {}
        content = ""
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if hasattr(delta, "reasoning_content") and delta.reasoning_content:
                PrintUtils.print_info(f"{delta.reasoning_content}", end="", color="green")
            if delta.content:
                content += delta.content
                PrintUtils.print_info(f"{delta.content}", end="", color="gray")
            if delta.tool_calls:
                for tool_call in delta.tool_calls:
                    index = tool_call.index
                    if index not in final_tool_calls:
                        final_tool_calls[index] = tool_call
                        PrintUtils.print_info(
                            f"<{tool_call.function.name}>{tool_call.function.arguments}", end="", color="blue"
                        )
                    else:
                        if final_tool_calls[index].function.arguments:
                            final_tool_calls[index].function.arguments += tool_call.function.arguments
                        else:
                            final_tool_calls[index].function.arguments = tool_call.function.arguments
                        PrintUtils.print_info(f"{tool_call.function.arguments}", end="", color="blue")
        PrintUtils.print_info("")  # print a newline
        tool_calls = [
            ChatCompletionMessageFunctionToolCall(
                id=tool_call.id,
                function=tool_call.function.model_dump(),
                type=tool_call.type,  # type is always "function"
            )
            for tool_call in final_tool_calls.values()
        ]
        message = ChatCompletionMessage(role="assistant", content=content, tool_calls=tool_calls)
        OpenAIUtils.print_message(message)
        return message

    # --------------------------------------------------------
    # responses
    # --------------------------------------------------------
    @staticmethod
    def print_response(response: Response) -> None:
        for item in response.output:
            # print(f"> responses item: {item}")
            match item.type:
                case "reasoning":
                    content = getattr(item, "content", item.summary)
                    PrintUtils.print_bot(f"<reasoning>{content}</reasoning>", add_prefix=True, color="gray")
                case "message":
                    PrintUtils.print_bot(f"{item.content}", add_prefix=True)
                case "function_call":
                    PrintUtils.print_info(f"<{item.name}>({item.arguments})")
                case "file_search_call":
                    PrintUtils.print_info(f"<{item.type}>({item.queries})")
                case "web_search_call":
                    PrintUtils.print_info(f"<{item.type}>({item.action})")
                case "computer_call":
                    PrintUtils.print_info(f"<{item.type}>({item.action})")
                case "image_generation_call":
                    PrintUtils.print_info(f"<{item.type}> -> {item.result[:4]}")
                case "code_interpreter_call":
                    PrintUtils.print_info(
                        f"<{item.type}>(container_id={item.container_id}, code={item.code}) -> {item.outputs}"
                    )
                case "local_shell_call":
                    PrintUtils.print_info(f"<{item.type}>(action={item.action})")
                case "mcp_list_tools":
                    PrintUtils.print_info(f"<{item.type}>(server={item.server_label}) -> {item.tools}")
                case "mcp_call":
                    PrintUtils.print_info(
                        f"<{item.type}>(server={item.server_label}) {item.name}({item.arguments}) -> {item.output}"
                    )
                case "mcp_approval_request":
                    PrintUtils.print_info(f"<{item.type}>(server={item.server_label}) {item.name}({item.arguments})")
                case _:
                    PrintUtils.print_error(f"Unknown item type: {item.type}\n{item}")

    @staticmethod
    def print_response_stream(stream: AsyncStream[ResponseStreamEvent]) -> Response:
        raise NotImplementedError

    @staticmethod
    def get_response_configs(response: Response, include_output: bool = False) -> dict:
        """Get response configs from response"""
        data = response.model_dump()
        if not include_output:
            del data["output"]
        return data

    @staticmethod
    def get_response_output(response: Response) -> list[dict]:
        """Get response output from response"""
        return response.model_dump()["output"]

    @classmethod
    def tool_chatcompletion_to_responses(cls, tool: ChatCompletionToolParam) -> FunctionToolParam:
        assert tool["type"] == "function"
        return FunctionToolParam(
            name=tool["function"]["name"],
            description=tool["function"].get("description", ""),
            parameters=tool["function"].get("parameters", None),
            type="function",
        )

    @staticmethod
    def maybe_basemodel_to_dict(obj: Any) -> dict | None:
        if isinstance(obj, BaseModel):
            return obj.model_dump()
        return obj

get_response_configs staticmethod

get_response_configs(
    response: Response, include_output: bool = False
) -> dict

Get response configs from response

Source code in utu/utils/openai_utils/openai_utils.py
120
121
122
123
124
125
126
@staticmethod
def get_response_configs(response: Response, include_output: bool = False) -> dict:
    """Get response configs from response"""
    data = response.model_dump()
    if not include_output:
        del data["output"]
    return data

get_response_output staticmethod

get_response_output(response: Response) -> list[dict]

Get response output from response

Source code in utu/utils/openai_utils/openai_utils.py
128
129
130
131
@staticmethod
def get_response_output(response: Response) -> list[dict]:
    """Get response output from response"""
    return response.model_dump()["output"]

SimplifiedAsyncOpenAI

Bases: AsyncOpenAI

Simplified OpenAI client for chat.completions and responses API, with default config

Source code in utu/utils/openai_utils/simplified_client.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class SimplifiedAsyncOpenAI(AsyncOpenAI):
    """Simplified OpenAI client for chat.completions and responses API, with default config"""

    def __init__(
        self,
        *,
        type: Literal["chat.completions", "responses"] = None,
        # openai client kwargs
        api_key: str | None = None,
        base_url: str | None = None,
        # default configs
        **kwargs: dict,
    ) -> None:
        logger.info(f"> type: {type}, base_url: {base_url}, kwargs: {kwargs}")
        super().__init__(
            api_key=api_key or os.getenv("UTU_LLM_API_KEY") or "xxx", base_url=base_url or os.getenv("UTU_LLM_BASE_URL")
        )
        self.type = type or os.getenv("UTU_LLM_TYPE", "chat.completions")
        self.type_create_params = (
            OpenAIChatCompletionParamsKeys if self.type == "chat.completions" else OpenAIResponsesParamsKeys
        )
        self.default_config = self._process_kwargs(kwargs)

    def _process_kwargs(self, kwargs: dict) -> dict:
        # parse kwargs for ChatCompletionParams
        default_config = {}
        for k, v in kwargs.items():
            if k in self.type_create_params:
                default_config[k] = v
        default_config["model"] = default_config.get("model", os.getenv("UTU_LLM_MODEL"))
        return default_config

    async def query_one(self, **kwargs) -> str:
        """Simplified chat.complete / responses API
        WARNING: Only for basic text i/o usage! You should not use the method with querying with customized configs!
        """
        if "stream" in kwargs:
            assert kwargs["stream"] is False, "stream is not supported in `query_one`"

        if self.type == "chat.completions":
            chat_completion: ChatCompletion = await self.chat_completions_create(**kwargs)
            return chat_completion.choices[0].message.content
        elif self.type == "responses":
            response: Response = await self.responses_create(**kwargs)
            return response.output_text  # NOTE: will not return toolcall or reasoning
        else:
            raise ValueError(f"Unknown type: {self.type}")

    async def chat_completions_create(self, **kwargs) -> ChatCompletion | AsyncStream[ChatCompletionChunk]:
        assert self.type == "chat.completions", "`chat_completions_create` is not supported for responses API"
        unknown_params = self.check_known_keys(kwargs, self.type_create_params)
        if unknown_params:
            logger.warning(f"Unknown parameters: {unknown_params} for {self.type} API!")
        kwargs = self.process_chat_completion_params(kwargs, self.default_config)
        return await self.chat.completions.create(**kwargs)

    async def responses_create(self, **kwargs) -> Response | AsyncStream[ResponseStreamEvent]:
        unknown_params = self.check_known_keys(kwargs, self.type_create_params)
        if unknown_params - {"messages"}:  # ignore
            logger.warning(f"Unknown parameters: {unknown_params} for {self.type} API!")
        assert self.type == "responses", "`responses_create` is not supported for chat.completions API"
        kwargs = self.process_responses_params(kwargs, self.default_config)
        return await self.responses.create(**kwargs)

    def process_chat_completion_params(
        self, kwargs: OpenAIChatCompletionParams, default_config: OpenAIChatCompletionParams
    ) -> OpenAIChatCompletionParams:
        """Process chat completion params, convert str to list of messages, merge default config"""
        assert "messages" in kwargs
        if isinstance(kwargs["messages"], str):
            kwargs["messages"] = [{"role": "user", "content": kwargs["messages"]}]
        return self._merge_default_config(kwargs, default_config)

    def process_responses_params(
        self, kwargs: OpenAIResponsesParams, default_config: OpenAIResponsesParams
    ) -> OpenAIResponsesParams:
        """Process responses params, convert str to list of messages, merge default config"""
        if "input" not in kwargs:
            # try parse query for chat.completions
            assert "messages" in kwargs
            input = kwargs.pop("messages")
            if isinstance(input, str):
                kwargs["input"] = [{"role": "user", "content": input}]
            else:
                kwargs["input"] = input
        else:
            if isinstance(kwargs["input"], str):
                kwargs["input"] = [{"role": "user", "content": kwargs["input"]}]
        return self._merge_default_config(kwargs, default_config)

    def _merge_default_config(self, kwargs: dict, default_config: dict) -> dict:
        """Merge default config"""
        for k, v in default_config.items():
            if k not in kwargs:
                kwargs[k] = v
        return kwargs

    def check_known_keys(self, kwargs: dict, known_keys: set[str]) -> set:
        """Check if all keys in kwargs are in known_keys"""
        unknown_keys = set(kwargs.keys()) - known_keys
        return unknown_keys

query_one async

query_one(**kwargs) -> str

Simplified chat.complete / responses API WARNING: Only for basic text i/o usage! You should not use the method with querying with customized configs!

Source code in utu/utils/openai_utils/simplified_client.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def query_one(self, **kwargs) -> str:
    """Simplified chat.complete / responses API
    WARNING: Only for basic text i/o usage! You should not use the method with querying with customized configs!
    """
    if "stream" in kwargs:
        assert kwargs["stream"] is False, "stream is not supported in `query_one`"

    if self.type == "chat.completions":
        chat_completion: ChatCompletion = await self.chat_completions_create(**kwargs)
        return chat_completion.choices[0].message.content
    elif self.type == "responses":
        response: Response = await self.responses_create(**kwargs)
        return response.output_text  # NOTE: will not return toolcall or reasoning
    else:
        raise ValueError(f"Unknown type: {self.type}")

process_chat_completion_params

process_chat_completion_params(
    kwargs: OpenAIChatCompletionParams,
    default_config: OpenAIChatCompletionParams,
) -> OpenAIChatCompletionParams

Process chat completion params, convert str to list of messages, merge default config

Source code in utu/utils/openai_utils/simplified_client.py
83
84
85
86
87
88
89
90
def process_chat_completion_params(
    self, kwargs: OpenAIChatCompletionParams, default_config: OpenAIChatCompletionParams
) -> OpenAIChatCompletionParams:
    """Process chat completion params, convert str to list of messages, merge default config"""
    assert "messages" in kwargs
    if isinstance(kwargs["messages"], str):
        kwargs["messages"] = [{"role": "user", "content": kwargs["messages"]}]
    return self._merge_default_config(kwargs, default_config)

process_responses_params

process_responses_params(
    kwargs: OpenAIResponsesParams,
    default_config: OpenAIResponsesParams,
) -> OpenAIResponsesParams

Process responses params, convert str to list of messages, merge default config

Source code in utu/utils/openai_utils/simplified_client.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def process_responses_params(
    self, kwargs: OpenAIResponsesParams, default_config: OpenAIResponsesParams
) -> OpenAIResponsesParams:
    """Process responses params, convert str to list of messages, merge default config"""
    if "input" not in kwargs:
        # try parse query for chat.completions
        assert "messages" in kwargs
        input = kwargs.pop("messages")
        if isinstance(input, str):
            kwargs["input"] = [{"role": "user", "content": input}]
        else:
            kwargs["input"] = input
    else:
        if isinstance(kwargs["input"], str):
            kwargs["input"] = [{"role": "user", "content": kwargs["input"]}]
    return self._merge_default_config(kwargs, default_config)

check_known_keys

check_known_keys(kwargs: dict, known_keys: set[str]) -> set

Check if all keys in kwargs are in known_keys

Source code in utu/utils/openai_utils/simplified_client.py
116
117
118
119
def check_known_keys(self, kwargs: dict, known_keys: set[str]) -> set:
    """Check if all keys in kwargs are in known_keys"""
    unknown_keys = set(kwargs.keys()) - known_keys
    return unknown_keys