From f079ab10ed315970270ad13c5c7cc191e3f420db Mon Sep 17 00:00:00 2001 From: parithosh Date: Thu, 4 Jun 2026 22:45:05 +0200 Subject: [PATCH] add token-efficient clickhouse log helpers --- modules/clickhouse/examples.yaml | 230 +++++- modules/clickhouse/module.go | 72 ++ modules/clickhouse/python/clickhouse.py | 918 +++++++++++++++++++++- modules/clickhouse/python_helpers_test.go | 160 ++++ runbooks/debug_devnet.md | 121 +-- runbooks/debug_local_devnet.md | 156 ++-- 6 files changed, 1517 insertions(+), 140 deletions(-) create mode 100644 modules/clickhouse/python_helpers_test.go diff --git a/modules/clickhouse/examples.yaml b/modules/clickhouse/examples.yaml index 6217c485..3e7c8ba7 100644 --- a/modules/clickhouse/examples.yaml +++ b/modules/clickhouse/examples.yaml @@ -17,9 +17,176 @@ # # IMPORTANT: For Xatu examples, always filter on partition key (slot_start_date_time) and meta_network_name. +log_helpers: + name: Token-Efficient Log Helpers + description: Python helper calls for scoped ClickHouse OTel log exploration. Prefer these before hand-writing SQL for devnet logs; they enforce source presets, time bounds, severity fallback, and compact row output. + examples: + - name: List log source presets + description: Show the built-in log source presets, field aliases, and required scope fields. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + for source in clickhouse.log_sources(): + print(source["name"], source["datasource"], source["table"]) + print("fields:", ", ".join(source["fields"])) + print("required scope:", source["required_scope_fields"]) + + - name: Hosted devnet nodes shipping logs + description: List node host names currently shipping logs for a hosted devnet without writing raw SQL. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + hosts = clickhouse.log_values( + "hosted_devnet", + "host", + filters={"network": network}, + since="1h", + limit=100, + ) + print(hosts) + + - name: Hosted devnet severity coverage + description: Check whether structured OTel severity fields are populated for a hosted devnet node. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + host = "" + + coverage = clickhouse.log_coverage( + "hosted_devnet", + filters={"network": network, "host": host}, + since="1h", + ) + print(coverage) + + - name: Hosted devnet compact node errors + description: Fetch compact error-class logs for one hosted devnet node. The helper prefers structured severity and uses the bounded Body fallback automatically. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + host = "" + + errors = clickhouse.log_errors( + "hosted_devnet", + filters={"network": network, "host": host}, + since="1h", + limit=50, + body_chars=240, + ) + for row in errors["rows"]: + print(row) + + - name: Hosted devnet node containers with samples + description: List container log files on one hosted devnet node with a representative sample line from each. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + host = "" + + containers = clickhouse.log_samples( + "hosted_devnet", + "container", + filters={"network": network, "host": host}, + since="1h", + limit=20, + body_chars=160, + ) + for row in containers["rows"]: + print(row) + + - name: Hosted consensus-client error sweep + description: Sweep compact error-class logs across a consensus client type while excluding bootnode noise. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + + errors = clickhouse.log_errors( + "hosted_devnet", + filters={"network": network}, + like_filters={"host": "lighthouse-%"}, + exclude_filters={"host": "bootnode-1"}, + since="1h", + limit=100, + body_chars=240, + ) + for row in errors["rows"]: + print(row) + + - name: Local Kurtosis service errors + description: Fetch compact error-class logs for one local Kurtosis service without writing raw SQL. + target: local-kurtosis + query: | + from ethpandaops import clickhouse + + enclave = "" + service = "" + + errors = clickhouse.log_errors( + "local_kurtosis", + filters={"enclave": enclave, "service": service}, + since="1h", + limit=50, + body_chars=240, + ) + for row in errors["rows"]: + print(row) + + - name: Local Kurtosis EL warnings and errors + description: Fetch compact warning/error logs for local Kurtosis execution-layer services. + target: local-kurtosis + query: | + from ethpandaops import clickhouse + + enclave = "" + + logs = clickhouse.log_errors( + "local_kurtosis", + filters={"enclave": enclave}, + like_filters={"service": "el-%"}, + min_severity="warn", + since="1h", + limit=100, + body_chars=240, + ) + for row in logs["rows"]: + print(row) + + - name: Log context around an error + description: Fetch a small before/after context window around a timestamp returned by log_errors. + target: clickhouse-raw + query: | + from ethpandaops import clickhouse + + network = "" + host = "" + timestamp = "" + + context = clickhouse.log_context( + "hosted_devnet", + filters={"network": network, "host": host}, + timestamp=timestamp, + before=20, + after=20, + window="1h", + body_chars=240, + ) + for row in context["rows"]: + print(row) + devnet_logs: name: Local Devnet OTel Logs - description: Queries for local Kurtosis devnet OTel logs exposed through the autodiscovered local-kurtosis ClickHouse datasource + description: Raw SQL fallback queries for local Kurtosis devnet OTel logs exposed through the autodiscovered local-kurtosis ClickHouse datasource. Prefer the log_helpers examples for normal investigation. examples: - name: List local devnet enclaves description: List Kurtosis enclave names currently present in the shared local OTel logs table. Run this before filtering logs because multiple devnets can share one table. @@ -31,40 +198,58 @@ devnet_logs: ORDER BY EnclaveName - name: Recent service errors - description: Fetch recent error-class logs for a specific service in a specific local Kurtosis enclave by matching the raw log body. + description: Fetch recent error-class logs for a specific service in a local Kurtosis enclave. Prefer OTel severity fields; parse Body only when structured severity is empty. target: local-kurtosis query: | + WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean SELECT Timestamp, ServiceName, - Body + clean AS Body FROM otel.otel_logs WHERE EnclaveName = {enclave:String} AND ServiceName = {service:String} - AND match(Body, '(?i)(crit|err|error|fatal)') + AND ( + SeverityNumber >= 17 + OR upper(SeverityText) IN ('CRIT', 'CRITICAL', 'ERRO', 'ERROR', 'FATAL', 'PANIC') + OR lower(LogAttributes['level']) IN ('crit', 'critical', 'erro', 'error', 'fatal', 'panic') + OR ( + match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') + AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') + ) + ) AND Timestamp >= now() - INTERVAL 1 HOUR ORDER BY Timestamp DESC LIMIT 200 - name: EL warnings and errors - description: Fetch recent EL warning/error logs for a local Kurtosis enclave by matching the raw log body. + description: Fetch recent EL warning/error logs for a local Kurtosis enclave. Prefer OTel severity fields; parse Body only when structured severity is empty. target: local-kurtosis query: | + WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean SELECT Timestamp, ServiceName, - Body + clean AS Body FROM otel.otel_logs WHERE EnclaveName = {enclave:String} AND ServiceName LIKE 'el-%' - AND match(Body, '(?i)(crit|err|error|fatal|warn)') + AND ( + SeverityNumber >= 13 + OR upper(SeverityText) IN ('WARN', 'WARNING', 'WRN', 'CRIT', 'CRITICAL', 'ERRO', 'ERROR', 'FATAL', 'PANIC') + OR lower(LogAttributes['level']) IN ('warn', 'warning', 'wrn', 'crit', 'critical', 'erro', 'error', 'fatal', 'panic') + OR ( + match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC|WARN|WRN)($|[][ |:])|^(ERR|FAT|WRN)\b|\blevel=(crit|error|fatal|panic|warn|warning)\b') + AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') + ) + ) AND Timestamp >= now() - INTERVAL 1 HOUR ORDER BY Timestamp DESC LIMIT 200 production_devnet_logs: name: Production Devnet OTel Logs - description: Queries for hosted (multi-VM) devnet and testnet container logs in the clickhouse-raw datasource, table external.otel_logs. Logs are keyed by ResourceAttributes['network'] (devnet name) and ResourceAttributes['host.name'] (node, e.g. lighthouse-geth-super-1). SeverityText is usually empty for raw Docker logs, so match severity on Body. This is NOT for local Kurtosis devnets — use the local-kurtosis datasource for those. + description: Raw SQL fallback queries for hosted (multi-VM) devnet and testnet container logs in the clickhouse-raw datasource, table external.otel_logs. Prefer the log_helpers examples for normal investigation. Logs are keyed by ResourceAttributes['network'] (devnet name) and ResourceAttributes['host.name'] (node, e.g. lighthouse-geth-super-1). Prefer OTel severity fields when populated; parse Body only for rows where structured severity is empty. This is NOT for local Kurtosis devnets — use the local-kurtosis datasource for those. examples: - name: List devnet nodes shipping logs description: List the nodes (host.name) currently shipping container logs for a hosted devnet. Run this to discover the node topology before drilling into a specific node. @@ -77,18 +262,27 @@ production_devnet_logs: ORDER BY host - name: Recent node errors - description: Fetch recent error-class logs for one devnet node by matching the raw log body. A node VM mixes its CL, EL, validator and sidecar containers — use LogAttributes['log.file.name'] to tell them apart. + description: Fetch recent error-class logs for one devnet node. Prefer OTel severity fields; parse Body only when structured severity is empty. A node VM mixes its CL, EL, validator and sidecar containers — use LogAttributes['log.file.name'] to tell them apart. target: clickhouse-raw query: | + WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean SELECT Timestamp, ResourceAttributes['host.name'] AS host, LogAttributes['log.file.name'] AS container_log, - Body + clean AS Body FROM external.otel_logs WHERE ResourceAttributes['network'] = {network:String} AND ResourceAttributes['host.name'] = {host:String} - AND match(Body, '(?i)(crit|err|error|fatal)') + AND ( + SeverityNumber >= 17 + OR upper(SeverityText) IN ('CRIT', 'CRITICAL', 'ERRO', 'ERROR', 'FATAL', 'PANIC') + OR lower(LogAttributes['level']) IN ('crit', 'critical', 'erro', 'error', 'fatal', 'panic') + OR ( + match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') + AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') + ) + ) AND Timestamp >= now() - INTERVAL 1 HOUR ORDER BY Timestamp DESC LIMIT 200 @@ -97,14 +291,24 @@ production_devnet_logs: description: Sweep error-class logs across all nodes running a given CL client. host.name is ---, so a 'lighthouse-%' prefix matches lighthouse-CL nodes. Results still mix each node's EL/sidecar lines. target: clickhouse-raw query: | + WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean SELECT Timestamp, ResourceAttributes['host.name'] AS host, - Body + clean AS Body FROM external.otel_logs WHERE ResourceAttributes['network'] = {network:String} AND ResourceAttributes['host.name'] LIKE {cl_prefix:String} - AND match(Body, '(?i)(crit|err|error|fatal)') + AND ResourceAttributes['host.name'] != 'bootnode-1' + AND ( + SeverityNumber >= 17 + OR upper(SeverityText) IN ('CRIT', 'CRITICAL', 'ERRO', 'ERROR', 'FATAL', 'PANIC') + OR lower(LogAttributes['level']) IN ('crit', 'critical', 'erro', 'error', 'fatal', 'panic') + OR ( + match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') + AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') + ) + ) AND Timestamp >= now() - INTERVAL 1 HOUR ORDER BY Timestamp DESC LIMIT 500 diff --git a/modules/clickhouse/module.go b/modules/clickhouse/module.go index e3cd8c0f..e32e6c61 100644 --- a/modules/clickhouse/module.go +++ b/modules/clickhouse/module.go @@ -244,6 +244,67 @@ func (m *Module) PythonAPIDocs() map[string]types.ModuleDoc { }, Returns: "(rows, column_names)", }, + "log_sources": { + Signature: "clickhouse.log_sources() -> list[dict]", + Description: "List built-in log source presets for hosted devnet logs (clickhouse-raw.external.otel_logs) and local Kurtosis logs (local-kurtosis.otel.otel_logs).", + Returns: "List of dicts with source name, datasource, table, field aliases, compact fields, and required scope fields.", + }, + "log_coverage": { + Signature: "clickhouse.log_coverage(source: str, filters: dict, *, since='1h', until=None, include_sql=False) -> dict", + Description: "Measure severity field coverage for a scoped log slice. Use before error triage to prove whether structured severity fields are populated.", + Parameters: map[string]string{ + "source": "'hosted_devnet' or 'local_kurtosis'", + "filters": "Exact field filters using source field aliases; hosted_devnet requires 'network', local_kurtosis requires 'enclave'", + "since/until": "Relative duration like '1h' or absolute timestamp string. Queries are always time-bounded.", + "include_sql": "When true, include reproducible SQL and parameters in result['query']; default keeps output compact.", + }, + Returns: "dict with counts, coverage ratios, first_seen, last_seen, and optional query metadata.", + }, + "log_values": { + Signature: "clickhouse.log_values(source: str, field: str, filters: dict | None = None, *, since='1h', limit=20) -> pandas.DataFrame", + Description: "Return top values for a log field using validated source field aliases and bounded time filters.", + Parameters: map[string]string{ + "source": "'hosted_devnet' or 'local_kurtosis'", + "field": "Field alias such as 'network', 'host', 'enclave', 'service', 'container', 'severity_text'", + "filters": "Exact field filters. Drilling into non-scope fields requires the source's scope filter.", + "limit": "Maximum value rows, capped at 500.", + }, + Returns: "pandas.DataFrame with value, lines, first_seen, and last_seen.", + }, + "log_samples": { + Signature: "clickhouse.log_samples(source: str, field: str, filters: dict, *, since='1h', limit=20, body_chars=160, include_sql=False) -> dict", + Description: "Return top field values with counts and one compact sample log line. Use this to identify containers or services before drilling into errors.", + Parameters: map[string]string{ + "source": "'hosted_devnet' or 'local_kurtosis'", + "field": "Field alias such as 'container', 'service', or 'host'", + "filters": "Exact source-scoped filters. hosted_devnet requires 'network'; local_kurtosis requires 'enclave'.", + "body_chars": "Per-sample body truncation length. Defaults to 160.", + }, + Returns: "dict with compact value/count/sample rows, row limit metadata, and optional query metadata.", + }, + "log_errors": { + Signature: "clickhouse.log_errors(source: str, filters: dict, *, since='1h', min_severity='error', limit=50, body_chars=240, include_sql=False) -> dict", + Description: "Fetch compact warning/error-class logs. Generated SQL prefers OTel severity fields and uses a bounded ANSI-stripped Body fallback for raw Docker logs.", + Parameters: map[string]string{ + "source": "'hosted_devnet' or 'local_kurtosis'", + "filters": "Exact field filters using aliases; source scope filter is required.", + "like_filters": "Optional LIKE filters, e.g. {'host': 'lighthouse-%'}", + "exclude_filters": "Optional exclusion filters, e.g. {'host': 'bootnode-1'}", + "min_severity": "'error' by default; use 'warn' to include WARN/WRN rows as well.", + "body_chars": "Per-row body truncation length. Defaults to 240 for token-efficient output.", + "include_sql": "When true, include reproducible SQL and parameters in result['query']; default keeps output compact.", + }, + Returns: "dict with compact rows, row limit metadata, filters, and optional query metadata.", + }, + "log_context": { + Signature: "clickhouse.log_context(source: str, filters: dict, timestamp: str, *, before=20, after=20, window='1h', body_chars=240, include_sql=False) -> dict", + Description: "Fetch compact before/after log context around a timestamp while keeping the query scoped and time-windowed.", + Parameters: map[string]string{ + "timestamp": "Center timestamp from a log row.", + "window": "Relative duration bounding the context search around the center timestamp; default '1h'.", + }, + Returns: "dict with compact context rows and optional query metadata.", + }, }, }, } @@ -262,6 +323,17 @@ Xatu data is split across **TWO datasources** with **DIFFERENT syntax**: **Always filter by partition column** (usually ` + "`slot_start_date_time`" + `) to avoid timeouts. +## OTel Log Helper Sources + +For devnet logs, prefer the Python log helpers over hand-written SQL: + +| Helper source | Datasource/table | Required scope | +|---------------|------------------|----------------| +| ` + "`hosted_devnet`" + ` | ` + "`clickhouse-raw.external.otel_logs`" + ` | ` + "`filters={'network': ''}`" + ` | +| ` + "`local_kurtosis`" + ` | ` + "`local-kurtosis.otel.otel_logs`" + ` | ` + "`filters={'enclave': ''}`" + ` | + +Use ` + "`clickhouse.log_values()`" + ` for field counts, ` + "`clickhouse.log_samples()`" + ` for counts plus sample lines, ` + "`clickhouse.log_coverage()`" + ` for severity coverage, ` + "`clickhouse.log_errors()`" + ` for compact warning/error rows, and ` + "`clickhouse.log_context()`" + ` for bounded before/after context. + ## Canonical vs Head Data - **Canonical** = finalized (no reorgs) - use for historical analysis diff --git a/modules/clickhouse/python/clickhouse.py b/modules/clickhouse/python/clickhouse.py index 1a817fe2..fc8cf798 100644 --- a/modules/clickhouse/python/clickhouse.py +++ b/modules/clickhouse/python/clickhouse.py @@ -1,12 +1,117 @@ """Thin ClickHouse wrappers over the server operation API.""" -from typing import Any +from __future__ import annotations + +from dataclasses import dataclass +import re +from typing import Any, Mapping, Sequence import pandas as pd from ethpandaops import _runtime +_ANSI_ESCAPE_RE_SQL = r"\x1b\[[0-9;?]*[A-Za-z]" +_ERROR_LEVEL_RE = ( + r"(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])" + r"|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b" +) +_WARN_OR_ERROR_LEVEL_RE = ( + r"(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC|WARN|WRN)($|[][ |:])" + r"|^(ERR|FAT|WRN)\b|\blevel=(crit|error|fatal|panic|warn|warning)\b" +) +_DEBUG_TRACE_LEVEL_RE = ( + r"(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])" + r"|\blevel=(debug|trace)\b" +) +_DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhdw])\s*$", re.IGNORECASE) +_DURATION_UNITS = { + "s": "SECOND", + "m": "MINUTE", + "h": "HOUR", + "d": "DAY", + "w": "WEEK", +} + + +@dataclass(frozen=True) +class _LogSource: + name: str + description: str + datasource: str + table: str + timestamp: str + body: str + severity_number: str + severity_text: str + level: str + fields: Mapping[str, str] + compact_fields: tuple[str, ...] + required_scope_fields: tuple[str, ...] + + +_LOG_SOURCES: dict[str, _LogSource] = { + "hosted_devnet": _LogSource( + name="hosted_devnet", + description=( + "Hosted multi-VM devnet and testnet container logs in " + "clickhouse-raw.external.otel_logs." + ), + datasource="clickhouse-raw", + table="external.otel_logs", + timestamp="Timestamp", + body="Body", + severity_number="SeverityNumber", + severity_text="SeverityText", + level="LogAttributes['level']", + fields={ + "timestamp": "Timestamp", + "body": "Body", + "severity_number": "SeverityNumber", + "severity_text": "SeverityText", + "level": "LogAttributes['level']", + "network": "ResourceAttributes['network']", + "host": "ResourceAttributes['host.name']", + "service": "ServiceName", + "container": "LogAttributes['log.file.name']", + "container_id": "LogAttributes['container_id']", + "ingress_user": "ResourceAttributes['ingress_user']", + "environment": "ResourceAttributes['deployment.environment']", + }, + compact_fields=("network", "host", "service", "container"), + required_scope_fields=("network",), + ), + "local_kurtosis": _LogSource( + name="local_kurtosis", + description=( + "Local Kurtosis devnet OTel logs in " + "local-kurtosis.otel.otel_logs." + ), + datasource="local-kurtosis", + table="otel.otel_logs", + timestamp="Timestamp", + body="Body", + severity_number="SeverityNumber", + severity_text="SeverityText", + level="LogAttributes['level']", + fields={ + "timestamp": "Timestamp", + "body": "Body", + "severity_number": "SeverityNumber", + "severity_text": "SeverityText", + "level": "LogAttributes['level']", + "enclave": "EnclaveName", + "service": "ServiceName", + "host": "ResourceAttributes['host.name']", + "container": "LogAttributes['log.file.name']", + "container_id": "LogAttributes['container_id']", + }, + compact_fields=("enclave", "service", "host", "container"), + required_scope_fields=("enclave",), + ), +} + + def list_datasources() -> list[dict[str, Any]]: """List available ClickHouse datasources.""" response = _runtime.invoke("clickhouse.list_datasources") @@ -47,3 +152,814 @@ def query_raw( "parameters": parameters, }, ) + + +def log_sources() -> list[dict[str, Any]]: + """List built-in ClickHouse log source presets. + + These presets encode the datasource, table, timestamp, identity fields, and + scope rules needed for token-efficient devnet log investigation. + """ + return [ + { + "name": source.name, + "description": source.description, + "datasource": source.datasource, + "table": source.table, + "timestamp": source.timestamp, + "body": source.body, + "fields": sorted(source.fields.keys()), + "compact_fields": list(source.compact_fields), + "required_scope_fields": list(source.required_scope_fields), + } + for source in _LOG_SOURCES.values() + ] + + +def log_coverage( + source: str, + filters: Mapping[str, Any], + *, + like_filters: Mapping[str, str] | None = None, + exclude_filters: Mapping[str, Any] | None = None, + since: str = "1h", + until: str | None = None, + include_sql: bool = False, +) -> dict[str, Any]: + """Measure severity field coverage for a scoped log slice. + + Use this before broad log triage to see whether structured severity fields + are populated or whether the bounded body fallback is needed. + """ + log_source = _get_log_source(source) + exact_filters = _normalize_filters("filters", filters, log_source) + _require_scope(log_source, exact_filters, "log_coverage") + + params: dict[str, Any] = {} + clauses = _build_where_clauses( + log_source, + params, + filters=exact_filters, + like_filters=_normalize_filters("like_filters", like_filters, log_source), + exclude_filters=_normalize_filters( + "exclude_filters", exclude_filters, log_source + ), + since=since, + until=until, + ) + + sql = f""" +SELECT + count() AS lines, + countIf({log_source.severity_number} != 0) AS severity_number_lines, + countIf({log_source.severity_text} != '') AS severity_text_lines, + countIf({log_source.level} != '') AS log_attr_level_lines, + min({log_source.timestamp}) AS first_seen, + max({log_source.timestamp}) AS last_seen +FROM {log_source.table} +{_where_sql(clauses)} +""".strip() + + rows, columns = query_raw(log_source.datasource, sql, params) + data = _first_row_dict(rows, columns) + lines = _to_int(data.get("lines")) + + result: dict[str, Any] = { + "source": log_source.name, + "datasource": log_source.datasource, + "table": log_source.table, + "filters": _public_filter_summary(exact_filters, like_filters, exclude_filters), + "lines": lines, + "severity_number_lines": _to_int(data.get("severity_number_lines")), + "severity_text_lines": _to_int(data.get("severity_text_lines")), + "log_attr_level_lines": _to_int(data.get("log_attr_level_lines")), + "first_seen": data.get("first_seen", ""), + "last_seen": data.get("last_seen", ""), + } + if lines: + result["severity_number_coverage"] = result["severity_number_lines"] / lines + result["severity_text_coverage"] = result["severity_text_lines"] / lines + result["log_attr_level_coverage"] = result["log_attr_level_lines"] / lines + + _maybe_attach_query(result, include_sql, log_source.datasource, sql, params) + return result + + +def log_values( + source: str, + field: str, + filters: Mapping[str, Any] | None = None, + *, + like_filters: Mapping[str, str] | None = None, + exclude_filters: Mapping[str, Any] | None = None, + since: str = "1h", + until: str | None = None, + limit: int = 20, +) -> pd.DataFrame: + """Return top values for a log field within a bounded slice.""" + log_source = _get_log_source(source) + field_expr = _field_expr(log_source, field) + exact_filters = _normalize_filters("filters", filters, log_source) + + # Discovering the required scope field itself is allowed without a scope + # filter. Drilling into other fields must be scoped to avoid wide scans. + if field not in log_source.required_scope_fields: + _require_scope(log_source, exact_filters, "log_values") + + params: dict[str, Any] = {} + clauses = _build_where_clauses( + log_source, + params, + filters=exact_filters, + like_filters=_normalize_filters("like_filters", like_filters, log_source), + exclude_filters=_normalize_filters( + "exclude_filters", exclude_filters, log_source + ), + since=since, + until=until, + ) + limit_ref = _add_param(params, "limit", _validate_int("limit", limit, 1, 500), "UInt64") + + sql = f""" +SELECT + {field_expr} AS value, + count() AS lines, + min({log_source.timestamp}) AS first_seen, + max({log_source.timestamp}) AS last_seen +FROM {log_source.table} +{_where_sql(clauses)} +GROUP BY value +ORDER BY lines DESC, value ASC +LIMIT {limit_ref} +""".strip() + + return query(log_source.datasource, sql, params) + + +def log_samples( + source: str, + field: str, + filters: Mapping[str, Any], + *, + like_filters: Mapping[str, str] | None = None, + exclude_filters: Mapping[str, Any] | None = None, + since: str = "1h", + until: str | None = None, + limit: int = 20, + body_chars: int = 160, + include_sql: bool = False, +) -> dict[str, Any]: + """Return top field values with counts and one compact sample log body.""" + log_source = _get_log_source(source) + field_expr = _field_expr(log_source, field) + exact_filters = _normalize_filters("filters", filters, log_source) + _require_scope(log_source, exact_filters, "log_samples") + + params: dict[str, Any] = {} + clauses = _build_where_clauses( + log_source, + params, + filters=exact_filters, + like_filters=_normalize_filters("like_filters", like_filters, log_source), + exclude_filters=_normalize_filters( + "exclude_filters", exclude_filters, log_source + ), + since=since, + until=until, + ) + limit_value = _validate_int("limit", limit, 1, 200) + body_chars_value = _validate_int("body_chars", body_chars, 40, 1000) + limit_ref = _add_param(params, "limit", limit_value, "UInt64") + body_chars_ref = _add_param(params, "body_chars", body_chars_value, "UInt64") + + sql = f""" +WITH {_clean_body_expr(log_source)} AS clean +SELECT + {field_expr} AS value, + count() AS lines, + min({log_source.timestamp}) AS first_seen, + max({log_source.timestamp}) AS last_seen, + any(substring(clean, 1, {body_chars_ref})) AS sample +FROM {log_source.table} +{_where_sql(clauses)} +GROUP BY value +ORDER BY lines DESC, value ASC +LIMIT {limit_ref} +""".strip() + + rows, columns = query_raw(log_source.datasource, sql, params) + row_dicts = [dict(zip(columns, row)) for row in rows] + for row in row_dicts: + row["lines"] = _to_int(row.get("lines")) + + result: dict[str, Any] = { + "source": log_source.name, + "datasource": log_source.datasource, + "table": log_source.table, + "field": field, + "filters": _public_filter_summary( + exact_filters, like_filters, exclude_filters + ), + "rows_returned": len(row_dicts), + "limit": limit_value, + "rows_limited": len(row_dicts) >= limit_value, + "body_chars": body_chars_value, + "rows": row_dicts, + } + _maybe_attach_query(result, include_sql, log_source.datasource, sql, params) + return result + + +def log_errors( + source: str, + filters: Mapping[str, Any], + *, + like_filters: Mapping[str, str] | None = None, + exclude_filters: Mapping[str, Any] | None = None, + since: str = "1h", + until: str | None = None, + min_severity: str = "error", + limit: int = 50, + body_chars: int = 240, + include_sql: bool = False, +) -> dict[str, Any]: + """Fetch compact warning/error-class log rows for a scoped log slice. + + The generated SQL prefers structured OTel severity and falls back to a + bounded, ANSI-stripped body regex for raw Docker log lines that have empty + severity fields. Set min_severity="warn" to include WARN/WRN rows. + """ + log_source = _get_log_source(source) + exact_filters = _normalize_filters("filters", filters, log_source) + _require_scope(log_source, exact_filters, "log_errors") + + params: dict[str, Any] = {} + clauses = _build_where_clauses( + log_source, + params, + filters=exact_filters, + like_filters=_normalize_filters("like_filters", like_filters, log_source), + exclude_filters=_normalize_filters( + "exclude_filters", exclude_filters, log_source + ), + since=since, + until=until, + ) + clauses.append(_severity_clause(log_source, params, min_severity)) + + limit_value = _validate_int("limit", limit, 1, 500) + body_chars_value = _validate_int("body_chars", body_chars, 40, 4000) + limit_ref = _add_param(params, "limit", limit_value, "UInt64") + body_chars_ref = _add_param(params, "body_chars", body_chars_value, "UInt64") + + sql = f""" +WITH {_clean_body_expr(log_source)} AS clean +SELECT +{_compact_select_list(log_source, body_chars_ref)} +FROM {log_source.table} +{_where_sql(clauses)} +ORDER BY {log_source.timestamp} DESC +LIMIT {limit_ref} +""".strip() + + rows, columns = query_raw(log_source.datasource, sql, params) + result = _compact_result( + log_source=log_source, + sql=sql, + params=params, + rows=rows, + columns=columns, + limit=limit_value, + body_chars=body_chars_value, + include_sql=include_sql, + ) + result["kind"] = f"{_normalize_min_severity(min_severity)}_logs" + result["filters"] = _public_filter_summary( + exact_filters, like_filters, exclude_filters + ) + return result + + +def log_context( + source: str, + filters: Mapping[str, Any], + timestamp: str, + *, + like_filters: Mapping[str, str] | None = None, + exclude_filters: Mapping[str, Any] | None = None, + before: int = 20, + after: int = 20, + window: str | None = "1h", + body_chars: int = 240, + include_sql: bool = False, +) -> dict[str, Any]: + """Fetch compact log context before and after a timestamp.""" + if not timestamp: + raise ValueError("timestamp is required") + + log_source = _get_log_source(source) + exact_filters = _normalize_filters("filters", filters, log_source) + _require_scope(log_source, exact_filters, "log_context") + + params: dict[str, Any] = {} + filter_clauses = _build_filter_clauses( + log_source, + params, + filters=exact_filters, + like_filters=_normalize_filters("like_filters", like_filters, log_source), + exclude_filters=_normalize_filters( + "exclude_filters", exclude_filters, log_source + ), + ) + where_scope = _indent(_where_sql(filter_clauses), 4) + center_ref = _add_param(params, "center", timestamp, "String") + before_ref = _add_param( + params, "before", _validate_int("before", before, 0, 200), "UInt64" + ) + after_ref = _add_param( + params, "after", _validate_int("after", after, 0, 200), "UInt64" + ) + body_chars_value = _validate_int("body_chars", body_chars, 40, 4000) + body_chars_ref = _add_param(params, "body_chars", body_chars_value, "UInt64") + center_expr = f"parseDateTime64BestEffort({center_ref}, 9)" + before_window_clause = "" + after_window_clause = "" + if window is not None: + interval = _duration_interval("window", window) + before_window_clause = f"\n AND {log_source.timestamp} >= center - {interval}" + after_window_clause = f"\n AND {log_source.timestamp} <= center + {interval}" + select_list = _compact_select_list( + log_source, + body_chars_ref, + clean_expr=_clean_body_expr(log_source), + ) + + sql = f""" +WITH {center_expr} AS center +SELECT * +FROM +( + SELECT * + FROM + ( + SELECT + 'before' AS context, +{_indent(select_list, 4)} + FROM {log_source.table} +{where_scope} + AND {log_source.timestamp} < center +{before_window_clause} + ORDER BY {log_source.timestamp} DESC + LIMIT {before_ref} + ) + + UNION ALL + + SELECT * + FROM + ( + SELECT + 'at' AS context, +{_indent(select_list, 4)} + FROM {log_source.table} +{where_scope} + AND {log_source.timestamp} = center + ORDER BY {log_source.timestamp} ASC + LIMIT 1 + ) + + UNION ALL + + SELECT * + FROM + ( + SELECT + 'after' AS context, +{_indent(select_list, 4)} + FROM {log_source.table} +{where_scope} + AND {log_source.timestamp} > center +{after_window_clause} + ORDER BY {log_source.timestamp} ASC + LIMIT {after_ref} + ) +) +ORDER BY ts ASC +""".strip() + + rows, columns = query_raw(log_source.datasource, sql, params) + result = _compact_result( + log_source=log_source, + sql=sql, + params=params, + rows=rows, + columns=columns, + limit=_validate_int("before", before, 0, 200) + + _validate_int("after", after, 0, 200) + + 1, + body_chars=body_chars_value, + include_sql=include_sql, + ) + result["kind"] = "log_context" + result["center_timestamp"] = timestamp + result["window"] = window + result["filters"] = _public_filter_summary( + exact_filters, like_filters, exclude_filters + ) + return result + + +def _get_log_source(source: str) -> _LogSource: + if source not in _LOG_SOURCES: + names = ", ".join(sorted(_LOG_SOURCES)) + raise ValueError(f"unknown log source {source!r}; expected one of: {names}") + return _LOG_SOURCES[source] + + +def _field_expr(source: _LogSource, field: str) -> str: + if field not in source.fields: + names = ", ".join(sorted(source.fields)) + raise ValueError( + f"unknown field {field!r} for log source {source.name!r}; " + f"expected one of: {names}" + ) + return source.fields[field] + + +def _field_param_type(field: str) -> str: + if field == "severity_number": + return "UInt8" + return "String" + + +def _normalize_filters( + name: str, + filters: Mapping[str, Any] | None, + source: _LogSource, +) -> dict[str, Any]: + if filters is None: + return {} + if not isinstance(filters, Mapping): + raise TypeError(f"{name} must be a mapping of field name to value") + + normalized: dict[str, Any] = {} + for field, value in filters.items(): + if not isinstance(field, str): + raise TypeError(f"{name} keys must be field names") + _field_expr(source, field) + if value is None: + raise ValueError(f"{name}[{field!r}] cannot be None") + normalized[field] = value + return normalized + + +def _require_scope( + source: _LogSource, + filters: Mapping[str, Any], + operation: str, +) -> None: + for field in source.required_scope_fields: + if field in filters: + return + + required = " or ".join(repr(field) for field in source.required_scope_fields) + raise ValueError( + f"{operation} requires an exact scope filter for {required} on " + f"log source {source.name!r}" + ) + + +def _build_where_clauses( + source: _LogSource, + params: dict[str, Any], + *, + filters: Mapping[str, Any], + like_filters: Mapping[str, str], + exclude_filters: Mapping[str, Any], + since: str | None, + until: str | None, +) -> list[str]: + clauses = _build_filter_clauses( + source, + params, + filters=filters, + like_filters=like_filters, + exclude_filters=exclude_filters, + ) + clauses.extend(_time_clauses(source, params, since=since, until=until)) + return clauses + + +def _build_filter_clauses( + source: _LogSource, + params: dict[str, Any], + *, + filters: Mapping[str, Any], + like_filters: Mapping[str, str], + exclude_filters: Mapping[str, Any], +) -> list[str]: + clauses: list[str] = [] + + for field, value in filters.items(): + expr = _field_expr(source, field) + clauses.append( + _comparison_clause(params, field, expr, value, _field_param_type(field), "=") + ) + + for field, value in like_filters.items(): + if _is_sequence(value): + raise ValueError(f"like_filters[{field!r}] must be a single LIKE pattern") + expr = _field_expr(source, field) + value_ref = _add_param(params, f"{field}_like", value, "String") + clauses.append(f"{expr} LIKE {value_ref}") + + for field, value in exclude_filters.items(): + expr = _field_expr(source, field) + clauses.append( + _comparison_clause(params, field, expr, value, _field_param_type(field), "!=") + ) + + return clauses + + +def _comparison_clause( + params: dict[str, Any], + field: str, + expr: str, + value: Any, + param_type: str, + operator: str, +) -> str: + if _is_sequence(value): + values = list(value) + if not values: + raise ValueError(f"filter {field!r} cannot use an empty sequence") + refs = [ + _add_param(params, field, item, param_type) + for item in values + ] + sql_operator = "IN" if operator == "=" else "NOT IN" + return f"{expr} {sql_operator} ({', '.join(refs)})" + + value_ref = _add_param(params, field, value, param_type) + return f"{expr} {operator} {value_ref}" + + +def _time_clauses( + source: _LogSource, + params: dict[str, Any], + *, + since: str | None, + until: str | None, +) -> list[str]: + clauses: list[str] = [] + if since: + clauses.append(f"{source.timestamp} >= {_time_expr(params, 'since', since)}") + if until: + clauses.append(f"{source.timestamp} < {_time_expr(params, 'until', until)}") + return clauses + + +def _time_expr(params: dict[str, Any], name: str, value: str) -> str: + if not isinstance(value, str): + raise TypeError(f"{name} must be a relative duration string or timestamp string") + + normalized = value.strip() + if not normalized: + raise ValueError(f"{name} cannot be empty") + if normalized.lower() == "now": + return "now()" + + match = _DURATION_RE.match(normalized) + if match: + amount = int(match.group(1)) + if amount <= 0: + raise ValueError(f"{name} duration must be positive") + unit = _DURATION_UNITS[match.group(2).lower()] + return f"now() - INTERVAL {amount} {unit}" + + value_ref = _add_param(params, name, normalized, "String") + return f"parseDateTime64BestEffort({value_ref}, 9)" + + +def _duration_interval(name: str, value: str) -> str: + if not isinstance(value, str): + raise TypeError(f"{name} must be a relative duration string") + + match = _DURATION_RE.match(value.strip()) + if not match: + raise ValueError(f"{name} must be a relative duration like '15m' or '1h'") + + amount = int(match.group(1)) + if amount <= 0: + raise ValueError(f"{name} duration must be positive") + + unit = _DURATION_UNITS[match.group(2).lower()] + return f"INTERVAL {amount} {unit}" + + +def _severity_clause( + source: _LogSource, + params: dict[str, Any], + min_severity: str, +) -> str: + normalized = _normalize_min_severity(min_severity) + if normalized == "warn": + severity_number = 13 + severity_texts = ( + "'WARN', 'WARNING', 'WRN', 'CRIT', 'CRITICAL', " + "'ERRO', 'ERROR', 'FATAL', 'PANIC'" + ) + attr_levels = ( + "'warn', 'warning', 'wrn', 'crit', 'critical', " + "'erro', 'error', 'fatal', 'panic'" + ) + level_re = _WARN_OR_ERROR_LEVEL_RE + else: + severity_number = 17 + severity_texts = "'CRIT', 'CRITICAL', 'ERRO', 'ERROR', 'FATAL', 'PANIC'" + attr_levels = "'crit', 'critical', 'erro', 'error', 'fatal', 'panic'" + level_re = _ERROR_LEVEL_RE + + error_re = _add_param(params, "level_re", level_re, "String") + debug_trace_re = _add_param( + params, "debug_trace_level_re", _DEBUG_TRACE_LEVEL_RE, "String" + ) + + return f"""( + {source.severity_number} >= {severity_number} + OR upper({source.severity_text}) IN ({severity_texts}) + OR lower({source.level}) IN ({attr_levels}) + OR ( + match(clean, {error_re}) + AND NOT match(clean, {debug_trace_re}) + ) +)""" + + +def _normalize_min_severity(value: str) -> str: + if value not in ("error", "warn"): + raise ValueError("min_severity must be 'error' or 'warn'") + return value + + +def _clean_body_expr(source: _LogSource) -> str: + return f"replaceRegexpAll({source.body}, '{_ANSI_ESCAPE_RE_SQL}', '')" + + +def _compact_select_list( + source: _LogSource, + body_chars_ref: str, + *, + clean_expr: str = "clean", +) -> str: + fields = [ + f" {source.timestamp} AS ts", + ] + for field in source.compact_fields: + fields.append(f" {_field_expr(source, field)} AS {field}") + + fields.extend( + [ + ( + " multiIf(" + f"{source.severity_text} != '', {source.severity_text}, " + f"{source.level} != '', {source.level}, " + f"{source.severity_number} != 0, toString({source.severity_number}), " + "''" + ") AS severity" + ), + f" {source.severity_number} AS severity_number", + f" substring({clean_expr}, 1, {body_chars_ref}) AS body", + f" length({clean_expr}) > {body_chars_ref} AS body_truncated", + ] + ) + return ",\n".join(fields) + + +def _compact_result( + *, + log_source: _LogSource, + sql: str, + params: Mapping[str, Any], + rows: Sequence[Sequence[Any]], + columns: Sequence[str], + limit: int, + body_chars: int, + include_sql: bool, +) -> dict[str, Any]: + row_dicts = [_compact_row(dict(zip(columns, row))) for row in rows] + result: dict[str, Any] = { + "source": log_source.name, + "datasource": log_source.datasource, + "table": log_source.table, + "rows_returned": len(row_dicts), + "limit": limit, + "rows_limited": len(row_dicts) >= limit, + "body_chars": body_chars, + "rows": row_dicts, + } + _maybe_attach_query(result, include_sql, log_source.datasource, sql, params) + return result + + +def _compact_row(row: dict[str, Any]) -> dict[str, Any]: + compact = dict(row) + if "body_truncated" in compact: + compact["body_truncated"] = _to_bool(compact["body_truncated"]) + if "severity_number" in compact: + compact["severity_number"] = _to_int(compact["severity_number"]) + return compact + + +def _maybe_attach_query( + result: dict[str, Any], + include_sql: bool, + datasource: str, + sql: str, + params: Mapping[str, Any], +) -> None: + if include_sql: + result["query"] = { + "datasource": datasource, + "sql": sql, + "parameters": dict(params), + } + else: + result["query_omitted"] = "pass include_sql=True to include reproducible SQL" + + +def _public_filter_summary( + filters: Mapping[str, Any], + like_filters: Mapping[str, Any] | None, + exclude_filters: Mapping[str, Any] | None, +) -> dict[str, Any]: + result: dict[str, Any] = {"filters": dict(filters)} + if like_filters: + result["like_filters"] = dict(like_filters) + if exclude_filters: + result["exclude_filters"] = dict(exclude_filters) + return result + + +def _where_sql(clauses: Sequence[str]) -> str: + if not clauses: + return "" + return "WHERE " + "\n AND ".join(clauses) + + +def _add_param( + params: dict[str, Any], + name: str, + value: Any, + param_type: str, +) -> str: + safe_name = re.sub(r"[^a-zA-Z0-9_]", "_", name).strip("_") or "param" + candidate = safe_name + suffix = 2 + while candidate in params: + candidate = f"{safe_name}_{suffix}" + suffix += 1 + + params[candidate] = value + return f"{{{candidate}:{param_type}}}" + + +def _validate_int(name: str, value: int, minimum: int, maximum: int) -> int: + if not isinstance(value, int): + raise TypeError(f"{name} must be an integer") + if value < minimum or value > maximum: + raise ValueError(f"{name} must be between {minimum} and {maximum}") + return value + + +def _first_row_dict( + rows: Sequence[Sequence[Any]], + columns: Sequence[str], +) -> dict[str, Any]: + if not rows: + return {} + return dict(zip(columns, rows[0])) + + +def _to_int(value: Any) -> int: + if value in (None, ""): + return 0 + return int(value) + + +def _to_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if value in (1, "1", "true", "True", "TRUE"): + return True + return False + + +def _is_sequence(value: Any) -> bool: + return isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)) + + +def _indent(value: str, spaces: int) -> str: + prefix = " " * spaces + return "\n".join(prefix + line for line in value.splitlines()) diff --git a/modules/clickhouse/python_helpers_test.go b/modules/clickhouse/python_helpers_test.go new file mode 100644 index 00000000..6b6ee623 --- /dev/null +++ b/modules/clickhouse/python_helpers_test.go @@ -0,0 +1,160 @@ +package clickhouse + +import ( + "os" + "os/exec" + "testing" +) + +func TestPythonLogHelpersBuildSafeCompactQueries(t *testing.T) { + if _, err := exec.LookPath("python3"); err != nil { + t.Skip("python3 not available") + } + + pythonModule, err := os.ReadFile("python/clickhouse.py") + if err != nil { + t.Fatalf("read python/clickhouse.py: %v", err) + } + if len(pythonModule) == 0 { + t.Fatal("python/clickhouse.py is empty") + } + + script := ` +import importlib.util +import sys +import types + +pd = types.ModuleType("pandas") +pd.DataFrame = object +sys.modules["pandas"] = pd + +ethpandaops = types.ModuleType("ethpandaops") +runtime = types.SimpleNamespace() +ethpandaops._runtime = runtime +sys.modules["ethpandaops"] = ethpandaops + +spec = importlib.util.spec_from_file_location("clickhouse", "python/clickhouse.py") +mod = importlib.util.module_from_spec(spec) +sys.modules["clickhouse"] = mod +spec.loader.exec_module(mod) + +captured = [] + +def fake_rows(operation, args): + captured.append((operation, args)) + sql = args["sql"] + if "countIf" in sql: + return [("10", "5", "2", "1", "2026-06-04 11:00:00", "2026-06-04 12:00:00")], [ + "lines", + "severity_number_lines", + "severity_text_lines", + "log_attr_level_lines", + "first_seen", + "last_seen", + ] + if "UNION ALL" in sql: + return [("before", "2026-06-04 11:59:59", "n", "h", "", "c.log", "ERROR", "17", "before body", "0")], [ + "context", + "ts", + "network", + "host", + "service", + "container", + "severity", + "severity_number", + "body", + "body_truncated", + ] + if "any(substring(clean" in sql: + return [("c.log", "12", "2026-06-04 11:00:00", "2026-06-04 12:00:00", "sample body")], [ + "value", + "lines", + "first_seen", + "last_seen", + "sample", + ] + return [("2026-06-04 12:00:00", "n", "h", "", "c.log", "ERROR", "17", "body", "1")], [ + "ts", + "network", + "host", + "service", + "container", + "severity", + "severity_number", + "body", + "body_truncated", + ] + +def fake_dataframe(operation, args): + captured.append((operation, args)) + return {"operation": operation, "args": args} + +runtime.invoke_tsv_rows = fake_rows +runtime.invoke_tsv_dataframe = fake_dataframe + +sources = mod.log_sources() +assert {source["name"] for source in sources} == {"hosted_devnet", "local_kurtosis"} +assert "network" in next(source for source in sources if source["name"] == "hosted_devnet")["fields"] + +coverage = mod.log_coverage("hosted_devnet", {"network": "n", "host": "h"}) +assert coverage["lines"] == 10 +assert coverage["severity_number_coverage"] == 0.5 +assert "query" not in coverage +assert coverage["query_omitted"] + +errors = mod.log_errors("hosted_devnet", {"network": "n", "host": "h"}, include_sql=True) +assert errors["datasource"] == "clickhouse-raw" +assert errors["rows"][0]["severity_number"] == 17 +assert errors["rows"][0]["body_truncated"] is True +assert "query" in errors +assert "ResourceAttributes['network'] = {network:String}" in errors["query"]["sql"] +assert "match(clean, {level_re:String})" in errors["query"]["sql"] +assert errors["query"]["parameters"]["body_chars"] == 240 + +warns = mod.log_errors("local_kurtosis", {"enclave": "e"}, like_filters={"service": "el-%"}, min_severity="warn", include_sql=True) +assert warns["kind"] == "warn_logs" +assert "SeverityNumber >= 13" in warns["query"]["sql"] +assert "WARN" in warns["query"]["parameters"]["level_re"] + +context = mod.log_context("hosted_devnet", {"network": "n", "host": "h"}, "2026-06-04T12:00:00Z", before=1, after=1, include_sql=True) +assert context["rows"][0]["context"] == "before" +assert "UNION ALL" in context["query"]["sql"] +assert "center - INTERVAL 1 HOUR" in context["query"]["sql"] + +samples = mod.log_samples("hosted_devnet", "container", {"network": "n", "host": "h"}, include_sql=True) +assert samples["rows"][0]["value"] == "c.log" +assert samples["rows"][0]["lines"] == 12 +assert "any(substring(clean" in samples["query"]["sql"] + +values = mod.log_values( + "hosted_devnet", + "host", + {"network": "n"}, + like_filters={"host": "lighthouse-%"}, + exclude_filters={"host": "bootnode-1"}, +) +values_sql = values["args"]["sql"] +assert "ResourceAttributes['host.name'] AS value" in values_sql +assert "LIKE {host_like:String}" in values_sql +assert "!= {host:String}" in values_sql + +try: + mod.log_errors("hosted_devnet", {"host": "h"}) + raise AssertionError("unscoped hosted log_errors should fail") +except ValueError as exc: + assert "network" in str(exc) + +try: + mod.log_values("hosted_devnet", "unknown", {"network": "n"}) + raise AssertionError("unknown field should fail") +except ValueError as exc: + assert "unknown field" in str(exc) +` + + cmd := exec.Command("python3", "-c", script) + cmd.Dir = "." + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("python log helper test failed: %v\n%s", err, output) + } +} diff --git a/runbooks/debug_devnet.md b/runbooks/debug_devnet.md index 80c1ec02..af7a9d18 100644 --- a/runbooks/debug_devnet.md +++ b/runbooks/debug_devnet.md @@ -19,15 +19,15 @@ Start by capturing `panda datasources` in the debug report. Hosted devnet logs a ## How Devnet Logs Flow -Hosted devnets run as Docker containers on bare-metal VMs (managed by Ansible). Each container's logs are scraped and shipped via OpenTelemetry into the `clickhouse-raw` ClickHouse cluster, database `external`, table `external.otel_logs`. Query them with SQL via `clickhouse.query("clickhouse-raw", ...)`, always filtering by `ResourceAttributes['network']` (the devnet) and `Timestamp`. Do not query a separate log datasource for hosted devnets; devnet container logs live in ClickHouse. +Hosted devnets run as Docker containers on bare-metal VMs (managed by Ansible). Each container's logs are scraped and shipped via OpenTelemetry into the `clickhouse-raw` ClickHouse cluster, database `external`, table `external.otel_logs`. Use the `clickhouse.log_*` helpers with source `hosted_devnet` for normal investigation; they generate SQL that filters by `ResourceAttributes['network']` (the devnet) and `Timestamp`. Use raw `clickhouse.query("clickhouse-raw", ...)` only as a fallback. Do not query a separate log datasource for hosted devnets; devnet container logs live in ClickHouse. LogQL only applies to Loki datasources, and these hosted devnet container logs are not exposed through Loki. Key fields on `external.otel_logs`: - `Timestamp DateTime64(9)` — always filter on this (it is the partition key). -- `Body String` — the raw log line. The level is usually embedded here, not in `SeverityText`. **Lines are terminal-coloured — the level token is wrapped in ANSI escape codes** (`\x1b[31mERROR\x1b[0m`); strip them with a `clean` CTE on bounded queries before matching (see step 4). -- `SeverityText LowCardinality(String)` — often EMPTY for raw Docker logs; do not rely on it. Triage severity by stripping ANSI then matching the **LEVEL token** in the cleaned line, not the bare word "error" (see step 4 below — a substring match returns tens of thousands of benign DEBUG lines on a healthy network). +- `SeverityNumber UInt8` / `SeverityText LowCardinality(String)` — use these first when populated (`SeverityNumber >= 17` is OTel error/fatal). +- `Body String` — the raw log line. For raw Docker logs, structured severity is often empty and the level is embedded here instead. **Lines are terminal-coloured — the level token can be wrapped in ANSI escape codes** (`\x1b[31mERROR\x1b[0m`); strip them with a `clean` CTE only in bounded fallback queries (see step 4). - `ServiceName` — empty for these VM/Docker logs (the `k8s.*` materialized columns are also empty — those only apply to Kubernetes platform logs). - `ResourceAttributes Map(String, String)` — node identity. Keys: `network` (devnet name), `host.name` (the node, e.g. `lighthouse-geth-super-1`), `ingress_user`, `deployment.environment`. -- `LogAttributes Map(String, String)` — per-line attributes. Keys include `log.file.name` / `log.file.path` (the Docker container json-log file — one per container on the node), `container_id`, plus any structured fields the client emits (`level`, `msg`, `component`, ...). +- `LogAttributes Map(String, String)` — per-line attributes. Keys include `log.file.name` / `log.file.path` (the Docker container json-log file — one per container on the node), `container_id`, plus any structured fields the client emits (`level`, `msg`, `component`, ...). Use `LogAttributes['level']` before falling back to parsing `Body`. **Node naming:** `host.name` encodes the client pair as `---` (e.g. `lighthouse-geth-super-1` → CL lighthouse, EL geth). Non-paired nodes exist too (`bootnode-1`, `mev-relay-1`). The current OTel records do not provide `ethereum_cl` / `ethereum_el` labels. A node VM runs the CL, EL, validator, and sidecar containers together, distinguished only by `LogAttributes['log.file.name']` (a container hash). To isolate one client's logs on a node, discover its containers first (see Phase 2) or identify the client by its log-line format in `Body`. @@ -76,7 +76,7 @@ If two sources disagree (e.g. Dora says 16 nodes, the logs show 30 hosts), surfa A *citation* is a `panda` command that re-derives the cited evidence. Every finding you record — both in the debug report and in chat output — MUST be followed by the citation(s) that produce it, so the user can run them and verify independently. Citations are claim-anchored, not exhaustive: cite the calls that support a finding, not every probe along the way. -Place each citation directly under the finding, in a fenced shell block, with a one-line `#` comment saying what it fetches. Discover the current command surface with `panda --help` (and subcommand `--help`) — do not hardcode flags or subcommands from memory. For datasource availability, cite the `panda datasources` output captured at the start. For log-derived claims, cite a `panda execute --code ...` command that re-runs the relevant `clickhouse.query("clickhouse-raw", ...)` SQL. +Place each citation directly under the finding, in a fenced shell block, with a one-line `#` comment saying what it fetches. Discover the current command surface with `panda --help` (and subcommand `--help`) — do not hardcode flags or subcommands from memory. For datasource availability, cite the `panda datasources` output captured at the start. For log-derived claims, cite a `panda execute --code ...` command that re-runs the relevant `clickhouse.log_*` helper call (or raw `clickhouse.query("clickhouse-raw", ...)` SQL if you used the raw fallback). Use `include_sql=True` when writing the debug report if the user needs the generated SQL. ## Timeframe Rules @@ -214,17 +214,35 @@ Append the table and your read of it, then proceed to Phase 2. ## Phase 2: Log Investigation with ClickHouse (`external.otel_logs`) -Use Dora findings (if available) to target specific nodes. With logs only (no Dora), start from the `hosts` list discovered in Phase 0 to identify which nodes have issues. **Always filter by `ResourceAttributes['network']` and `Timestamp`** — unfiltered queries scan everything and may time out. All queries go through `clickhouse.query("clickhouse-raw", ...)` against `external.otel_logs`; see **How Devnet Logs Flow** above for the full schema and severity-matching details. +Use Dora findings (if available) to target specific nodes. With logs only (no Dora), start from the `hosts` list discovered in Phase 0 to identify which nodes have issues. **Prefer the ClickHouse log helpers** — they expand validated field aliases into the correct ClickHouse expressions, keep queries time-bounded, prefer structured severity, and return compact rows. Use raw SQL only as an escape hatch. **Use the same active timeframe** established in the Timeframe Rules section above. -**⚠️ ANSI stripping is for bounded queries only.** Log lines are terminal-coloured, so severity matching strips ANSI in a `clean` CTE first (`replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '')`). But wrapping `Body` disables the `idx_body` skip-index and rewrites every scanned row, and (primary key led by `IngressUser`, data >7d on S3) a stripped-`Body` regex over a wide/multi-day window becomes a full scan. **Always pair the strip with a `host.name` filter + tight `Timestamp` window + `LIMIT`** — narrow to the suspect host/time first, then strip within that slice. Pass SQL as a raw string (`r"""`) so `\b`/`\x1b` survive. +**Severity filters:** These logs are in ClickHouse, not Loki, so LogQL is not available here. `clickhouse.log_errors()` uses `SeverityNumber`, `SeverityText`, and `LogAttributes['level']` first, then falls back to a bounded ANSI-stripped `Body` regex for raw Docker logs. Keep the source scope exact (`network`, and usually `host`) and keep `since`/`until` aligned with the active timeframe. + +Run this once for the target host and append it to the debug report: + +```python +from ethpandaops import clickhouse + +network = "" +host = "" +timeframe = {"since": "1h"} # replace with the active timeframe if different + +coverage = clickhouse.log_coverage( + "hosted_devnet", + filters={"network": network, "host": host}, + include_sql=True, + **timeframe, +) +print(coverage) +``` **Node naming:** Most nodes follow `---` (e.g. `lighthouse-geth-super-1` → CL lighthouse, EL geth), but devnets also include bootnodes, MEV relays, and other non-paired nodes (`bootnode-1`, `mev-relay-1`) that do NOT match this pattern. Never derive node names from the convention — always use the `hosts` list discovered in Phase 0 (or Dora's `/v1/clients/consensus`). **Exclude `bootnode-1` from cross-host triage by default.** Bootnodes flood multi-host sweeps with p2p noise (`Ping` deserialization, `ENR missing IP`, connection churn) that's never the root cause — add `AND ResourceAttributes['host.name'] != 'bootnode-1'` to any multi-host query (per-host queries below pin one `host.name`, so they're unaffected). Investigate the bootnode directly only if discovery/peering is the suspected problem. -**CL vs EL in ClickHouse OTel logs:** there is no `ethereum_cl` / `ethereum_el` label. A node VM runs the CL, EL, validator, and sidecar containers together; their logs are separated only by `LogAttributes['log.file.name']` (a per-container json-log file, named by hash). To investigate one client on a node, first discover its containers (step 3) and identify the CL/EL container by its log-line format, then filter on that log file. To sweep a client type across the network, filter on `host.name` (e.g. `host.name LIKE 'lighthouse-%'` for lighthouse-CL nodes, or `host.name LIKE '%-geth-%'` for geth-EL nodes) — but remember the result still mixes that node's CL/EL/sidecar lines. +**CL vs EL in ClickHouse OTel logs:** there is no `ethereum_cl` / `ethereum_el` label. A node VM runs the CL, EL, validator, and sidecar containers together; their logs are separated only by `LogAttributes['log.file.name']` (field alias: `container`). To investigate one client on a node, first discover containers with `log_samples()`, identify the CL/EL container by sample log format, then filter error queries on `container`. **You SHOULD start with the consensus layer (CL).** Most devnet issues originate at the CL level. Only investigate EL logs if CL logs point to execution-side problems (e.g. payload validation errors, engine API failures). @@ -235,56 +253,71 @@ Use Dora findings (if available) to target specific nodes. With logs only (no Do network = "" host = "" - - df = clickhouse.query("clickhouse-raw", r""" - WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean - SELECT - LogAttributes['log.file.name'] AS container_log, - count() AS lines, - any(substring(clean, 1, 120)) AS sample - FROM external.otel_logs - WHERE ResourceAttributes['network'] = {network:String} - AND ResourceAttributes['host.name'] = {host:String} - AND Timestamp >= now() - INTERVAL 1 HOUR - GROUP BY container_log - ORDER BY lines DESC - """, parameters={"network": network, "host": host}) - print(df) + timeframe = {"since": "1h"} + + containers = clickhouse.log_samples( + "hosted_devnet", + "container", + filters={"network": network, "host": host}, + limit=20, + body_chars=160, + include_sql=True, + **timeframe, + ) + for row in containers["rows"]: + print(row) ``` Identify the client from each `sample` log format (e.g. lighthouse `MMM DD HH:MM:SS.mmm LEVEL ...`, geth `LEVEL [MM-DD|HH:MM:SS.mmm] ...`, prysm `level=... msg=...`). Append the node→container map to the debug report. -4. **Fetch CL errors first (CRIT/ERR)** - For each problematic node (or all CL nodes when there is no Dora target), fetch the most severe lines. `SeverityText` is usually empty for these Docker logs, so match severity on the raw `Body` — but **anchor on the LEVEL token, do not substring-match "error"** (a bare `(?i)error` returns tens of thousands of benign DEBUG lines on a healthy network). Match the uppercase level token (case-sensitively) or logfmt `level=error`, and exclude DEBUG/TRACE. Per-client LEVEL tokens: lighthouse `ERROR`; geth/nethermind/erigon/reth/besu `ERROR [..]` or `|ERROR|`; prysm `[ts] ERROR` / `level=error`; nimbus `ERR`/`FAT` at line start; lodestar `level=error`: +4. **Fetch CL errors first (CRIT/ERR)** - For each problematic node, fetch the most severe compact rows. Keep the query bounded by `network`, `host`, active timeframe, and `LIMIT`; do not use a bare `error` substring. ```python from ethpandaops import clickhouse network = "" host = "" + timeframe = {"since": "1h"} + + errors = clickhouse.log_errors( + "hosted_devnet", + filters={"network": network, "host": host}, + limit=100, + body_chars=240, + include_sql=True, + **timeframe, + ) + for row in errors["rows"]: + print(row) + ``` + + Once you have identified the CL container's log file (step 3), add `container` to the filters to isolate the CL client's lines from the EL and sidecars on the same node: - df = clickhouse.query("clickhouse-raw", r""" - WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean - SELECT - Timestamp, - ResourceAttributes['host.name'] AS host, - LogAttributes['log.file.name'] AS container_log, - clean AS Body - FROM external.otel_logs - WHERE ResourceAttributes['network'] = {network:String} - AND ResourceAttributes['host.name'] = {host:String} - -- error-class LEVEL token only, matched on the ANSI-stripped line (uppercase token, nimbus 3-letter, or logfmt level=) - AND match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') - AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') - AND Timestamp >= now() - INTERVAL 1 HOUR - ORDER BY Timestamp DESC - LIMIT 200 - """, parameters={"network": network, "host": host}) - print(df) + ```python + errors = clickhouse.log_errors( + "hosted_devnet", + filters={"network": network, "host": host, "container": ""}, + limit=100, + body_chars=240, + include_sql=True, + **timeframe, + ) ``` - Once you have identified the CL container's log file (step 3), add `AND LogAttributes['log.file.name'] = {container:String}` to isolate the CL client's lines from the EL and sidecars on the same node. + To sweep a CL client type across the whole network instead of one node, use `like_filters={"host": "lighthouse-%"}` and `exclude_filters={"host": "bootnode-1"}`. This is a wider scan with the ANSI-stripped Body fallback in play — keep the timeframe short (≤1h) and the `limit` tight, then drill into individual hosts once you have a target: - To sweep a CL client type across the whole network instead of one node, replace the host filter with `AND ResourceAttributes['host.name'] LIKE {cl_prefix:String}` (and add `AND ResourceAttributes['host.name'] != 'bootnode-1'` per the bootnode-exclusion note above) and pass e.g. `{"cl_prefix": "lighthouse-%"}`. This is a wider scan with the ANSI strip in play — keep the `Timestamp` window short (≤1h) and the `LIMIT` tight, and prefer drilling into individual hosts once you have a target. Do not widen this to all hosts over a multi-day range. + ```python + errors = clickhouse.log_errors( + "hosted_devnet", + filters={"network": network}, + like_filters={"host": "lighthouse-%"}, + exclude_filters={"host": "bootnode-1"}, + limit=100, + body_chars=240, + include_sql=True, + **timeframe, + ) + ``` If multiple nodes are erroring, query each one. Look for common error patterns across nodes — the same error across nodes of one client type points to a client bug. diff --git a/runbooks/debug_local_devnet.md b/runbooks/debug_local_devnet.md index c69e02d2..0fbe12e8 100644 --- a/runbooks/debug_local_devnet.md +++ b/runbooks/debug_local_devnet.md @@ -9,13 +9,13 @@ The first step in debugging a local devnet is discovering what tooling is availa **The user MUST specify which enclave to debug.** Do NOT assume an enclave — if the user hasn't specified one, ask them before proceeding. You can discover running enclaves with `kurtosis enclave ls`. -**Local devnets do NOT use the hosted ClickHouse datasources.** Start by capturing `panda datasources`, then only use `clickhouse.query("local-kurtosis", ...)` for logs when the `local-kurtosis` ClickHouse datasource is discovered. Do not use the hosted `clickhouse-raw`/`clickhouse-refined` datasources for local Kurtosis logs. +**Local devnets do NOT use the hosted ClickHouse datasources.** Start by capturing `panda datasources`, then only use the `local_kurtosis` ClickHouse log helper source (backed by the `local-kurtosis` datasource) when it is discovered. Do not use the hosted `clickhouse-raw`/`clickhouse-refined` datasources for local Kurtosis logs. Refer to the query skill for general API usage patterns (Dora overview, ClickHouse queries, direct HTTP calls, Dora link generation, etc.). This runbook only covers the debugging-specific procedure and API calls not in the skill. ## How OTel Logs Flow -Kurtosis devnet services emit logs to the devnet's `otel-collector`. The collector writes them into the Kurtosis ClickHouse service on HTTP port `18123`, database `otel`, table `otel_logs`. Panda starts an in-process local proxy that autodiscovers this ClickHouse when `/ping` returns `Ok.` and the `otel` database exists, then exposes it as the `local-kurtosis` ClickHouse datasource. Query it with SQL, always filtering by `EnclaveName` because one local ClickHouse can hold logs from multiple devnets. If `local-kurtosis` is absent from the advertised datasources, treat ClickHouse logs as unavailable and use the `kurtosis service logs` fallback. +Kurtosis devnet services emit logs to the devnet's `otel-collector`. The collector writes them into the Kurtosis ClickHouse service on HTTP port `18123`, database `otel`, table `otel_logs`. Panda starts an in-process local proxy that autodiscovers this ClickHouse when `/ping` returns `Ok.` and the `otel` database exists, then exposes it as the `local-kurtosis` ClickHouse datasource. Use the `clickhouse.log_*` helpers with source `local_kurtosis` for normal investigation; they generate SQL that filters by `EnclaveName` because one local ClickHouse can hold logs from multiple devnets. If `local-kurtosis` is absent from the advertised datasources, treat ClickHouse logs as unavailable and use the `kurtosis service logs` fallback. ## Sandbox Session @@ -166,29 +166,38 @@ Use the autodiscovered `local-kurtosis` ClickHouse datasource. The local OTel ta Useful schema fields: - `otel.otel_logs`: `Timestamp DateTime64(9)`, `ServiceName LowCardinality(String)`, `Body String`, `SeverityText LowCardinality(String)`, `SeverityNumber UInt8`, `EnclaveName LowCardinality(String)`, `EnclaveUuid`, `ResourceAttributes Map(LowCardinality(String), String)`, `LogAttributes Map(LowCardinality(String), String)` -**Always filter by `EnclaveName`** (and `ServiceName` for service-level queries). The Kurtosis collector may leave `SeverityText`/`SeverityNumber` empty, so severity comes from `Body` — which is terminal-coloured. **Strip ANSI in a `clean` CTE, then anchor on the LEVEL token — don't substring-match "error"** (a bare `(?i)error` matches tens of thousands of benign DEBUG lines; an un-stripped colour-wrapped `ERROR` defeats the anchors). Match the uppercase token (case-sensitively) or `level=error` on `clean`, excluding DEBUG/TRACE (per-client tokens: lighthouse `ERROR`, geth-style `ERROR [..]`, prysm `level=error`, nimbus `ERR`/`FAT`): +**Severity filters:** Always filter by `enclave` and usually `service`. This is ClickHouse SQL under the hood, not LogQL. Prefer the log helpers; `clickhouse.log_errors()` uses `SeverityNumber`, `SeverityText`, and `LogAttributes['level']` first, then falls back to a bounded ANSI-stripped `Body` regex for raw logs. Use `min_severity="warn"` only when you intentionally need WARN/WRN rows. -```sql --- strip ANSI first, then match the error-class LEVEL token on the cleaned line -WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean -... WHERE AND AND - AND match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') - AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') -... LIMIT 200 +Run this once for the target service and append it to the debug report: + +```python +from ethpandaops import clickhouse + +enclave = "" +service = "" +timeframe = {"since": "1h"} # replace with the active timeframe if different + +coverage = clickhouse.log_coverage( + "local_kurtosis", + filters={"enclave": enclave, "service": service}, + include_sql=True, + **timeframe, +) +print(coverage) ``` -Pass SQL as a raw string (`r"""`) so `\b`/`\x1b` survive (a normal string turns `\b` into a backspace byte). The strip wraps `Body`, so keep queries bounded — `EnclaveName` + `ServiceName` + tight `Timestamp` window + `LIMIT`; don't run a stripped-`Body` regex over a wide/multi-day range. Use the active timeframe from Timeframe Rules. +Use the active timeframe from Timeframe Rules. If you need raw SQL, search examples for the raw ClickHouse fallback and keep it bounded by `EnclaveName` + `ServiceName` + tight `Timestamp` window + `LIMIT`. **FIRST: discover enclaves present in the OTel logs table** ```python from ethpandaops import clickhouse -enclaves = clickhouse.query("local-kurtosis", """ - SELECT DISTINCT EnclaveName - FROM otel.otel_logs - WHERE EnclaveName != '' - ORDER BY EnclaveName -""") +enclaves = clickhouse.log_values( + "local_kurtosis", + "enclave", + since="1h", + limit=50, +) print(enclaves) ``` @@ -199,19 +208,15 @@ If the requested enclave is not listed, the OTel datasource is not currently rec from ethpandaops import clickhouse enclave = "" - -services = clickhouse.query("local-kurtosis", """ - SELECT - ServiceName, - count() AS log_count, - min(Timestamp) AS first_seen, - max(Timestamp) AS last_seen - FROM otel.otel_logs - WHERE EnclaveName = {enclave:String} - AND Timestamp >= now() - INTERVAL 1 HOUR - GROUP BY ServiceName - ORDER BY ServiceName -""", parameters={"enclave": enclave}) +timeframe = {"since": "1h"} + +services = clickhouse.log_values( + "local_kurtosis", + "service", + filters={"enclave": enclave}, + limit=100, + **timeframe, +) print(services) ``` @@ -220,23 +225,19 @@ print(services) from ethpandaops import clickhouse enclave = "" - -cl_errors = clickhouse.query("local-kurtosis", r""" - WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean - SELECT - Timestamp, - ServiceName, - clean AS Body - FROM otel.otel_logs - WHERE EnclaveName = {enclave:String} - AND ServiceName LIKE 'cl-%' - AND match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') - AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') - AND Timestamp >= now() - INTERVAL 1 HOUR - ORDER BY Timestamp DESC - LIMIT 200 -""", parameters={"enclave": enclave}) -print(cl_errors) +timeframe = {"since": "1h"} + +cl_errors = clickhouse.log_errors( + "local_kurtosis", + filters={"enclave": enclave}, + like_filters={"service": "cl-%"}, + limit=100, + body_chars=240, + include_sql=True, + **timeframe, +) +for row in cl_errors["rows"]: + print(row) ``` **Fetch recent error-class logs for a specific service** @@ -245,23 +246,18 @@ from ethpandaops import clickhouse enclave = "" service = "" - -service_logs = clickhouse.query("local-kurtosis", r""" - WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean - SELECT - Timestamp, - ServiceName, - clean AS Body - FROM otel.otel_logs - WHERE EnclaveName = {enclave:String} - AND ServiceName = {service:String} - AND match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC)($|[][ |:])|^(ERR|FAT)\b|\blevel=(crit|error|fatal|panic)\b') - AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') - AND Timestamp >= now() - INTERVAL 1 HOUR - ORDER BY Timestamp DESC - LIMIT 200 -""", parameters={"enclave": enclave, "service": service}) -print(service_logs) +timeframe = {"since": "1h"} + +service_logs = clickhouse.log_errors( + "local_kurtosis", + filters={"enclave": enclave, "service": service}, + limit=100, + body_chars=240, + include_sql=True, + **timeframe, +) +for row in service_logs["rows"]: + print(row) ``` **Fetch EL warnings/errors when CL logs point to execution issues** @@ -269,24 +265,20 @@ print(service_logs) from ethpandaops import clickhouse enclave = "" - -el_logs = clickhouse.query("local-kurtosis", r""" - WITH replaceRegexpAll(Body, '\x1b\[[0-9;?]*[A-Za-z]', '') AS clean - SELECT - Timestamp, - ServiceName, - clean AS Body - FROM otel.otel_logs - WHERE EnclaveName = {enclave:String} - AND ServiceName LIKE 'el-%' - -- error-class + WARN level token (EL triage includes warnings); excludes debug/trace - AND match(clean, '(^|[][ |])(CRIT|ERRO|ERROR|FATAL|PANIC|WARN|WRN)($|[][ |:])|^(ERR|FAT|WRN)\b|\blevel=(crit|error|fatal|panic|warn|warning)\b') - AND NOT match(clean, '(^|[][ |])(DEBUG|DBG|TRACE|TRC)($|[][ |:])|\blevel=(debug|trace)\b') - AND Timestamp >= now() - INTERVAL 1 HOUR - ORDER BY Timestamp DESC - LIMIT 200 -""", parameters={"enclave": enclave}) -print(el_logs) +timeframe = {"since": "1h"} + +el_logs = clickhouse.log_errors( + "local_kurtosis", + filters={"enclave": enclave}, + like_filters={"service": "el-%"}, + min_severity="warn", + limit=100, + body_chars=240, + include_sql=True, + **timeframe, +) +for row in el_logs["rows"]: + print(row) ``` ### If OTel ClickHouse is not available — fallback to kurtosis service logs