diff --git a/Makefile b/Makefile index 7345cae..cf97318 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ DEV_IMAGE_ID = $(file < .image.dev) DOCKER ?= docker DOCKERCOMPOSE ?= docker compose -DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml +DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner diff --git a/compose.tracing.yaml b/compose.tracing.yaml new file mode 100644 index 0000000..28e99aa --- /dev/null +++ b/compose.tracing.yaml @@ -0,0 +1,33 @@ +services: + + postgres: + environment: &otlp_enabled + OTEL_TRACES_EXPORTER: otlp + OTEL_TRACES_SAMPLER: parentbased_always_off + OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf + OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318 + + testrunner: + environment: + <<: *otlp_enabled + OTEL_SERVICE_NAME: progressor_testrunner + OTEL_TRACES_SAMPLER: parentbased_always_on + depends_on: + jaeger: + condition: service_healthy + + jaeger: + image: jaegertracing/all-in-one:1.47 + environment: + - COLLECTOR_OTLP_ENABLED=true + healthcheck: + test: "/go/bin/all-in-one-linux status" + interval: 2s + timeout: 1s + retries: 20 + ports: + - 4317:4317 # OTLP gRPC receiver + - 4318:4318 # OTLP http receiver + - 5778:5778 + - 14250:14250 + - 16686:16686 diff --git a/rebar.config b/rebar.config index 1261a06..ed4bee4 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,8 @@ {recon, "2.5.6"}, {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, - {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}} + {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}, + {opentelemetry_api, "1.4.0"} ]}. {xref_checks, [ @@ -38,7 +39,9 @@ {profiles, [ {test, [ {deps, [ - {meck, "0.9.2"} + {meck, "0.9.2"}, + {opentelemetry, "1.5.0"}, + {opentelemetry_exporter, "1.8.0"} ]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, runtime_tools, meck]}]} ]} diff --git a/rebar.lock b/rebar.lock index 28c39ff..22f9a4d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -18,8 +18,9 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto.git", - {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, + {ref,"cc2c27c30d30dc34c0c56fc7c7e96326d6bd6a14"}}, 0}, + {<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.4.0">>},0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},0}, @@ -34,6 +35,7 @@ {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, + {<<"opentelemetry_api">>, <<"63CA1742F92F00059298F478048DFB826F4B20D49534493D6919A0DB39B6DB04">>}, {<<"prometheus">>, <<"B95F8DE8530F541BD95951E18E355A840003672E5EDA4788C5FA6183406BA29A">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}]}, @@ -43,6 +45,7 @@ {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, + {<<"opentelemetry_api">>, <<"3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58">>}, {<<"prometheus">>, <<"719862351AABF4DF7079B05DC085D2BBCBE3AC0AC3009E956671B1D5AB88247D">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}]} diff --git a/src/prg_scanner.erl b/src/prg_scanner.erl index 08c1854..1e19212 100644 --- a/src/prg_scanner.erl +++ b/src/prg_scanner.erl @@ -161,7 +161,7 @@ search_timers( [] end end, - prg_utils:with_observe(Fun, ?TIMERS_SCAN_KEY, [erlang:atom_to_binary(NsId, utf8)]). + prg_utils:with_observe(Fun, ?TIMERS_SCAN_KEY, [erlang:atom_to_binary(NsId, utf8)], undefined). search_calls( FreeWorkersCount, @@ -181,7 +181,7 @@ search_calls( [] end end, - prg_utils:with_observe(Fun, ?CALLS_SCAN_KEY, [erlang:atom_to_binary(NsId, utf8)]). + prg_utils:with_observe(Fun, ?CALLS_SCAN_KEY, [erlang:atom_to_binary(NsId, utf8)], undefined). collect_zombie( NsId, @@ -200,7 +200,7 @@ collect_zombie( end, ok end, - prg_utils:with_observe(Fun, ?ZOMBIE_COLLECTION_KEY, [erlang:atom_to_binary(NsId, utf8)]). + prg_utils:with_observe(Fun, ?ZOMBIE_COLLECTION_KEY, [erlang:atom_to_binary(NsId, utf8)], undefined). header(Type) -> {erlang:binary_to_atom(Type), undefined}. diff --git a/src/prg_utils.erl b/src/prg_utils.erl index bc24ca1..1d0867f 100644 --- a/src/prg_utils.erl +++ b/src/prg_utils.erl @@ -8,8 +8,9 @@ -export([format/1]). -export([make_ns_opts/2]). -export([unixtime_to_datetime/1]). --export([with_observe/3]). -export([with_observe/4]). +-export([with_observe/5]). +-export([with_span/2]). -spec registered_name(atom(), string()) -> atom(). registered_name(BaseAtom, PostfixStr) -> @@ -46,18 +47,45 @@ make_ns_opts(NsId, NsOpts) -> unixtime_to_datetime(TimestampSec) -> calendar:gregorian_seconds_to_datetime(TimestampSec + ?EPOCH_DIFF). --spec with_observe(_Fun, atom(), [list() | binary()]) -> any(). -with_observe(Fun, MetricKey, Labels) -> - with_observe(Fun, histogram, MetricKey, Labels). +-type span_params() :: #{ + name := binary(), + attributes => opentelemetry:attributes_map(), + kind => internal | server | client | producer | consumer +}. --spec with_observe(_Fun, atom(), atom(), [list() | binary()]) -> any(). -with_observe(Fun, MetricType, MetricKey, Labels) -> - {DurationMicro, Result} = timer:tc(Fun), +-spec with_observe(_Fun, atom(), [list() | binary()], span_params() | undefined) -> any(). +with_observe(Fun, MetricKey, Labels, SpanParams) -> + with_observe(Fun, histogram, MetricKey, Labels, SpanParams). + +-spec with_observe(fun(() -> T), atom(), atom(), [list() | binary()], span_params() | undefined) -> T when T :: any(). +with_observe(Fun, MetricType, MetricKey, Labels, SpanParams) -> + {DurationMicro, Result} = with_span(SpanParams, fun(_SpanCtx) -> timer:tc(Fun) end), DurationMs = DurationMicro div 1000, logger:debug("metric: ~p, labels: ~p, value: ~p", [MetricKey, Labels, DurationMs]), ok = collect(MetricType, MetricKey, Labels, DurationMs), Result. +-spec with_span(span_params() | undefined, fun((opentelemetry:span_ctx() | undefined) -> T)) -> T when T :: any(). +with_span(undefined, Fun) -> + Fun(undefined); +with_span(#{name := SpanName} = SpanParams, Fun) -> + OtelCtx = otel_ctx:get_current(), + Tracer = opentelemetry:get_application_tracer(progressor), + SpanOpts = #{ + kind => maps:get(kind, SpanParams, internal), + attributes => maps:get(attributes, SpanParams, #{}) + }, + SpannedFun = fun(SpanCtx) -> + try + Fun(SpanCtx) + catch + Class:Reason:Stacktrace -> + _ = otel_span:record_exception(SpanCtx, Class, Reason, Stacktrace, #{}), + erlang:raise(Class, Reason, Stacktrace) + end + end, + otel_tracer:with_span(OtelCtx, Tracer, SpanName, SpanOpts, SpannedFun). + collect(histogram, MetricKey, Labels, Value) -> prometheus_histogram:observe(MetricKey, Labels, Value). %%collect(_, _MetricKey, _Labels, _Value) -> diff --git a/src/prg_worker.erl b/src/prg_worker.erl index d3bef1a..08d568b 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -31,11 +31,11 @@ -spec process_task(pid(), task_header(), task()) -> ok. process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> - gen_server:cast(Worker, {process_task, TaskHeader, Task}). + gen_server:cast(Worker, {process_task, TaskHeader, Task, otel_ctx:get_current()}). -spec continuation_task(pid(), task_header(), task()) -> ok. continuation_task(Worker, TaskHeader, Task) -> - gen_server:cast(Worker, {continuation_task, TaskHeader, Task}). + gen_server:cast(Worker, {continuation_task, TaskHeader, Task, otel_ctx:get_current()}). -spec next_task(pid()) -> ok. next_task(Worker) -> @@ -43,7 +43,7 @@ next_task(Worker) -> -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> - gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId}). + gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, otel_ctx:get_current()}). %%%=================================================================== %%% Spawning and gen_server implementation @@ -61,6 +61,7 @@ init([NsId, NsOpts]) -> {continue, do_start}}. handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> + %% FIXME Worker w/o OTEL context, since it is not passed to init w/ `start_child` {ok, Pid} = prg_worker_sidecar:start_link(), case prg_scheduler:pop_task(NsId, self()) of {TaskHeader, Task} -> @@ -74,13 +75,14 @@ handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. handle_cast( - {process_task, TaskHeader, Task}, + {process_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> + _ = otel_ctx:attach(OtelCtx), Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, ProcessId = maps:get(process_id, Task), HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), @@ -88,22 +90,22 @@ handle_cast( NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), {noreply, NewState}; handle_cast( - {continuation_task, TaskHeader, Task}, - #prg_worker_state{ - ns_opts = #{process_step_timeout := TimeoutSec} - } = State + {continuation_task, TaskHeader, Task, OtelCtx}, + #prg_worker_state{ns_opts = #{process_step_timeout := TimeoutSec}} = State ) -> + _ = otel_ctx:attach(OtelCtx), Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, NewState = do_process_task(TaskHeader, Task, Deadline, State), {noreply, NewState}; handle_cast( - {process_scheduled_task, ProcessId, TaskId}, + {process_scheduled_task, ProcessId, TaskId, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> + _ = otel_ctx:attach(OtelCtx), try prg_storage:capture_task(StorageOpts, NsId, TaskId) of [] -> %% task cancelled, blocked, already running or finished @@ -378,6 +380,7 @@ success_and_unlock( last_retry_interval => 0, attempts_count => 0 }, + %% FIXME Otel must drop trace here - right before moving to other tasks {ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue( Pid, Deadline, diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index a67ba4d..ba5c740 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -51,9 +51,12 @@ process(Pid, Deadline, #{namespace := NS} = NsOpts, {TaskType, _, _} = Request, Context) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {process, NsOpts, Request, Context}, Timeout) + gen_server:call(Pid, {process, NsOpts, Request, Context, otel_ctx:get_current()}, Timeout) end, - prg_utils:with_observe(Fun, ?PROCESSING_KEY, [NS, erlang:atom_to_list(TaskType)]). + prg_utils:with_observe(Fun, ?PROCESSING_KEY, [NS, erlang:atom_to_list(TaskType)], #{ + name => atom_to_binary(?FUNCTION_NAME), + kind => server + }). %% storage wrappers -spec complete_and_continue( @@ -71,13 +74,14 @@ complete_and_continue(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpda Fun = fun() -> gen_server:call( Pid, - {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task, + otel_ctx:get_current()}, infinity ) end, - prg_utils:with_observe(Fun, ?COMPLETION_KEY, [ - erlang:atom_to_list(NsId), "complete_and_continue" - ]). + prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_continue"], #{ + name => atom_to_binary(?FUNCTION_NAME) + }). -spec complete_and_suspend( pid(), @@ -93,11 +97,13 @@ complete_and_suspend(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdat Fun = fun() -> gen_server:call( Pid, - {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, infinity ) end, - prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_suspend"]). + prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_suspend"], #{ + name => atom_to_binary(?FUNCTION_NAME) + }). -spec complete_and_unlock( pid(), @@ -113,11 +119,13 @@ complete_and_unlock(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdate Fun = fun() -> gen_server:call( Pid, - {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, infinity ) end, - prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_unlock"]). + prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_unlock"], #{ + name => atom_to_binary(?FUNCTION_NAME) + }). -spec complete_and_error( pid(), timestamp_ms(), storage_opts(), namespace_id(), task_result(), process_updates() @@ -128,20 +136,22 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates Fun = fun() -> gen_server:call( Pid, - {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates}, + {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates, otel_ctx:get_current()}, infinity ) end, - prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_error"]). + prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_error"], #{ + name => atom_to_binary(?FUNCTION_NAME) + }). -spec remove_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id()) -> ok | no_return(). remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId}, infinity) + gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId, otel_ctx:get_current()}, infinity) end, - prg_utils:with_observe(Fun, ?REMOVING_KEY, [erlang:atom_to_list(NsId)]). + prg_utils:with_observe(Fun, ?REMOVING_KEY, [erlang:atom_to_list(NsId)], #{name => atom_to_binary(?FUNCTION_NAME)}). %% notifier wrappers @@ -151,7 +161,7 @@ event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> Fun = fun() -> gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events}, Timeout) end, - prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"]). + prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"], #{name => atom_to_binary(?FUNCTION_NAME)}). -spec lifecycle_sink(pid(), timestamp_ms(), namespace_opts(), task_t() | {error, _Reason}, id()) -> ok | no_return(). @@ -160,7 +170,7 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) Fun = fun() -> gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId}, Timeout) end, - prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"]). + prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"], #{name => atom_to_binary(?FUNCTION_NAME)}). %% -spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id()) -> @@ -196,11 +206,13 @@ handle_call( process, #{processor := #{client := Handler, options := Options}, namespace := _NsName} = _NsOpts, Request, - Ctx + Ctx, + OtelCtx }, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Response = try Handler:process(Request, Options, Ctx) of {ok, _Result} = OK -> @@ -218,20 +230,22 @@ handle_call( end, {reply, Response, State}; handle_call( - {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Fun = fun() -> prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {remove_process, StorageOpts, NsId, ProcessId}, + {remove_process, StorageOpts, NsId, ProcessId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Fun = fun() -> prg_storage:remove_process(StorageOpts, NsId, ProcessId) end, @@ -258,30 +272,33 @@ handle_call( Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Fun = fun() -> prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Fun = fun() -> prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {complete_and_error, StorageOpts, NsId, TaskResult, Process}, + {complete_and_error, StorageOpts, NsId, TaskResult, Process, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> + _ = otel_ctx:attach(OtelCtx), Fun = fun() -> prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) end, diff --git a/src/progressor.app.src b/src/progressor.app.src index d713868..be762b3 100644 --- a/src/progressor.app.src +++ b/src/progressor.app.src @@ -11,7 +11,8 @@ epg_connector, thrift, mg_proto, - brod + brod, + opentelemetry_api ]}, {env, []}, {modules, []}, diff --git a/src/progressor.erl b/src/progressor.erl index 1428582..91c8187 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -48,7 +48,7 @@ init(Req) -> fun add_ns_opts/1, fun check_idempotency/1, fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_init/4, Data) end, + fun(Data) -> prepare(?FUNCTION_NAME, fun prg_storage:prepare_init/4, Data) end, fun process_call/1 ], Req#{type => init} @@ -62,7 +62,7 @@ call(Req) -> fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"running">>) end, fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_call/4, Data) end, + fun(Data) -> prepare(?FUNCTION_NAME, fun prg_storage:prepare_call/4, Data) end, fun process_call/1 ], Req#{type => call} @@ -76,7 +76,7 @@ repair(Req) -> fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"error">>) end, fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_repair/4, Data) end, + fun(Data) -> prepare(?FUNCTION_NAME, fun prg_storage:prepare_repair/4, Data) end, fun process_call/1 ], Req#{type => repair} @@ -89,7 +89,7 @@ simple_repair(Req) -> fun add_ns_opts/1, fun check_idempotency/1, fun check_process_continuation/1, - fun(Data) -> prepare_postponed(fun prg_storage:prepare_call/4, Data) end, + fun(Data) -> prepare_postponed(?FUNCTION_NAME, fun prg_storage:prepare_call/4, Data) end, fun do_simple_repair/1 ], Req @@ -115,7 +115,7 @@ put(Req) -> Req ). --spec trace(request()) -> {ok, _Result} | {error, _Reason}. +-spec trace(request()) -> {ok, _Result :: term()} | {error, _Reason :: term()}. trace(Req) -> prg_utils:pipe( [ @@ -227,11 +227,14 @@ do_simple_repair(#{task := _}) -> %% process will repaired via timeout task {ok, ok}; do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId}) -> - ok = prg_storage:repair_process(StorageOpts, NsId, Id), - ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), + prg_utils:with_span(#{name => <<"simple repair">>, kind => client}, fun(_SpanCtx) -> + ok = prg_storage:repair_process(StorageOpts, NsId, Id), + ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id) + end), {ok, ok}. prepare( + Name, Fun, #{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, id := ProcessId, task := Task} = Req @@ -242,7 +245,8 @@ prepare( PrepareResult = prg_utils:with_observe( fun() -> Fun(StorageOpts, NsId, ProcessId, Task#{status => TaskStatus}) end, ?PREPARING_KEY, - [erlang:atom_to_binary(NsId, utf8), TaskType] + [erlang:atom_to_binary(NsId, utf8), TaskType], + #{name => iolist_to_binary(["prepare ", atom_to_binary(Name)]), kind => client} ), case PrepareResult of {ok, {continue, TaskId}} -> @@ -263,6 +267,7 @@ prepare( end. prepare_postponed( + Name, Fun, #{ns_opts := #{storage := StorageOpts}, ns := NsId, id := ProcessId, task := Task} = Req ) -> @@ -270,7 +275,8 @@ prepare_postponed( PrepareResult = prg_utils:with_observe( fun() -> Fun(StorageOpts, NsId, ProcessId, Task#{status => <<"waiting">>}) end, ?PREPARING_KEY, - [erlang:atom_to_binary(NsId, utf8), TaskType] + [erlang:atom_to_binary(NsId, utf8), TaskType], + #{name => atom_to_binary(Name), kind => client} ), case PrepareResult of {ok, {postpone, TaskId}} -> @@ -278,7 +284,7 @@ prepare_postponed( {error, _} = Error -> Error end; -prepare_postponed(_Fun, Req) -> +prepare_postponed(_Name, _Fun, Req) -> %% Req without task, skip this step Req. @@ -325,15 +331,7 @@ do_get(Req) -> do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> prg_storage:process_trace(StorageOpts, NsId, Id). - -do_put( - #{ - ns_opts := #{storage := StorageOpts}, - id := Id, - ns := NsId, - args := #{process := Process} = Args - } = Opts -) -> +do_put(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := #{process := Process} = Args} = Opts) -> #{ process_id := ProcessId } = Process, @@ -376,15 +374,24 @@ process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker Timeout = TimeoutSec * 1000, Ref = make_ref(), TaskHeader = make_task_header(Type, Ref), - ok = prg_worker:process_task(Worker, TaskHeader, Task), - ok = prg_scheduler:release_worker(NsId, self(), Worker), - %% see fun reply/2 - receive - {Ref, Result} -> - Result - after Timeout -> - {error, <<"timeout">>} - end. + Attributes = #{ + <<"process.type">> => atom_to_binary(Type), + <<"process.id">> => maps:get(process_id, Task), + <<"process.namespace">> => NsId, + <<"process.task_id">> => maps:get(task_id, Task, undefined) + }, + prg_utils:with_span(#{name => atom_to_binary(Type), kind => client, attributes => Attributes}, fun(SpanCtx) -> + ok = prg_worker:process_task(Worker, TaskHeader, Task), + ok = prg_scheduler:release_worker(NsId, self(), Worker), + %% see fun reply/2 + receive + {Ref, Result} -> + Result + after Timeout -> + _ = otel_span:add_event(SpanCtx, <<"timeout">>, #{}), + {error, <<"timeout">>} + end + end). make_task_header(init, Ref) -> {init, {self(), Ref}}; diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 6a2fb46..459a33b 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -8,6 +8,8 @@ end_per_suite/1, init_per_group/2, end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, all/0, groups/0 ]). @@ -61,7 +63,9 @@ init_per_group(tasks_injection, C) -> Applications = [ {epg_connector, prg_ct_hook:app_env(epg_connector)}, {brod, prg_ct_hook:app_env(brod)}, - {progressor, UpdPrgConfig} + {progressor, UpdPrgConfig}, + {opentelemetry_exporter, []}, + {opentelemetry, [{span_processor, simple}]} ], _ = prg_ct_hook:start_applications(Applications), _ = prg_ct_hook:create_kafka_topics(), @@ -75,6 +79,23 @@ end_per_group(_, _) -> _ = prg_ct_hook:stop_applications(), ok. +init_per_testcase(Name, C) -> + Mod = ?MODULE, + SpanName = iolist_to_binary([atom_to_binary(Mod), ":", atom_to_binary(Name), "/1"]), + SpanCtx = otel_tracer:start_span(opentelemetry:get_application_tracer(Mod), SpanName, #{kind => internal}), + %% NOTE This also puts otel context to process dictionary + _ = otel_tracer:set_current_span(SpanCtx), + [{span_ctx, SpanCtx} | C]. + +end_per_testcase(_Name, C) -> + case lists:keyfind(span_ctx, 1, C) of + {span_ctx, SpanCtx} -> + _ = otel_span:end_span(SpanCtx), + ok; + _ -> + ok + end. + all() -> [ {group, base}, diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index eb3fd38..b458c67 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -53,7 +53,9 @@ app_list() -> [ {epg_connector, app_env(epg_connector)}, {brod, app_env(brod)}, - {progressor, app_env(progressor)} + {progressor, app_env(progressor)}, + {opentelemetry_exporter, []}, + {opentelemetry, [{span_processor, simple}]} ]. app_env(progressor) ->