From e8e2c6cc43b8d8c9ebdd1f8d4f1c232d8b8c5cbc Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Mon, 18 May 2026 04:47:45 +0300 Subject: [PATCH] add telemetry events --- README.md | 1 + lib/ch.ex | 258 +++++++++++++++++++++++++++++++++---- mix.exs | 1 + pages/query.md | 32 +++++ test/ch/telemetry_test.exs | 93 +++++++++++++ 5 files changed, 362 insertions(+), 23 deletions(-) create mode 100644 test/ch/telemetry_test.exs diff --git a/README.md b/README.md index f93b0fd..83753ef 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Used in [Ecto ClickHouse adapter](https://github.com/plausible/ecto_ch). - RowBinary - Native query parameters - Per query settings +- Telemetry events ## Installation diff --git a/lib/ch.ex b/lib/ch.ex index 92f7e41..ca9ce6d 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -99,11 +99,13 @@ defmodule Ch do * `:timeout` - Request timeout, defaults to 30 seconds. * `:settings` - An enumerable (usually a map or a keyword list) added to the URL query string. * `:headers` - Headers passed directly to Mint. + * `:telemetry_metadata` - Extra metadata to include in telemetry events. """ @type query_option :: {:timeout, timeout} | {:settings, Enumerable.t()} | {:headers, Mint.Types.headers()} + | {:telemetry_metadata, map} @typedoc """ The parsed query response. @@ -203,6 +205,7 @@ defmodule Ch do * `:timeout` - request timeout, defaults to 30 seconds. * `:settings` - ClickHouse settings added to the URL query string. * `:headers` - HTTP headers sent with the request. + * `:telemetry_metadata` - extra metadata included in query telemetry events. By default, `Ch` adds `x-clickhouse-format: RowBinaryWithNamesAndTypes`, decodes that response format, and returns `%Ch.Result{names: names, rows: rows}`. @@ -217,6 +220,7 @@ defmodule Ch do def query(pool, statement, params \\ %{}, options \\ []) do timeout = Keyword.get(options, :timeout, @query_timeout) settings = Keyword.get(options, :settings, []) + telemetry_metadata = Keyword.get(options, :telemetry_metadata, %{}) headers = options @@ -226,25 +230,99 @@ defmodule Ch do deadline = Ch.HTTP.to_deadline(timeout) path = Ch.HTTP.path(params, settings) + query_started = System.monotonic_time() + checkout_started = System.monotonic_time() + + metadata = %{ + pool: pool, + telemetry_metadata: telemetry_metadata, + settings: settings, + format: get_header(headers, "x-clickhouse-format") + } - result = - NimblePool.checkout!( - pool, - :request, - fn {pid, _ref}, conn_or_template -> - with {:ok, conn} <- connect(conn_or_template, pid, deadline), - {:ok, conn, status, headers, data} <- - request(conn, "POST", path, headers, statement, deadline) do - {{:ok, status, headers, data}, checkin(conn)} - else - {:error, reason} = error -> {error, {:remove, reason}} + :telemetry.execute([:ch, :query, :start], %{system_time: System.system_time()}, metadata) + + try do + result = + NimblePool.checkout!( + pool, + :request, + fn {pid, _ref}, conn_or_template -> + meter = [] |> past_event(:checkout, checkout_started) |> event(:query) + + with {:ok, conn} <- connect(conn_or_template, pid, deadline, metadata), + {:ok, conn, status, headers, data} <- + request(conn, "POST", path, headers, statement, deadline) do + {{:ok, status, headers, data, meter}, checkin(conn)} + else + {:error, reason} -> + {{:error, reason, meter}, {:remove, reason}} + end + end, + timeout + ) + + case result do + {:ok, status, headers, data, meter} -> + meter = event(meter, :decode) + + try do + case decode_query_response(status, headers, data) do + {:ok, result} -> + :telemetry.execute( + [:ch, :query, :stop], + telemetry_measurements(meter, query_started, result), + telemetry_metadata(metadata, status, headers, result) + ) + + {:ok, result} + + {:error, reason} -> + :telemetry.execute( + [:ch, :query, :error], + telemetry_measurements(meter, query_started, nil), + telemetry_error_metadata(metadata, status, headers, reason, nil) + ) + + {:error, reason} + end + catch + kind, reason -> + stacktrace = __STACKTRACE__ + + :telemetry.execute( + [:ch, :query, :error], + telemetry_measurements(meter, query_started, nil), + telemetry_error_metadata(metadata, status, headers, reason, {kind, stacktrace}) + ) + + :erlang.raise(kind, reason, stacktrace) end - end, - timeout - ) - with {:ok, status, headers, data} <- result do - decode_query_response(status, headers, data) + {:error, reason, meter} -> + :telemetry.execute( + [:ch, :query, :error], + telemetry_measurements(meter, query_started, nil), + telemetry_error_metadata(metadata, nil, [], reason, nil) + ) + + {:error, reason} + end + catch + kind, reason -> + stacktrace = __STACKTRACE__ + + :telemetry.execute( + [:ch, :query, :error], + telemetry_measurements([], query_started, nil), + Map.merge(metadata, %{ + kind: kind, + reason: reason, + stacktrace: stacktrace + }) + ) + + :erlang.raise(kind, reason, stacktrace) end end @@ -280,9 +358,13 @@ defmodule Ch do {:ok, {:ok, conn}, conn, config} end + def handle_checkout(:request, _from, {:conn, conn, checkin_time, conn_metadata}, config) do + {:ok, {:ok, conn, checkin_time, conn_metadata}, conn, config} + end + @impl NimblePool def handle_checkin({:ok, conn}, _from, _prev, config) do - {:ok, conn, config} + {:ok, {:conn, conn, System.monotonic_time(), conn_metadata(config, %{})}, config} end def handle_checkin({:remove, reason}, _from, _prev, config) do @@ -295,35 +377,64 @@ defmodule Ch do end @impl NimblePool - def terminate_worker(_reason, conn_or_template, config) do + def terminate_worker(reason, conn_or_template, config) do case conn_or_template do :template -> :ok + {:conn, conn, _checkin_time, _conn_metadata} -> Mint.HTTP1.close(conn) conn -> Mint.HTTP1.close(conn) end + unless conn_or_template == :template do + emit_conn_event([:ch, :conn, :drop], %{}, conn_metadata(config, %{reason: reason})) + end + {:ok, config} end - defp connect({:template, scheme, host, port}, owner, deadline) do + defp connect({:template, scheme, host, port}, owner, deadline, query_metadata) do + started = System.monotonic_time() timeout = Ch.HTTP.to_timeout(deadline) + metadata = conn_metadata(scheme, host, port, query_metadata) + + emit_conn_event([:ch, :conn, :start], %{system_time: System.system_time()}, metadata) case Mint.HTTP1.connect(scheme, host, port, mode: :passive, timeout: timeout) do {:ok, conn} -> case Mint.HTTP1.controlling_process(conn, owner) do {:ok, _conn} = ok -> + emit_conn_event([:ch, :conn, :stop], %{duration: duration(started)}, metadata) ok {:error, _reason} = error -> Mint.HTTP1.close(conn) + + emit_conn_event( + [:ch, :conn, :error], + %{duration: duration(started)}, + conn_error_metadata(metadata, error) + ) + error end {:error, _reason} = error -> + emit_conn_event( + [:ch, :conn, :error], + %{duration: duration(started)}, + conn_error_metadata(metadata, error) + ) + error end end - defp connect({:ok, _conn} = ok, _owner, _deadline), do: ok + defp connect({:ok, _conn, checkin_time, conn_metadata} = ok, _owner, _deadline, query_metadata) do + measurements = %{idle_time: max(System.monotonic_time() - checkin_time, 0)} + emit_conn_event([:ch, :conn, :reuse], measurements, Map.merge(conn_metadata, query_metadata)) + {:ok, elem(ok, 1)} + end + + defp connect({:ok, _conn} = ok, _owner, _deadline, _query_metadata), do: ok defp request(conn, method, path, headers, body, deadline) do result = @@ -432,9 +543,9 @@ defmodule Ch do |> response_body_to_binary() code = - if code = get_header(headers, "x-clickhouse-error-code") do - String.to_integer(code) - end + headers + |> get_clickhouse_error_code() + |> maybe_string_to_integer() {:error, %Ch.Error{code: code, message: message}} end @@ -442,11 +553,112 @@ defmodule Ch do defp response_body_to_binary(nil), do: "" defp response_body_to_binary(body), do: IO.iodata_to_binary(body) + defp event(events, name), do: [{name, System.monotonic_time()} | events] + + defp past_event(events, name, time), do: [{name, time} | events] + + defp telemetry_measurements(events, query_started, result) do + stop = System.monotonic_time() + + measurements = + events + |> Enum.reduce({stop, %{duration: stop - query_started}}, fn + {:decode, start}, {stop, measurements} -> + {start, Map.put(measurements, :decode_time, stop - start)} + + {:query, start}, {stop, measurements} -> + {start, Map.put(measurements, :query_time, stop - start)} + + {:checkout, start}, {stop, measurements} -> + {start, Map.put(measurements, :queue_time, stop - start)} + + {:checkin, start}, {stop, measurements} -> + {stop, Map.put(measurements, :idle_time, max(stop - start, 0))} + end) + |> elem(1) + + case result do + %Ch.Result{rows: rows, names: names, data: data} -> + measurements + |> maybe_put_measurement(:num_rows, rows && length(rows)) + |> maybe_put_measurement(:num_columns, names && length(names)) + |> maybe_put_measurement(:response_body_bytes, data && IO.iodata_length(data)) + + _ -> + measurements + end + end + + defp maybe_put_measurement(measurements, _name, nil), do: measurements + defp maybe_put_measurement(measurements, name, value), do: Map.put(measurements, name, value) + + defp telemetry_metadata(metadata, status, headers, result) do + Map.merge(metadata, %{ + status: status, + response_headers: headers, + result: result + }) + end + + defp telemetry_error_metadata(metadata, status, headers, reason, stacktrace) do + metadata + |> Map.merge(%{ + status: status, + response_headers: headers, + kind: error_kind(stacktrace), + reason: reason + }) + |> maybe_put_metadata(:stacktrace, error_stacktrace(stacktrace)) + |> maybe_put_metadata(:clickhouse_error_code, clickhouse_error_code(reason)) + end + + defp error_kind({kind, _stacktrace}), do: kind + defp error_kind(_stacktrace), do: :error + + defp error_stacktrace({_kind, stacktrace}), do: stacktrace + defp error_stacktrace(stacktrace), do: stacktrace + + defp maybe_put_metadata(metadata, _key, nil), do: metadata + defp maybe_put_metadata(metadata, key, value), do: Map.put(metadata, key, value) + + defp clickhouse_error_code(%Ch.Error{code: code}), do: code + defp clickhouse_error_code(_reason), do: nil + + defp emit_conn_event(event, measurements, metadata) do + :telemetry.execute(event, measurements, metadata) + end + + defp conn_metadata(%{template: {:template, scheme, host, port}}, metadata) do + conn_metadata(scheme, host, port, metadata) + end + + defp conn_metadata(scheme, host, port, metadata) do + Map.merge(metadata, %{ + scheme: scheme, + host: host, + port: port + }) + end + + defp conn_error_metadata(metadata, {:error, reason}) do + Map.merge(metadata, %{kind: :error, reason: reason}) + end + + defp duration(started), do: System.monotonic_time() - started + @compile inline: [get_header: 2] defp get_header(headers, name) do with {_, value} <- List.keyfind(headers, name, 0, nil), do: value end + defp get_clickhouse_error_code(headers) do + get_header(headers, "x-clickhouse-error-code") || + get_header(headers, "x-clickhouse-exception-code") + end + + defp maybe_string_to_integer(nil), do: nil + defp maybe_string_to_integer(value), do: String.to_integer(value) + @compile inline: [put_new_header: 3] defp put_new_header(headers, name, value) do if List.keymember?(headers, name, 0) do diff --git a/mix.exs b/mix.exs index c730fe1..7c0ce15 100644 --- a/mix.exs +++ b/mix.exs @@ -57,6 +57,7 @@ defmodule Ch.MixProject do {:mint, "~> 1.8"}, {:nimble_pool, "~> 1.1"}, {:nimble_options, "~> 1.1"}, + {:telemetry, "~> 1.0"}, {:decimal, "~> 2.0 or ~> 3.0"}, {:ecto, "~> 3.13.0", optional: true}, {:benchee, "~> 1.0", only: :dev}, diff --git a/pages/query.md b/pages/query.md index d713071..5ad2488 100644 --- a/pages/query.md +++ b/pages/query.md @@ -129,3 +129,35 @@ Ch.query!( headers: [{"content-encoding", "zstd"}] ) ``` + +## Telemetry + +Ch emits query telemetry events: + +- `[:ch, :query, :start]` +- `[:ch, :query, :stop]` +- `[:ch, :query, :error]` + +Successful query stop events include native-time measurements such as +`:duration`, `:queue_time`, `:query_time`, and `:decode_time`. Decoded responses +also include `:num_rows`, `:num_columns`, and `:response_body_bytes` when +available. + +Connection lifecycle events use short names: + +- `[:ch, :conn, :start]` +- `[:ch, :conn, :stop]` +- `[:ch, :conn, :reuse]` +- `[:ch, :conn, :drop]` +- `[:ch, :conn, :error]` + +Use `:telemetry_metadata` to attach application context to query events: + +```elixir +Ch.query!( + pool, + "SELECT count() FROM events", + %{}, + telemetry_metadata: %{source: :dashboard} +) +``` diff --git a/test/ch/telemetry_test.exs b/test/ch/telemetry_test.exs new file mode 100644 index 0000000..47abde2 --- /dev/null +++ b/test/ch/telemetry_test.exs @@ -0,0 +1,93 @@ +defmodule Ch.TelemetryTest do + use ExUnit.Case, async: true + + setup do + pool = start_supervised!(Ch) + handler_id = {__MODULE__, self(), System.unique_integer()} + + events = [ + [:ch, :query, :start], + [:ch, :query, :stop], + [:ch, :query, :error], + [:ch, :conn, :start], + [:ch, :conn, :stop], + [:ch, :conn, :reuse], + [:ch, :conn, :drop], + [:ch, :conn, :error] + ] + + :ok = + :telemetry.attach_many( + handler_id, + events, + &__MODULE__.handle_event/4, + self() + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + + {:ok, pool: pool} + end + + test "emits query and connection telemetry for successful queries", %{pool: pool} do + assert {:ok, %Ch.Result{rows: [[1]]}} = + Ch.query(pool, "SELECT 1", %{}, telemetry_metadata: %{source: :test}) + + assert_receive {:telemetry_event, [:ch, :query, :start], %{system_time: system_time}, + %{telemetry_metadata: %{source: :test}, format: "RowBinaryWithNamesAndTypes"}} + + assert is_integer(system_time) + + assert_receive {:telemetry_event, [:ch, :conn, :start], %{system_time: conn_system_time}, + %{scheme: :http, host: "localhost", port: 8123}} + + assert is_integer(conn_system_time) + + assert_receive {:telemetry_event, [:ch, :conn, :stop], %{duration: conn_duration}, + %{scheme: :http, host: "localhost", port: 8123}} + + assert is_integer(conn_duration) + assert conn_duration >= 0 + + assert_receive {:telemetry_event, [:ch, :query, :stop], measurements, + %{status: 200, telemetry_metadata: %{source: :test}, result: %Ch.Result{}}} + + assert measurements.duration >= 0 + assert measurements.queue_time >= 0 + assert measurements.query_time >= 0 + assert measurements.decode_time >= 0 + assert measurements.num_rows == 1 + assert measurements.num_columns == 1 + assert measurements.response_body_bytes > 0 + + assert {:ok, %Ch.Result{rows: [[2]]}} = Ch.query(pool, "SELECT 2") + + assert_receive {:telemetry_event, [:ch, :conn, :reuse], %{idle_time: idle_time}, + %{scheme: :http, host: "localhost", port: 8123}} + + assert idle_time >= 0 + end + + test "emits query error telemetry for ClickHouse errors", %{pool: pool} do + assert {:error, %Ch.Error{} = error} = Ch.query(pool, "SELECT missing_column") + + assert_receive {:telemetry_event, [:ch, :query, :error], measurements, + %{ + kind: :error, + reason: ^error, + clickhouse_error_code: code, + status: status + }} + + assert measurements.duration >= 0 + assert measurements.queue_time >= 0 + assert measurements.query_time >= 0 + assert measurements.decode_time >= 0 + assert is_integer(code) + assert status != 200 + end + + def handle_event(event, measurements, metadata, test_pid) do + send(test_pid, {:telemetry_event, event, measurements, metadata}) + end +end