Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions hermes-plugin/session/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""Session management for SoloFlow.

Implements Google ADK-style session management:
- Session = conversation state + event flow
- Context budget management
- Memory integration via lifecycle
"""

from __future__ import annotations

import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional

logger = logging.getLogger("soloflow.session")


@dataclass
class SessionEvent:
"""An event in a session."""

event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = "" # user_input, tool_call, tool_result, agent_response
content: dict = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
token_count: int = 0

def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"content": self.content,
"timestamp": self.timestamp,
"token_count": self.token_count,
}


@dataclass
class Session:
"""A session with context budget management.

Key insight from Google ADK:
- Session = conversation state + event flow
- Context budget = max tokens for context
- Memory integration via lifecycle (not as tool)
"""

session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
app_name: str = "default"
user_id: str = "default"

# State
events: list[SessionEvent] = field(default_factory=list)
metadata: dict = field(default_factory=dict)

# Context budget
max_context_tokens: int = 8000
current_token_count: int = 0

# Timestamps
created_at: float = field(default_factory=time.time)
last_active_at: float = field(default_factory=time.time)

@property
def is_active(self) -> bool:
"""Check if session is still active."""
# Session expires after 30 minutes of inactivity
return (time.time() - self.last_active_at) < 1800

@property
def context_budget_remaining(self) -> int:
"""Remaining context budget in tokens."""
return max(0, self.max_context_tokens - self.current_token_count)

def add_event(self, event: SessionEvent) -> None:
"""Add an event to the session."""
self.events.append(event)
self.current_token_count += event.token_count
self.last_active_at = time.time()

# Enforce context budget
self._enforce_budget()

def _enforce_budget(self) -> None:
"""Enforce context budget by summarizing old events."""
while self.current_token_count > self.max_context_tokens and len(self.events) > 1:
# Remove oldest event (in production, would summarize)
removed = self.events.pop(0)
self.current_token_count -= removed.token_count

def get_recent_events(self, limit: int = 10) -> list[SessionEvent]:
"""Get recent events."""
return self.events[-limit:]

def get_context_summary(self) -> dict:
"""Get context summary for memory integration."""
return {
"session_id": self.session_id,
"app_name": self.app_name,
"user_id": self.user_id,
"event_count": len(self.events),
"token_count": self.current_token_count,
"budget_remaining": self.context_budget_remaining,
"is_active": self.is_active,
}

def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"app_name": self.app_name,
"user_id": self.user_id,
"event_count": len(self.events),
"current_token_count": self.current_token_count,
"max_context_tokens": self.max_context_tokens,
"created_at": self.created_at,
"last_active_at": self.last_active_at,
}


class SessionManager:
"""Manages sessions with context budget.

Key patterns from Google ADK:
1. Session = conversation state + event flow
2. Context budget = max tokens for context
3. Memory integration via lifecycle (not as tool)
4. Single responsibility: session manages state, memory manages long-term
"""

def __init__(self, max_context_tokens: int = 8000) -> None:
self._sessions: dict[str, Session] = {}
self._max_context_tokens = max_context_tokens

def create_session(
self,
app_name: str = "default",
user_id: str = "default",
) -> Session:
"""Create a new session."""
session = Session(
app_name=app_name,
user_id=user_id,
max_context_tokens=self._max_context_tokens,
)
self._sessions[session.session_id] = session
return session

def get_session(self, session_id: str) -> Optional[Session]:
"""Get a session by ID."""
return self._sessions.get(session_id)

def get_or_create_session(
self,
session_id: str,
app_name: str = "default",
user_id: str = "default",
) -> Session:
"""Get existing session or create new one."""
session = self._sessions.get(session_id)
if session and session.is_active:
return session

# Create new session
session = Session(
session_id=session_id,
app_name=app_name,
user_id=user_id,
max_context_tokens=self._max_context_tokens,
)
self._sessions[session_id] = session
return session

def list_sessions(
self,
app_name: str | None = None,
user_id: str | None = None,
active_only: bool = True,
) -> list[Session]:
"""List sessions with optional filters."""
sessions = list(self._sessions.values())

if app_name:
sessions = [s for s in sessions if s.app_name == app_name]

if user_id:
sessions = [s for s in sessions if s.user_id == user_id]

if active_only:
sessions = [s for s in sessions if s.is_active]

return sessions

def cleanup_expired(self) -> int:
"""Clean up expired sessions."""
expired = [
sid for sid, session in self._sessions.items()
if not session.is_active
]

for sid in expired:
del self._sessions[sid]

return len(expired)

def get_stats(self) -> dict:
"""Get session statistics."""
active = sum(1 for s in self._sessions.values() if s.is_active)
total_tokens = sum(s.current_token_count for s in self._sessions.values())

return {
"total_sessions": len(self._sessions),
"active_sessions": active,
"total_tokens": total_tokens,
}
94 changes: 94 additions & 0 deletions tests/session/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for session management."""

import sys
from pathlib import Path
import pytest

sys.path.insert(0, str(Path(__file__).parent.parent.parent / "hermes-plugin"))

from session import SessionManager, Session, SessionEvent


@pytest.fixture
def manager():
return SessionManager(max_context_tokens=1000)


class TestSession:
def test_creation(self):
session = Session(app_name="test", user_id="user1")
assert session.app_name == "test"
assert session.user_id == "user1"
assert session.is_active is True

def test_add_event(self):
session = Session()
event = SessionEvent(
event_type="user_input",
content={"text": "hello"},
token_count=10,
)
session.add_event(event)
assert len(session.events) == 1
assert session.current_token_count == 10

def test_context_budget(self):
session = Session(max_context_tokens=100)
event = SessionEvent(token_count=50)
session.add_event(event)
assert session.context_budget_remaining == 50

def test_budget_enforcement(self):
session = Session(max_context_tokens=100)
for i in range(20):
session.add_event(SessionEvent(token_count=10))

# Should have enforced budget
assert session.current_token_count <= 100


class TestSessionManager:
def test_create_session(self, manager):
session = manager.create_session("app1", "user1")
assert session.app_name == "app1"
assert session.user_id == "user1"

def test_get_session(self, manager):
session = manager.create_session()
retrieved = manager.get_session(session.session_id)
assert retrieved is not None
assert retrieved.session_id == session.session_id

def test_get_nonexistent(self, manager):
result = manager.get_session("nonexistent")
assert result is None

def test_list_sessions(self, manager):
manager.create_session("app1", "user1")
manager.create_session("app2", "user2")

sessions = manager.list_sessions()
assert len(sessions) == 2

def test_list_sessions_filter(self, manager):
manager.create_session("app1", "user1")
manager.create_session("app2", "user2")

sessions = manager.list_sessions(app_name="app1")
assert len(sessions) == 1

def test_cleanup_expired(self, manager):
session = manager.create_session()
# Manually expire session
session.last_active_at = 0

cleaned = manager.cleanup_expired()
assert cleaned == 1

def test_get_stats(self, manager):
manager.create_session()
manager.create_session()

stats = manager.get_stats()
assert stats["total_sessions"] == 2
assert stats["active_sessions"] == 2
Loading