Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions system/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from openpilot.common.swaglog import cloudlog, add_file_handler
from openpilot.system.version import get_build_metadata, terms_version, training_version
from openpilot.system.hardware.hw import Paths
from openpilot.system.manager.service_monitor import service_monitor


def manager_init() -> None:
Expand Down Expand Up @@ -93,6 +94,13 @@ def manager_init() -> None:
commit=build_metadata.openpilot.git_commit,
dirty=build_metadata.openpilot.is_dirty,
device=HARDWARE.get_device_type())
service_monitor.log_manager_init(
serial=serial,
version=build_metadata.openpilot.version,
branch=build_metadata.channel,
commit=build_metadata.openpilot.git_commit,
dirty=build_metadata.openpilot.is_dirty,
)

# preimport all processes
for p in managed_processes.values():
Expand All @@ -109,6 +117,7 @@ def manager_cleanup() -> None:
p.stop(block=True)

cloudlog.info("everything is dead")
service_monitor.log_manager_cleanup()


def manager_thread() -> None:
Expand All @@ -124,6 +133,7 @@ def manager_thread() -> None:
if os.getenv("NOBOARD") is not None:
ignore.append("pandad")
ignore += [x for x in os.getenv("BLOCK", "").split(",") if len(x) > 0]
service_monitor.log_manager_start(ignore=ignore, environ=os.environ)

sm = messaging.SubMaster(['deviceState', 'carParams', 'pandaStates'], poll='deviceState')
pm = messaging.PubMaster(['managerState'])
Expand Down Expand Up @@ -161,6 +171,7 @@ def manager_thread() -> None:
for p in managed_processes.values() if p.proc)
print(running)
cloudlog.debug(running)
service_monitor.log_process_snapshot(managed_processes)

# send managerState
msg = messaging.new_message('managerState', valid=True)
Expand All @@ -182,6 +193,7 @@ def manager_thread() -> None:
shutdown = True
params.put("LastManagerExitReason", f"{param} {datetime.datetime.now()}")
cloudlog.warning(f"Shutting down manager - {param} set")
service_monitor.log_shutdown_request(param)

if shutdown:
break
Expand All @@ -198,6 +210,7 @@ def main() -> None:
try:
manager_thread()
except Exception:
service_monitor.log_exception("manager_thread")
traceback.print_exc()
sentry.capture_exception()
finally:
Expand All @@ -223,6 +236,7 @@ def main() -> None:
except KeyboardInterrupt:
print("got CTRL-C, exiting")
except Exception:
service_monitor.log_exception("manager_main")
add_file_handler(cloudlog)
cloudlog.exception("Manager failed to start")

Expand Down
28 changes: 25 additions & 3 deletions system/manager/process.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import importlib
import os
import signal
import struct
import time
import subprocess
import traceback
from collections.abc import Callable, ValuesView
from abc import ABC, abstractmethod
from multiprocessing import Process
Expand All @@ -17,6 +20,7 @@
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.common.watchdog import WATCHDOG_FN
from openpilot.system.manager.service_monitor import service_monitor

ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None

Expand All @@ -41,6 +45,8 @@ def launcher(proc: str, name: str) -> None:
except KeyboardInterrupt:
cloudlog.warning(f"child {proc} got SIGINT")
except Exception:
stacktrace = traceback.format_exc()
service_monitor.log_process_exception(name=name, stacktrace=stacktrace)
# can't install the crash handler because sys.excepthook doesn't play nice
# with threads, so catch it here.
sentry.capture_exception()
Expand Down Expand Up @@ -84,8 +90,13 @@ def prepare(self) -> None:
def start(self) -> None:
pass

def restart(self) -> None:
self.stop(sig=signal.SIGKILL)
def restart(self, reason: str | None = None) -> None:
setattr(self, "_pending_restart", True)
service_monitor.log_process_restart(name=self.name, reason=reason)
try:
self.stop(sig=signal.SIGKILL)
finally:
setattr(self, "_pending_restart", False)
self.start()

def check_watchdog(self, started: bool) -> None:
Expand All @@ -103,8 +114,9 @@ def check_watchdog(self, started: bool) -> None:

if dt > self.watchdog_max_dt:
if self.watchdog_seen and ENABLE_WATCHDOG:
service_monitor.log_watchdog_timeout(name=self.name, elapsed=dt, exit_code=self.proc.exitcode)
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
self.restart()
self.restart(reason="watchdog_timeout")
else:
self.watchdog_seen = True

Expand Down Expand Up @@ -133,6 +145,13 @@ def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = Non

ret = self.proc.exitcode
cloudlog.info(f"{self.name} is dead with {ret}")
service_monitor.log_process_exit(
name=self.name,
pid=self.proc.pid,
exit_code=self.proc.exitcode,
shutting_down=self.shutting_down,
restart=getattr(self, "_pending_restart", False),
)

if self.proc.exitcode is not None:
self.shutting_down = False
Expand Down Expand Up @@ -194,6 +213,7 @@ def start(self) -> None:
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
service_monitor.log_process_start(name=self.name, pid=self.proc.pid)


class PythonProcess(ManagerProcess):
Expand Down Expand Up @@ -228,6 +248,7 @@ def start(self) -> None:
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
service_monitor.log_process_start(name=self.name, pid=self.proc.pid)


class DaemonProcess(ManagerProcess):
Expand Down Expand Up @@ -271,6 +292,7 @@ def start(self) -> None:
preexec_fn=os.setpgrp)

self.params.put(self.param_name, proc.pid)
service_monitor.log_process_start(name=self.name, pid=proc.pid)

def stop(self, retry=True, block=True, sig=None) -> None:
pass
Expand Down
139 changes: 139 additions & 0 deletions system/manager/service_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Utility helpers for monitoring the openpilot manager service.

This module centralizes the additional logging required to debug
manager lifecycle events when openpilot is off-road. The logger writes
structured information to a rotating file on disk without modifying the
manager control flow.
"""
from __future__ import annotations

import json
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Iterable

from openpilot.system.hardware.hw import Paths


class ServiceMonitor:
"""Collects diagnostic information about the manager service.

The monitor mirrors existing telemetry without influencing the
execution flow. It persists the output to a rotating log file so the
data remains available even when the standard on-road logging is
inactive.
"""

def __init__(self,
log_filename: str = "openpilot_service_monitor.log",
max_bytes: int = 5 * 1024 * 1024,
backup_count: int = 5) -> None:
log_dir = Path(Paths.swaglog_root())
log_dir.mkdir(parents=True, exist_ok=True)

self._log_path = log_dir / log_filename
self._logger = logging.getLogger("openpilot.service_monitor")
self._logger.setLevel(logging.DEBUG)
self._logger.propagate = False

handler_exists = any(
isinstance(handler, RotatingFileHandler) and getattr(handler, "baseFilename", None) == str(self._log_path)
for handler in self._logger.handlers
)

if not handler_exists:
handler = RotatingFileHandler(self._log_path, maxBytes=max_bytes, backupCount=backup_count)
handler.setFormatter(logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
self._logger.addHandler(handler)

self._last_snapshot: str | None = None

def log_event(self, message: str, **payload: object) -> None:
if payload:
serialized = json.dumps(payload, sort_keys=True, default=str)
self._logger.info("%s | %s", message, serialized)
else:
self._logger.info(message)

def log_manager_start(self, *, ignore: Iterable[str], environ: dict[str, str]) -> None:
self.log_event(
"manager_thread_start",
ignore=list(ignore),
environ={k: environ[k] for k in sorted(environ)},
)

def log_manager_init(self, *, serial: str, version: str, branch: str, commit: str, dirty: bool) -> None:
self.log_event(
"manager_init",
serial=serial,
version=version,
branch=branch,
commit=commit,
dirty=dirty,
log_file=str(self._log_path),
)

def log_process_snapshot(self, processes) -> None:
snapshot = []
for name in sorted(processes.keys()):
proc = processes[name]
state = {
"name": name,
"running": bool(proc.proc and proc.proc.is_alive()),
"pid": getattr(proc.proc, "pid", None),
"exit_code": getattr(proc.proc, "exitcode", None),
"shutting_down": getattr(proc, "shutting_down", False),
"enabled": getattr(proc, "enabled", False),
}
snapshot.append(state)

serialized = json.dumps(snapshot, sort_keys=True, default=str)
if serialized != self._last_snapshot:
self._last_snapshot = serialized
self._logger.debug("process_snapshot %s", serialized)

def log_process_start(self, *, name: str, pid: int | None) -> None:
self.log_event("process_start", name=name, pid=pid)

def log_process_exit(self,
*,
name: str,
pid: int | None,
exit_code: int | None,
shutting_down: bool,
restart: bool | None = None) -> None:
self.log_event(
"process_exit",
name=name,
pid=pid,
exit_code=exit_code,
shutting_down=shutting_down,
restart=restart,
)

def log_process_exception(self, *, name: str, stacktrace: str) -> None:
self.log_event("process_exception", name=name, stacktrace=stacktrace)

def log_watchdog_timeout(self, *, name: str, elapsed: float, exit_code: int | None) -> None:
self.log_event("watchdog_timeout", name=name, elapsed=elapsed, exit_code=exit_code)

def log_process_restart(self, *, name: str, reason: str | None = None) -> None:
self.log_event("process_restart", name=name, reason=reason)

def log_shutdown_request(self, param: str) -> None:
self.log_event("shutdown_requested", reason=param)

def log_manager_cleanup(self) -> None:
self.log_event("manager_cleanup")

def log_exception(self, context: str) -> None:
self._logger.exception("%s", context)


service_monitor = ServiceMonitor()

__all__ = ["ServiceMonitor", "service_monitor"]
Loading