Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.5.26"
version = "0.5.27"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
36 changes: 32 additions & 4 deletions src/uipath_langchain/chat/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from uipath._utils import resource_override
from uipath.utils import EndpointManager

from .header_capture import HeaderCapture
from .retryers.bedrock import AsyncBedrockRetryer, BedrockRetryer
from .supported_models import BedrockModels
from .types import APIFlavor, LLMProvider
Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(
api_flavor: str,
agenthub_config: Optional[str] = None,
byo_connection_id: Optional[str] = None,
header_capture: HeaderCapture | None = None,
):
self.model = model
self.token = token
Expand All @@ -70,6 +72,7 @@ def __init__(
self.byo_connection_id = byo_connection_id
self._vendor = "awsbedrock"
self._url: Optional[str] = None
self.header_capture = header_capture

@property
def endpoint(self) -> str:
Expand All @@ -91,6 +94,12 @@ def _build_base_url(self) -> str:

return self._url

def _capture_response_headers(self, parsed, model, **kwargs):
if "ResponseMetadata" in parsed:
headers = parsed["ResponseMetadata"].get("HTTPHeaders", {})
if self.header_capture:
self.header_capture.set(dict(headers))

def get_client(self):
client = boto3.client(
"bedrock-runtime",
Expand All @@ -106,6 +115,9 @@ def get_client(self):
client.meta.events.register(
"before-send.bedrock-runtime.*", self._modify_request
)
client.meta.events.register(
"after-call.bedrock-runtime.*", self._capture_response_headers
)
return client

def _modify_request(self, request, **kwargs):
Expand Down Expand Up @@ -203,6 +215,7 @@ class UiPathChatBedrock(ChatBedrock):
model: str = "" # For tracing serialization
retryer: Optional[Retrying] = None
aretryer: Optional[AsyncRetrying] = None
header_capture: HeaderCapture

def __init__(
self,
Expand Down Expand Up @@ -233,17 +246,21 @@ def __init__(
"UIPATH_ACCESS_TOKEN environment variable or token parameter is required"
)

header_capture = HeaderCapture(name=f"bedrock_headers_{id(self)}")

passthrough_client = AwsBedrockCompletionsPassthroughClient(
model=model_name,
token=token,
api_flavor="invoke",
agenthub_config=agenthub_config,
byo_connection_id=byo_connection_id,
header_capture=header_capture,
)

client = passthrough_client.get_client()
kwargs["client"] = client
kwargs["model"] = model_name
kwargs["header_capture"] = header_capture
super().__init__(**kwargs)
self.model = model_name
self.retryer = retryer
Expand Down Expand Up @@ -297,7 +314,15 @@ def _generate(
**kwargs: Any,
) -> ChatResult:
messages = self._convert_file_blocks_to_anthropic_documents(messages)
return super()._generate(messages, stop=stop, run_manager=run_manager, **kwargs)
result = super()._generate(
messages,
stop=stop,
run_manager=run_manager,
**kwargs,
)
self.header_capture.attach_to_chat_result(result)
self.header_capture.clear()
return result

def _stream(
self,
Expand All @@ -307,9 +332,12 @@ def _stream(
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
messages = self._convert_file_blocks_to_anthropic_documents(messages)
yield from super()._stream(
messages, stop=stop, run_manager=run_manager, **kwargs
)
chunks = super()._stream(messages, stop=stop, run_manager=run_manager, **kwargs)

for chunk in chunks:
self.header_capture.attach_to_chat_generation(chunk)
yield chunk
self.header_capture.clear()


def _get_default_retryer() -> BedrockRetryer:
Expand Down
9 changes: 9 additions & 0 deletions src/uipath_langchain/chat/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import StrEnum


class LlmGatewayHeaders(StrEnum):
"""LLM Gateway headers."""

IS_BYO_EXECUTION = "x-uipath-llmgateway-isbyoexecution"
EXECUTION_DEPLOYMENT_TYPE = "x-uipath-llmgateway-executiondeploymenttype"
IS_PII_MASKED = "x-uipath-llmgateway-ispiimasked"
41 changes: 41 additions & 0 deletions src/uipath_langchain/chat/header_capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from contextvars import ContextVar
from typing import Optional

from langchain_core.outputs import ChatGeneration, ChatResult


class HeaderCapture:
"""Captures HTTP response headers and applies them to LangChain generations."""

def __init__(self, name: str = "response_headers"):
"""Initialize with a new context var.

Args:
name: Name for the context var."""
self._headers: ContextVar[Optional[dict[str, str]]] = ContextVar(
name, default=None
)

def set(self, headers: dict[str, str]) -> None:
"""Store headers in this instance's context var."""
self._headers.set(headers)

def clear(self) -> None:
"""Clear stored headers from this instance's context var."""
self._headers.set(None)

def attach_to_chat_generation(
self, generation: ChatGeneration, metadata_key: str = "headers"
) -> None:
"""Attach captured headers to the generation message's response_metadata."""
headers = self._headers.get()
if headers:
generation.message.response_metadata[metadata_key] = headers

def attach_to_chat_result(
self, result: ChatResult, metadata_key: str = "headers"
) -> ChatResult:
"""Attach captured headers to the message response_metadata of each generation."""
for generation in result.generations:
self.attach_to_chat_generation(generation, metadata_key)
return result
1 change: 1 addition & 0 deletions src/uipath_langchain/chat/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def __init__(
api_version=api_version,
validate_base_url=False,
use_responses_api=use_responses_api,
include_response_headers=True,
**kwargs,
)

Expand Down
84 changes: 75 additions & 9 deletions src/uipath_langchain/chat/vertex.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any, Optional

import httpx
Expand All @@ -8,12 +9,13 @@
CallbackManagerForLLMRun,
)
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatResult
from langchain_core.outputs import ChatGenerationChunk, ChatResult
from tenacity import AsyncRetrying, Retrying
from uipath._utils import resource_override
from uipath._utils._ssl_context import get_httpx_client_kwargs
from uipath.utils import EndpointManager

from .header_capture import HeaderCapture
from .retryers.vertex import AsyncVertexRetryer, VertexRetryer
from .supported_models import GeminiModels
from .types import APIFlavor, LLMProvider
Expand Down Expand Up @@ -70,9 +72,15 @@ def _rewrite_vertex_url(original_url: str, gateway_url: str) -> httpx.URL | None
class _UrlRewriteTransport(httpx.HTTPTransport):
"""Transport that rewrites URLs to redirect to UiPath gateway."""

def __init__(self, gateway_url: str, verify: bool = True):
def __init__(
self,
gateway_url: str,
verify: bool = True,
header_capture: HeaderCapture | None = None,
):
super().__init__(verify=verify)
self.gateway_url = gateway_url
self.header_capture = header_capture

def handle_request(self, request: httpx.Request) -> httpx.Response:
original_url = str(request.url)
Expand All @@ -86,15 +94,26 @@ def handle_request(self, request: httpx.Request) -> httpx.Response:
# Update host header to match the new URL
request.headers["host"] = new_url.host
request.url = new_url
return super().handle_request(request)

response = super().handle_request(request)
if self.header_capture:
self.header_capture.set(dict(response.headers))

return response


class _AsyncUrlRewriteTransport(httpx.AsyncHTTPTransport):
"""Async transport that rewrites URLs to redirect to UiPath gateway."""

def __init__(self, gateway_url: str, verify: bool = True):
def __init__(
self,
gateway_url: str,
verify: bool = True,
header_capture: HeaderCapture | None = None,
):
super().__init__(verify=verify)
self.gateway_url = gateway_url
self.header_capture = header_capture

async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
original_url = str(request.url)
Expand All @@ -108,7 +127,12 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
# Update host header to match the new URL
request.headers["host"] = new_url.host
request.url = new_url
return await super().handle_async_request(request)

response = await super().handle_async_request(request)
if self.header_capture:
self.header_capture.set(dict(response.headers))

return response


class UiPathChatVertex(ChatGoogleGenerativeAI):
Expand Down Expand Up @@ -162,17 +186,22 @@ def __init__(
uipath_url = self._build_base_url(model_name)
headers = self._build_headers(token, agenthub_config, byo_connection_id)

header_capture = HeaderCapture(name=f"vertex_headers_{id(self)}")
client_kwargs = get_httpx_client_kwargs()
verify = client_kwargs.get("verify", True)

http_options = genai_types.HttpOptions(
httpx_client=httpx.Client(
transport=_UrlRewriteTransport(uipath_url, verify=verify),
transport=_UrlRewriteTransport(
uipath_url, verify=verify, header_capture=header_capture
),
headers=headers,
**client_kwargs,
),
httpx_async_client=httpx.AsyncClient(
transport=_AsyncUrlRewriteTransport(uipath_url, verify=verify),
transport=_AsyncUrlRewriteTransport(
uipath_url, verify=verify, header_capture=header_capture
),
headers=headers,
**client_kwargs,
),
Expand Down Expand Up @@ -205,6 +234,7 @@ def __init__(
self._byo_connection_id = byo_connection_id
self._retryer = retryer
self._aretryer = aretryer
self._header_capture = header_capture

if self.temperature is not None and not 0 <= self.temperature <= 2.0:
raise ValueError("temperature must be in the range [0.0, 2.0]")
Expand Down Expand Up @@ -295,7 +325,10 @@ def _generate(
result = super()._generate(
messages, stop=stop, run_manager=run_manager, **kwargs
)
return self._merge_finish_reason_to_response_metadata(result)
result = self._merge_finish_reason_to_response_metadata(result)
self._header_capture.attach_to_chat_result(result)
self._header_capture.clear()
return result

async def _agenerate(
self,
Expand All @@ -308,7 +341,40 @@ async def _agenerate(
result = await super()._agenerate(
messages, stop=stop, run_manager=run_manager, **kwargs
)
return self._merge_finish_reason_to_response_metadata(result)
result = self._merge_finish_reason_to_response_metadata(result)
self._header_capture.attach_to_chat_result(result)
self._header_capture.clear()
return result

def _stream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
for chunk in super()._stream(
messages, stop=stop, run_manager=run_manager, **kwargs
):
self._header_capture.attach_to_chat_generation(chunk)
yield chunk

self._header_capture.clear()

async def _astream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: AsyncCallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
async for chunk in super()._astream(
messages, stop=stop, run_manager=run_manager, **kwargs
):
self._header_capture.attach_to_chat_generation(chunk)
yield chunk

self._header_capture.clear()


def _get_default_retryer() -> VertexRetryer:
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.