Skip to content

LLM Clients

Maticlib provides a unified interface for multiple LLM providers. All clients inherit from BaseLLMClient and support both synchronous and asynchronous completion.

OpenAI Client

Using the modern OpenAI Responses API.

maticlib.llm.openai.client.OpenAIClient

Bases: BaseLLMClient

Client for interacting with OpenAI models via the Responses API.

Inherits from BaseLLMClient and implements OpenAI-specific message formatting and response parsing. Supports all current GPT and o-series models.

Parameters:

Name Type Description Default
model str

The OpenAI model to use. Defaults to "gpt-4o-mini". Examples: "gpt-4o", "gpt-4.1", "o4-mini", "gpt-5.4".

'gpt-4o-mini'
system_instruct str | SystemMessage

An optional system / developer prompt prepended to every request.

None
api_key str

Your OpenAI API key. Falls back to the OPENAI_API_KEY environment variable.

None
verbose bool

If True, prints HTTP status codes to stdout.

True
return_raw bool

If True, the complete / async_complete methods return the raw dict instead of an OpenAIResponse model.

False
Source code in maticlib/llm/openai/client.py
class OpenAIClient(BaseLLMClient):
    """
    Client for interacting with OpenAI models via the Responses API.

    Inherits from ``BaseLLMClient`` and implements OpenAI-specific message
    formatting and response parsing. Supports all current GPT and o-series
    models.

    Args:
        model (str): The OpenAI model to use. Defaults to ``"gpt-4o-mini"``.
            Examples: ``"gpt-4o"``, ``"gpt-4.1"``, ``"o4-mini"``, ``"gpt-5.4"``.
        system_instruct (str | SystemMessage, optional): An optional system /
            developer prompt prepended to every request.
        api_key (str): Your OpenAI API key. Falls back to the
            ``OPENAI_API_KEY`` environment variable.
        verbose (bool): If ``True``, prints HTTP status codes to stdout.
        return_raw (bool): If ``True``, the ``complete`` / ``async_complete``
            methods return the raw ``dict`` instead of an ``OpenAIResponse``
            model.
    """

    def __init__(
        self,
        model: str = "gpt-4o-mini",
        system_instruct: Union[str, SystemMessage, None] = None,
        api_key: Optional[str] = None,
        verbose: bool = True,
        return_raw: bool = False,
    ) -> None:
        api_key = api_key or os.getenv("OPENAI_API_KEY", "")
        api_key = (api_key or "").strip()
        if not api_key:
            raise ValueError(
                "OpenAI API key is missing. Please provide it via the 'api_key' "
                "argument or set the OPENAI_API_KEY environment variable."
            )
        self.api_key = api_key
        self.model = model
        self.system_instruct = system_instruct
        self.base_url = "https://api.openai.com/v1"
        self.verbose = verbose
        self.return_raw = return_raw
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _format_input(
        self,
        input: Union[str, List[Union[Dict, HumanMessage, SystemMessage, AIMessage]]],
    ) -> Union[str, List[Dict]]:
        """
        Converts the caller's input into the format expected by /v1/responses.

        The Responses API accepts either:
          - A plain string as the ``input`` value, **or**
          - A list of message dicts: ``[{"role": "user", "content": "..."}]``

        Args:
            input (str | list): A plain string prompt or a list of message
                objects / dicts.

        Returns:
            str | list: The formatted input ready to assign to the payload.

        Raises:
            ValueError: If a dict message is missing a ``role`` key.
            TypeError: If an unsupported message type is encountered.
        """
        # --- plain string — pass through directly ---
        if isinstance(input, str):
            return input

        if not isinstance(input, list):
            raise TypeError(f"input must be str or list, got {type(input)}")

        formatted: List[Dict] = []
        for message in input:
            if isinstance(message, dict):
                role = message.get("role")
                content = message.get("content")
                if role is None:
                    raise ValueError(
                        f"Message dict must have a 'role' key: {message}"
                    )
                if not isinstance(content, str):
                    raise TypeError(
                        f"Message content must be str, got {type(content)}"
                    )
                # Map maticlib role aliases → OpenAI roles
                role_map = {
                    "human": "user",
                    "ai": "assistant",
                    "model": "assistant",
                }
                formatted.append(
                    {"role": role_map.get(role, role), "content": content}
                )
            elif isinstance(message, HumanMessage):
                formatted.append({"role": "user", "content": message.content})
            elif isinstance(message, SystemMessage):
                formatted.append({"role": "developer", "content": message.content})
            elif isinstance(message, AIMessage):
                formatted.append({"role": "assistant", "content": message.content})
            else:
                raise TypeError(f"Unsupported message type: {type(message)}")

        return formatted

    def _build_payload(
        self,
        input: Union[str, List[Union[Dict, HumanMessage, SystemMessage, AIMessage]]],
        tools: Optional[List[Callable]] = None,
    ) -> Dict[str, Any]:
        """
        Builds the full JSON payload for the /v1/responses endpoint.

        Args:
            input (str | list): Raw caller input (string or message list).
            tools (list, optional): A list of tool functions.

        Returns:
            dict: A payload dict ready to be sent as JSON.
        """
        formatted_input = self._format_input(input)

        payload: Dict[str, Any] = {
            "model": self.model,
            "input": formatted_input,
        }

        # Handle tools
        if tools:
            payload["tools"] = self._format_tools(tools)

        # Prepend system/developer instruction when provided
        if self.system_instruct:
            system_text = (
                self.system_instruct
                if isinstance(self.system_instruct, str)
                else self.system_instruct.content
            )
            # The Responses API supports an explicit 'instructions' key which
            # acts as a developer/system prompt regardless of input format.
            payload["instructions"] = system_text

        return payload

    def _parse_response(
        self, response: httpx.Response
    ) -> Union[OpenAIResponse, Dict[str, Any]]:
        """
        Parses the JSON HTTP response into a structured ``OpenAIResponse``.

        Args:
            response (httpx.Response): The raw HTTP response from the API.

        Returns:
            OpenAIResponse | dict: The parsed model, or a raw dict if
            ``return_raw`` is ``True``.
        """
        response_data = response.json()

        if self.return_raw:
            return response_data

        try:
            return OpenAIResponse(**response_data)
        except Exception as e:
            if self.verbose:
                print(f"Warning: Failed to parse into OpenAIResponse: {e}")
                print("Returning raw dict instead.")
            return response_data

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def complete(
        self,
        input: Union[str, List],
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None,
    ) -> Union[OpenAIResponse, Dict[str, Any]]:
        """
        Sends a synchronous generation request to the OpenAI Responses API.

        Args:
            input (str | list): The user prompt as a plain string, or a
                conversation history as a list of message objects / dicts.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/responses"

        try:
            input = self._inject_runtime_instructions(input, response_model)
            payload = self._build_payload(input, tools=tools)
            response = httpx.post(
                url, headers=self.headers, json=payload, timeout=60.0
            )
            response.raise_for_status()

            if self.verbose:
                print(f"Status: {response.status_code}")

            result = self._parse_response(response)
            self._apply_response_model(result, response_model)
            return result

        except httpx.HTTPStatusError as e:
            if self.verbose:
                print(f"HTTP Error: {e.response.status_code}")
                print(f"Response: {e.response.text}")
            raise
        except Exception:
            if self.verbose:
                import traceback
                traceback.print_exc()
            raise

    async def async_complete(
        self,
        input: Union[str, List],
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None,
    ) -> Union[OpenAIResponse, Dict[str, Any]]:
        """
        Sends an asynchronous generation request to the OpenAI Responses API.

        Args:
            input (str | list): The user prompt as a plain string, or a
                conversation history as a list of message objects / dicts.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/responses"

        try:
            input = self._inject_runtime_instructions(input, response_model)
            payload = self._build_payload(input, tools=tools)
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    url, headers=self.headers, json=payload, timeout=60.0
                )
                response.raise_for_status()

                if self.verbose:
                    print(f"Status: {response.status_code}")

                result = self._parse_response(response)
                self._apply_response_model(result, response_model)
                return result

        except httpx.HTTPStatusError as e:
            if self.verbose:
                print(f"HTTP Error: {e.response.status_code}")
                print(f"Response: {e.response.text}")
            raise
        except Exception:
            if self.verbose:
                import traceback
                traceback.print_exc()
            raise

    def _format_tools(self, tools: List[Callable]) -> List[Dict[str, Any]]:
        """Formats the list of tool functions for OpenAI."""
        formatted = []
        for tool_func in tools:
            if hasattr(tool_func, "matic_tool_metadata"):
                metadata = tool_func.matic_tool_metadata
                formatted.append({
                    "type": "function",
                    "function": {
                        "name": metadata["name"],
                        "description": metadata["description"],
                        "parameters": metadata["parameters"]
                    }
                })
        return formatted

    def get_text_response(
        self, response: Union[OpenAIResponse, Dict[str, Any]]
    ) -> str:
        """
        Extracts the primary text content from an OpenAI response.

        This is a convenience helper so callers do not need to traverse
        the ``output`` list manually.

        Args:
            response (OpenAIResponse | dict): The response returned by
                ``complete`` or ``async_complete``.

        Returns:
            str: The extracted text string, or an empty string if no text
            was found.
        """
        if isinstance(response, OpenAIResponse):
            return response.content or ""

        # Raw dict fallback: walk output items manually
        try:
            for item in response.get("output", []):
                for part in item.get("content", []):
                    if part.get("type") == "output_text" and part.get("text"):
                        return part["text"]
        except Exception:
            raise

        return ""

async_complete async

async_complete(input, response_model=None, tools=None)

Sends an asynchronous generation request to the OpenAI Responses API.

Parameters:

Name Type Description Default
input str | list

The user prompt as a plain string, or a conversation history as a list of message objects / dicts.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/openai/client.py
async def async_complete(
    self,
    input: Union[str, List],
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None,
) -> Union[OpenAIResponse, Dict[str, Any]]:
    """
    Sends an asynchronous generation request to the OpenAI Responses API.

    Args:
        input (str | list): The user prompt as a plain string, or a
            conversation history as a list of message objects / dicts.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/responses"

    try:
        input = self._inject_runtime_instructions(input, response_model)
        payload = self._build_payload(input, tools=tools)
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url, headers=self.headers, json=payload, timeout=60.0
            )
            response.raise_for_status()

            if self.verbose:
                print(f"Status: {response.status_code}")

            result = self._parse_response(response)
            self._apply_response_model(result, response_model)
            return result

    except httpx.HTTPStatusError as e:
        if self.verbose:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
        raise
    except Exception:
        if self.verbose:
            import traceback
            traceback.print_exc()
        raise

complete

complete(input, response_model=None, tools=None)

Sends a synchronous generation request to the OpenAI Responses API.

Parameters:

Name Type Description Default
input str | list

The user prompt as a plain string, or a conversation history as a list of message objects / dicts.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/openai/client.py
def complete(
    self,
    input: Union[str, List],
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None,
) -> Union[OpenAIResponse, Dict[str, Any]]:
    """
    Sends a synchronous generation request to the OpenAI Responses API.

    Args:
        input (str | list): The user prompt as a plain string, or a
            conversation history as a list of message objects / dicts.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/responses"

    try:
        input = self._inject_runtime_instructions(input, response_model)
        payload = self._build_payload(input, tools=tools)
        response = httpx.post(
            url, headers=self.headers, json=payload, timeout=60.0
        )
        response.raise_for_status()

        if self.verbose:
            print(f"Status: {response.status_code}")

        result = self._parse_response(response)
        self._apply_response_model(result, response_model)
        return result

    except httpx.HTTPStatusError as e:
        if self.verbose:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
        raise
    except Exception:
        if self.verbose:
            import traceback
            traceback.print_exc()
        raise

get_text_response

get_text_response(response)

Extracts the primary text content from an OpenAI response.

This is a convenience helper so callers do not need to traverse the output list manually.

Parameters:

Name Type Description Default
response OpenAIResponse | dict

The response returned by complete or async_complete.

required

Returns:

Name Type Description
str str

The extracted text string, or an empty string if no text

str

was found.

Source code in maticlib/llm/openai/client.py
def get_text_response(
    self, response: Union[OpenAIResponse, Dict[str, Any]]
) -> str:
    """
    Extracts the primary text content from an OpenAI response.

    This is a convenience helper so callers do not need to traverse
    the ``output`` list manually.

    Args:
        response (OpenAIResponse | dict): The response returned by
            ``complete`` or ``async_complete``.

    Returns:
        str: The extracted text string, or an empty string if no text
        was found.
    """
    if isinstance(response, OpenAIResponse):
        return response.content or ""

    # Raw dict fallback: walk output items manually
    try:
        for item in response.get("output", []):
            for part in item.get("content", []):
                if part.get("type") == "output_text" and part.get("text"):
                    return part["text"]
    except Exception:
        raise

    return ""

Mistral Client

maticlib.llm.mistral.client.MistralClient

Bases: BaseLLMClient

Client for interacting with Mistral AI models.

Inherits from BaseLLMClient and implements Mistral-specific message formatting and response parsing.

Parameters:

Name Type Description Default
model str

The name of the Mistral model to use. Defaults to "mistral-medium-latest".

'mistral-medium-latest'
system_instruct str | SystemMessage

Default instructions to prepend to all conversations.

None
api_key str

Your Mistral AI API key. Defaults to MISTRAL_API_KEY environment variable.

None
verbose bool

If True, prints status messages to console.

True
return_raw bool

If True, returns the raw dict response instead of a MistralResponse model.

False
Source code in maticlib/llm/mistral/client.py
class MistralClient(BaseLLMClient):
    """
    Client for interacting with Mistral AI models.

    Inherits from BaseLLMClient and implements Mistral-specific message 
    formatting and response parsing.

    Args:
        model (str): The name of the Mistral model to use. 
            Defaults to "mistral-medium-latest".
        system_instruct (str | SystemMessage, optional): Default instructions 
            to prepend to all conversations.
        api_key (str): Your Mistral AI API key. Defaults to MISTRAL_API_KEY environment variable.
        verbose (bool): If True, prints status messages to console.
        return_raw (bool): If True, returns the raw dict response instead of a MistralResponse model.
    """
    def __init__(
        self,
        model: str = "mistral-medium-latest",
        system_instruct: str|SystemMessage|None = None,
        api_key: Optional[str] = None,
        verbose: bool = True,
        return_raw: bool = False
    ):
        api_key = api_key or os.getenv("MISTRAL_API_KEY", "")
        api_key = (api_key or "").strip()
        if not api_key:
            raise ValueError(
                "Mistral API key is missing. Please provide it via the 'api_key' "
                "argument or set the MISTRAL_API_KEY environment variable."
            )
        self.api_key = api_key
        self.model = model
        self.system_instruct = system_instruct
        self.base_url = "https://api.mistral.ai/v1"
        self.verbose = verbose
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
        self.return_raw = return_raw  # Option to return raw JSON response or Pydantic model

    def _format_messages(self, input: Union[str, List[Union[Dict, HumanMessage, SystemMessage, AIMessage]]]):
        """
        Formats various input types into the standard Mistral API message format.

        Args:
            input (str | list): A simple string, a list of message objects, 
                or a list of dictionaries with 'role' and 'content'.

        Returns:
            list: A list of dictionaries ready for the Mistral API.

        Raises:
            ValueError: If a dictionary message is missing a 'role'.
            TypeError: If input types are unsupported.
        """
        if isinstance(input, str):
            # Return a list of messages for Mistral
            return [
                {
                    "role": "user",
                    "content": input
                }
            ]

        elif isinstance(input, list):
            formatted_messages = []

            for message in input:
                # Handle dictionary format
                if isinstance(message, dict):
                    role = message.get("role")
                    content = message.get("content")

                    if role is None:
                        raise ValueError(f"Message dictionary must have 'role' key: {message}")

                    if not isinstance(content, str):
                        raise TypeError(f"Message content must be a string, got {type(content)}")

                    # Map roles to Mistral format (system, user, assistant)
                    if role in ["user", "human"]:
                        mistral_role = "user"
                    elif role in ["assistant", "ai", "model"]:
                        mistral_role = "assistant"
                    elif role == "system":
                        mistral_role = "system"
                    else:
                        mistral_role = "user"  # Default to user

                    formatted_messages.append({
                        "role": mistral_role,
                        "content": content
                    })

                # Handle message objects
                elif isinstance(message, (HumanMessage, SystemMessage)):
                    role = "system" if isinstance(message, SystemMessage) else "user"
                    formatted_messages.append({
                        "role": role,
                        "content": message.content
                    })

                elif isinstance(message, AIMessage):
                    formatted_messages.append(
                            {
                            "role": "assistant",
                            "content": message.content
                        }
                    )

                else:
                    raise TypeError(f"Unsupported message type: {type(message)}")

            return formatted_messages

        else:
            raise TypeError(f"Input must be str or list, got {type(input)}")

    def __format__system_instruction(self):
        system_instruct = self.system_instruct
        if isinstance(system_instruct, str):
            return system_instruct
        elif isinstance(system_instruct, SystemMessage):
            return system_instruct.content

    def _parse_response(self, response: httpx.Response) -> Union[MistralResponse, Dict[str, Any]]:
        """
        Parses the JSON response from Mistral into a structured model.

        Args:
            response (httpx.Response): The raw HTTP response.

        Returns:
            MistralResponse | dict: The parsed response model, or a raw dictionary 
            if `return_raw` is set to True.
        """
        response_data = response.json()

        # Add model version metadata
        response_data['modelVersion'] = self.model

        if self.return_raw:
            return response_data

        try:
            return MistralResponse(**response_data)
        except Exception as e:
            if self.verbose:
                print(f"Warning: Failed to parse response into Pydantic model: {e}")
                print("Returning raw response instead")
            return response_data

    def complete(
        self, 
        input: Union[str, List],
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None
    ) -> Union[MistralResponse, Dict[str, Any]]:
        """
        Sends a synchronous chat completion request to Mistral.

        Args:
            input (str | list): The user prompt or conversation history.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/chat/completions"

        try:
            # Inject structure instructions if requested
            input = self._inject_runtime_instructions(input, response_model)

            # Format messages
            formatted_messages = self._format_messages(input)

            payload = {
                "model": self.model,
                "messages": formatted_messages
            }

            # Handle tools
            if tools:
                payload["tools"] = self._format_tools(tools)

            # Make request
            response = httpx.post(url, headers=self.headers, json=payload, timeout=30.0)
            response.raise_for_status()

            if self.verbose:
                print(f"Status: {response.status_code}")

            # Parse and return response
            result = self._parse_response(response)
            self._apply_response_model(result, response_model)
            return result

        except httpx.HTTPStatusError as e:
            if self.verbose:
                print(f"HTTP Error: {e.response.status_code}")
                print(f"Response: {e.response.text}")
            raise
        except Exception as e:
            if self.verbose:
                import traceback
                traceback.print_exc()
            raise

    async def async_complete(
        self, 
        input: Union[str, List],
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None
    ) -> Union[MistralResponse, Dict[str, Any]]:
        """
        Sends an asynchronous chat completion request to Mistral.

        Args:
            input (str | list): The user prompt or conversation history.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/chat/completions"

        try:
            # Inject structure instructions if requested
            input = self._inject_runtime_instructions(input, response_model)

            # Format messages
            formatted_messages = self._format_messages(input)

            payload = {
                "model": self.model,
                "messages": formatted_messages
            }

            # Handle tools
            if tools:
                payload["tools"] = self._format_tools(tools)

            # Make async request
            async with httpx.AsyncClient() as client:
                response = await client.post(url, headers=self.headers, json=payload, timeout=30.0)
                response.raise_for_status()

                if self.verbose:
                    print(f"Status: {response.status_code}")

                # Parse and return response
                result = self._parse_response(response)
                self._apply_response_model(result, response_model)
                return result

        except httpx.HTTPStatusError as e:
            if self.verbose:
                print(f"HTTP Error: {e.response.status_code}")
                print(f"Response: {e.response.text}")
            raise
        except Exception as e:
            if self.verbose:
                import traceback
                traceback.print_exc()
            raise

    def _format_tools(self, tools: List[Callable]) -> List[Dict[str, Any]]:
        """Formats the list of tool functions for Mistral."""
        formatted = []
        for tool_func in tools:
            if hasattr(tool_func, "matic_tool_metadata"):
                metadata = tool_func.matic_tool_metadata
                formatted.append({
                    "type": "function",
                    "function": {
                        "name": metadata["name"],
                        "description": metadata["description"],
                        "parameters": metadata["parameters"]
                    }
                })
        return formatted

    def get_text_response(self, response: Union[MistralResponse, Dict[str, Any]]) -> str:
        """
        Extracts the primary text content from a Mistral response.

        Args:
            response (MistralResponse | dict): The response to extract from.

        Returns:
            str: The extracted text string.
        """
        if isinstance(response, MistralResponse):
            return response.content or ""

        # Handle raw dict response
        try:
            choices = response.get('choices', [])
            if choices:
                message = choices[0].get('message', {})
                content = message.get('content', '')
                return content
        except Exception:
            raise

async_complete async

async_complete(input, response_model=None, tools=None)

Sends an asynchronous chat completion request to Mistral.

Parameters:

Name Type Description Default
input str | list

The user prompt or conversation history.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/mistral/client.py
async def async_complete(
    self, 
    input: Union[str, List],
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None
) -> Union[MistralResponse, Dict[str, Any]]:
    """
    Sends an asynchronous chat completion request to Mistral.

    Args:
        input (str | list): The user prompt or conversation history.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/chat/completions"

    try:
        # Inject structure instructions if requested
        input = self._inject_runtime_instructions(input, response_model)

        # Format messages
        formatted_messages = self._format_messages(input)

        payload = {
            "model": self.model,
            "messages": formatted_messages
        }

        # Handle tools
        if tools:
            payload["tools"] = self._format_tools(tools)

        # Make async request
        async with httpx.AsyncClient() as client:
            response = await client.post(url, headers=self.headers, json=payload, timeout=30.0)
            response.raise_for_status()

            if self.verbose:
                print(f"Status: {response.status_code}")

            # Parse and return response
            result = self._parse_response(response)
            self._apply_response_model(result, response_model)
            return result

    except httpx.HTTPStatusError as e:
        if self.verbose:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
        raise
    except Exception as e:
        if self.verbose:
            import traceback
            traceback.print_exc()
        raise

complete

complete(input, response_model=None, tools=None)

Sends a synchronous chat completion request to Mistral.

Parameters:

Name Type Description Default
input str | list

The user prompt or conversation history.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/mistral/client.py
def complete(
    self, 
    input: Union[str, List],
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None
) -> Union[MistralResponse, Dict[str, Any]]:
    """
    Sends a synchronous chat completion request to Mistral.

    Args:
        input (str | list): The user prompt or conversation history.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/chat/completions"

    try:
        # Inject structure instructions if requested
        input = self._inject_runtime_instructions(input, response_model)

        # Format messages
        formatted_messages = self._format_messages(input)

        payload = {
            "model": self.model,
            "messages": formatted_messages
        }

        # Handle tools
        if tools:
            payload["tools"] = self._format_tools(tools)

        # Make request
        response = httpx.post(url, headers=self.headers, json=payload, timeout=30.0)
        response.raise_for_status()

        if self.verbose:
            print(f"Status: {response.status_code}")

        # Parse and return response
        result = self._parse_response(response)
        self._apply_response_model(result, response_model)
        return result

    except httpx.HTTPStatusError as e:
        if self.verbose:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
        raise
    except Exception as e:
        if self.verbose:
            import traceback
            traceback.print_exc()
        raise

get_text_response

get_text_response(response)

Extracts the primary text content from a Mistral response.

Parameters:

Name Type Description Default
response MistralResponse | dict

The response to extract from.

required

Returns:

Name Type Description
str str

The extracted text string.

Source code in maticlib/llm/mistral/client.py
def get_text_response(self, response: Union[MistralResponse, Dict[str, Any]]) -> str:
    """
    Extracts the primary text content from a Mistral response.

    Args:
        response (MistralResponse | dict): The response to extract from.

    Returns:
        str: The extracted text string.
    """
    if isinstance(response, MistralResponse):
        return response.content or ""

    # Handle raw dict response
    try:
        choices = response.get('choices', [])
        if choices:
            message = choices[0].get('message', {})
            content = message.get('content', '')
            return content
    except Exception:
        raise

Google GenAI Client

maticlib.llm.google_genai.client.GoogleGenAIClient

Bases: BaseLLMClient

Client for interacting with Google's Generative AI (Gemini) models.

Inherits from BaseLLMClient and implements Gemini-specific message formatting and response parsing.

Parameters:

Name Type Description Default
model str

The name of the Gemini model to use. Defaults to "gemini-2.5-flash".

'gemini-2.5-flash-lite'
system_instruct str | SystemMessage

Default instructions to prepend to all conversations.

None
api_key str

Your Google AI API key.

None
thinking_budget int

Optional token budget for model reasoning/thinking.

0
verbose bool

If True, prints status messages to console.

True
return_raw bool

If True, returns the raw dict response instead of a GeminiResponse model.

False
Source code in maticlib/llm/google_genai/client.py
class GoogleGenAIClient(BaseLLMClient):
    """
    Client for interacting with Google's Generative AI (Gemini) models.

    Inherits from BaseLLMClient and implements Gemini-specific message 
    formatting and response parsing.

    Args:
        model (str): The name of the Gemini model to use. 
            Defaults to "gemini-2.5-flash".
        system_instruct (str | SystemMessage, optional): Default instructions 
            to prepend to all conversations.
        api_key (str): Your Google AI API key.
        thinking_budget (int): Optional token budget for model reasoning/thinking.
        verbose (bool): If True, prints status messages to console.
        return_raw (bool): If True, returns the raw dict response instead of a GeminiResponse model.
    """
    def __init__(
        self,
        model: str = "gemini-2.5-flash-lite",
        system_instruct: str|SystemMessage|None = None,
        api_key: Optional[str] = None,
        thinking_budget: int = 0,
        verbose: bool = True,
        return_raw: bool = False
    ):
        api_key = api_key or os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY") or ""
        api_key = (api_key or "").strip()
        if not api_key:
            raise ValueError(
                "Google Gemini API key is missing. Please provide it via the 'api_key' "
                "argument or set the GOOGLE_API_KEY environment variable."
            )
        self.api_key = api_key
        self.model = model
        self.system_instruct = system_instruct
        self.base_url = "https://generativelanguage.googleapis.com/v1beta"
        self.verbose = verbose
        self.headers = {
            "x-goog-api-key": self.api_key,
            "Content-Type": "application/json"
        }
        self.thinking_budget = thinking_budget
        self.return_raw = return_raw  # Option to return raw JSON response or Pydantic model

    def _format_messages(self, input: Union[str, List[Union[Dict, HumanMessage, SystemMessage, AIMessage]]]):
        """
        Formats various input types into the standard Gemini API content format.

        Args:
            input (str | list): A simple string, a list of message objects, 
                or a list of dictionaries with 'role' and 'content'.

        Returns:
            list: A list of dictionaries ready for the Gemini 'contents' payload.

        Raises:
            ValueError: If a dictionary message is missing a 'role'.
            TypeError: If input types are unsupported.
        """
        if isinstance(input, str):
            # Return a list of messages, NOT wrapped in "contents"
            return [
                {
                    "parts": [{"text": input}]
                }
            ]

        elif isinstance(input, list):
            formatted_messages = []

            for message in input:
                # Handle dictionary format
                if isinstance(message, dict):
                    role = message.get("role")
                    content = message.get("content")

                    if role is None:
                        raise ValueError(f"Message dictionary must have 'role' key: {message}")

                    if not isinstance(content, str):
                        raise TypeError(f"Message content must be a string, got {type(content)}")

                    # Map roles to Gemini format
                    if role in ["user", "system"]:
                        gemini_role = "user"
                    elif role in ["assistant", "model"]:
                        gemini_role = "model"
                    else:
                        gemini_role = "user"  # Default to user

                    formatted_messages.append({
                        "role": gemini_role,
                        "parts": [{"text": content}]
                    })

                # Handle message objects
                elif isinstance(message, (HumanMessage, SystemMessage)):
                    formatted_messages.append({
                        "role": "user",
                        "parts": [{"text": message.content}]
                    })

                elif isinstance(message, AIMessage):
                    formatted_messages.append({
                        "role": "model",
                        "parts": [{"text": message.content}]
                    })

                else:
                    raise TypeError(f"Unsupported message type: {type(message)}")

            return formatted_messages

        else:
            raise TypeError(f"Input must be str or list, got {type(input)}")

    def _format__system_instruction(self):
        system_instruct = self.system_instruct
        if isinstance(system_instruct, str):
            return system_instruct
        elif isinstance(system_instruct, SystemMessage):
            return system_instruct.content
        return None

    def _parse_response(self, response: httpx.Response) -> Union[GeminiResponse, Dict[str, Any]]:
        """
        Parses the JSON response from Gemini into a structured model.

        Args:
            response (httpx.Response): The raw HTTP response.

        Returns:
            GeminiResponse | dict: The parsed response model, or a raw dictionary 
            if `return_raw` is set to True.
        """
        response_data = response.json()

        # Add metadata that might not be in the response
        response_data['responseId'] = response.headers.get('X-Response-Id', 'unknown')
        response_data['modelVersion'] = self.model

        if self.return_raw:
            return response_data


        try:
            return GeminiResponse(**response_data)
        except Exception as e:
            if self.verbose:
                print(f"Warning: Failed to parse response into Pydantic model: {e}")
                print("Returning raw response instead")
            return response_data

    def complete(
        self, 
        input: Union[str, List],
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None
    ) -> Union[GeminiResponse, Dict[str, Any]]:
        """
        Sends a synchronous generation request to Gemini.

        Args:
            input (str | list): The user prompt or conversation history.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/models/{self.model}:generateContent"

        try:
            # Inject structure instructions if requested
            input = self._inject_runtime_instructions(input, response_model)

            # Format messages
            formatted_messages = self._format_messages(input)

            payload = {}

            # Handle tools
            if tools:
                payload["tools"] = self._format_tools(tools)

            if self.system_instruct:
                self.system_instruct = self._format__system_instruction()

                payload["system_instruction"] = {
                    "parts": [
                        {
                        "text": self.system_instruct
                        }
                    ]
                }

            payload["contents"] = formatted_messages

            # Add thinking budget if configured
            if self.thinking_budget > 0:
                payload["generationConfig"] = {
                    "thinkingBudget": self.thinking_budget
                }

            # Make request
            response = httpx.post(url, headers=self.headers, json=payload, timeout=60.0)
            response.raise_for_status()

            if self.verbose:
                print(f"Status: {response.status_code}")  

            result = self._parse_response(response)
            self._apply_response_model(result, response_model)
            return result

        except httpx.HTTPStatusError as e:
            if self.verbose:
                print(f"HTTP Error: {e.response.status_code}")
                print(f"Response: {e.response.text}")
            raise
        except Exception as e:
            if self.verbose:
                import traceback
                traceback.print_exc()
            raise

    async def async_complete(
        self, 
        input: str,
        response_model: Optional[Type[BaseModel]] = None,
        tools: Optional[List[Callable]] = None
    ) -> Union[GeminiResponse, Dict[str, Any]]:
        """
        Sends an asynchronous generation request to Gemini.

        Args:
            input (str): The text input to send to the model.
            response_model (Type[BaseModel], optional): A Pydantic model to 
                parse the output into.
            tools (list, optional): A list of tool functions decorated with @tool.
        """
        url = f"{self.base_url}/models/{self.model}:generateContent"

        # Inject structure instructions if requested
        input = self._inject_runtime_instructions(input, response_model)

        formatted_messages = self._format_messages(input=input)

        payload = {}

        # Handle tools
        if tools:
            payload["tools"] = self._format_tools(tools)

        if self.system_instruct:
            self.system_instruct = self._format__system_instruction()

            payload["system_instruction"] = {
                "parts": [
                    {
                    "text": self.system_instruct
                    }
                ]
            }

        payload["contents"] = formatted_messages

        if self.thinking_budget > 0:
            payload["generationConfig"] = {
                "thinkingBudget": self.thinking_budget
            }

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(url, headers=self.headers, json=payload, timeout=60.0)
                response.raise_for_status()
            if self.verbose:
                print(f"Status: {response.status_code}")

            result = self._parse_response(response=response)
            self._apply_response_model(result, response_model)
            return result
        except Exception as e:
            import traceback
            traceback.print_exc()
            raise
        finally:
            await client.aclose()

    def _format_tools(self, tools: List[Callable]) -> List[Dict[str, Any]]:
        """Formats the list of tool functions for Gemini."""
        declarations = []
        for tool_func in tools:
            if hasattr(tool_func, "matic_tool_metadata"):
                metadata = tool_func.matic_tool_metadata
                declarations.append({
                    "name": metadata["name"],
                    "description": metadata["description"],
                    "parameters": metadata["parameters"]
                })
        return [{"function_declarations": declarations}]

    def get_text_response(self, response: Union[GeminiResponse, Dict[str, Any]]) -> str:
        """
        Extracts the primary text content from a Gemini response.

        Args:
            response (GeminiResponse | dict): The response to extract from.

        Returns:
            str: The extracted text string.
        """
        if isinstance(response, GeminiResponse):
            return response.content or ""

        # Handle raw dict response
        try:
            candidates = response.get('candidates', [])
            if candidates:
                parts = candidates[0].get('content', {}).get('parts', [])
                texts = [part.get('text', '') for part in parts if 'text' in part]
                return ' '.join(texts)
        except Exception:
            raise

async_complete async

async_complete(input, response_model=None, tools=None)

Sends an asynchronous generation request to Gemini.

Parameters:

Name Type Description Default
input str

The text input to send to the model.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/google_genai/client.py
async def async_complete(
    self, 
    input: str,
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None
) -> Union[GeminiResponse, Dict[str, Any]]:
    """
    Sends an asynchronous generation request to Gemini.

    Args:
        input (str): The text input to send to the model.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/models/{self.model}:generateContent"

    # Inject structure instructions if requested
    input = self._inject_runtime_instructions(input, response_model)

    formatted_messages = self._format_messages(input=input)

    payload = {}

    # Handle tools
    if tools:
        payload["tools"] = self._format_tools(tools)

    if self.system_instruct:
        self.system_instruct = self._format__system_instruction()

        payload["system_instruction"] = {
            "parts": [
                {
                "text": self.system_instruct
                }
            ]
        }

    payload["contents"] = formatted_messages

    if self.thinking_budget > 0:
        payload["generationConfig"] = {
            "thinkingBudget": self.thinking_budget
        }

    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(url, headers=self.headers, json=payload, timeout=60.0)
            response.raise_for_status()
        if self.verbose:
            print(f"Status: {response.status_code}")

        result = self._parse_response(response=response)
        self._apply_response_model(result, response_model)
        return result
    except Exception as e:
        import traceback
        traceback.print_exc()
        raise
    finally:
        await client.aclose()

complete

complete(input, response_model=None, tools=None)

Sends a synchronous generation request to Gemini.

Parameters:

Name Type Description Default
input str | list

The user prompt or conversation history.

required
response_model Type[BaseModel]

A Pydantic model to parse the output into.

None
tools list

A list of tool functions decorated with @tool.

None
Source code in maticlib/llm/google_genai/client.py
def complete(
    self, 
    input: Union[str, List],
    response_model: Optional[Type[BaseModel]] = None,
    tools: Optional[List[Callable]] = None
) -> Union[GeminiResponse, Dict[str, Any]]:
    """
    Sends a synchronous generation request to Gemini.

    Args:
        input (str | list): The user prompt or conversation history.
        response_model (Type[BaseModel], optional): A Pydantic model to 
            parse the output into.
        tools (list, optional): A list of tool functions decorated with @tool.
    """
    url = f"{self.base_url}/models/{self.model}:generateContent"

    try:
        # Inject structure instructions if requested
        input = self._inject_runtime_instructions(input, response_model)

        # Format messages
        formatted_messages = self._format_messages(input)

        payload = {}

        # Handle tools
        if tools:
            payload["tools"] = self._format_tools(tools)

        if self.system_instruct:
            self.system_instruct = self._format__system_instruction()

            payload["system_instruction"] = {
                "parts": [
                    {
                    "text": self.system_instruct
                    }
                ]
            }

        payload["contents"] = formatted_messages

        # Add thinking budget if configured
        if self.thinking_budget > 0:
            payload["generationConfig"] = {
                "thinkingBudget": self.thinking_budget
            }

        # Make request
        response = httpx.post(url, headers=self.headers, json=payload, timeout=60.0)
        response.raise_for_status()

        if self.verbose:
            print(f"Status: {response.status_code}")  

        result = self._parse_response(response)
        self._apply_response_model(result, response_model)
        return result

    except httpx.HTTPStatusError as e:
        if self.verbose:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
        raise
    except Exception as e:
        if self.verbose:
            import traceback
            traceback.print_exc()
        raise

get_text_response

get_text_response(response)

Extracts the primary text content from a Gemini response.

Parameters:

Name Type Description Default
response GeminiResponse | dict

The response to extract from.

required

Returns:

Name Type Description
str str

The extracted text string.

Source code in maticlib/llm/google_genai/client.py
def get_text_response(self, response: Union[GeminiResponse, Dict[str, Any]]) -> str:
    """
    Extracts the primary text content from a Gemini response.

    Args:
        response (GeminiResponse | dict): The response to extract from.

    Returns:
        str: The extracted text string.
    """
    if isinstance(response, GeminiResponse):
        return response.content or ""

    # Handle raw dict response
    try:
        candidates = response.get('candidates', [])
        if candidates:
            parts = candidates[0].get('content', {}).get('parts', [])
            texts = [part.get('text', '') for part in parts if 'text' in part]
            return ' '.join(texts)
    except Exception:
        raise

Response Models

Standardized models used to ensure consistency across providers.

OpenAI Response

maticlib.llm.openai.openai_classes.OpenAIResponse

Bases: LLMResponseBase

OpenAI Responses API response (/v1/responses).

Maps the raw JSON payload onto the shared LLMResponseBase interface so callers can use response.content and response.content_parts the same way they would with MistralResponse or GeminiResponse.

Common (inherited) fields populated automatically: content -- concatenated text from all output_text parts content_parts -- one ContentPart per output_text chunk prompt_tokens -- mapped from usage.input_tokens completion_tokens-- mapped from usage.output_tokens total_tokens -- mapped from usage.total_tokens finish_reason -- mapped from first output item's status response_id -- mapped from top-level id raw_response -- original JSON dict

OpenAI-specific fields

id (str): Response ID (resp_...). object (str): Always "response". created_at (int): Unix timestamp of creation. status (str): Response-level status (completed, failed, ...). output (List[OpenAIOutputMessage]): Ordered list of output items. usage (OpenAIUsage): Detailed token-usage breakdown. model_version (str): Model string echoed back by OpenAI.

Source code in maticlib/llm/openai/openai_classes.py
class OpenAIResponse(LLMResponseBase):
    """
    OpenAI Responses API response (/v1/responses).

    Maps the raw JSON payload onto the shared ``LLMResponseBase`` interface
    so callers can use ``response.content`` and ``response.content_parts``
    the same way they would with ``MistralResponse`` or ``GeminiResponse``.

    Common (inherited) fields populated automatically:
        content          -- concatenated text from all output_text parts
        content_parts    -- one ContentPart per output_text chunk
        prompt_tokens    -- mapped from usage.input_tokens
        completion_tokens-- mapped from usage.output_tokens
        total_tokens     -- mapped from usage.total_tokens
        finish_reason    -- mapped from first output item's status
        response_id      -- mapped from top-level id
        raw_response     -- original JSON dict

    OpenAI-specific fields:
        id (str): Response ID (resp_...).
        object (str): Always "response".
        created_at (int): Unix timestamp of creation.
        status (str): Response-level status (completed, failed, ...).
        output (List[OpenAIOutputMessage]): Ordered list of output items.
        usage (OpenAIUsage): Detailed token-usage breakdown.
        model_version (str): Model string echoed back by OpenAI.
    """

    # ---- OpenAI-specific top-level fields ----
    id: str = Field(..., description="Response ID (resp_...)")
    object: str = Field(..., description="Object type, always 'response'")
    created_at: int = Field(..., description="Unix timestamp of creation")
    status: str = Field(..., description="Top-level response status")
    output: List[OpenAIOutputMessage] = Field(
        default_factory=list, description="Ordered output items"
    )
    usage: Optional[OpenAIUsage] = Field(None, description="Detailed token usage")
    model_version: Optional[str] = Field(
        None, description="Model string as returned by OpenAI"
    )

    def __init__(self, **data: Any) -> None:
        # ------------------------------------------------------------------
        # 1. Walk every output item and extract text into content / content_parts
        # ------------------------------------------------------------------
        text_parts: List[str] = []
        content_parts: List[ContentPart] = []

        for item in data.get("output", []):
            if not isinstance(item, dict):
                continue
            for part in item.get("content", []):
                if not isinstance(part, dict):
                    continue
                if part.get("type") == "output_text" and part.get("text"):
                    text_parts.append(part["text"])
                    content_parts.append(
                        ContentPart(type=ModalityType.TEXT, text=part["text"])
                    )

        if text_parts:
            data["content"] = " ".join(text_parts)
        if content_parts:
            data["content_parts"] = content_parts

        # ------------------------------------------------------------------
        # 2. Map usage onto the shared LLMResponseBase token fields
        # ------------------------------------------------------------------
        usage_raw = data.get("usage") or {}
        if isinstance(usage_raw, dict):
            data["prompt_tokens"] = usage_raw.get("input_tokens")
            data["completion_tokens"] = usage_raw.get("output_tokens")
            data["total_tokens"] = usage_raw.get("total_tokens")

        # ------------------------------------------------------------------
        # 3. finish_reason -- use the status of the first output item
        # ------------------------------------------------------------------
        output_list = data.get("output") or []
        if output_list and isinstance(output_list[0], dict):
            data["finish_reason"] = output_list[0].get("status")

        # ------------------------------------------------------------------
        # 5. Extract tool calls from output items
        # ------------------------------------------------------------------
        tool_calls: List[Dict[str, Any]] = []
        for item in data.get("output", []):
            if not isinstance(item, dict):
                continue
            if item.get("type") == "call_tool":
                tool_calls.append({
                    "id": item.get("id"),
                    "type": "function",
                    "function": {
                        "name": item.get("call_tool", {}).get("name"),
                        "arguments": item.get("call_tool", {}).get("arguments")
                    }
                })
        if tool_calls:
            data["tool_calls"] = tool_calls

        # Standardise response identifiers
        data["response_id"] = data.get("id")
        data["model_version"] = data.get("model")

        # Preserve raw JSON before super().__init__ may alter data
        data["raw_response"] = data.copy()

        super().__init__(**data)

    # ------------------------------------------------------------------
    # Convenience properties for OpenAI-specific metadata
    # ------------------------------------------------------------------

    @computed_field
    @property
    def timestamp(self) -> datetime:
        """Converts the ``created_at`` Unix timestamp into a ``datetime`` object."""
        return datetime.fromtimestamp(self.created_at)

    @property
    def reasoning_tokens(self) -> Optional[int]:
        """
        Tokens used for internal model reasoning (o-series models only).

        Returns ``None`` for standard GPT models that do not expose
        reasoning-token counts.
        """
        if self.usage and self.usage.output_tokens_details:
            return self.usage.output_tokens_details.reasoning_tokens
        return None

    @property
    def cached_tokens(self) -> Optional[int]:
        """
        Input tokens served from the prompt cache.

        A non-zero value means the model reused previously computed KV-cache
        entries, which are billed at a reduced rate.
        """
        if self.usage and self.usage.input_tokens_details:
            return self.usage.input_tokens_details.cached_tokens
        return None

cached_tokens property

cached_tokens

Input tokens served from the prompt cache.

A non-zero value means the model reused previously computed KV-cache entries, which are billed at a reduced rate.

reasoning_tokens property

reasoning_tokens

Tokens used for internal model reasoning (o-series models only).

Returns None for standard GPT models that do not expose reasoning-token counts.

timestamp property

timestamp

Converts the created_at Unix timestamp into a datetime object.

Mistral Response

maticlib.llm.mistral.mistral_classes.MistralResponse

Bases: LLMResponseBase

Mistral-specific response structure.

Supports both text-only and multimodal (Pixtral) models. Inherits from LLMResponseBase and adds Mistral-specific fields.

Source code in maticlib/llm/mistral/mistral_classes.py
class MistralResponse(LLMResponseBase):
    """
    Mistral-specific response structure.

    Supports both text-only and multimodal (Pixtral) models.
    Inherits from LLMResponseBase and adds Mistral-specific fields.
    """

    id: str = Field(..., description="Unique identifier for the Mistral response")
    created: int = Field(..., description="Unix timestamp of creation")
    object: str = Field(..., description="Object type (e.g., 'chat.completion')")
    choices: List[MistralChoice] = Field(..., description="List of completion choices")
    usage: MistralUsage = Field(..., description="Token usage information")

    def __init__(self, **data):
        # Extract common fields from Mistral structure
        if 'choices' in data and len(data['choices']) > 0:
            first_choice = data['choices'][0]
            message = first_choice.get('message', {})
            content = message.get('content')

            # Handle multimodal content (list of parts) or text-only (string)
            if isinstance(content, list):
                # Multimodal response with parts
                content_parts = []
                text_parts = []
                for part in content:
                    if isinstance(part, dict):
                        content_part = ContentPart(
                            type=ModalityType(part.get('type', 'text')),
                            text=part.get('text'),
                            image_url=part.get('image_url'),
                        )
                        content_parts.append(content_part)
                        if part.get('text'):
                            text_parts.append(part['text'])

                data['content_parts'] = content_parts
                data['content'] = ' '.join(text_parts) if text_parts else None
            else:
                # Simple text response
                data['content'] = content
                if content:
                    data['content_parts'] = [ContentPart(type=ModalityType.TEXT, text=content)]

            data['finish_reason'] = first_choice.get('finish_reason')

            # Extract tool calls
            tool_calls = message.get('tool_calls')
            if tool_calls:
                data['tool_calls'] = tool_calls

        # Extract token usage
        if 'usage' in data:
            usage = data['usage']
            data['prompt_tokens'] = usage.get('prompt_tokens')
            data['completion_tokens'] = usage.get('completion_tokens')
            data['total_tokens'] = usage.get('total_tokens')
            data['image_tokens'] = usage.get('image_tokens')
            data['audio_tokens'] = usage.get('audio_tokens')
            data['video_tokens'] = usage.get('video_tokens')

        # Set response_id and model
        data['response_id'] = data.get('id')

        # Store raw response
        data['raw_response'] = data.copy()

        super().__init__(**data)

    @computed_field
    @property
    def timestamp(self) -> datetime:
        """Convert Unix timestamp to datetime"""
        return datetime.fromtimestamp(self.created)

timestamp property

timestamp

Convert Unix timestamp to datetime

Gemini Response

maticlib.llm.google_genai.gemini_classes.GeminiResponse

Bases: LLMResponseBase

Gemini-specific response structure.

Supports multimodal inputs (text, image, audio, video) and outputs. Inherits from LLMResponseBase and adds Gemini-specific fields.

Source code in maticlib/llm/google_genai/gemini_classes.py
class GeminiResponse(LLMResponseBase):
    """
    Gemini-specific response structure.

    Supports multimodal inputs (text, image, audio, video) and outputs.
    Inherits from LLMResponseBase and adds Gemini-specific fields.
    """

    responseId: str = Field(..., description="Unique identifier for the Gemini response")
    modelVersion: str = Field(..., description="Gemini model version")
    candidates: List[GeminiCandidate] = Field(..., description="List of candidate responses")
    usageMetadata: GeminiUsageMetadata = Field(..., description="Token usage metadata")

    def __init__(self, **data):
        # Extract common fields from Gemini structure
        if 'candidates' in data and len(data['candidates']) > 0:
            first_candidate = data['candidates'][0]
            parts = first_candidate.get('content', {}).get('parts', [])

            if parts:
                content_parts = []
                text_parts = []

                for part in parts:
                    if isinstance(part, dict):
                        # Determine modality type
                        modality = ModalityType.TEXT
                        content_part = ContentPart(type=modality)

                        # Extract text
                        if part.get('text'):
                            text_parts.append(part['text'])
                            content_part.text = part['text']

                        # Extract inline data (images, audio, etc.)
                        if part.get('inline_data'):
                            inline = part['inline_data']
                            mime_type = inline.get('mime_type', '')
                            content_part.inline_data = inline

                            if 'image' in mime_type:
                                modality = ModalityType.IMAGE
                            elif 'audio' in mime_type:
                                modality = ModalityType.AUDIO
                            elif 'video' in mime_type:
                                modality = ModalityType.VIDEO

                        # Extract file data
                        if part.get('file_data'):
                            file_data = part['file_data']
                            mime_type = file_data.get('mime_type', '')

                            if 'image' in mime_type:
                                modality = ModalityType.IMAGE
                                content_part.image_url = file_data.get('file_uri')
                            elif 'audio' in mime_type:
                                modality = ModalityType.AUDIO
                                content_part.audio_url = file_data.get('file_uri')
                            elif 'video' in mime_type:
                                modality = ModalityType.VIDEO
                                content_part.video_url = file_data.get('file_uri')

                        # Extract function calls (tools)
                        if part.get('functionCall'):
                            call = part['functionCall']
                            if 'tool_calls' not in data:
                                data['tool_calls'] = []
                            data['tool_calls'].append({
                                "id": None, # Gemini doesn't always provide a call ID in the same way
                                "type": "function",
                                "function": {
                                    "name": call.get("name"),
                                    "arguments": call.get("args") # Gemini returns args as dict, not JSON string
                                }
                            })

                        content_part.type = modality
                        content_parts.append(content_part)

                data['content_parts'] = content_parts
                data['content'] = ' '.join(text_parts) if text_parts else None

            data['finish_reason'] = first_candidate.get('finishReason')

        # Extract token usage with multimodal support
        if 'usageMetadata' in data:
            usage = data['usageMetadata']
            data['prompt_tokens'] = usage.get('promptTokenCount')
            data['completion_tokens'] = usage.get('candidatesTokenCount')
            data['total_tokens'] = usage.get('totalTokenCount')

            # Parse modality-specific tokens from promptTokensDetails
            if usage.get('promptTokensDetails'):
                for detail in usage['promptTokensDetails']:
                    modality = detail.get('modality', '').lower()
                    token_count = detail.get('tokenCount', 0)

                    if 'image' in modality:
                        data['image_tokens'] = token_count
                    elif 'audio' in modality:
                        data['audio_tokens'] = token_count
                    elif 'video' in modality:
                        data['video_tokens'] = token_count

        # Set response_id and model
        data['response_id'] = data.get('responseId')
        data['model'] = data.get('modelVersion', 'gemini')

        # Store raw response
        data['raw_response'] = data.copy()

        super().__init__(**data)

    @property
    def thoughts_token_count(self) -> Optional[int]:
        """Get the thoughts token count if available (Gemini-specific)"""
        return self.usageMetadata.thoughtsTokenCount

    @property
    def cached_token_count(self) -> Optional[int]:
        """Get cached content token count (Gemini context caching)"""
        return self.usageMetadata.cachedContentTokenCount

cached_token_count property

cached_token_count

Get cached content token count (Gemini context caching)

thoughts_token_count property

thoughts_token_count

Get the thoughts token count if available (Gemini-specific)