-
Notifications
You must be signed in to change notification settings - Fork 57
db-openai: Add AsyncDatabricksSession class to support Session Protocol for Stateful Conversation Management via SQLAlchemy engine #316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3351a09
1e7b165
9ac71d9
1433da0
d1afb16
d7c4739
5aa6d1a
bbb0b0e
e1ff83c
b9b2cf8
04b7e46
00280db
ec0c1c9
be3a9cb
6135b57
0ce9d42
62bd09f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from databricks_openai.agents.mcp_server import McpServer | ||
| from databricks_openai.agents.session import AsyncDatabricksSession | ||
|
|
||
| __all__ = ["McpServer"] | ||
| __all__ = ["AsyncDatabricksSession", "McpServer"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| """ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want this to be importable from databricks_openai.agents?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I'll make this importable so instead of: import path will look like: |
||
| AsyncDatabricksSession - Async SQLAlchemy-based session storage for Databricks Lakebase. | ||
|
|
||
| This module provides an AsyncDatabricksSession class that subclasses OpenAI's SQLAlchemySession | ||
| to provide persistent conversation history storage in Databricks Lakebase. | ||
|
|
||
| Note: | ||
| This class is **async-only** as it follows the Session Protocol. Use within async context | ||
| https://openai.github.io/openai-agents-python/ref/memory/session/#agents.memory.session.Session | ||
|
|
||
| Usage:: | ||
|
|
||
| import asyncio | ||
| from databricks_openai.agents import AsyncDatabricksSession | ||
| from agents import Agent, Runner | ||
|
|
||
|
|
||
| async def main(): | ||
| session = AsyncDatabricksSession( | ||
| session_id="user-123", | ||
| instance_name="my-lakebase-instance", | ||
| ) | ||
|
|
||
| agent = Agent(name="Assistant") | ||
| result = await Runner.run(agent, "Hello!", session=session) | ||
|
|
||
|
|
||
| asyncio.run(main()) | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| from threading import Lock | ||
| from typing import Any, Optional | ||
|
|
||
| try: | ||
| from agents.extensions.memory import SQLAlchemySession | ||
| from databricks.sdk import WorkspaceClient | ||
| from databricks_ai_bridge.lakebase import ( | ||
| DEFAULT_POOL_RECYCLE_SECONDS, | ||
| DEFAULT_TOKEN_CACHE_DURATION_SECONDS, | ||
| AsyncLakebaseSQLAlchemy, | ||
| ) | ||
|
|
||
| _session_imports_available = True | ||
| except ImportError: | ||
| SQLAlchemySession = object # type: ignore | ||
| _session_imports_available = False | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class AsyncDatabricksSession(SQLAlchemySession): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from talking to the research team working on DBRA, they actually have a very similar snippet as us to manage a SQLAlchemy connection to lakebase: https://sourcegraph.prod.databricks-corp.com/databricks-eng/universe/-/blob/research/aroll/app/aroll_app/db/connection.py?L162-182 would it make sense for us to further abstract this by providing a similar AsyncLakebaseSQLAlchemy / LakebaseSQLAlchemy class?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. discussed offline but I'll refactor such that:
this will create much cleaner separation of concerns for future frameworks to reuse any sqlalchemy engines etc! |
||
| """ | ||
| Async OpenAI Agents SDK Session implementation for Databricks Lakebase. | ||
| For more information on the Session protocol, see: | ||
| https://openai.github.io/openai-agents-python/ref/memory/session/ | ||
|
|
||
| Note: | ||
| This class is **async-only**. All session methods (get_items, add_items, | ||
| clear_session, etc.) are coroutines and must be awaited. | ||
|
|
||
| The session stores conversation history in two tables: | ||
| - agent_sessions: Tracks session metadata (session_id, created_at, updated_at) | ||
| - agent_messages: Stores conversation items (id, session_id, message_data, created_at) | ||
|
|
||
| Example: | ||
| ```python | ||
| import asyncio | ||
| from databricks_openai.agents import AsyncDatabricksSession | ||
| from agents import Agent, Runner | ||
|
|
||
|
|
||
| async def main(): | ||
| session = AsyncDatabricksSession( | ||
| session_id="user-123", | ||
| instance_name="my-lakebase-instance", | ||
| ) | ||
| agent = Agent(name="Assistant") | ||
| result = await Runner.run(agent, "Hello!", session=session) | ||
|
|
||
|
|
||
| asyncio.run(main()) | ||
| ``` | ||
| """ | ||
|
|
||
| # Class-level cache for AsyncLakebaseSQLAlchemy instances keyed by | ||
| # (instance_name, engine_kwargs). This allows multiple sessions to share | ||
| # a single engine/connection pool when the configuration is identical. | ||
| _lakebase_sql_alchemy_cache: dict[str, AsyncLakebaseSQLAlchemy] = {} | ||
| _lakebase_sql_alchemy_cache_lock = Lock() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thoughts on the class-level cache for AsyncLakebaseSQLAlchemy engines keyed by instance_name? this is so we reuse a single SQLAlchemy engine / pool per Lakebase instance, avoiding repeated pool creation, TCP handshakes, and auth setup. sessions are still created per Runner.run(), but engines are shared
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this approach looks good to me to minimize IO. two comments:
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the best case would be include engine kwargs + instance name in the cache key
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good - going to create cache key that takes into consideration both instance name + engine kwards, as well as ability to not cache the engines (but defaults to caching) |
||
|
|
||
| def __init__( | ||
| self, | ||
| session_id: str, | ||
| *, | ||
| instance_name: str, | ||
| workspace_client: Optional[WorkspaceClient] = None, | ||
| token_cache_duration_seconds: int = DEFAULT_TOKEN_CACHE_DURATION_SECONDS, | ||
| create_tables: bool = True, | ||
| sessions_table: str = "agent_sessions", | ||
| messages_table: str = "agent_messages", | ||
| use_cached_engine: bool = True, | ||
| **engine_kwargs, | ||
| ) -> None: | ||
| """ | ||
| Initialize an AsyncDatabricksSession for Databricks Lakebase. | ||
|
|
||
| Args: | ||
| session_id: Unique identifier for the conversation session. | ||
| instance_name: Name of the Lakebase instance. | ||
| workspace_client: Optional WorkspaceClient for authentication. | ||
| If not provided, a default client will be created. | ||
| token_cache_duration_seconds: How long to cache OAuth tokens. | ||
| Defaults to 15 minutes. | ||
| create_tables: Whether to auto-create tables on first use. | ||
| Defaults to True. | ||
| sessions_table: Name of the sessions table. | ||
| Defaults to "agent_sessions". | ||
| messages_table: Name of the messages table. | ||
| Defaults to "agent_messages". | ||
| use_cached_engine: Whether to reuse a cached engine for the same | ||
| instance_name and engine_kwargs combination. Set to False to | ||
| always create a new engine. Defaults to True. | ||
| **engine_kwargs: Additional keyword arguments passed to | ||
| SQLAlchemy's create_async_engine(). | ||
| """ | ||
| if not _session_imports_available: | ||
| raise ImportError( | ||
| "AsyncDatabricksSession requires databricks-openai[memory]. " | ||
| "Please install with: pip install databricks-openai[memory]" | ||
| ) | ||
|
|
||
| self._lakebase = self._get_or_create_lakebase( | ||
| instance_name=instance_name, | ||
| workspace_client=workspace_client, | ||
| token_cache_duration_seconds=token_cache_duration_seconds, | ||
| pool_recycle=engine_kwargs.pop("pool_recycle", DEFAULT_POOL_RECYCLE_SECONDS), | ||
| use_cached_engine=use_cached_engine, | ||
| **engine_kwargs, | ||
| ) | ||
|
|
||
| # Initialize parent SQLAlchemySession - inherits all SQL logic | ||
| super().__init__( | ||
| session_id=session_id, | ||
| engine=self._lakebase.engine, | ||
| create_tables=create_tables, | ||
| sessions_table=sessions_table, | ||
| messages_table=messages_table, | ||
| ) | ||
|
|
||
| logger.info( | ||
| "AsyncDatabricksSession initialized: instance=%s session_id=%s", | ||
| instance_name, | ||
| session_id, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def _build_cache_key(cls, instance_name: str, **engine_kwargs: Any) -> str: | ||
| """Build a cache key from instance_name and engine_kwargs.""" | ||
| # Sort kwargs for deterministic key; use JSON for serializable values | ||
| kwargs_key = json.dumps(engine_kwargs, sort_keys=True, default=str) | ||
| return f"{instance_name}::{kwargs_key}" | ||
|
|
||
| @classmethod | ||
| def _get_or_create_lakebase( | ||
| cls, | ||
| *, | ||
| instance_name: str, | ||
| workspace_client: Optional[WorkspaceClient], | ||
| token_cache_duration_seconds: int, | ||
| pool_recycle: int, | ||
| use_cached_engine: bool = True, | ||
| **engine_kwargs, | ||
| ) -> AsyncLakebaseSQLAlchemy: | ||
| """Get cached AsyncLakebaseSQLAlchemy or create a new one. | ||
| The cache key uses both instance_name and engine_kwargs | ||
| """ | ||
| cache_key = cls._build_cache_key(instance_name, pool_recycle=pool_recycle, **engine_kwargs) | ||
|
|
||
| if use_cached_engine: | ||
| with cls._lakebase_sql_alchemy_cache_lock: | ||
| if cache_key in cls._lakebase_sql_alchemy_cache: | ||
| logger.debug("Reusing cached engine for key=%s", cache_key) | ||
| return cls._lakebase_sql_alchemy_cache[cache_key] | ||
|
|
||
| lakebase = AsyncLakebaseSQLAlchemy( | ||
| instance_name=instance_name, | ||
| workspace_client=workspace_client, | ||
| token_cache_duration_seconds=token_cache_duration_seconds, | ||
| pool_recycle=pool_recycle, | ||
| **engine_kwargs, | ||
| ) | ||
|
|
||
| if use_cached_engine: | ||
| with cls._lakebase_sql_alchemy_cache_lock: | ||
| cls._lakebase_sql_alchemy_cache[cache_key] = lakebase | ||
|
|
||
| return lakebase | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we update the CI job for this memory extra too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!