diff --git a/scripts/build-preloaded-store.escript b/scripts/build-preloaded-store.escript index 83a0ccca2..62ba90a08 100755 --- a/scripts/build-preloaded-store.escript +++ b/scripts/build-preloaded-store.escript @@ -90,6 +90,8 @@ hb_opts_compile_opts(Ebin) -> end. drop_outdir([{outdir, _} | Rest]) -> drop_outdir(Rest); +drop_outdir([{d, Name, Value} | Rest]) when is_list(Name) -> + [{d, list_to_atom(Name), Value} | drop_outdir(Rest)]; drop_outdir([Opt | Rest]) -> [Opt | drop_outdir(Rest)]; drop_outdir([]) -> []. diff --git a/scripts/hyper-token.lua b/scripts/hyper-token.lua index 237542a0a..233421724 100644 --- a/scripts/hyper-token.lua +++ b/scripts/hyper-token.lua @@ -119,11 +119,25 @@ function count_common(a, b) if type(a) ~= "table" then a = { a } end if type(b) ~= "table" then b = { b } end + -- local count = 0 + -- for _, v in ipairs(a) do + -- for _, w in ipairs(b) do + -- if v == w then + -- count = count + 1 + -- end + -- end + -- end + + local seen = {} local count = 0 for _, v in ipairs(a) do - for _, w in ipairs(b) do - if v == w then - count = count + 1 + if not seen[v] then + seen[v] = true + for _, w in ipairs(b) do + if v == w then + count = count + 1 + break + end end end end @@ -895,4 +909,4 @@ function compute(base, assignment) ao.event({ "Process initialized.", { slot = assignment.slot } }) return "ok", base end -end \ No newline at end of file +end diff --git a/src/core/http/hb_client_remote.erl b/src/core/http/hb_client_remote.erl index 05c7e5df5..7489d32e1 100644 --- a/src/core/http/hb_client_remote.erl +++ b/src/core/http/hb_client_remote.erl @@ -98,7 +98,13 @@ upload(Msg, Opts) -> end, hb_message:commitment_devices(Msg, Opts) ), - {ok, UploadResults}. + case lists:filter(fun upload_failed/1, UploadResults) of + [] -> {ok, UploadResults}; + Errors -> {error, Errors} + end. +upload_failed({error, _}) -> true; +upload_failed({failure, _}) -> true; +upload_failed(_) -> false. upload(Msg, Opts, <<"httpsig@1.0">>) -> case hb_opts:get(bundler_httpsig, not_found, Opts) of not_found -> @@ -107,13 +113,17 @@ upload(Msg, Opts, <<"httpsig@1.0">>) -> ?event({uploading_item, Msg}), hb_http:post(Bundler, <<"/tx">>, Msg, Opts) end; -upload(Msg, Opts, _CommitmentDevice) -> +upload(Msg, Opts, CommitmentDevice) -> ?event({uploading_item, Msg}), hb_ao:raw( <<"arweave@2.9">>, <<"tx">>, - #{}, - Msg#{ <<"method">> => <<"POST">> }, + Msg, + #{ + <<"method">> => <<"POST">>, + <<"target">> => <<"base">>, + <<"commitment-device">> => CommitmentDevice + }, Opts ). diff --git a/src/core/http/hb_http.erl b/src/core/http/hb_http.erl index 241d5e63a..82cb31dca 100644 --- a/src/core/http/hb_http.erl +++ b/src/core/http/hb_http.erl @@ -638,9 +638,15 @@ encode_reply(Status, TABMReq, Message, Opts) -> end, Opts ), + DefaultAcceptBundle = + case {Codec, hb_maps:get(<<"require-codec">>, TABMReq, not_found, Opts)} of + {<<"json@1.0">>, not_found} -> false; + {<<"json@1.0">>, _} -> true; + _ -> false + end, AcceptBundle = hb_util:atom( - hb_maps:get(<<"accept-bundle">>, TABMReq, false, Opts) + hb_maps:get(<<"accept-bundle">>, TABMReq, DefaultAcceptBundle, Opts) ), ?event(debug_http, {encoding_reply, @@ -1054,6 +1060,16 @@ normalize_unsigned(PrimMsg, Req = #{ headers := RawHeaders }, Msg, Opts) -> ), FilterKeys = hb_opts:get(http_inbound_filter_keys, ?DEFAULT_FILTER_KEYS, Opts), FilteredMsg = hb_message:without_unless_signed(FilterKeys, Msg, Opts), + DefaultAcceptBundle = + case maps:get( + <<"require-codec">>, + Msg, + maps:get(<<"require-codec">>, PrimMsg, not_found) + ) of + <<"application/json">> -> true; + <<"json@1.0">> -> true; + _ -> maps:get(<<"accept-bundle">>, RawHeaders, false) + end, BaseMsg = FilteredMsg#{ <<"method">> => Method, @@ -1065,7 +1081,7 @@ normalize_unsigned(PrimMsg, Req = #{ headers := RawHeaders }, Msg, Opts) -> maps:get( <<"accept-bundle">>, PrimMsg, - maps:get(<<"accept-bundle">>, RawHeaders, false) + DefaultAcceptBundle ) ), <<"accept">> => diff --git a/src/core/resolver/hb_cache_control.erl b/src/core/resolver/hb_cache_control.erl index 85746bf0d..665bcade1 100644 --- a/src/core/resolver/hb_cache_control.erl +++ b/src/core/resolver/hb_cache_control.erl @@ -56,7 +56,14 @@ lookup(Base, Req, Opts) -> Opts, hb_opts:get(store_scope_resolved, local, Opts) ), - case hb_cache:read_resolved(Base, Req, OutputScopedOpts) of + CacheRead = + try hb_cache:read_resolved(Base, Req, OutputScopedOpts) of + ReadRes -> ReadRes + catch + throw:{necessary_message_not_found, _, _} -> + miss + end, + case CacheRead of {hit, not_found} -> {error, not_found}; {hit, {ok, Res}} -> diff --git a/src/core/resolver/hb_opts.erl b/src/core/resolver/hb_opts.erl index 4d89c1dda..5c9bf763f 100644 --- a/src/core/resolver/hb_opts.erl +++ b/src/core/resolver/hb_opts.erl @@ -531,7 +531,7 @@ raw_default_message() -> } ] }, - <<"scheduler-default-commitment-spec">> => <<"httpsig@1.0">>, + <<"scheduler-default-commitment-spec">> => <<"ans104@1.0">>, <<"genesis-wasm-import-authorities">> => [ <<"WjnS-s03HWsDSdMnyTdzB1eHZB2QheUWP_FVRVYxkXk">> diff --git a/src/preloaded/arweave/dev_arweave.erl b/src/preloaded/arweave/dev_arweave.erl index 88ca6523d..de3be7f5d 100644 --- a/src/preloaded/arweave/dev_arweave.erl +++ b/src/preloaded/arweave/dev_arweave.erl @@ -47,11 +47,16 @@ tx(Base, Request, Opts) -> %% you should use the ~bundler@1.0 device. post_tx(Base, RawRequest, Opts) -> {ok, Request} = extract_target(Base, RawRequest, Opts), - case hb_maps:find(<<"commitment-device">>, Request, Opts) of + case hb_maps:find(<<"commitment-device">>, RawRequest, Opts) of {ok, Device} -> post_tx(Base, Request, Opts, Device); error -> - post_tx_detect_device(Base, Request, Opts) + case hb_maps:find(<<"commitment-device">>, Request, Opts) of + {ok, Device} -> + post_tx(Base, Request, Opts, Device); + error -> + post_tx_detect_device(Base, Request, Opts) + end end. %% @doc Detect the commitment device to use when posting a transaction. diff --git a/src/preloaded/codec/dev_json.erl b/src/preloaded/codec/dev_json.erl index dffff0820..807dade03 100644 --- a/src/preloaded/codec/dev_json.erl +++ b/src/preloaded/codec/dev_json.erl @@ -27,9 +27,10 @@ to(Msg, Req, Opts) -> tabm, ConvOpts ), + Bundle = hb_maps:get(<<"bundle">>, Req, false, Opts), Loaded = - case hb_maps:get(<<"bundle">>, Req, false, Opts) of - true -> hb_cache:ensure_all_loaded(Restructured, Opts); + case Bundle of + true -> load_available_links(hb_link:decode_all_links(Restructured), Opts); false -> Restructured end, JSONStructured = @@ -38,12 +39,37 @@ to(Msg, Req, Opts) -> tabm, #{ <<"device">> => <<"structured@1.0">>, + <<"bundle">> => Bundle, <<"encode-types">> => [<<"atom">>] }, ConvOpts ), {ok, hb_json:encode(JSONStructured)}. +%% @doc Eager-load resolvable links for bundled JSON responses, while leaving +%% missing lazy links in place so optional response fields do not fail encoding. +load_available_links(Msg, Opts) -> + load_available_links([], Msg, Opts). + +load_available_links(_Ref, Link, Opts) when ?IS_LINK(Link) -> + try hb_cache:ensure_loaded(Link, Opts) of + Loaded -> load_available_links([], Loaded, Opts) + catch + throw:{necessary_message_not_found, _, _} -> Link + end; +load_available_links(Ref, Msg, Opts) when is_map(Msg) -> + maps:map( + fun(K, V) -> load_available_links([K|Ref], V, Opts) end, + Msg + ); +load_available_links(Ref, Msg, Opts) when is_list(Msg) -> + lists:map( + fun({N, V}) -> load_available_links([N|Ref], V, Opts) end, + hb_util:number(Msg) + ); +load_available_links(_Ref, Msg, _Opts) -> + Msg. + %% @doc Decode a JSON string to a message. from(Map, _Req, _Opts) when is_map(Map) -> {ok, Map}; from(JSON, Req, Opts) -> diff --git a/src/preloaded/codec/dev_json_iface.erl b/src/preloaded/codec/dev_json_iface.erl index b37621d6c..b4b35b6e1 100644 --- a/src/preloaded/codec/dev_json_iface.erl +++ b/src/preloaded/codec/dev_json_iface.erl @@ -208,7 +208,10 @@ prepare_tags(Msg, Opts) -> {ok, OriginalTags} -> Res = hb_util:message_to_ordered_list(OriginalTags), ?event({using_original_tags, Res}), - Res; + case complete_tags(Res) of + true -> Res; + false -> prepare_header_case_tags(Msg, Opts) + end; error -> prepare_header_case_tags(Msg, Opts) end; @@ -216,6 +219,14 @@ prepare_tags(Msg, Opts) -> prepare_header_case_tags(Msg, Opts) end. +complete_tags(Tags) -> + lists:all( + fun(#{ <<"name">> := _, <<"value">> := _ }) -> true; + (_) -> false + end, + Tags + ). + %% @doc Convert a message without an `original-tags' field into a list of %% key-value pairs, with the keys in HTTP header-case. prepare_header_case_tags(TABM, Opts) -> diff --git a/src/preloaded/codec/lib_arweave_common.erl b/src/preloaded/codec/lib_arweave_common.erl index aa0951aea..8a4fd8a90 100644 --- a/src/preloaded/codec/lib_arweave_common.erl +++ b/src/preloaded/codec/lib_arweave_common.erl @@ -535,7 +535,9 @@ original_tags_to_tags(TagMap) -> ?event({ordered_tagmap, {explicit, OrderedList}, {input, {explicit, TagMap}}}), lists:map( fun(#{ <<"name">> := Key, <<"value">> := Value }) -> - {Key, Value} + {Key, Value}; + (#{ <<"name">> := Key }) -> + {Key, <<>>} end, OrderedList ). diff --git a/src/preloaded/process/dev_process.erl b/src/preloaded/process/dev_process.erl index 55eea8feb..bec4a2426 100644 --- a/src/preloaded/process/dev_process.erl +++ b/src/preloaded/process/dev_process.erl @@ -217,7 +217,7 @@ compute(Base, Req, Opts) -> {result, Result} } ), - {ok, without_snapshot(Result, Opts)}; + {ok, compute_response(Result, Req, Opts)}; {error, not_found} -> {ok, Loaded} = ensure_loaded(ProcBase, Req, Opts), ?event(compute, @@ -259,7 +259,7 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> Opts ), store_result(true, ProcID, TargetSlot, Base, Req, Opts), - {ok, without_snapshot(lib_process:as_process(Base, Opts), Opts)}; + {ok, compute_response(lib_process:as_process(Base, Opts), Req, Opts)}; CurrentSlot when CurrentSlot < TargetSlot -> % Compute the next state transition. NextSlot = CurrentSlot + 1, @@ -646,7 +646,7 @@ now(RawBase, Req, Opts) -> LatestKnown = dev_process_cache:latest(ProcessID, [], Opts), case LatestKnown of {ok, LatestSlot, RawLatestMsg} -> - LatestMsg = without_snapshot(RawLatestMsg, Opts), + LatestMsg = compute_response(RawLatestMsg, Req, Opts), ?event(compute_cache, {serving_latest_cached_state, {proc_id, ProcessID}, @@ -791,3 +791,17 @@ ensure_loaded(Base, Req, Opts) -> %% @doc Remove the `snapshot' key from a message and return it. without_snapshot(Msg, Opts) -> hb_ao:set(Msg, <<"snapshot">>, unset, Opts). + +%% @doc Format a compute response for the caller with its result payload inline. +compute_response(Msg, _Req, Opts) -> + with_loaded_results(without_snapshot(Msg, Opts), Opts). + +with_loaded_results(Msg, Opts) -> + Decoded = hb_link:decode_all_links(Msg), + case hb_maps:get(<<"results">>, Decoded, not_found, Opts) of + not_found -> Msg; + Results -> + (maps:remove(<<"results+link">>, Msg))#{ + <<"results">> => hb_cache:ensure_all_loaded(Results, Opts) + } + end. diff --git a/src/preloaded/process/dev_push.erl b/src/preloaded/process/dev_push.erl index 8a549e16d..244d347e0 100644 --- a/src/preloaded/process/dev_push.erl +++ b/src/preloaded/process/dev_push.erl @@ -44,7 +44,7 @@ push(Base, Req, Opts) -> no_slot -> case schedule_initial_message(Process, Req, Opts) of {ok, Assignment} -> - case find_type(hb_ao:get(<<"body">>, Assignment, Opts), Opts) of + case find_type(Req, Opts) of <<"Process">> -> ?event(push, {initializing_process, @@ -459,12 +459,15 @@ push_downstream_local(TargetID, NextSlotOnProc, Origin, Opts) -> {origin, Origin} } ), + ResultDepth = + decrement_result_depth( + hb_maps:get(<<"result-depth">>, Origin, 1, Opts) + ), BaseReq = #{ <<"path">> => <<"push">>, <<"slot">> => NextSlotOnProc, - <<"result-depth">> => - hb_maps:get(<<"result-depth">>, Origin, 1, Opts) - 1 + <<"result-depth">> => ResultDepth }, Req = case parse_max_depth(hb_maps:get(<<"max-depth">>, Origin, undefined, Opts)) of @@ -492,6 +495,15 @@ parse_max_depth(Bin) when is_binary(Bin) -> end; parse_max_depth(_) -> undefined. +decrement_result_depth(Depth) when is_integer(Depth), Depth > 0 -> Depth - 1; +decrement_result_depth(Depth) when is_binary(Depth) -> + try hb_util:int(Depth) of + N -> decrement_result_depth(N) + catch + _:_ -> 0 + end; +decrement_result_depth(_) -> 0. + %% @doc Augment the message with from-* keys, if it doesn't already have them. normalize_message(MsgToPush, Opts) -> hb_ao:set( @@ -990,7 +1002,7 @@ test_push_as_identity() -> test_multi_process_push() -> {Sender, _Receiver, MsgSlot, Opts} = setup_two_process_message(), - %% Install a catch-all `Pong' handler on the Sender so the Receiver's + %% Install a `Pong' handler on the Sender so the Receiver's %% reply (the helper's `reply_script' fires on `Action = "Ping"' and %% sends back `Action = "Reply"') is observable as `GOT PONG' in the %% Sender's `now/results/data'. @@ -999,7 +1011,9 @@ test_multi_process_push() -> Sender, << "Handlers.add(\"Pong\",\n" - " function (test) return true end,\n" + " function (test)\n" + " return (test.Action or test.action) == \"Reply\"\n" + " end,\n" " function(m)\n" " print(\"GOT PONG\")\n" " end\n" @@ -1585,9 +1599,11 @@ test_nested_push_prompts_encoding_change() -> ping_pong_script(Limit) -> << "Handlers.add(\"Ping\",\n" - " function (test) return true end,\n" + " function (test)\n" + " return (test.Action or test.action) == \"Ping\"\n" + " end,\n" " function(m)\n" - " C = tonumber(m.Count)\n" + " C = tonumber(m.Count or m.count)\n" " if C <= ", (integer_to_binary(Limit))/binary, " then\n" " Send({ Target = ao.id, Action = \"Ping\", Count = C + 1 })\n" " print(\"Ping\", C + 1)\n" @@ -1603,11 +1619,14 @@ reply_script() -> << """ Handlers.add("Reply", - { Action = "Ping" }, function(m) + return (m.Action or m.action) == "Ping" + end, + function(m) + local from = m.From or m.from print("Replying to...") - print(m.From) - Send({ Target = m.From, Action = "Reply", Message = "Pong!" }) + print(from) + Send({ Target = from, Action = "Reply", Message = "Pong!" }) print("Done.") end ) diff --git a/src/preloaded/process/dev_scheduler.erl b/src/preloaded/process/dev_scheduler.erl index c06cfee69..886e8c69f 100644 --- a/src/preloaded/process/dev_scheduler.erl +++ b/src/preloaded/process/dev_scheduler.erl @@ -15,7 +15,7 @@ %%% -module(dev_scheduler). --device_libraries([lib_process]). +-device_libraries([lib_process, lib_scheduler_formats]). %%% AO-Core API functions: -export([info/0]). %%% Local scheduling functions: @@ -377,17 +377,29 @@ post_schedule(Base, Req, Opts) -> ?event(scheduling_message), % Find the target message to schedule: RawToSched = find_message_to_schedule(Base, Req, Opts), - % If the message can not be properly loaded, this will throw an error - % before scheduling the message. - try hb_cache:ensure_all_loaded(RawToSched, Opts) of - ToSched -> - do_post_schedule(Base, Req, ToSched, Opts) - catch - error:{necessary_message_not_found, _, _} -> + % Filter before loading so uncommitted HTTP wrapper links do not block a + % valid signed message from being scheduled. + case hb_message:with_only_committed(RawToSched, Opts) of + {ok, OnlyCommitted} -> + try hb_cache:ensure_all_loaded(OnlyCommitted, Opts) of + ToSched -> + do_post_schedule(Base, Req, ToSched, Opts) + catch + _: {necessary_message_not_found, _, _} -> + {error, + #{ + <<"status">> => 404, + <<"body">> => <<"Cannot fully load message to schedule.">> + } + } + end; + {error, Err} -> {error, #{ - <<"status">> => 404, - <<"body">> => <<"Cannot fully load message to schedule.">> + <<"status">> => 400, + <<"body">> => <<"Message invalid: ", + "Committed components cannot be validated.">>, + <<"reason">> => Err } } end. @@ -482,14 +494,6 @@ post_local_schedule(ProcID, PID, Req, Opts) -> }; {true, <<"Process">>} -> {ok, _} = hb_cache:write(Req, Opts), - spawn( - fun() -> - {ok, Results} = hb_client_remote:upload(Req, Opts), - ?event( - {uploaded_process, {proc_id, ProcID}, {results, Results}} - ) - end - ), ?event( {registering_new_process, {proc_id, ProcID}, @@ -776,7 +780,7 @@ remote_slot(<<"ao.TN.1">>, ProcID, Node, Opts) -> % Convert the JSON object for the latest assignment into the % standardized `~scheduler@1.0' format. A = - dev_scheduler_formats:aos2_to_assignment( + lib_scheduler_formats:aos2_to_assignment( JSON, Opts ), @@ -845,7 +849,7 @@ get_schedule(Base, Req, Opts) -> {ok, Res} -> case uri_string:percent_decode(Format) of <<"application/aos-2">> -> - dev_scheduler_formats:assignments_to_aos2( + lib_scheduler_formats:assignments_to_aos2( ProcID, hb_ao:get( <<"assignments">>, Res, [], Opts), @@ -896,7 +900,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, _, Opts) % as a bundle. We set the 'more' to `undefined' to indicate that there may % be more assignments to fetch, but we don't know for sure. Res = - dev_scheduler_formats:assignments_to_bundle( + lib_scheduler_formats:assignments_to_bundle( ProcID, LocalAssignments, undefined, @@ -995,7 +999,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, Redirect, Opts) -> cache_remote_schedule(Variant, ProcID, JSONRes, Opts), ?event(debug_aos2, {json_res, {json, JSONRes}}), Filtered = filter_json_assignments(JSONRes, To, From, Opts), - dev_scheduler_formats:aos2_to_assignments( + lib_scheduler_formats:aos2_to_assignments( ProcID, Filtered, Opts @@ -1018,7 +1022,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, Redirect, Opts) -> % Merge the local assignments with the remote assignments, % and normalize the keys. Merged = - dev_scheduler_formats:assignments_to_bundle( + lib_scheduler_formats:assignments_to_bundle( ProcID, MergedAssignments = LocalAssignments ++ RemoteAssignments, hb_ao:get(<<"continues">>, NormSched, false, Opts), @@ -1272,7 +1276,7 @@ post_legacy_schedule(ProcID, OnlyCommitted, Node, Opts) -> ), ?event({assignment_json, AssignmentJSON}), Assignment = - dev_scheduler_formats:aos2_to_assignment( + lib_scheduler_formats:aos2_to_assignment( AssignmentJSON, Opts ), @@ -1386,9 +1390,9 @@ generate_local_schedule(Format, ProcID, From, To, Opts) -> FormatterFun = case uri_string:percent_decode(Format) of <<"application/aos-2">> -> - fun dev_scheduler_formats:assignments_to_aos2/4; + fun lib_scheduler_formats:assignments_to_aos2/4; _ -> - fun dev_scheduler_formats:assignments_to_bundle/4 + fun lib_scheduler_formats:assignments_to_bundle/4 end, Res = FormatterFun(ProcID, Assignments, More, Opts), ?event({assignments_bundle_outbound, {format, Format}, {res, Res}}), diff --git a/src/preloaded/process/dev_scheduler_cache.erl b/src/preloaded/process/dev_scheduler_cache.erl index 66f8acb7f..7c2ce6b86 100644 --- a/src/preloaded/process/dev_scheduler_cache.erl +++ b/src/preloaded/process/dev_scheduler_cache.erl @@ -91,7 +91,7 @@ read(ProcID, Slot, RawOpts) -> case hb_ao:get(<<"variant">>, Assignment, Opts) of <<"ao.TN.1">> -> Loaded = hb_cache:ensure_all_loaded(Assignment, Opts), - Norm = dev_scheduler_formats:aos2_to_assignment(Loaded, Opts), + Norm = lib_scheduler_formats:aos2_to_assignment(Loaded, Opts), ?event({normalized_aos2_assignment, Norm}), {ok, Norm}; <<"ao.N.1">> -> diff --git a/src/preloaded/process/dev_scheduler_server.erl b/src/preloaded/process/dev_scheduler_server.erl index bba82eba7..95a9bca99 100644 --- a/src/preloaded/process/dev_scheduler_server.erl +++ b/src/preloaded/process/dev_scheduler_server.erl @@ -145,7 +145,9 @@ schedule(ErlangProcID, Message) -> ErlangProcID ! {schedule, Message, self(), AbortTime}, receive {scheduled, Message, Assignment} -> - Assignment + Assignment; + {schedule_failed, Message, Reason} -> + throw({scheduler_error, {proc_id, ErlangProcID}, {reason, Reason}}) after ?DEFAULT_TIMEOUT -> throw({scheduler_timeout, {proc_id, ErlangProcID}, {message, Message}}) end. @@ -194,8 +196,9 @@ assign(State, Message, ReplyPID) -> try do_assign(State, Message, ReplyPID) catch - _Class:Reason:Stack -> + Class:Reason:Stack -> ?event({error_scheduling, {reason, Reason}, {trace, Stack}}), + ReplyPID ! {schedule_failed, Message, {Class, Reason, Stack}}, State end. @@ -253,6 +256,18 @@ do_assign(State, Message, ReplyPID) -> Assignment, State ), + CommitmentSpec = maps:get(committment_spec, State), + CommitmentDevice = commitment_device(CommitmentSpec), + UploadOpts = upload_opts(State), + ?event( + {uploading_message, + {commitment_spec, CommitmentSpec}, + {commitment_device, CommitmentDevice} + } + ), + ok = upload_with_commitment(Message, UploadOpts, CommitmentSpec), + ok = upload_assignment(Assignment, State, UploadOpts, CommitmentSpec), + ?event(uploads_complete), ?event(starting_message_write), ok = dev_scheduler_cache:write(Assignment, Opts), maybe_inform_recipient( @@ -263,10 +278,6 @@ do_assign(State, Message, ReplyPID) -> State ), ?event(writes_complete), - ?event(uploading_message), - hb_client_remote:upload(Message, Opts), - hb_client_remote:upload(Assignment, Opts), - ?event(uploads_complete), maybe_inform_recipient( remote_confirmation, ReplyPID, @@ -293,18 +304,115 @@ commit_assignment(BaseAssignment, State) -> Wallets = maps:get(wallets, State), Opts = maps:get(opts, State), CommittmentSpec = maps:get(committment_spec, State), - lists:foldr( - fun(Wallet, Assignment) -> - hb_message:commit( - Assignment, - Opts#{ <<"priv-wallet">> => Wallet }, - CommittmentSpec - ) + lists:foldl( + fun(Wallet, Acc) -> + Signed = + hb_message:commit( + BaseAssignment, + Opts#{ <<"priv-wallet">> => Wallet }, + CommittmentSpec + ), + merge_commitments(Acc, Signed) end, BaseAssignment, Wallets ). +merge_commitments(Base, Signed) -> + Signed#{ + <<"commitments">> => + maps:merge( + maps:get(<<"commitments">>, Base, #{}), + maps:get(<<"commitments">>, Signed, #{}) + ) + }. + +%% @doc Ensure an upload target is committed with the scheduler's commitment +%% spec before asking the remote uploader to publish it using that spec. +ensure_committed(Msg, Opts, CommitmentSpec) -> + Device = commitment_device(CommitmentSpec), + case lists:member(Device, hb_message:commitment_devices(Msg, Opts)) of + true -> Msg; + false -> hb_message:commit(Msg, Opts, CommitmentSpec) + end. + +commitment_device(CommitmentSpec) when is_binary(CommitmentSpec) -> + CommitmentSpec; +commitment_device(CommitmentSpec) -> + maps:get(<<"commitment-device">>, CommitmentSpec). + +upload_opts(#{ opts := Opts, wallets := [Wallet | _] }) -> + case maps:is_key(<<"priv-wallet">>, Opts) of + true -> Opts; + false -> Opts#{ <<"priv-wallet">> => Wallet } + end; +upload_opts(#{ opts := Opts }) -> + Opts. + +upload_with_commitment(Msg, Opts, CommitmentSpec) -> + Device = commitment_device(CommitmentSpec), + Committed = ensure_committed(Msg, Opts, CommitmentSpec), + Results = + lists:map( + fun(UploadMsg) -> + hb_client_remote:upload(UploadMsg, Opts, Device) + end, + upload_variants(Committed, Device, Opts) + ), + case lists:filter(fun upload_failed/1, Results) of + [] -> ok; + Errors -> {error, Errors} + end. + +upload_variants(Msg = #{ <<"commitments">> := Commitments }, Device, Opts) -> + DeviceCommitments = + maps:filter( + fun(_ID, Commitment) -> + hb_ao:get(<<"commitment-device">>, Commitment, Opts) =:= Device + end, + Commitments + ), + case maps:to_list(DeviceCommitments) of + [] -> [Msg]; + [_] -> [Msg#{ <<"commitments">> => DeviceCommitments }]; + DeviceCommitmentsList -> + [ + Msg#{ <<"commitments">> => #{ ID => Commitment } } + || + {ID, Commitment} <- DeviceCommitmentsList + ] + end; +upload_variants(Msg, _Device, _Opts) -> + [Msg]. + +upload_failed({ok, _}) -> false; +upload_failed(_) -> true. + +upload_assignment(Assignment, #{ wallets := [] }, Opts, CommitmentSpec) -> + upload_with_commitment(Assignment, Opts, CommitmentSpec); +upload_assignment(Assignment, #{ wallets := Wallets }, Opts, CommitmentSpec) -> + Device = commitment_device(CommitmentSpec), + BaseAssignment = hb_message:uncommitted(Assignment, Opts), + Results = + lists:map( + fun(Wallet) -> + hb_client_remote:upload( + hb_message:commit( + BaseAssignment, + Opts#{ <<"priv-wallet">> => Wallet }, + CommitmentSpec + ), + Opts, + Device + ) + end, + Wallets + ), + case lists:filter(fun upload_failed/1, Results) of + [] -> ok; + Errors -> {error, Errors} + end. + %% @doc Potentially inform the caller that the assignment has been scheduled. %% The main assignment loop calls this function repeatedly at different stages %% of the assignment process. The scheduling mode determines which stages diff --git a/src/preloaded/process/dev_scheduler_formats.erl b/src/preloaded/process/lib_scheduler_formats.erl similarity index 97% rename from src/preloaded/process/dev_scheduler_formats.erl rename to src/preloaded/process/lib_scheduler_formats.erl index 8d178f209..ad176b68d 100644 --- a/src/preloaded/process/dev_scheduler_formats.erl +++ b/src/preloaded/process/lib_scheduler_formats.erl @@ -1,12 +1,12 @@ -%%% @doc This module is used by dev_scheduler in order to produce outputs that -%%% are compatible with various forms of AO clients. It features two main formats: +%%% @doc Shared scheduler response format helpers for devices that need outputs +%%% compatible with various forms of AO clients. It features two main formats: %%% %%% - `application/json' %%% - `application/http' %%% %%% The `application/json' format is a legacy format that is not recommended for %%% new integrations of the AO protocol. --module(dev_scheduler_formats). +-module(lib_scheduler_formats). -export([assignments_to_bundle/4, assignments_to_aos2/4]). -export([aos2_to_assignments/3, aos2_to_assignment/2]). -export([aos2_normalize_types/1]). diff --git a/src/preloaded/vm/dev_delegated_compute.erl b/src/preloaded/vm/dev_delegated_compute.erl index 64daea028..f7c058f6d 100644 --- a/src/preloaded/vm/dev_delegated_compute.erl +++ b/src/preloaded/vm/dev_delegated_compute.erl @@ -3,7 +3,7 @@ %%% bring trusted results into the local node, or as the `Execution-Device' of %%% an AO process. -module(dev_delegated_compute). --device_libraries([lib_process]). +-device_libraries([lib_process, lib_scheduler_formats]). -export([init/3, compute/3, normalize/3, snapshot/3]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -83,7 +83,7 @@ do_compute(ProcID, Req, Opts) -> ?event({do_compute_msg, {req, Req}}), Slot = hb_ao:get(<<"slot">>, Req, Opts), {ok, AOS2 = #{ <<"body">> := Body }} = - dev_scheduler_formats:assignments_to_aos2( + lib_scheduler_formats:assignments_to_aos2( ProcID, #{ Slot => Req @@ -151,6 +151,7 @@ do_relay(Method, Path, Body, Headers, Opts) -> Headers#{ <<"path">> => <<"call">>, <<"target">> => <<"payload">>, + <<"peer">> => genesis_wasm_peer(Opts), <<"payload">> => Headers#{ <<"path">> => Path, @@ -233,6 +234,7 @@ snapshot(Msg, Req, Opts) -> }, #{ <<"path">> => <<"call">>, + <<"peer">> => genesis_wasm_peer(Opts), <<"relay-method">> => <<"POST">>, <<"relay-path">> => <<"/snapshot/", ProcID/binary>>, <<"content-type">> => <<"application/json">>, @@ -254,3 +256,10 @@ snapshot(Msg, Req, Opts) -> <<"error-details">> => Error }} end. + +genesis_wasm_peer(Opts) -> + Port = + integer_to_binary( + hb_opts:get(genesis_wasm_port, 6363, Opts) + ), + <<"http://localhost:", Port/binary>>.