Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Used in [Ecto ClickHouse adapter](https://github.com/plausible/ecto_ch).
- RowBinary
- Native query parameters
- Per query settings
- Telemetry events

## Installation

Expand Down
258 changes: 235 additions & 23 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ defmodule Ch do
* `:timeout` - Request timeout, defaults to 30 seconds.
* `:settings` - An enumerable (usually a map or a keyword list) added to the URL query string.
* `:headers` - Headers passed directly to Mint.
* `:telemetry_metadata` - Extra metadata to include in telemetry events.
"""
@type query_option ::
{:timeout, timeout}
| {:settings, Enumerable.t()}
| {:headers, Mint.Types.headers()}
| {:telemetry_metadata, map}

@typedoc """
The parsed query response.
Expand Down Expand Up @@ -203,6 +205,7 @@ defmodule Ch do
* `:timeout` - request timeout, defaults to 30 seconds.
* `:settings` - ClickHouse settings added to the URL query string.
* `:headers` - HTTP headers sent with the request.
* `:telemetry_metadata` - extra metadata included in query telemetry events.

By default, `Ch` adds `x-clickhouse-format: RowBinaryWithNamesAndTypes`,
decodes that response format, and returns `%Ch.Result{names: names, rows: rows}`.
Expand All @@ -217,6 +220,7 @@ defmodule Ch do
def query(pool, statement, params \\ %{}, options \\ []) do
timeout = Keyword.get(options, :timeout, @query_timeout)
settings = Keyword.get(options, :settings, [])
telemetry_metadata = Keyword.get(options, :telemetry_metadata, %{})

headers =
options
Expand All @@ -226,25 +230,99 @@ defmodule Ch do

deadline = Ch.HTTP.to_deadline(timeout)
path = Ch.HTTP.path(params, settings)
query_started = System.monotonic_time()
checkout_started = System.monotonic_time()

metadata = %{
pool: pool,
telemetry_metadata: telemetry_metadata,
settings: settings,
format: get_header(headers, "x-clickhouse-format")
}

result =
NimblePool.checkout!(
pool,
:request,
fn {pid, _ref}, conn_or_template ->
with {:ok, conn} <- connect(conn_or_template, pid, deadline),
{:ok, conn, status, headers, data} <-
request(conn, "POST", path, headers, statement, deadline) do
{{:ok, status, headers, data}, checkin(conn)}
else
{:error, reason} = error -> {error, {:remove, reason}}
:telemetry.execute([:ch, :query, :start], %{system_time: System.system_time()}, metadata)

try do
result =
NimblePool.checkout!(
pool,
:request,
fn {pid, _ref}, conn_or_template ->
meter = [] |> past_event(:checkout, checkout_started) |> event(:query)

with {:ok, conn} <- connect(conn_or_template, pid, deadline, metadata),
{:ok, conn, status, headers, data} <-
request(conn, "POST", path, headers, statement, deadline) do
{{:ok, status, headers, data, meter}, checkin(conn)}
else
{:error, reason} ->
{{:error, reason, meter}, {:remove, reason}}
end
end,
timeout
)

case result do
{:ok, status, headers, data, meter} ->
meter = event(meter, :decode)

try do
case decode_query_response(status, headers, data) do
{:ok, result} ->
:telemetry.execute(
[:ch, :query, :stop],
telemetry_measurements(meter, query_started, result),
telemetry_metadata(metadata, status, headers, result)
)

{:ok, result}

{:error, reason} ->
:telemetry.execute(
[:ch, :query, :error],
telemetry_measurements(meter, query_started, nil),
telemetry_error_metadata(metadata, status, headers, reason, nil)
)

{:error, reason}
end
catch
kind, reason ->
stacktrace = __STACKTRACE__

:telemetry.execute(
[:ch, :query, :error],
telemetry_measurements(meter, query_started, nil),
telemetry_error_metadata(metadata, status, headers, reason, {kind, stacktrace})
)

:erlang.raise(kind, reason, stacktrace)
end
end,
timeout
)

with {:ok, status, headers, data} <- result do
decode_query_response(status, headers, data)
{:error, reason, meter} ->
:telemetry.execute(
[:ch, :query, :error],
telemetry_measurements(meter, query_started, nil),
telemetry_error_metadata(metadata, nil, [], reason, nil)
)

{:error, reason}
end
catch
kind, reason ->
stacktrace = __STACKTRACE__

:telemetry.execute(
[:ch, :query, :error],
telemetry_measurements([], query_started, nil),
Map.merge(metadata, %{
kind: kind,
reason: reason,
stacktrace: stacktrace
})
)

:erlang.raise(kind, reason, stacktrace)
end
end

Expand Down Expand Up @@ -280,9 +358,13 @@ defmodule Ch do
{:ok, {:ok, conn}, conn, config}
end

def handle_checkout(:request, _from, {:conn, conn, checkin_time, conn_metadata}, config) do
{:ok, {:ok, conn, checkin_time, conn_metadata}, conn, config}
end

@impl NimblePool
def handle_checkin({:ok, conn}, _from, _prev, config) do
{:ok, conn, config}
{:ok, {:conn, conn, System.monotonic_time(), conn_metadata(config, %{})}, config}
end

def handle_checkin({:remove, reason}, _from, _prev, config) do
Expand All @@ -295,35 +377,64 @@ defmodule Ch do
end

@impl NimblePool
def terminate_worker(_reason, conn_or_template, config) do
def terminate_worker(reason, conn_or_template, config) do
case conn_or_template do
:template -> :ok
{:conn, conn, _checkin_time, _conn_metadata} -> Mint.HTTP1.close(conn)
conn -> Mint.HTTP1.close(conn)
end

unless conn_or_template == :template do
emit_conn_event([:ch, :conn, :drop], %{}, conn_metadata(config, %{reason: reason}))
end

{:ok, config}
end

defp connect({:template, scheme, host, port}, owner, deadline) do
defp connect({:template, scheme, host, port}, owner, deadline, query_metadata) do
started = System.monotonic_time()
timeout = Ch.HTTP.to_timeout(deadline)
metadata = conn_metadata(scheme, host, port, query_metadata)

emit_conn_event([:ch, :conn, :start], %{system_time: System.system_time()}, metadata)

case Mint.HTTP1.connect(scheme, host, port, mode: :passive, timeout: timeout) do
{:ok, conn} ->
case Mint.HTTP1.controlling_process(conn, owner) do
{:ok, _conn} = ok ->
emit_conn_event([:ch, :conn, :stop], %{duration: duration(started)}, metadata)
ok

{:error, _reason} = error ->
Mint.HTTP1.close(conn)

emit_conn_event(
[:ch, :conn, :error],
%{duration: duration(started)},
conn_error_metadata(metadata, error)
)

error
end

{:error, _reason} = error ->
emit_conn_event(
[:ch, :conn, :error],
%{duration: duration(started)},
conn_error_metadata(metadata, error)
)

error
end
end

defp connect({:ok, _conn} = ok, _owner, _deadline), do: ok
defp connect({:ok, _conn, checkin_time, conn_metadata} = ok, _owner, _deadline, query_metadata) do
measurements = %{idle_time: max(System.monotonic_time() - checkin_time, 0)}
emit_conn_event([:ch, :conn, :reuse], measurements, Map.merge(conn_metadata, query_metadata))
{:ok, elem(ok, 1)}
end

defp connect({:ok, _conn} = ok, _owner, _deadline, _query_metadata), do: ok

defp request(conn, method, path, headers, body, deadline) do
result =
Expand Down Expand Up @@ -432,21 +543,122 @@ defmodule Ch do
|> response_body_to_binary()

code =
if code = get_header(headers, "x-clickhouse-error-code") do
String.to_integer(code)
end
headers
|> get_clickhouse_error_code()
|> maybe_string_to_integer()

{:error, %Ch.Error{code: code, message: message}}
end

defp response_body_to_binary(nil), do: ""
defp response_body_to_binary(body), do: IO.iodata_to_binary(body)

defp event(events, name), do: [{name, System.monotonic_time()} | events]

defp past_event(events, name, time), do: [{name, time} | events]

defp telemetry_measurements(events, query_started, result) do
stop = System.monotonic_time()

measurements =
events
|> Enum.reduce({stop, %{duration: stop - query_started}}, fn
{:decode, start}, {stop, measurements} ->
{start, Map.put(measurements, :decode_time, stop - start)}

{:query, start}, {stop, measurements} ->
{start, Map.put(measurements, :query_time, stop - start)}

{:checkout, start}, {stop, measurements} ->
{start, Map.put(measurements, :queue_time, stop - start)}

{:checkin, start}, {stop, measurements} ->
{stop, Map.put(measurements, :idle_time, max(stop - start, 0))}
end)
|> elem(1)

case result do
%Ch.Result{rows: rows, names: names, data: data} ->
measurements
|> maybe_put_measurement(:num_rows, rows && length(rows))
|> maybe_put_measurement(:num_columns, names && length(names))
|> maybe_put_measurement(:response_body_bytes, data && IO.iodata_length(data))

_ ->
measurements
end
end

defp maybe_put_measurement(measurements, _name, nil), do: measurements
defp maybe_put_measurement(measurements, name, value), do: Map.put(measurements, name, value)

defp telemetry_metadata(metadata, status, headers, result) do
Map.merge(metadata, %{
status: status,
response_headers: headers,
result: result
})
end

defp telemetry_error_metadata(metadata, status, headers, reason, stacktrace) do
metadata
|> Map.merge(%{
status: status,
response_headers: headers,
kind: error_kind(stacktrace),
reason: reason
})
|> maybe_put_metadata(:stacktrace, error_stacktrace(stacktrace))
|> maybe_put_metadata(:clickhouse_error_code, clickhouse_error_code(reason))
end

defp error_kind({kind, _stacktrace}), do: kind
defp error_kind(_stacktrace), do: :error

defp error_stacktrace({_kind, stacktrace}), do: stacktrace
defp error_stacktrace(stacktrace), do: stacktrace

defp maybe_put_metadata(metadata, _key, nil), do: metadata
defp maybe_put_metadata(metadata, key, value), do: Map.put(metadata, key, value)

defp clickhouse_error_code(%Ch.Error{code: code}), do: code
defp clickhouse_error_code(_reason), do: nil

defp emit_conn_event(event, measurements, metadata) do
:telemetry.execute(event, measurements, metadata)
end

defp conn_metadata(%{template: {:template, scheme, host, port}}, metadata) do
conn_metadata(scheme, host, port, metadata)
end

defp conn_metadata(scheme, host, port, metadata) do
Map.merge(metadata, %{
scheme: scheme,
host: host,
port: port
})
end

defp conn_error_metadata(metadata, {:error, reason}) do
Map.merge(metadata, %{kind: :error, reason: reason})
end

defp duration(started), do: System.monotonic_time() - started

@compile inline: [get_header: 2]
defp get_header(headers, name) do
with {_, value} <- List.keyfind(headers, name, 0, nil), do: value
end

defp get_clickhouse_error_code(headers) do
get_header(headers, "x-clickhouse-error-code") ||
get_header(headers, "x-clickhouse-exception-code")
end

defp maybe_string_to_integer(nil), do: nil
defp maybe_string_to_integer(value), do: String.to_integer(value)

@compile inline: [put_new_header: 3]
defp put_new_header(headers, name, value) do
if List.keymember?(headers, name, 0) do
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ defmodule Ch.MixProject do
{:mint, "~> 1.8"},
{:nimble_pool, "~> 1.1"},
{:nimble_options, "~> 1.1"},
{:telemetry, "~> 1.0"},
{:decimal, "~> 2.0 or ~> 3.0"},
{:ecto, "~> 3.13.0", optional: true},
{:benchee, "~> 1.0", only: :dev},
Expand Down
Loading