From 352b2595ef80b65f92e267eeb789ff71ef0c2423 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 27 May 2026 21:14:44 -0700 Subject: [PATCH] feat: auto-refresh Signal Feed every 120s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: new FeedPoller asyncio task wakes every 120s, walks every (user, service) credential row, and overwrites the cache with the result of the matching fetcher. Mirrors pr_watcher's shape — env-gated via FEED_POLLER_ENABLED, per-pair exception isolation, write-only against the cache (reads would defeat its purpose). Adds CredentialModel.list_active_services() as the enumeration source. Frontend: a second useEffect in Agents.tsx fires setInterval(120_000) to re-pull all three feeds from the (now warm) cache. No feedLoading flicker; filter selections persist across the swap so a refresh doesn't strand the user on an empty 'all'. The standalone filter-reset effect moved to specific call sites where the dataset shape genuinely changes (mount fetch, OAuth callback, disconnect). Closes #37. Co-Authored-By: Claude Opus 4.7 --- app/src/pages/Agents.tsx | 51 +++++++- backend/.env.example | 5 + backend/app/main.py | 28 +++-- backend/app/models/credential.py | 15 +++ backend/app/services/feed_poller.py | 93 ++++++++++++++ backend/tests/conftest.py | 3 + backend/tests/test_feed_poller.py | 184 ++++++++++++++++++++++++++++ 7 files changed, 364 insertions(+), 15 deletions(-) create mode 100644 backend/app/services/feed_poller.py create mode 100644 backend/tests/test_feed_poller.py diff --git a/app/src/pages/Agents.tsx b/app/src/pages/Agents.tsx index 13a305a..c7d5099 100644 --- a/app/src/pages/Agents.tsx +++ b/app/src/pages/Agents.tsx @@ -178,11 +178,40 @@ export default function Agents() { if (d.connected && d.emails?.length) { setGmailConn(true); setGmailData(d.emails) } }), fetchGithubActivity().then(d => { - if (d.connected && d.items?.length) { setGithubConn(true); setGithubData(d.items) } + if (d.connected && d.items?.length) { + setGithubConn(true); setGithubData(d.items) + // Reset filters on the mock → real transition so a leftover filter + // doesn't strand the user on an empty view of the new dataset. + setGhCategory('all'); setGhRepo('all') + } }), ]).finally(() => setFeedLoading(false)) }, []) + // Background auto-refresh — pulls fresh data from the backend cache every + // 120s so the feed reflects upstream changes without a manual reload. + // Pairs with the server-side feed_poller that keeps the cache warm; the + // two intervals are intentionally desynced (each browser starts its phase + // from page-mount, the backend from process boot — the 180s cache TTL + // absorbs the drift). We deliberately do NOT set feedLoading on tick to + // avoid the spinner flicker, and we do NOT reset filters so the user's + // active selection survives the swap. + useEffect(() => { + if (!isLoggedIn()) return + const id = setInterval(() => { + fetchSlackMessages().then(d => { + if (d.connected) { setSlackConn(true); setSlackData(d.messages) } + }) + fetchGmailMessages().then(d => { + if (d.connected) { setGmailConn(true); setGmailData(d.emails) } + }) + fetchGithubActivity().then(d => { + if (d.connected) { setGithubConn(true); setGithubData(d.items) } + }) + }, 120_000) + return () => clearInterval(id) + }, []) + // OAuth redirect params — also flip the active tab to whichever tool the // user just connected, so they land on its feed instead of the Slack default. useEffect(() => { @@ -197,7 +226,12 @@ export default function Agents() { } if (p.get('github_connected')) { setTab('github') - fetchGithubActivity().then(d => { if (d.connected) { setGithubConn(true); setGithubData(d.items) } }) + fetchGithubActivity().then(d => { + if (d.connected) { + setGithubConn(true); setGithubData(d.items) + setGhCategory('all'); setGhRepo('all') + } + }) } if (p.toString()) window.history.replaceState({}, '', '/agents') }, []) @@ -215,9 +249,9 @@ export default function Agents() { // ── GitHub filter state ────────────────── const [ghCategory, setGhCategory] = useState('all') const [ghRepo, setGhRepo] = useState<'all' | string>('all') - // Reset filters when the underlying data refreshes — avoids stranding the - // user on a filter that matches nothing. - useEffect(() => { setGhCategory('all'); setGhRepo('all') }, [githubData]) + // Filters reset at call sites where the dataset shape genuinely changes + // (mount fetch, OAuth callback, disconnect-to-mock). Auto-refresh ticks + // do NOT reset — the user keeps their active filter across the swap. const filteredGithub = useMemo(() => githubData.filter(it => { if (ghCategory !== 'all' && (it.category ?? 'other') !== ghCategory) return false @@ -260,7 +294,12 @@ export default function Agents() { await disconnectService(tab as 'slack' | 'gmail' | 'github') if (tab === 'slack') { setSlackConn(false); setSlackData(MOCK_SLACK) } if (tab === 'gmail') { setGmailConn(false); setGmailData(MOCK_GMAIL) } - if (tab === 'github') { setGithubConn(false); setGithubData(MOCK_GITHUB) } + if (tab === 'github') { + setGithubConn(false); setGithubData(MOCK_GITHUB) + // Disconnect swaps real data for mock — filter selections from the + // real dataset (real repo names) will not match anything in mock. + setGhCategory('all'); setGhRepo('all') + } } async function handlePostDigest() { diff --git a/backend/.env.example b/backend/.env.example index 2f4c815..60fcda3 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -92,3 +92,8 @@ RATE_LIMIT_ENABLED=true # Set to "false" to skip starting the Phase C PR watcher on app startup. # Useful if you're iterating on routers and don't want background dispatches. PR_WATCHER_ENABLED=true + +# Set to "false" to skip starting the Signal Feed poller (keeps the Slack/ +# Gmail/GitHub cache warm). Disable when you want feed fetches to always be +# live against the upstream APIs. +FEED_POLLER_ENABLED=true diff --git a/backend/app/main.py b/backend/app/main.py index e9ce113..6d10030 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,6 +15,7 @@ users, agents, gateway, credentials, tasks, roles, auth, chat, watched_repos, ) +from app.services.feed_poller import FeedPoller from app.services.pr_watcher import PRWatcher logging.basicConfig( @@ -26,23 +27,32 @@ @asynccontextmanager async def lifespan(app: FastAPI): - """Start the PR watcher background task; cancel it cleanly on shutdown. + """Start the platform's background workers and cancel them cleanly on + shutdown. Each worker is individually env-gated so the test suite (and + ad-hoc debug runs) can disable them; defaults are all enabled. - Disabled when PR_WATCHER_ENABLED=false (test suites, ad-hoc debug runs); - otherwise on by default. + Workers: + - PRWatcher — autonomous PR review trigger (PR_WATCHER_ENABLED) + - FeedPoller — keeps Signal Feed cache warm (FEED_POLLER_ENABLED) """ - watcher_task: asyncio.Task | None = None + background_tasks: list[asyncio.Task] = [] + if os.getenv("PR_WATCHER_ENABLED", "true").lower() != "false": - watcher = PRWatcher() - watcher_task = asyncio.create_task(watcher.run_forever()) + background_tasks.append(asyncio.create_task(PRWatcher().run_forever())) + if os.getenv("FEED_POLLER_ENABLED", "true").lower() != "false": + background_tasks.append(asyncio.create_task(FeedPoller().run_forever())) try: yield finally: - if watcher_task is not None: - watcher_task.cancel() + # Cancel all tasks first, then await each — order matters: if we + # awaited inside the cancel loop the second worker would run for a + # tick or two after the first was already gone. + for task in background_tasks: + task.cancel() + for task in background_tasks: try: - await watcher_task + await task except asyncio.CancelledError: pass diff --git a/backend/app/models/credential.py b/backend/app/models/credential.py index 24b304f..fa8bc56 100644 --- a/backend/app/models/credential.py +++ b/backend/app/models/credential.py @@ -47,6 +47,21 @@ def list_by_user(user_id: str) -> list[dict]: ) return result.data + @staticmethod + def list_active_services() -> list[dict]: + """Return every (user_id, service) pair that currently has a credential. + + Used by the Signal Feed poller to enumerate the work it needs to do + each tick. Returns minimal columns — the poller doesn't need the + token (the fetcher will re-fetch the credential anyway).""" + result = ( + get_supabase() + .table(TABLE) + .select("user_id, service") + .execute() + ) + return result.data + @staticmethod def delete(user_id: str, service: str) -> bool: result = ( diff --git a/backend/app/services/feed_poller.py b/backend/app/services/feed_poller.py new file mode 100644 index 0000000..17d4513 --- /dev/null +++ b/backend/app/services/feed_poller.py @@ -0,0 +1,93 @@ +"""Background poll loop that keeps the Signal Feed cache warm. + +The poller is a single asyncio.Task owned by the FastAPI lifespan, mirroring +`pr_watcher.py`. Every POLL_INTERVAL seconds it walks every credential row, +calls the matching fetcher, and overwrites the cache entry. Users opening +the feed in the meantime read the warm cache in <100ms instead of paying +the 2-5s live-fetch cost. + +The poller is write-only against `signal_feed_cache` — it never reads. +Reading would defeat its purpose (a hit would short-circuit the refresh +the poller exists to perform). + +No startup-staleness gate (unlike pr_watcher), because warming the cache +on tick 1 produces no user-visible side effect — there's no review being +dispatched, no notification being sent. Worst case after a long downtime: +the first tick repopulates from-scratch, exactly as if every user had just +opened the page. +""" + +import asyncio +import logging +from typing import Awaitable, Callable + +from app.models.credential import CredentialModel +from app.services import feed_fetchers, signal_feed_cache + +logger = logging.getLogger(__name__) + +POLL_INTERVAL_SECONDS = 120 + +# Only these services produce a Signal Feed payload. A user can have other +# credentials (e.g. discord) — those rows are skipped, not errored. +FEED_FETCHERS: dict[str, Callable[[str], Awaitable[dict]]] = { + "slack": feed_fetchers.slack_messages, + "gmail": feed_fetchers.gmail_messages, + "github": feed_fetchers.github_activity, +} + + +class FeedPoller: + def __init__(self, poll_interval: float = POLL_INTERVAL_SECONDS): + self._poll_interval = poll_interval + + async def run_forever(self) -> None: + """Sleep-tick-sleep until cancelled by the lifespan shutdown.""" + logger.info("feed_poller: started, polling every %ss", self._poll_interval) + try: + while True: + try: + await self.tick() + except Exception: # noqa: BLE001 — never let the loop die + logger.exception("feed_poller: tick crashed; continuing") + await asyncio.sleep(self._poll_interval) + except asyncio.CancelledError: + logger.info("feed_poller: shutting down") + raise + + async def tick(self) -> None: + """One pass over every (user, feed-service) credential row. + + Error isolation is at the (user, service) granularity: a Slack + outage for one user must not skip Gmail for the same user, nor + any other user's feeds. + """ + rows = CredentialModel.list_active_services() + if not rows: + return + + refreshed = 0 + for row in rows: + user_id = row["user_id"] + service = row["service"] + fetcher = FEED_FETCHERS.get(service) + if fetcher is None: + continue # discord et al. — no feed surface + try: + payload = await fetcher(user_id) + except Exception: # noqa: BLE001 — isolate per (user, service) + logger.exception( + "feed_poller: fetch failed user=%s service=%s", + user_id, service, + ) + continue + + # Only cache successful fetches. A connected=False response + # means the credential vanished mid-tick (raced with disconnect) + # or the upstream rejected the token — don't paper over it. + if payload.get("connected"): + signal_feed_cache.set(user_id, service, payload) + refreshed += 1 + + if refreshed: + logger.info("feed_poller: refreshed %s cache entries", refreshed) diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 1b26d45..06caf29 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -27,6 +27,9 @@ # The PR watcher would spin up a background poll loop the moment # TestClient(app) enters its lifespan; disable it for tests. "PR_WATCHER_ENABLED": "false", + # Same reasoning — the Signal Feed poller is a second background + # task started in the lifespan; tests should never hit live APIs. + "FEED_POLLER_ENABLED": "false", } ) diff --git a/backend/tests/test_feed_poller.py b/backend/tests/test_feed_poller.py new file mode 100644 index 0000000..0617e95 --- /dev/null +++ b/backend/tests/test_feed_poller.py @@ -0,0 +1,184 @@ +"""Tests for the Signal Feed background poller. + +The poller walks every credential row each tick, calls the matching +feed fetcher, and overwrites the cache. These tests run a single tick +at a time — the asyncio.sleep loop is exercised only briefly in the +lifecycle test. +""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from app.services import signal_feed_cache + + +@pytest.fixture(autouse=True) +def _clean_cache(): + """Module-level cache state must not bleed between tests.""" + signal_feed_cache.clear_all() + yield + signal_feed_cache.clear_all() + + +class TestTick: + @pytest.mark.asyncio + async def test_empty_credentials_no_op(self): + from app.services.feed_poller import FeedPoller + + with patch("app.services.feed_poller.CredentialModel") as mock_cm: + mock_cm.list_active_services.return_value = [] + await FeedPoller().tick() + # No writes happened. + assert signal_feed_cache.get("u1", "slack") is None + + @pytest.mark.asyncio + async def test_tick_writes_each_service_to_cache(self): + from app.services.feed_poller import FeedPoller + + rows = [ + {"user_id": "u1", "service": "slack"}, + {"user_id": "u1", "service": "gmail"}, + {"user_id": "u1", "service": "github"}, + ] + slack_payload = {"connected": True, "messages": [{"id": "1"}]} + gmail_payload = {"connected": True, "emails": [{"id": "a"}]} + github_payload = {"connected": True, "items": [{"id": "x"}]} + + with patch("app.services.feed_poller.CredentialModel") as mock_cm, \ + patch("app.services.feed_poller.feed_fetchers") as mock_ff: + mock_cm.list_active_services.return_value = rows + mock_ff.slack_messages = AsyncMock(return_value=slack_payload) + mock_ff.gmail_messages = AsyncMock(return_value=gmail_payload) + mock_ff.github_activity = AsyncMock(return_value=github_payload) + + # The FEED_FETCHERS map is bound at import time to the real + # fetcher callables, so patching feed_fetchers.* doesn't + # rewire the map. Patch the map itself. + with patch.dict( + "app.services.feed_poller.FEED_FETCHERS", + { + "slack": mock_ff.slack_messages, + "gmail": mock_ff.gmail_messages, + "github": mock_ff.github_activity, + }, + clear=True, + ): + await FeedPoller().tick() + + assert signal_feed_cache.get("u1", "slack") == slack_payload + assert signal_feed_cache.get("u1", "gmail") == gmail_payload + assert signal_feed_cache.get("u1", "github") == github_payload + + @pytest.mark.asyncio + async def test_unknown_service_skipped_not_errored(self): + """A discord credential (or any non-feed service) must be silently + skipped — not raise, not abort the rest of the tick.""" + from app.services.feed_poller import FeedPoller + + rows = [ + {"user_id": "u1", "service": "discord"}, + {"user_id": "u1", "service": "slack"}, + ] + slack_fetcher = AsyncMock(return_value={"connected": True, "messages": []}) + + with patch("app.services.feed_poller.CredentialModel") as mock_cm, \ + patch.dict( + "app.services.feed_poller.FEED_FETCHERS", + {"slack": slack_fetcher, "gmail": AsyncMock(), "github": AsyncMock()}, + clear=True, + ): + mock_cm.list_active_services.return_value = rows + await FeedPoller().tick() + + slack_fetcher.assert_awaited_once_with("u1") + + @pytest.mark.asyncio + async def test_connected_false_not_cached(self): + """If the fetcher returns connected=False (credential vanished mid-tick + or upstream rejected the token), do not poison the cache.""" + from app.services.feed_poller import FeedPoller + + rows = [{"user_id": "u1", "service": "slack"}] + fetcher = AsyncMock(return_value={"connected": False, "messages": []}) + + with patch("app.services.feed_poller.CredentialModel") as mock_cm, \ + patch.dict( + "app.services.feed_poller.FEED_FETCHERS", + {"slack": fetcher, "gmail": AsyncMock(), "github": AsyncMock()}, + clear=True, + ): + mock_cm.list_active_services.return_value = rows + await FeedPoller().tick() + + assert signal_feed_cache.get("u1", "slack") is None + + @pytest.mark.asyncio + async def test_tick_isolates_failures_per_pair(self): + """One (user, service) raising must not stop the tick from refreshing + the rest. Error isolation is the watcher's main correctness property.""" + from app.services.feed_poller import FeedPoller + + rows = [ + {"user_id": "u1", "service": "slack"}, # raises + {"user_id": "u1", "service": "gmail"}, # succeeds + {"user_id": "u2", "service": "github"}, # succeeds + ] + bad = AsyncMock(side_effect=RuntimeError("slack down")) + gmail = AsyncMock(return_value={"connected": True, "emails": [{"id": "a"}]}) + github = AsyncMock(return_value={"connected": True, "items": [{"id": "x"}]}) + + with patch("app.services.feed_poller.CredentialModel") as mock_cm, \ + patch.dict( + "app.services.feed_poller.FEED_FETCHERS", + {"slack": bad, "gmail": gmail, "github": github}, + clear=True, + ): + mock_cm.list_active_services.return_value = rows + # Must not raise. + await FeedPoller().tick() + + assert signal_feed_cache.get("u1", "slack") is None # never written + assert signal_feed_cache.get("u1", "gmail") is not None + assert signal_feed_cache.get("u2", "github") is not None + + +class TestRunForever: + @pytest.mark.asyncio + async def test_run_forever_ticks_then_cancels_cleanly(self): + """A cancelled task should raise CancelledError out of run_forever + after firing at least one tick.""" + from app.services.feed_poller import FeedPoller + + poller = FeedPoller(poll_interval=0.01) + poller.tick = AsyncMock(return_value=None) + + task = asyncio.create_task(poller.run_forever()) + await asyncio.sleep(0.05) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert poller.tick.await_count >= 1 + + @pytest.mark.asyncio + async def test_tick_crash_does_not_kill_loop(self): + """A tick raising an exception must be logged and the loop must + continue — never let a transient bug take the poller offline + until the next deploy.""" + from app.services.feed_poller import FeedPoller + + poller = FeedPoller(poll_interval=0.01) + # First tick raises, subsequent ticks succeed. + poller.tick = AsyncMock( + side_effect=[RuntimeError("boom"), None, None, None] + ) + + task = asyncio.create_task(poller.run_forever()) + await asyncio.sleep(0.05) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert poller.tick.await_count >= 2 # crashed once, kept going