Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions nerve/cron/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import asyncio
import logging
from datetime import datetime, timezone
from datetime import datetime, timezone, tzinfo
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
Expand Down Expand Up @@ -75,7 +76,8 @@ def __init__(self, config: NerveConfig, engine: AgentEngine, db: Database):
self.config = config
self.engine = engine
self.db = db
self.scheduler = AsyncIOScheduler()
self.timezone = ZoneInfo(config.timezone)
self.scheduler = AsyncIOScheduler(timezone=self.timezone)
self._jobs: list[CronJob] = []
self._source_runners: list[SourceRunner] = []
self._job_locks: dict[str, asyncio.Lock] = {}
Expand Down Expand Up @@ -119,10 +121,14 @@ async def start(self) -> None:
schedule_str = getattr(source_config, "schedule", "*/15 * * * *")

try:
trigger = CronTrigger.from_crontab(schedule_str)
trigger = CronTrigger.from_crontab(
schedule_str, timezone=self.timezone,
)
except ValueError:
seconds = _parse_interval(schedule_str)
trigger = IntervalTrigger(seconds=seconds)
trigger = IntervalTrigger(
seconds=seconds, timezone=self.timezone,
)

self.scheduler.add_job(
self._run_source_wrapper,
Expand All @@ -139,7 +145,7 @@ async def start(self) -> None:
# Daily cleanup of expired messages and consumer cursors
self.scheduler.add_job(
self._cleanup_expired,
CronTrigger(hour=3, minute=0),
CronTrigger(hour=3, minute=0, timezone=self.timezone),
id="cleanup",
name="Cleanup expired data",
replace_existing=True,
Expand All @@ -149,7 +155,9 @@ async def start(self) -> None:
# scheduler is disabled; Nerve owns wakeup timing here.
self.scheduler.add_job(
self._sweep_wakeups,
IntervalTrigger(seconds=_WAKEUP_SWEEP_SECONDS),
IntervalTrigger(
seconds=_WAKEUP_SWEEP_SECONDS, timezone=self.timezone,
),
id="wakeup_sweep",
name="Fire due session wakeups",
replace_existing=True,
Expand Down Expand Up @@ -178,7 +186,9 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
the cadence survives restarts (persistent timer).
"""
try:
return CronTrigger.from_crontab(job.schedule)
return CronTrigger.from_crontab(
job.schedule, timezone=self.timezone,
)
except ValueError:
pass

Expand All @@ -189,8 +199,12 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
logger.debug(
"Aligning interval for %s: start_date=%s", job.id, start_date,
)
return IntervalTrigger(seconds=seconds, start_date=start_date)
return IntervalTrigger(seconds=seconds)
return IntervalTrigger(
seconds=seconds,
start_date=start_date,
timezone=self.timezone,
)
return IntervalTrigger(seconds=seconds, timezone=self.timezone)

async def _catchup_missed_jobs(self) -> None:
"""Fire jobs that should have run while the server was down.
Expand All @@ -210,7 +224,7 @@ async def _catchup_missed_jobs(self) -> None:
continue # first-ever run — no catch-up

last_time = _parse_timestamp(last_run["finished_at"])
if self._is_overdue(job, last_time, now):
if self._is_overdue(job, last_time, now, self.timezone):
overdue.append(job)

if not overdue:
Expand All @@ -225,10 +239,17 @@ async def _catchup_missed_jobs(self) -> None:
)

@staticmethod
def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool:
def _is_overdue(
job: CronJob,
last_run: datetime,
now: datetime,
trigger_timezone: tzinfo | None = None,
) -> bool:
"""Check if a job should have fired between *last_run* and *now*."""
try:
trigger = CronTrigger.from_crontab(job.schedule)
trigger = CronTrigger.from_crontab(
job.schedule, timezone=trigger_timezone or timezone.utc,
)
next_fire = trigger.get_next_fire_time(last_run, last_run)
return next_fire is not None and next_fire < now
except ValueError:
Expand Down Expand Up @@ -282,8 +303,7 @@ async def _maybe_rotate_context(
logger.warning("Invalid context_rotate_at: %s", rotate_at)
return False

local_tz = datetime.now().astimezone().tzinfo
today_rotate = datetime.now(local_tz).replace(
today_rotate = now.astimezone(self.timezone).replace(
hour=hour, minute=minute, second=0, microsecond=0,
)
today_rotate_utc = today_rotate.astimezone(timezone.utc)
Expand Down
87 changes: 83 additions & 4 deletions tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from zoneinfo import ZoneInfo

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -47,10 +48,10 @@ def _make_cron_log(finished_at: str) -> dict:
return {"job_id": "test-job", "finished_at": finished_at, "status": "success"}


@pytest_asyncio.fixture
async def cron_service():
def _make_cron_service(timezone_name: str = "UTC") -> CronService:
"""Minimal CronService with mocked dependencies."""
config = MagicMock()
config.timezone = timezone_name
config.cron.system_file = MagicMock()
config.cron.jobs_file = MagicMock()
config.agent.cron_model = "test-model"
Expand All @@ -65,8 +66,13 @@ async def cron_service():
db.log_cron_finish = AsyncMock()
db.get_last_successful_cron_run = AsyncMock(return_value=None)

svc = CronService(config, engine, db)
return svc
return CronService(config, engine, db)


@pytest_asyncio.fixture
async def cron_service():
"""Minimal CronService with mocked dependencies."""
return _make_cron_service()


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -118,6 +124,18 @@ def test_default_on_garbage(self):
assert _parse_interval("???") == 7200


# ---------------------------------------------------------------------------
# Configured timezone
# ---------------------------------------------------------------------------

class TestConfiguredTimezone:
def test_scheduler_uses_configured_timezone(self):
svc = _make_cron_service("America/New_York")

assert str(svc.timezone) == "America/New_York"
assert str(svc.scheduler.timezone) == "America/New_York"


# ---------------------------------------------------------------------------
# _is_overdue
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -157,6 +175,20 @@ def test_interval_multiple_missed(self):
last_run = _utc_now() - timedelta(hours=10)
assert CronService._is_overdue(job, last_run, _utc_now()) is True

def test_crontab_uses_configured_timezone(self):
"""Catch-up checks crontab fires in the configured timezone."""
job = _make_job(schedule="0 9 * * *")
last_run = datetime(2026, 1, 1, 13, 30, tzinfo=timezone.utc)
now = datetime(2026, 1, 1, 14, 30, tzinfo=timezone.utc)

assert (
CronService._is_overdue(
job, last_run, now, ZoneInfo("America/New_York"),
)
is True
)
assert CronService._is_overdue(job, last_run, now, timezone.utc) is False


# ---------------------------------------------------------------------------
# _make_trigger (interval alignment)
Expand Down Expand Up @@ -208,6 +240,26 @@ async def test_crontab_unchanged(self, cron_service):
from apscheduler.triggers.cron import CronTrigger
assert isinstance(trigger, CronTrigger)

@pytest.mark.asyncio
async def test_crontab_uses_configured_timezone(self):
svc = _make_cron_service("America/Los_Angeles")

trigger = await svc._make_trigger(_make_job(schedule="30 11 * * *"))

from apscheduler.triggers.cron import CronTrigger
assert isinstance(trigger, CronTrigger)
assert str(trigger.timezone) == "America/Los_Angeles"

@pytest.mark.asyncio
async def test_interval_uses_configured_timezone(self):
svc = _make_cron_service("America/Los_Angeles")

trigger = await svc._make_trigger(_make_job(schedule="4h"))

from apscheduler.triggers.interval import IntervalTrigger
assert isinstance(trigger, IntervalTrigger)
assert str(trigger.timezone) == "America/Los_Angeles"


# ---------------------------------------------------------------------------
# _catchup_missed_jobs
Expand Down Expand Up @@ -479,6 +531,33 @@ async def test_no_rotation_no_memorize(self, cron_service):
assert rotated is False
cron_service.engine.schedule_memorize.assert_not_awaited()

@pytest.mark.asyncio
async def test_rotate_at_uses_configured_timezone(self):
"""Daily rotate_at uses config timezone, not the server timezone."""
svc = _make_cron_service("America/New_York")
svc.db.get_session = AsyncMock(return_value={
"connected_at": "2026-01-01T13:59:00+00:00",
})

class FixedDateTime(datetime):
@classmethod
def now(cls, tz=None):
fixed = datetime(2026, 1, 1, 15, 0, tzinfo=timezone.utc)
if tz is None:
return fixed.replace(tzinfo=None)
return fixed.astimezone(tz)

with patch("nerve.cron.service.datetime", FixedDateTime):
rotated = await svc._maybe_rotate_context(
"cron:pers", rotate_hours=0, rotate_at="09:00",
)

assert rotated is True
svc.engine.schedule_memorize.assert_awaited_once_with("cron:pers")
svc.engine.sessions.mark_idle.assert_awaited_once_with(
"cron:pers", preserve_sdk_id=False,
)


# ---------------------------------------------------------------------------
# Run gates — service-level skip/run behaviour
Expand Down