diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 9b249b56c..a99490820 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -5,7 +5,7 @@ %%% `/arweave` route in the node's configuration message. -module(dev_arweave). -export([info/0]). --export([tx/3, raw/3, chunk/3, block/3, current/3, status/3, price/3, tx_anchor/3]). +-export([tx/3, raw/3, chunk/3, block/3, parent/3, current/3, status/3, price/3, tx_anchor/3]). -export([pending/3]). -export([post_tx_header/2, post_tx/3, post_tx/4, post_chunk/2]). %%% Helper functions @@ -173,7 +173,16 @@ head_raw(Base, Request, Opts) -> <<"tx@1.0">> -> fun head_raw_tx/4; _ -> throw({invalid_codec_device, CodecDevice}) end, - CodecFun(TXID, StartOffset, Length, Opts); + try CodecFun(TXID, StartOffset, Length, Opts) + catch _:Reason:Stacktrace -> + %% This can be prone to serialization error. + %% Catch and output as an error. + ?event(store_error, {head_raw, + {txid, TXID}, + {reason, Reason}, + {stacktrace, Stacktrace}}), + {error, Reason} + end; not_found -> ?event( arweave, @@ -793,6 +802,57 @@ block({height, Height}, Opts) -> ) end. +%% @doc Look up the parent (block or bundle) that contains an item. +parent(Base, Request, Opts) -> + case find_key(<<"parent">>, Base, Request, Opts) of + not_found -> + {error, not_found}; + ID -> + StoreOpts = hb_store_arweave:store_from_opts(Opts), + try hb_store_arweave:read_parent(StoreOpts, ID, Opts) of + {ok, [{Height, block} | _]} -> + Entry = #{ + <<"type">> => <<"block">>, + <<"height">> => Height + }, + {ok, #{ + <<"content-type">> => <<"application/json">>, + <<"body">> => + hb_json:encode(#{<<"parents">> => [Entry]}) + }}; + {ok, [{ParentID, bundle} | _]} -> + Entry = #{ + <<"type">> => <<"bundle">>, + <<"id">> => hb_util:encode(ParentID) + }, + {ok, #{ + <<"content-type">> => <<"application/json">>, + <<"body">> => + hb_json:encode(#{<<"parents">> => [Entry]}) + }}; + {error, Reason} -> + ?event(warning, + {parent_read_error, {id, ID}, {reason, Reason}}), + {error, not_found}; + not_found -> + {error, not_found} + catch + error:Reason:Stacktrace -> + ?event(error, + {parent_read_error, + {id, ID}, + {reason, Reason}, + {stacktrace, Stacktrace} + }), + {failure, + #{ + <<"status">> => 500, + <<"type">> => <<"parent_read_error">> + } + } + end + end. + %% @doc Retrieve the current block information from Arweave. current(_Base, _Request, Opts) -> request(<<"GET">>, <<"/block/current">>, Opts). @@ -953,7 +1013,7 @@ to_message(Path = <<"/block/", _/binary>>, <<"GET">>, {ok, #{ <<"body">> := Body Opts ), CacheRes = - case hb_opts:get(arweave_index_blocks, true, Opts) of + case hb_opts:get(<<"arweave-index-blocks">>, true, Opts) of true -> dev_arweave_block_cache:write(Block, Opts); false -> skipped end, @@ -1357,7 +1417,8 @@ index_test_tx(TXID, IndexStore, Opts) -> TXID, <<"tx@1.0">>, StartOffset, - Size + Size, + Opts ), ?assertMatch({ok, _}, hb_store_arweave:read_offset(IndexStore, TXID)), ok. @@ -1551,6 +1612,40 @@ head_raw_ans104_test_parallel() -> hb_maps:find(<<"content-length">>, Result, Opts) ). +%% @doc Interior Arweave offset returns bytes that are not a valid ANS-104 +%% header, so head_raw_ans104/4 throws inside do_head_raw_ans104/5. The +%% try-catch added to head_raw/3 must convert that throw into {error, _}. +head_raw_ans104_deserialize_throws_test_parallel() -> + TestStore = hb_test_utils:test_store(hb_store_volatile, <<"head-raw-throws">>), + IndexStore = #{ + <<"module">> => hb_store_arweave, + <<"index-store">> => [TestStore] + }, + Opts = #{ + <<"store">> => [TestStore], + <<"arweave-index-ids">> => true, + <<"arweave-index-store">> => IndexStore + }, + FakeID = <<"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC">>, + %% Same interior offset as bundle_header_garbage_guard_test_parallel. + ProbeOffset = 376836336327208, + Size = 4096, + ok = hb_store_arweave:write_offset( + IndexStore, FakeID, <<"ans104@1.0">>, ProbeOffset - 1, Size, Opts + ), + ?assertMatch( + {error, _}, + hb_ao:resolve( + #{ <<"device">> => <<"arweave@2.9">> }, + #{ + <<"path">> => <<"raw">>, + <<"raw">> => FakeID, + <<"method">> => <<"HEAD">> + }, + Opts + ) + ). + get_raw_range_tx_test_parallel() -> DataItemID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, Opts = setup_arweave_index_opts([DataItemID]), diff --git a/src/dev_copycat_arweave.erl b/src/dev_copycat_arweave.erl index d8db2f845..6b5a86537 100644 --- a/src/dev_copycat_arweave.erl +++ b/src/dev_copycat_arweave.erl @@ -1,14 +1,22 @@ %%% @doc A `~copycat@1.0' engine that fetches block data from an Arweave node for %%% replication. This engine works in _reverse_ chronological order by default. %%% If `to' is omitted, it keeps moving downward from `from' until it reaches a -%%% block where at least one TX is already indexed, then stops. If `to' is -%%% provided, every block in the range is processed. +%%% block that is already indexed at the requested depth (checked via block +%%% markers first, then legacy per-TX fallback for pre-marker indexes). If `to' +%%% is provided, every block in the range is processed. -module(dev_copycat_arweave). -export([arweave/3]). +-export([set_depth_recursion_cap/2, get_depth_recursion_cap/1]). -include_lib("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(ARWEAVE_DEVICE, <<"~arweave@2.9">>). +-define(DEPTH_SENTINEL, 99999). +%% `full` uses the copycat-depth-recursion-cap option +%% as a safe depth to go to. This can be changed to an +%% integer value. +-define(DEFAULT_BLOCK_DEPTH, <<"full">>). +-define(DEFAULT_COPYCAT_MEMORY_BUDGET, 6 * 1024 * 1024 * 1024). % GET /~cron@1.0/once&cron-path=~copycat@1.0/arweave @@ -16,18 +24,339 @@ %% latest known block towards the Genesis block. If no range is provided, we %% fetch blocks from the latest known block towards the Genesis block. arweave(_Base, Request, Opts) -> - case parse_range(Request, Opts) of - {error, unavailable} -> - {error, unavailable}; - {ok, {From, To}} -> - case hb_maps:get(<<"mode">>, Request, <<"write">>, Opts) of - <<"write">> -> fetch_blocks(Request, From, To, Opts); - <<"list">> -> list_index(From, To, Opts); - Mode -> - {error, <<"Unsupported mode `", (hb_util:bin(Mode))/binary, "`. Supported modes are: write, list">>} + case hb_maps:get(<<"mode">>, Request, <<"write">>, Opts) of + <<"write">> -> + case hb_maps:find(<<"id">>, Request, Opts) of + {ok, TXID} -> + case process_l1_request(TXID, Request, Opts) of + {ok, Stats} when is_map(Stats) -> + ?event( + copycat_short, + {arweave_tx_indexed, + {id, {explicit, TXID}}, + {items_indexed, maps:get(items_count, Stats, 0)}, + {bundle_txs, maps:get(bundle_count, Stats, 0)}, + {skipped_txs, maps:get(skipped_count, Stats, 0)} + } + ), + {ok, Stats#{ + <<"body">> => maps:get(items_count, Stats, 0) + }}; + _ -> + {ok, #{ + items_count => 0, + bundle_count => 0, + skipped_count => 0, + <<"body">> => 0 + }} + end; + error -> + case parse_range(Request, Opts) of + {error, unavailable} -> + {error, unavailable}; + {ok, {From, To}} -> + TargetDepth = request_depth( + Request, ?DEFAULT_BLOCK_DEPTH, Opts), + ?event(copycat_short, + {indexing_blocks, + {from, From}, {to, To}, + {depth, TargetDepth}} + ), + fetch_blocks(From, To, TargetDepth, Opts) + end + end; + <<"list">> -> + case parse_range(Request, Opts) of + {error, unavailable} -> {error, unavailable}; + {ok, {From, To}} -> list_index(From, To, Opts) + end; + <<"inventory">> -> + case parse_range(Request, Opts) of + {error, unavailable} -> {error, unavailable}; + {ok, {From, To}} -> inventory_index(From, To, Opts) + end; + Mode -> + {error, <<"Unsupported mode `", (hb_util:bin(Mode))/binary, + "`. Supported modes are: write, list, inventory">>} + end. +%% @doc Set bundles descendant recursion cap, avoids recursion +%% in very nested bundles (very rare). +set_depth_recursion_cap(Cap, Opts) when is_integer(Cap), Cap > 0 -> + Opts#{<<"copycat-depth-recursion-cap">> => Cap}. +%% @doc Get the set depth recursion cap from hb_opts. +get_depth_recursion_cap(Opts) -> + hb_opts:get(<<"copycat-depth-recursion-cap">>, undefined, Opts). + +%% @doc Return the effective per-TX memory cap, clamped to the global budget. +%% Lazily initializes the budget pool on first call. +effective_memory_cap(Opts) -> + Budget = + hb_opts:get( + <<"copycat-memory-budget">>, + ?DEFAULT_COPYCAT_MEMORY_BUDGET, + Opts + ), + hb_copycat_budget:ensure_started(Budget), + hb_copycat_budget:get_budget(). + +%% @doc Shift all depth keys in an item ID map by Offset. +shift_item_ids(Map, Offset) -> + maps:fold( + fun(Depth, IDs, Acc) -> Acc#{Depth + Offset => IDs} end, + #{}, + Map + ). + +%% @doc Merge a list of depth→ID-list maps in one pass per depth key. +merge_all_item_ids(Maps) -> + AllKeys = lists:usort(lists:flatmap(fun maps:keys/1, Maps)), + maps:from_list([ + {K, lists:append([maps:get(K, M, []) || M <- Maps])} + || K <- AllKeys]). + +%% @doc Merge two depth→ID-list maps by concatenating lists at each depth. +merge_item_ids(A, B) -> + maps:fold( + fun(Depth, IDs, Acc) -> + Existing = maps:get(Depth, Acc, []), + Acc#{Depth => Existing ++ IDs} + end, + A, + B + ). + +%% @doc Normalize an owner address into the native ID form used for comparisons. +normalize_owner_id(Addr) -> + hb_util:native_id(hb_util:bin(Addr)). + +%% @doc Adds an address to the owners aliases cache in Opts, mapping +%% Alias -> native address for fast lookup and once per address computation. +add_owner_alias(Addr, Alias, Opts) when is_binary(Alias) -> + ExistingAliases = hb_opts:get(<<"owner_aliases">>, #{}, Opts), + Opts#{ <<"owner_aliases">> => ExistingAliases#{ Alias => normalize_owner_id(Addr) }}; +add_owner_alias(_Addr, Alias, _Opts) -> + throw({invalid_owner_alias, Alias}). + +%% @doc Retrieve the address of a given alias. +resolve_owner_alias(Alias, Opts) when is_binary(Alias) -> + Aliases = hb_opts:get(<<"owner_aliases">>, #{}, Opts), + case hb_maps:find(Alias, Aliases) of + {ok, Addr} -> {ok, Addr}; + error -> {error, {owner_alias_not_found, Alias}} + end; +resolve_owner_alias(Alias, _Opts) -> + {error, {invalid_owner_alias, Alias}}. +%% @doc Parse include/exclude owner filters from the request. +%% Supports direct owner values and owner aliases. +parse_owner_filter(Request, Opts) -> + maybe + {ok, IncludeOwner} ?= + resolve_owner_filter_value( + <<"include-owner">>, + <<"include-owner-alias">>, + Request, + Opts + ), + {ok, ExcludeOwner} ?= + resolve_owner_filter_value( + <<"exclude-owner">>, + <<"exclude-owner-alias">>, + Request, + Opts + ), + {ok, #{ + include_owner => IncludeOwner, + exclude_owner => ExcludeOwner + }} + else + {error, _} = Error -> + Error + end. +%% @doc Resolve one owner filter value from either a direct owner param or +%% a comma-separated owner alias param. Alias takes precedence. +resolve_owner_filter_value(OwnerKey, AliasKey, Request, Opts) -> + case hb_maps:find(AliasKey, Request, Opts) of + {ok, Alias} -> + resolve_owner_aliases(Alias, Opts); + error -> + case hb_maps:find(OwnerKey, Request, Opts) of + {ok, Owner} -> + {ok, normalize_owner_id(Owner)}; + error -> + {ok, undefined} + end + end. +%% @doc Resolve one or more comma-separated owner aliases into normalized owner IDs. +resolve_owner_aliases(Alias, Opts) -> + case + lists:filter( + fun(Part) -> byte_size(Part) > 0 end, + binary:split(hb_util:bin(Alias), <<",">>, [global]) + ) + of + [SingleAlias] -> + case resolve_owner_alias(SingleAlias, Opts) of + {ok, Addr} -> {ok, normalize_owner_id(Addr)}; + {error, _} = Error -> Error + end; + Aliases -> + resolve_owner_aliases(Aliases, Opts, []) + end. +%% @doc Resolve a list of owner aliases into normalized owner IDs. +resolve_owner_aliases([], _Opts, Acc) -> + {ok, lists:reverse(Acc)}; +resolve_owner_aliases([Alias | Rest], Opts, Acc) -> + case resolve_owner_alias(Alias, Opts) of + {ok, Addr} -> + resolve_owner_aliases(Rest, Opts, [normalize_owner_id(Addr) | Acc]); + {error, _} = Error -> + Error + end. +%% @doc Parse an L1 tag filter from `Name:Value` form. +parse_tag_filter(Key, Request, Opts) -> + case hb_maps:find(Key, Request, Opts) of + {ok, Tag} -> + case binary:split(hb_util:bin(Tag), <<":">>, [global]) of + [Name, Value] + when byte_size(Name) > 0 andalso byte_size(Value) > 0 -> + {ok, #{name => Name, value => Value}}; + _ -> + {error, invalid_tag_filter} + end; + error -> + {ok, undefined} + end. +%% @doc Process the `id=...` copycat path for an already indexed L1 TX. +%% applies L1-level owner/tag filters on the lightweight TX header first, then, +%% if the TX passes and is a bundle, loads the full L1 payload once and indexes +%% descendants in-memory up to the requested safe depth (defaults to full recursion +%% till the set copycat-depth-recursion-cap). +process_l1_request(TXID, Request, Opts) -> + Depth = request_depth(Request, <<"full">>, Opts), + QueryL1Offset = + hb_util:bool( + hb_maps:get(<<"query-l1-offset">>, Request, false, Opts) + ), + observe_copycat_l1_stage( + <<"l1_request_total">>, + fun() -> + try + maybe + {ok, OwnerFilters} ?= parse_owner_filter(Request, Opts), + {ok, IncludeTag} ?= parse_tag_filter(<<"include-tag">>, Request, Opts), + {ok, ExcludeTag} ?= parse_tag_filter(<<"exclude-tag">>, Request, Opts), + {ok, + maybe_process_l1_tx( + TXID, + OwnerFilters#{ + include_tag => IncludeTag, + exclude_tag => ExcludeTag + }, + Depth, + QueryL1Offset, + Opts + )} + else + {error, _} = Error -> + Error + end + catch + _:Reason:Stacktrace -> + ?event(copycat_short, + {error, + {reason, Reason}, + {stacktrace, Stacktrace}}), + {error, Reason} end + end + ). +%% @doc Parse the requested recursion depth and clamp it to the configured +%% safe cap. Depth is relative so depth 1 is always one level below the +%% root specified in the request (either a block or an L1 TX ID). +%% +%% `full` resolves to the current copycat depth recursion cap. +request_depth(Request, Default, Opts) -> + MaxRecursionCap = get_depth_recursion_cap(Opts), + RequestedDepth = + case hb_maps:get(<<"depth">>, Request, Default, Opts) of + <<"full">> -> MaxRecursionCap; + Value -> hb_util:int(Value) + end, + erlang:min( + MaxRecursionCap, + erlang:max(1, RequestedDepth) + ). +%% @doc Return the first matching L1 filter reason for a TX header, or `pass`. +l1_filter_reason(TX, Filters) -> + IncludeOwner = maps:get(include_owner, Filters, undefined), + ExcludeOwner = maps:get(exclude_owner, Filters, undefined), + IncludeTag = maps:get(include_tag, Filters, undefined), + ExcludeTag = maps:get(exclude_tag, Filters, undefined), + Owner = ar_tx:get_owner_address(TX), + maybe + pass ?= maybe_include_owner(Owner, IncludeOwner), + pass ?= maybe_exclude_owner(Owner, ExcludeOwner), + pass ?= maybe_include_tag(TX, IncludeTag), + pass ?= maybe_exclude_tag(TX, ExcludeTag), + pass + else + Reason -> Reason + end. +%% @doc Match an owner against an undefined, single-owner, or multi-owner filter. +owner_matches_filter(_Owner, undefined) -> + false; +owner_matches_filter(Owner, Owners) when is_list(Owners) -> + lists:member(Owner, Owners); +owner_matches_filter(Owner, FilterOwner) -> + Owner =:= FilterOwner. + +maybe_include_owner(_Owner, undefined) -> + pass; +maybe_include_owner(Owner, IncludeOwner) -> + case owner_matches_filter(Owner, IncludeOwner) of + true -> pass; + false -> include_owner_mismatch + end. + +maybe_exclude_owner(_Owner, undefined) -> + pass; +maybe_exclude_owner(Owner, ExcludeOwner) -> + case owner_matches_filter(Owner, ExcludeOwner) of + true -> exclude_owner_match; + false -> pass + end. + +maybe_include_tag(_TX, undefined) -> + pass; +maybe_include_tag(TX, IncludeTag) -> + case has_tag_pair(TX, IncludeTag) of + true -> pass; + false -> include_tag_mismatch + end. + +maybe_exclude_tag(_TX, undefined) -> + pass; +maybe_exclude_tag(TX, ExcludeTag) -> + case has_tag_pair(TX, ExcludeTag) of + true -> exclude_tag_match; + false -> pass end. +has_tag_pair(#tx{tags = Tags}, #{name := Name, value := Value}) -> + TagValue = dev_arweave_common:tagfind(Name, Tags, not_found), + case TagValue of + not_found -> + false; + _ -> + LowerTagValue = hb_util:to_lower(TagValue), + LowerValue = hb_util:to_lower(Value), + case LowerTagValue of + LowerValue -> true; + _ -> false + end + end; +has_tag_pair(_, _) -> + false. %% @doc Parse the range from the request. parse_range(Request, Opts) -> maybe @@ -78,17 +407,6 @@ latest_height(Opts) -> {error, Reason} -> {error, Reason} end. -%% @doc Check if a transaction ID is indexed in the arweave index store. -is_tx_indexed(TXID, Opts) -> - case hb_store_arweave:store_from_opts(Opts) of - no_store -> false; - #{ <<"index-store">> := Store } -> - case hb_store:read(Store, hb_store_arweave_offset:path(TXID), Opts) of - {ok, _} -> true; - {error, not_found} -> false - end - end. - %% @doc List indexed blocks and transactions in the given range. %% Returns JSON with block heights as keys, each containing indexed and not-indexed lists. list_index(From, undefined, Opts) -> @@ -117,19 +435,26 @@ list_index_blocks(Current, To, Opts, Acc) -> [] -> list_index_blocks(Current - 1, To, Opts, Acc); _ -> - {IndexedTXs, NotIndexedTXs} = classify_txs(TXIDs, Opts), + {IndexedTXs, _NotIndexedTXs} = classify_txs(TXIDs, Opts), case IndexedTXs of [] -> % Do not include blocks with no locally indexed TXs. list_index_blocks(Current - 1, To, Opts, Acc); _ -> BlockKey = hb_util:bin(Current), - NewAcc = Acc#{ - BlockKey => #{ - <<"indexed">> => IndexedTXs, - <<"not-indexed">> => NotIndexedTXs - } - }, + BlockInfo = assemble_block_info( + Current, Block, Opts), + WithItems = case maps:get( + <<"depth">>, BlockInfo, undefined) + of + undefined -> BlockInfo; + _ -> + BlockInfo#{ + <<"items">> => + hb_store_arweave:read_block_item_counts( + Current, Opts)} + end, + NewAcc = Acc#{BlockKey => WithItems}, list_index_blocks(Current - 1, To, Opts, NewAcc) end end; @@ -137,6 +462,50 @@ list_index_blocks(Current, To, Opts, Acc) -> list_index_blocks(Current - 1, To, Opts, Acc) end. +%% @doc Build base block info with indexed/not-indexed TXs and optional depth. +assemble_block_info(Height, Block, Opts) -> + TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), + {IndexedTXs, NotIndexedTXs} = classify_txs(TXIDs, Opts), + Base = #{ + <<"indexed">> => IndexedTXs, + <<"not-indexed">> => NotIndexedTXs + }, + case hb_store_arweave:read_block_marker_depth(Height, Opts) of + undefined -> Base; + Depth -> Base#{<<"depth">> => Depth} + end. + +%% @doc mode=inventory: return per-depth item ID lists from the local index store. +%% Supports range queries. The inventory read itself is local-only (no network). +%% Note: range parsing may call latest_height/1 if from/to are omitted or negative. +inventory_index(From, undefined, Opts) -> + inventory_index(From, 0, Opts); +inventory_index(From, To, _Opts) when From < To -> + {ok, #{ + <<"content-type">> => <<"application/json">>, + <<"body">> => hb_json:encode(#{}) + }}; +inventory_index(From, To, Opts) -> + Result = inventory_local(From, To, Opts, #{}), + JSON = hb_json:encode(Result), + {ok, #{ + <<"content-type">> => <<"application/json">>, + <<"body">> => JSON + }}. + +inventory_local(Current, To, _Opts, Acc) when Current < To -> Acc; +inventory_local(Current, To, Opts, Acc) -> + case hb_store_arweave:read_block_marker_depth(Current, Opts) of + undefined -> + inventory_local(Current - 1, To, Opts, Acc); + Depth -> + ItemIDs = hb_store_arweave:read_block_item_ids(Current, Opts), + BlockKey = hb_util:bin(Current), + BlockInfo = #{<<"depth">> => Depth, <<"items">> => ItemIDs}, + inventory_local(Current - 1, To, Opts, + Acc#{BlockKey => BlockInfo}) + end. + fetch_block_header(Height, Opts) -> ?event(debug_copycat, {fetching_block, Height}), observe_event(<<"block_header">>, fun() -> @@ -154,7 +523,7 @@ fetch_block_header(Height, Opts) -> classify_txs(TXIDs, Opts) -> lists:foldl( fun(TXID, {IndexedAcc, NotIndexedAcc}) -> - case is_tx_indexed(TXID, Opts) of + case hb_store_arweave:is_tx_indexed(TXID, Opts) of true -> {[TXID | IndexedAcc], NotIndexedAcc}; false -> {IndexedAcc, [TXID | NotIndexedAcc]} end @@ -165,59 +534,151 @@ classify_txs(TXIDs, Opts) -> %% @doc Fetch blocks from an Arweave node while moving downward from `Current'. %% If `To' is provided, every block in [`To', `Current'] is processed. If `To' -%% is omitted, stop at the first block where any TX is already indexed. -fetch_blocks(Req, Current, To, _Opts) when is_integer(To), Current < To -> +%% is omitted, stop at the first block already indexed at the requested depth +%% (via block markers above cutover, or legacy per-TX check below cutover). +fetch_blocks(Current, To, TargetDepth, _Opts) + when is_integer(To), Current < To -> ?event(copycat_short, {arweave_block_indexing_completed, {reached_target, To}, - {initial_request, Req} + {target_depth, TargetDepth} } ), {ok, To}; -fetch_blocks(_Req, Current, undefined, _Opts) when Current < 0 -> +fetch_blocks(Current, undefined, _TargetDepth, _Opts) when Current < 0 -> {ok, 0}; -fetch_blocks(Req, Current, undefined, Opts) -> - BlockRes = fetch_block_header(Current, Opts), - case is_already_indexed(BlockRes, Opts) of - true -> +fetch_blocks(Current, undefined, TargetDepth, Opts) -> + BlockWorkers = block_workers(Opts), + fetch_blocks_open_ended(Current, TargetDepth, BlockWorkers, Opts); +fetch_blocks(Current, To, TargetDepth, Opts) -> + BlockWorkers = block_workers(Opts), + fetch_blocks_ranged(Current, To, TargetDepth, BlockWorkers, Opts). + +block_workers(Opts) -> + max(1, hb_opts:get(<<"arweave-block-workers">>, 3, Opts)). + +%% @doc Process a known range of blocks in parallel batches. +fetch_blocks_ranged(Current, To, TargetDepth, _Workers, _Opts) + when Current < To -> + ?event(copycat_short, + {arweave_block_indexing_completed, + {reached_target, To}, + {target_depth, TargetDepth} + } + ), + {ok, To}; +fetch_blocks_ranged(Current, To, TargetDepth, Workers, Opts) -> + BatchEnd = max(To, Current - Workers + 1), + Heights = lists:seq(Current, BatchEnd, -1), + hb_pmap:parallel_map( + Heights, + fun(H) -> + case hb_store_arweave:is_block_indexed(H, TargetDepth, Opts) of + true -> ok; + false -> + observe_event(<<"block_indexed">>, fun() -> + fetch_and_process_block(H, To, TargetDepth, Opts) + end) + end + end, + Workers + ), + fetch_blocks_ranged(BatchEnd - 1, To, TargetDepth, Workers, Opts). + +%% @doc Process blocks until an already-indexed block is found. +%% Fetches headers in parallel, stops at the first indexed block, +%% then processes the unindexed prefix in parallel. +fetch_blocks_open_ended(Current, _TargetDepth, _Workers, _Opts) + when Current < 0 -> + {ok, 0}; +fetch_blocks_open_ended(Current, TargetDepth, Workers, Opts) -> + BatchEnd = max(0, Current - Workers + 1), + Heights = lists:seq(Current, BatchEnd, -1), + HeaderResults = hb_pmap:parallel_map( + Heights, + fun(H) -> {H, fetch_block_header(H, Opts)} end, + Workers + ), + case find_indexed_prefix(HeaderResults, TargetDepth, Opts) of + {all_unindexed, ToProcess} -> + process_prefetched_blocks( + ToProcess, TargetDepth, Workers, Opts), + fetch_blocks_open_ended( + BatchEnd - 1, TargetDepth, Workers, Opts); + {stop_at, StopHeight, ToProcess} -> + process_prefetched_blocks( + ToProcess, TargetDepth, Workers, Opts), ?event(copycat_short, {arweave_block_indexing_completed, - {stop_at_indexed_block, Current}, - {initial_request, Req} + {stop_at_indexed_block, StopHeight} } ), - {ok, Current}; + {ok, StopHeight} + end. + +%% @doc Walk header results in order, return the unindexed prefix and +%% either the stop height or all_unindexed. +find_indexed_prefix(HeaderResults, TargetDepth, Opts) -> + find_indexed_prefix(HeaderResults, TargetDepth, Opts, []). + +find_indexed_prefix([], _TargetDepth, _Opts, Acc) -> + {all_unindexed, lists:reverse(Acc)}; +find_indexed_prefix([{H, BlockRes} | Rest], TargetDepth, Opts, Acc) -> + case is_already_indexed(BlockRes, TargetDepth, Opts) of + true -> + {stop_at, H, lists:reverse(Acc)}; false -> + find_indexed_prefix( + Rest, TargetDepth, Opts, [{H, BlockRes} | Acc]) + end. + +%% @doc Process a list of {Height, BlockRes} tuples in parallel. +process_prefetched_blocks(Blocks, TargetDepth, Workers, Opts) -> + hb_pmap:parallel_map( + Blocks, + fun({H, BlockRes}) -> observe_event(<<"block_indexed">>, fun() -> - process_block(BlockRes, Current, undefined, Opts) - end), - fetch_blocks(Req, Current - 1, undefined, Opts) - end; -fetch_blocks(Req, Current, To, Opts) -> - observe_event(<<"block_indexed">>, fun() -> - fetch_and_process_block(Current, To, Opts) - end), - fetch_blocks(Req, Current - 1, To, Opts). + process_block(BlockRes, H, undefined, TargetDepth, Opts) + end) + end, + Workers + ). -%% @doc Determine whether a fetched block is considered indexed. -%% A block is indexed when any TX from its `txs' list is in the index. -is_already_indexed({ok, Block}, Opts) -> - TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), - lists:any(fun(TXID) -> is_tx_indexed(TXID, Opts) end, TXIDs); -is_already_indexed({error, _}, _Opts) -> +%% @doc Determine whether a fetched block is considered indexed at the +%% requested depth. Checks block markers first. For blocks at or above +%% the cutover height, the marker is authoritative. For blocks below +%% the cutover, falls back to legacy per-TX check. +is_already_indexed({ok, Block}, TargetDepth, Opts) -> + Height = hb_maps:get(<<"height">>, Block, undefined, Opts), + case hb_store_arweave:is_block_indexed(Height, TargetDepth, Opts) of + true -> + true; + false -> + case hb_store_arweave:is_post_cutover(Height, Opts) of + true -> + false; + false -> + TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), + lists:any( + fun(TXID) -> hb_store_arweave:is_tx_indexed(TXID, Opts) end, + TXIDs + ) + end + end; +is_already_indexed({error, _}, _TargetDepth, _Opts) -> false. -fetch_and_process_block(Current, To, Opts) -> +fetch_and_process_block(Current, To, TargetDepth, Opts) -> BlockRes = fetch_block_header(Current, Opts), - process_block(BlockRes, Current, To, Opts). + process_block(BlockRes, Current, To, TargetDepth, Opts). %% @doc Process a block. -process_block(BlockRes, Current, To, Opts) -> +process_block(BlockRes, Current, To, TargetDepth, Opts) -> case BlockRes of {ok, Block} -> ?event(debug_copycat, {{processing_block, Current}, {indep_hash, hb_maps:get(<<"indep_hash">>, Block, <<>>)}}), - case maybe_index_ids(Block, Opts) of + case maybe_index_block(Block, TargetDepth, Opts) of {block_skipped, Results} -> TotalTXs = maps:get(total_txs, Results, 0), ?event( @@ -233,17 +694,49 @@ process_block(BlockRes, Current, To, Opts) -> TotalTXs = maps:get(total_txs, Results, 0), BundleTXs = maps:get(bundle_count, Results, 0), SkippedTXs = maps:get(skipped_count, Results, 0), - ?event( - copycat_short, - {arweave_block_indexed, - {height, Current}, - {items_indexed, ItemsIndexed}, - {total_txs, TotalTXs}, - {bundle_txs, BundleTXs}, - {skipped_txs, SkippedTXs}, - {target, To} - } - ) + AchievedDepth = maps:get( + achieved_depth, Results, + max(2, TargetDepth)), + ItemIDs = maps:get(item_ids, Results, #{}), + maybe + ok ?= hb_store_arweave:write_block_item_ids( + Current, AchievedDepth, ItemIDs, Opts), + ok ?= hb_store_arweave:mark_block_indexed( + Current, AchievedDepth, Opts), + hb_store_arweave:ensure_cutover_height(Current, Opts), + ?event( + copycat_short, + {arweave_block_indexed, + {height, Current}, + {items_indexed, ItemsIndexed}, + {total_txs, TotalTXs}, + {bundle_txs, BundleTXs}, + {skipped_txs, SkippedTXs}, + {achieved_depth, AchievedDepth}, + {target, To} + } + ) + else + {error, item_ids_write_failed} -> + ?event( + copycat_short, + {arweave_block_metadata_failed, + {height, Current}, + {target, To} + } + ), + throw(item_ids_write_failed); + Error -> + ?event( + copycat_short, + {arweave_block_marker_failed, + {height, Current}, + {target, To}, + {error, Error} + } + ), + throw({writing_to_index_store, Error}) + end end; {error, _} = Error -> ?event( @@ -256,9 +749,9 @@ process_block(BlockRes, Current, To, Opts) -> end. %% @doc Index the IDs of all transactions in the block if configured to do so. -maybe_index_ids(Block, Opts) -> +maybe_index_block(Block, TargetDepth, Opts) -> TotalTXs = length(hb_maps:get(<<"txs">>, Block, [], Opts)), - case hb_opts:get(arweave_index_ids, true, Opts) of + case hb_opts:get(<<"arweave-index-ids">>, true, Opts) of false -> {block_skipped, #{ items_count => 0, @@ -281,14 +774,19 @@ maybe_index_ids(Block, Opts) -> }}; {ok, TXs} -> Height = hb_maps:get(<<"height">>, Block, 0, Opts), + L1IDs = [TX#tx.id || TX <- TXs], TXsWithData = ar_block:generate_size_tagged_list_from_txs(TXs, Height), - % Filter out padding entries before processing ValidTXs = lists:filter( fun({{padding, _}, _}) -> false; (_) -> true end, TXsWithData ), - TXResults = process_txs(ValidTXs, BlockStartOffset, Opts), - {block_cached, TXResults#{total_txs => TotalTXs}} + TXResults = process_block_txs( + ValidTXs, BlockStartOffset, TargetDepth, Height, Opts), + ExistingIDs = maps:get(item_ids, TXResults, #{}), + {block_cached, TXResults#{ + total_txs => TotalTXs, + item_ids => ExistingIDs#{1 => L1IDs} + }} end end. @@ -302,10 +800,11 @@ parallel_map(Items, Fun, Opts) -> %% @doc Process a single transaction and return its contribution to the counters. %% Returns a map with keys: items_count, bundle_count, skipped_count -process_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, _Opts) -> - #{items_count => 0, bundle_count => 0, skipped_count => 0}; -process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> - IndexStore = hb_store_arweave:store_from_opts(Opts), +process_block_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, TargetDepth, _BlockHeight, _Opts) -> + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => max(2, TargetDepth)}; +process_block_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, TargetDepth, BlockHeight, Opts) -> + ArweaveStore = hb_store_arweave:store_from_opts(Opts), TXID = hb_util:encode(TX#tx.id), TXEndOffset = BlockStartOffset + EndOffset, TXStartOffset = TXEndOffset - TX#tx.data_size, @@ -314,17 +813,42 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> {offset, TXStartOffset}, {size, TX#tx.data_size} }), - observe_event(<<"item_indexed">>, fun() -> + ok = observe_event(<<"item_indexed">>, fun() -> hb_store_arweave:write_offset( - IndexStore, + ArweaveStore, TXID, <<"tx@1.0">>, TXStartOffset, - TX#tx.data_size + TX#tx.data_size, + Opts ) end), - case is_bundle_tx(TX, Opts) of - false -> #{items_count => 0, bundle_count => 0, skipped_count => 0}; + #{ <<"index-store">> := IndexStore } = ArweaveStore, + ok = hb_store_arweave:write_parent(TX#tx.id, BlockHeight, block, IndexStore, Opts), + try is_bundle_tx(TX, Opts) of + false -> + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => max(2, TargetDepth)}; + true when TargetDepth > 2 -> + %% Retry to preserve bundle count + try + L1Result = process_l1_tx_direct( + TXStartOffset, TX#tx.data_size, + TargetDepth - 1, ArweaveStore, TXID, TX#tx.id, Opts), + L1Result#{ + achieved_depth => + max(2, maps:get(achieved_depth, L1Result, 0)) + } + catch + _:Reason:Stacktrace -> + ?event(copycat_short, + {arweave_bundle_skipped, + {tx, {explicit, TX#tx.id}}, + {reason, Reason}, + {stacktrace, Stacktrace}}), + #{items_count => 0, bundle_count => 1, + skipped_count => 1, achieved_depth => 0} + end; true -> % Lightweight processing of block transactions to depth 2. We % can avoid loading the full L1 TX data into memory, and instead @@ -340,31 +864,34 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> ), case BundleRes of {ok, HeaderSize, BundleIndex} -> - % Batch event tracking: measure total time and count for all write_offset calls {TotalTime, {_, ItemsCount}} = timer:tc(fun() -> lists:foldl( fun({ItemID, Size}, {ItemStartOffset, ItemsCountAcc}) -> - hb_store_arweave:write_offset( - IndexStore, + ok = hb_store_arweave:write_offset( + ArweaveStore, hb_util:encode(ItemID), <<"ans104@1.0">>, ItemStartOffset, - Size + Size, + Opts ), + ok = hb_store_arweave:write_parent(ItemID, TX#tx.id, bundle, IndexStore, Opts), {ItemStartOffset + Size, ItemsCountAcc + 1} end, {TXStartOffset + HeaderSize, 0}, BundleIndex ) end), + L2IDs = [ItemID || {ItemID, _Size} <- BundleIndex], ?event(debug_copycat, {bundle_items_indexed, {tx_id, {string, TXID}}, {items_count, ItemsCount} }), - % Single event increment for the batch record_event_metrics(<<"item_indexed">>, ItemsCount, TotalTime), - #{items_count => ItemsCount, bundle_count => 1, skipped_count => 0}; + #{items_count => ItemsCount, bundle_count => 1, + skipped_count => 0, achieved_depth => 2, + item_ids => #{2 => L2IDs}}; {error, Reason} -> ?event( copycat_short, @@ -373,129 +900,602 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> {reason, Reason} } ), - #{items_count => 0, bundle_count => 1, skipped_count => 1} + #{items_count => 0, bundle_count => 1, + skipped_count => 1, achieved_depth => 0} end + catch + _:Reason:Stacktrace -> + ?event(copycat_short, + {arweave_bundle_skipped, + {tx, {explicit, TX#tx.id}}, + {reason, Reason}, + {stacktrace, Stacktrace}}), + #{items_count => 0, bundle_count => 0, skipped_count => 1, achieved_depth => 0} end. +download_bundle_header(EndOffset, Size, Opts) -> + observe_event(<<"bundle_header">>, fun() -> + dev_arweave:bundle_header(EndOffset - Size, Opts) + end). + %% @doc Process transactions: spawn workers and manage the worker pool. %% This function processes transactions in parallel using parallel_map. %% When arweave_index_workers <= 1, processes sequentially (one worker at a time). %% When arweave_index_workers > 1, processes in parallel with the specified concurrency limit. %% Returns a map with keys: items_count, bundle_count, skipped_count. -process_txs(ValidTXs, BlockStartOffset, Opts) -> +process_block_txs(ValidTXs, BlockStartOffset, TargetDepth, BlockHeight, Opts) -> Results = parallel_map( ValidTXs, - fun(TXWithData) -> process_tx(TXWithData, BlockStartOffset, Opts) end, + fun(TXWithData) -> process_block_tx( + TXWithData, BlockStartOffset, TargetDepth, BlockHeight, Opts) end, Opts ), - lists:foldl( + Folded = lists:foldl( fun(Result, Acc) -> #{ - items_count => maps:get(items_count, Result, 0) + maps:get(items_count, Acc, 0), - bundle_count => maps:get(bundle_count, Result, 0) + maps:get(bundle_count, Acc, 0), - skipped_count => maps:get(skipped_count, Result, 0) + maps:get(skipped_count, Acc, 0) + items_count => + maps:get(items_count, Result, 0) + + maps:get(items_count, Acc, 0), + bundle_count => + maps:get(bundle_count, Result, 0) + + maps:get(bundle_count, Acc, 0), + skipped_count => + maps:get(skipped_count, Result, 0) + + maps:get(skipped_count, Acc, 0), + achieved_depth => + min( + maps:get(achieved_depth, Result, ?DEPTH_SENTINEL), + maps:get(achieved_depth, Acc, ?DEPTH_SENTINEL) + ) } end, - #{items_count => 0, bundle_count => 0, skipped_count => 0}, + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => ?DEPTH_SENTINEL}, Results - ). - -%% @doc Check whether a TX header indicates bundle content. -is_bundle_tx(TX, _Opts) -> - dev_arweave_common:type(TX) =/= binary. - -%% @doc Download and decode a bundle header from chunk data. -download_bundle_header(EndOffset, Size, Opts) -> - observe_event(<<"bundle_header">>, fun() -> - dev_arweave:bundle_header(EndOffset - Size, Size, Opts) - end). - -resolve_tx_headers(TXIDs, Opts) -> - Results = parallel_map( - TXIDs, - fun(TXID) -> resolve_tx_header(TXID, Opts) end, - Opts ), - case lists:any(fun(Res) -> Res =:= error end, Results) of - true -> error; - false -> - TXs = lists:foldr( - fun({ok, TX}, Acc) -> [TX | Acc] end, - [], - Results - ), - {ok, TXs} + MergedIDs = merge_all_item_ids( + [maps:get(item_ids, R, #{}) || R <- Results]), + Folded2 = Folded#{item_ids => MergedIDs}, + case maps:get(achieved_depth, Folded2) of + ?DEPTH_SENTINEL -> + Folded2#{achieved_depth => max(2, TargetDepth)}; + _ -> + Folded2 end. -resolve_tx_header(TXID, Opts) -> - try - ?event(debug_copycat, {fetching_tx, {explicit, TXID}}), - ResolveRes = observe_event(<<"tx_header">>, fun() -> - hb_ao:resolve( - << - ?ARWEAVE_DEVICE/binary, - "/tx&tx=", - TXID/binary, - "&exclude-data=true" - >>, - Opts - ) - end), - case ResolveRes of - {ok, StructuredTXHeader} -> - {ok, - hb_message:convert( - StructuredTXHeader, - <<"tx@1.0">>, - <<"structured@1.0">>, - Opts)}; - {error, ResolveError} -> - ?event( - copycat_short, - {arweave_tx_skipped, - {tx_id, {explicit, TXID}}, - {reason, ResolveError} - } - ), - error +%% @doc Process a single indexed L1 TX candidate after lightweight filter checks. +maybe_process_l1_tx(TXID, Filters, Depth, QueryL1Offset, Opts) -> + Skipped = #{items_count => 0, bundle_count => 0, skipped_count => 1, + achieved_depth => 0}, + NormalizedTXID = hb_util:native_id(TXID), + EncodedTXID = hb_util:encode(NormalizedTXID), + IndexStore = hb_store_arweave:store_from_opts(Opts), + ?event(copycat_short, + {indexing_l1_tx, {tx_id, {explicit, EncodedTXID}}, + {depth, Depth}, + {query_l1_offset, QueryL1Offset} + }), + maybe + {ok, + #{ + <<"codec-device">> := <<"tx@1.0">>, + <<"start-offset">> := StartOffset, + <<"length">> := Length + }} ?= + observe_copycat_l1_stage( + <<"l1_offset_lookup">>, + fun() -> + ensure_l1_tx_offset( + NormalizedTXID, + EncodedTXID, + IndexStore, + QueryL1Offset, + Opts + ) + end + ), + {ok, TX} ?= resolve_tx_header(EncodedTXID, Opts), + pass ?= l1_filter_reason(TX, Filters), + bundle ?= + case is_bundle_tx(TX, Opts) of + true -> bundle; + false -> not_bundle + end, + within_effective_cap ?= + case Length =< effective_memory_cap(Opts) of + true -> within_effective_cap; + false -> effective_cap_exceeded + end, + ok ?= hb_copycat_budget:lease(Length), + try process_l1_tx( + StartOffset, + Length, + Depth, + IndexStore, + EncodedTXID, + hb_util:decode(EncodedTXID), + Opts + ) + after + hb_copycat_budget:release(Length) end - catch - Class:Reason:_ -> + else + {error, Reason} -> ?event( copycat_short, {arweave_tx_skipped, - {tx_id, {explicit, TXID}}, - {class, Class}, + {tx_id, {explicit, EncodedTXID}}, {reason, Reason} } ), - error + Skipped; + error -> + % event already logged in resolve_tx_header + Skipped; + not_bundle -> + ?event( + copycat_short, + {arweave_tx_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, not_bundle} + } + ), + Skipped; + effective_cap_exceeded -> + ?event( + copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, effective_cap_exceeded} + } + ), + #{ + items_count => 0, + bundle_count => 1, + skipped_count => 1, + achieved_depth => 0 + }; + FilterReason -> + ?event( + copycat_short, + {arweave_tx_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, FilterReason} + } + ), + Skipped end. -%% @doc Record event metrics (count and duration) using hb_event:increment. -record_event_metrics(MetricName, Count, Duration) -> - hb_event:increment(<<"arweave_block_count">>, MetricName, #{}, Count), - hb_event:increment(<<"arweave_block_duration">>, MetricName, #{}, Duration). - -%% @doc Track an operation's execution time and count using hb_event:increment. -%% Always tracks both count and duration, regardless of success/failure. -observe_event(MetricName, Fun) -> - {Time, Result} = timer:tc(Fun), - record_event_metrics(MetricName, 1, Time), - Result. - -%%% Tests +%% @doc Fast path for depth>2 block indexing. Skips offset lookup and +%% header re-fetch since the caller already has both. +process_l1_tx_direct(StartOffset, Length, Depth, IndexStore, EncodedTXID, ParentID, Opts) -> + EffectiveCap = effective_memory_cap(Opts), + case Length > EffectiveCap of + true -> + ?event(copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, effective_cap_exceeded} + } + ), + #{items_count => 0, bundle_count => 1, + skipped_count => 1, achieved_depth => 0}; + false -> + ok = hb_copycat_budget:lease(Length), + try + process_l1_tx( + StartOffset, Length, Depth, + IndexStore, EncodedTXID, ParentID, Opts) + after + hb_copycat_budget:release(Length) + end + end. -index_ids_test_parallel() -> - %% Test block: https://viewblock.io/arweave/block/1827942 - %% Note: this block includes a data item with an Ethereum signature. This - %% signature type is not yet (as of Jan 2026) supported by ar_bundles.erl, - %% however we should still be able to index it (we just can't deserialize - %% it). - {_TestStore, StoreOpts, Opts} = setup_index_opts(), - {ok, 1827942} = - hb_ao:resolve( - <<"~copycat@1.0/arweave&from=1827942&to=1827942">>, +%% @doc Load the L1 TX data into memory and index it. +process_l1_tx( + StartOffset, Length, Depth, IndexStore, EncodedTXID, ParentID, Opts) -> + case observe_copycat_l1_stage( + <<"l1_read_chunks">>, + fun() -> hb_store_arweave:read_chunks(StartOffset, Length, Opts) end + ) of + {ok, BundleData} -> + {TotalTime, IndexRes} = timer:tc( + fun() -> + observe_copycat_l1_stage( + <<"l1_full_bundle_index">>, + fun() -> + index_full_bundle_bytes( + BundleData, + StartOffset, + Depth, + IndexStore, + ParentID, + Opts + ) + end + ) + end + ), + case IndexRes of + {ok, ItemsCount, AchievedDepth, BundleIDs} -> + record_event_metrics( + <<"item_indexed">>, + ItemsCount, + TotalTime + ), + #{ + items_count => ItemsCount, + bundle_count => 1, + skipped_count => 0, + achieved_depth => 1 + AchievedDepth, + item_ids => shift_item_ids(BundleIDs, 1) + }; + {error, Reason} -> + ?event( + copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, Reason} + } + ), + #{ + items_count => 0, + bundle_count => 1, + skipped_count => 1, + achieved_depth => 0 + } + end; + {error, Reason} -> + ?event( + copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, Reason} + } + ), + #{ + items_count => 0, + bundle_count => 1, + skipped_count => 1, + achieved_depth => 0 + }; + not_found -> + ?event( + copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, EncodedTXID}}, + {reason, not_found} + } + ), + #{ + items_count => 0, + bundle_count => 1, + skipped_count => 1, + achieved_depth => 0 + } + end. +%% @doc Ensure the root L1 TX offset exists locally before `id=...` indexing. +%% if the offset is missing and `query_l1_offset` is enabled, fetches the TX +%% offset metadata from Arweave, writes it to the local offset store, and +%% retries the local lookup. +ensure_l1_tx_offset(_TXID, _EncodedTXID, IndexStore, _LoadL1Offset, _Opts) + when is_map(IndexStore) =:= false -> + {error, missing_offset}; +ensure_l1_tx_offset(TXID, EncodedTXID, IndexStore, QueryL1Offset, Opts) -> + case hb_store_arweave:read_offset(IndexStore, TXID) of + {ok, _} = OffsetRes -> + OffsetRes; + not_found when QueryL1Offset -> + ?event( + copycat_short, + {arweave_tx_querying_offset, + {tx_id, {explicit, EncodedTXID}}, + {source, network} + } + ), + case query_l1_tx_offset(EncodedTXID, IndexStore, Opts) of + ok -> + case hb_store_arweave:read_offset(IndexStore, TXID) of + {ok, _} = OffsetRes -> + OffsetRes; + not_found -> + {error, missing_offset} + end; + {error, Reason} -> + {error, Reason} + end; + not_found -> + {error, missing_offset} + end. + +query_l1_tx_offset(TXID, IndexStore, Opts) -> + case observe_copycat_l1_stage( + <<"l1_offset_query_http">>, + fun() -> + hb_http:request( + #{ + <<"path">> => <<"/arweave/tx/", TXID/binary, "/offset">>, + <<"method">> => <<"GET">> + }, + Opts + ) + end + ) of + {ok, #{ <<"body">> := OffsetBody }} -> + OffsetMsg = hb_json:decode(OffsetBody), + EndOffset = hb_util:int(maps:get(<<"offset">>, OffsetMsg)), + Size = hb_util:int(maps:get(<<"size">>, OffsetMsg)), + StartOffset = EndOffset - Size, + ok = observe_copycat_l1_stage( + <<"l1_offset_query_store_write">>, + fun() -> + hb_store_arweave:write_offset( + IndexStore, + TXID, + <<"tx@1.0">>, + StartOffset, + Size, + Opts + ) + end + ), + ok; + {error, Reason} -> + {error, Reason}; + not_found -> + {error, not_found} + end. + +index_full_bundle_bytes(_BundleData, _BundleStartOffset, Depth, _Store, _ParentID, _Opts) + when Depth =< 0 -> + {ok, 0, 0, #{}}; +index_full_bundle_bytes(BundleData, BundleStartOffset, Depth, Store, ParentID, Opts) -> + case ar_bundles:decode_bundle_header(BundleData) of + invalid_bundle_header -> + {error, invalid_bundle_header}; + {ItemsBin, BundleIndex} -> + HeaderSize = byte_size(BundleData) - byte_size(ItemsBin), + index_full_bundle_items( + BundleIndex, + ItemsBin, + BundleStartOffset + HeaderSize, + Depth, + Store, + ParentID, + Opts, + 0, + ?DEPTH_SENTINEL, + [], + #{} + ) + end. + +%% @doc Index bundle children from decoded bundle bytes and recurse descendants in-memory. +%% Returns {ok, Count, MinAchievedDepth, ItemIDs} or {error, Reason}. +%% ItemIDs is a map of relative-depth => list of raw 32-byte IDs. +index_full_bundle_items( + [], _ItemsBin, _ItemStartOffset, Depth, _Store, _ParentID, _Opts, + Count, MinDepth, ThisLevelIDs, DescIDs) -> + FinalDepth = case MinDepth of + ?DEPTH_SENTINEL -> Depth; + _ -> 1 + MinDepth + end, + AllIDs = DescIDs#{1 => lists:reverse(ThisLevelIDs)}, + {ok, Count, FinalDepth, AllIDs}; +index_full_bundle_items( + [{ItemID, Size} | Rest], + ItemsBin, + ItemStartOffset, + Depth, + #{ <<"index-store">> := IndexStore } = Store, + ParentID, + Opts, + Count, + MinDepth, + ThisLevelIDs, + DescIDs +) when byte_size(ItemsBin) >= Size -> + ItemBinary = binary:part(ItemsBin, 0, Size), + EncodedItemID = hb_util:encode(ItemID), + ParseResult = validate_and_flag_item_id( + ItemBinary, ItemID, EncodedItemID, IndexStore), + ok = hb_store_arweave:write_offset( + Store, + EncodedItemID, + <<"ans104@1.0">>, + ItemStartOffset, + Size, + Opts + ), + ok = hb_store_arweave:write_parent(ItemID, ParentID, bundle, IndexStore, Opts), + {DescendantCount, ItemAchievedDepth, ChildIDs} = + case {Depth > 1, ParseResult} of + {true, {ok, HeaderSize, ParsedItem}} -> + index_full_bundle_descendants_parsed( + ParsedItem, HeaderSize, + ItemStartOffset, Depth - 1, Store, ItemID, Opts); + _ -> + {0, Depth - 1, #{}} + end, + ShiftedChildIDs = shift_item_ids(ChildIDs, 1), + index_full_bundle_items( + Rest, + binary:part(ItemsBin, Size, byte_size(ItemsBin) - Size), + ItemStartOffset + Size, + Depth, + Store, + ParentID, + Opts, + Count + 1 + DescendantCount, + min(MinDepth, ItemAchievedDepth), + [ItemID | ThisLevelIDs], + merge_item_ids(DescIDs, ShiftedChildIDs) + ); +index_full_bundle_items( + _BundleIndex, _ItemsBin, _ItemStartOffset, _Depth, + _Store, _ParentID, _Opts, _Count, _MinDepth, _ThisLevelIDs, _DescIDs) -> + {error, invalid_bundle_header}. + +%% @doc Recurse into a nested data item using an already-parsed header. +%% Returns {Count, AchievedDepth, ItemIDs}. +index_full_bundle_descendants_parsed( + _ParsedItem, _HeaderSize, _ItemStartOffset, Depth, _Store, _ParentID, _Opts) + when Depth =< 0 -> + {0, 0, #{}}; +index_full_bundle_descendants_parsed( + ParsedItem, HeaderSize, ItemStartOffset, Depth, Store, ParentID, Opts) -> + case is_bundle_tx(ParsedItem, Opts) of + true -> + case index_full_bundle_bytes( + ParsedItem#tx.data, + ItemStartOffset + HeaderSize, + Depth, + Store, + ParentID, + Opts + ) of + {ok, Count, ChildDepth, ChildIDs} -> + {Count, ChildDepth, ChildIDs}; + _ -> + {0, 0, #{}} + end; + false -> + {0, Depth, #{}} + end. + +%% @doc Validate an item ID by hashing the signature from the deserialized +%% header. Returns {ok, HeaderSize, ParsedItem} on successful parse, or +%% error if deserialization fails. Mismatch flags are written but don't +%% prevent the item from being indexed. +validate_and_flag_item_id(ItemBinary, DeclaredID, EncodedDeclaredID, IndexStore) -> + try ar_bundles:deserialize_header(ItemBinary) of + {ok, HeaderSize, ParsedItem} -> + ComputedID = crypto:hash(sha256, ParsedItem#tx.signature), + case ComputedID =:= DeclaredID of + true -> + ok; + false -> + ok = hb_store:write( + IndexStore, + #{hb_store_arweave_offset:mismatch_path(DeclaredID) => ComputedID}, + #{} + ), + ?event(copycat_short, + {item_id_mismatch, + {declared_id, {explicit, EncodedDeclaredID}}, + {computed_id, + {explicit, hb_util:encode(ComputedID)}} + } + ) + end, + {ok, HeaderSize, ParsedItem}; + _ -> + error + catch + _:_ -> + error + end. + +%% @doc Check whether a TX header indicates bundle content. +%% NOTE: This function can throw if transaction tags aren't properly formated +is_bundle_tx(TX, _Opts) -> + dev_arweave_common:type(TX) =/= binary. + +resolve_tx_headers(TXIDs, Opts) -> + Results = parallel_map( + TXIDs, + fun(TXID) -> resolve_tx_header(TXID, Opts) end, + Opts + ), + case lists:any(fun(Res) -> Res =:= error end, Results) of + true -> error; + false -> + TXs = lists:foldr( + fun({ok, TX}, Acc) -> [TX | Acc] end, + [], + Results + ), + {ok, TXs} + end. + +resolve_tx_header(TXID, Opts) -> + try + ?event(debug_copycat, {fetching_tx, {explicit, TXID}}), + ResolveRes = observe_event(<<"tx_header">>, fun() -> + hb_ao:resolve( + << + ?ARWEAVE_DEVICE/binary, + "/tx&tx=", + TXID/binary, + "&exclude-data=true" + >>, + Opts + ) + end), + case ResolveRes of + {ok, StructuredTXHeader} -> + {ok, + hb_message:convert( + StructuredTXHeader, + <<"tx@1.0">>, + <<"structured@1.0">>, + Opts)}; + {error, ResolveError} -> + ?event( + copycat_short, + {arweave_tx_skipped, + {tx_id, {explicit, TXID}}, + {reason, ResolveError} + } + ), + error + end + catch + Class:Reason:_ -> + ?event( + copycat_short, + {arweave_tx_skipped, + {tx_id, {explicit, TXID}}, + {class, Class}, + {reason, Reason} + } + ), + error + end. + +%% @doc Record event metrics (count and duration) using hb_event:increment. +record_event_metrics(MetricName, Count, Duration) -> + hb_event:increment(arweave_block_count, MetricName, #{}, Count), + hb_event:increment(arweave_block_duration, MetricName, #{}, Duration). + +record_copycat_l1_metrics(MetricName, Count, Duration) -> + hb_event:increment(copycat_l1_count, MetricName, #{}, Count), + hb_event:increment(copycat_l1_duration, MetricName, #{}, Duration). + +%% @doc Track an operation's execution time and count using hb_event:increment. +%% Always tracks both count and duration, regardless of success/failure. +observe_event(MetricName, Fun) -> + {Time, Result} = timer:tc(Fun), + record_event_metrics(MetricName, 1, Time), + Result. + +observe_copycat_l1_stage(MetricName, Fun) -> + {Time, Result} = timer:tc(Fun), + record_copycat_l1_metrics(MetricName, 1, Time), + Result. + +%%% Tests + +index_ids_test_parallel() -> + %% Test block: https://viewblock.io/arweave/block/1827942 + %% Note: this block includes a data item with an Ethereum signature. This + %% signature type is not yet (as of Jan 2026) supported by ar_bundles.erl, + %% however we should still be able to index it (we just can't deserialize + %% it). + {_TestStore, StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&depth=2">>, Opts ), ?assertMatch( @@ -554,7 +1554,23 @@ index_ids_test_parallel() -> ], Opts ), - ok. + % L3 item not read when doing L1 depth=1 + assert_item_not_read(<<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, Opts), + ok. + +block_depth_3_test() -> + %% Test block: https://viewblock.io/arweave/block/1827942 + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&depth=3">>, + Opts + ), + % L3 item read when doing depth=3 + assert_item_read( + <<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, + Opts), + ok. %% @doc Test a bundle header that fits in a single chunk. small_bundle_header_test_parallel() -> @@ -616,25 +1632,27 @@ invalid_bundle_header_test_parallel() -> download_bundle_header(EndOffset, Size, Opts)), ok. -invalid_bundle_test_parallel() -> - {_TestStore, _StoreOpts, Opts} = setup_index_opts(), - Block = 1307606, - {ok, Block} = - hb_ao:resolve( - <<"~copycat@1.0/arweave&from=", (hb_util:bin(Block))/binary, "&to=", (hb_util:bin(Block))/binary>>, +invalid_bundle_test_parallel_() -> + {timeout, 60, fun() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1307606, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", (hb_util:bin(Block))/binary, "&to=", (hb_util:bin(Block))/binary>>, + Opts + ), + assert_bundle_read( + <<"8S12ZqO6-_icGkeuH8mFq6x9q7OIoXOqFRGH5k-wshg">>, + [ + {<<"gintz-t6q_kdeP_IBQVGnp9fgFzs-pPGGehXW-V7ZRk">>, <<"1">>} + ], Opts ), - assert_bundle_read( - <<"8S12ZqO6-_icGkeuH8mFq6x9q7OIoXOqFRGH5k-wshg">>, - [ - {<<"gintz-t6q_kdeP_IBQVGnp9fgFzs-pPGGehXW-V7ZRk">>, <<"1">>} - ], - Opts - ), - % L1 TX with bundle tags, but data is not a valid bundle. The L1 TX - % should still be indexed. - assert_item_read(<<"cGNURX2IUt98VKVIeXSfYe6eulNwPEqijaQfvatzd_o">>, Opts), - ok. + % L1 TX with bundle tags, but data is not a valid bundle. The L1 TX + % should still be indexed. + assert_item_read(<<"cGNURX2IUt98VKVIeXSfYe6eulNwPEqijaQfvatzd_o">>, Opts), + ok + end}. block_with_large_integer_test_parallel() -> {_TestStore, _StoreOpts, Opts} = setup_index_opts(), @@ -868,6 +1886,9 @@ auto_stop_on_indexed_block_test_parallel() -> ?assert(has_any_indexed_tx(Higher1, Opts)), ?assert(has_any_indexed_tx(IndexedBlock, Opts)), ?assertNot(has_any_indexed_tx(IndexedBlock-1, Opts)), + ?assert(hb_store_arweave:is_block_indexed(IndexedBlock, 2, Opts)), + ?assert(hb_store_arweave:is_block_indexed(Higher1, 2, Opts)), + ?assert(hb_store_arweave:is_block_indexed(Higher2, 2, Opts)), ok. explicit_to_reindexes_all_test_parallel() -> @@ -922,7 +1943,7 @@ auto_stop_partial_index_test_parallel() -> TXIDs = hb_maps:get(<<"txs">>, BlockData, [], Opts), ?assert(length(TXIDs) > 0), [OneTXID | _] = TXIDs, - hb_store_arweave:write_offset(StoreOpts, OneTXID, <<"tx@1.0">>, 0, 0), + ok = hb_store_arweave:write_offset(StoreOpts, OneTXID, <<"tx@1.0">>, 0, 0, Opts), {ok, Block} = hb_ao:resolve( << @@ -935,6 +1956,8 @@ auto_stop_partial_index_test_parallel() -> ?assert(has_any_indexed_tx(HigherBlock, Opts)), ?assert(has_any_indexed_tx(Block, Opts)), ?assertNot(has_any_indexed_tx(Block-1, Opts)), + ?assert(hb_store_arweave:is_block_indexed(HigherBlock, 2, Opts)), + ?assertNot(hb_store_arweave:is_block_indexed(Block, 2, Opts)), ok. negative_parse_range_test_parallel() -> @@ -1060,76 +2083,420 @@ negative_from_index_test_parallel() -> ?assertNot(has_any_indexed_tx(NextBlock + 1, Opts)), ok. -setup_index_opts() -> - TestStore = hb_test_utils:test_store(), - StoreOpts = #{ <<"index-store">> => [TestStore] }, - Store = [ - TestStore, - #{ - <<"store-module">> => hb_store_fs, - <<"name">> => <<"cache-mainnet">> - }, - #{ - <<"store-module">> => hb_store_arweave, - <<"name">> => <<"cache-arweave">>, - <<"index-store">> => [TestStore], - <<"arweave-node">> => <<"https://arweave.net">> - }, - #{ - <<"store-module">> => hb_store_gateway, - <<"subindex">> => [ - #{ - <<"name">> => <<"Data-Protocol">>, - <<"value">> => <<"ao">> - } - ], - <<"local-store">> => [TestStore] - }, - #{ - <<"store-module">> => hb_store_gateway, - <<"local-store">> => [TestStore] - } - ], - Opts = #{ - <<"store">> => Store, - <<"arweave-index-ids">> => true, - <<"arweave-index-store">> => StoreOpts - }, - {TestStore, StoreOpts, Opts}. - -assert_bundle_read(BundleID, ExpectedItems, Opts) -> - ReadItems = - lists:map( - fun({ItemID, _Index}) -> - assert_item_read(ItemID, Opts) - end, - ExpectedItems +owner_alias_roundtrip_test_parallel() -> + Opts1 = + add_owner_alias( + <<"FPjbN7EVwP3XwQJx8qnKqJDYa4TLJ0Y8gu4AaiUuW1c">>, + <<"turbo">>, + #{} ), - Bundle = assert_item_read(BundleID, Opts), - lists:foreach( - fun({{_ItemID, Index}, Item}) -> - QueriedItem = hb_ao:get(Index, Bundle, Opts), - ?assertEqual(hb_maps:without(?AO_CORE_KEYS, Item), hb_maps:without(?AO_CORE_KEYS, QueriedItem)) - end, - lists:zip(ExpectedItems, ReadItems) + Opts2 = + add_owner_alias( + <<"JNC6vBhU4sAK5T49VL4k79vNer0tZjM8fI1gpqUQK5g">>, + <<"redstone">>, + Opts1 + ), + ?assertEqual( + {ok, normalize_owner_id(<<"FPjbN7EVwP3XwQJx8qnKqJDYa4TLJ0Y8gu4AaiUuW1c">>)}, + resolve_owner_alias(<<"turbo">>, Opts2) + ), + ?assertEqual( + {ok, normalize_owner_id(<<"JNC6vBhU4sAK5T49VL4k79vNer0tZjM8fI1gpqUQK5g">>)}, + resolve_owner_alias(<<"redstone">>, Opts2) + ), + ?assertEqual( + {error, {owner_alias_not_found, <<"unknown">>}}, + resolve_owner_alias(<<"unknown">>, Opts2) ), ok. -assert_item_read(ItemID, Opts) -> - ?event(debug_test, {resolving, {explicit, ItemID}}), - Resolved = hb_ao:resolve(ItemID, Opts), - ?assertMatch({ok, _}, Resolved, ItemID), - {ok, Item} = Resolved, - ?event(debug_test, {item, Item}), - ?assert(hb_message:verify(Item, all, Opts)), - ?assertEqual(ItemID, hb_message:id(Item, signed)), - Item. - -has_any_indexed_tx(Height, Opts) -> - case fetch_block_header(Height, Opts) of - {ok, Block} -> +parse_tag_filter_test() -> + ?assertEqual( + {ok, #{name => <<"App-Name">>, value => <<"ao">>}}, + parse_tag_filter(<<"include-tag">>, #{<<"include-tag">> => <<"App-Name:ao">>}, #{}) + ), + ?assertEqual( + {ok, undefined}, + parse_tag_filter(<<"include-tag">>, #{}, #{}) + ), + ?assertEqual( + {error, invalid_tag_filter}, + parse_tag_filter(<<"include-tag">>, #{<<"include-tag">> => <<"App-Name">>}, #{}) + ), + ?assertEqual( + {error, invalid_tag_filter}, + parse_tag_filter(<<"include-tag">>, #{<<"include-tag">> => <<":ao">>}, #{}) + ), + ?assertEqual( + {error, invalid_tag_filter}, + parse_tag_filter(<<"include-tag">>, #{<<"include-tag">> => <<"App-Name:">>}, #{}) + ), + ok. + +l1_filter_reason_test() -> + Owner = <<"owner-1">>, + OtherOwner = <<"owner-2">>, + TX = #tx{ + owner = <<"non-default-owner">>, + owner_address = Owner, + tags = [ + {<<"App-Name">>, <<"ao">>}, + {<<"Bundler-App-Name">>, <<"Redstone">>} + ] + }, + IncludeTag = #{name => <<"App-Name">>, value => <<"ao">>}, + ExcludeTag = #{name => <<"Bundler-App-Name">>, value => <<"Redstone">>}, + ?assertEqual(pass, l1_filter_reason(TX, #{})), + ?assertEqual(pass, l1_filter_reason(TX, #{include_owner => Owner})), + ?assertEqual( + include_owner_mismatch, + l1_filter_reason(TX, #{include_owner => OtherOwner}) + ), + ?assertEqual( + exclude_owner_match, + l1_filter_reason(TX, #{exclude_owner => Owner}) + ), + ?assertEqual( + pass, + l1_filter_reason(TX, #{exclude_owner => OtherOwner}) + ), + ?assertEqual(pass, l1_filter_reason(TX, #{include_tag => IncludeTag})), + ?assertEqual( + include_tag_mismatch, + l1_filter_reason( + TX, + #{include_tag => #{name => <<"Content-Type">>, value => <<"text/plain">>}} + ) + ), + ?assertEqual( + exclude_tag_match, + l1_filter_reason(TX, #{exclude_tag => ExcludeTag}) + ), + ?assertEqual( + pass, + l1_filter_reason( + TX, + #{exclude_tag => #{name => <<"Content-Type">>, value => <<"text/plain">>}} + ) + ), + ?assertEqual( + exclude_tag_match, + l1_filter_reason( + TX, + #{include_tag => IncludeTag, exclude_tag => ExcludeTag} + ) + ), + ?assertEqual( + pass, + l1_filter_reason(TX, #{include_owner => [OtherOwner, Owner]}) + ), + ok. + +request_depth_clamping_test() -> + {_TestStore, _StoreOpts, Opts0} = setup_index_opts(), + ?assertEqual(6, request_depth(#{}, <<"full">>, Opts0)), + ?assertEqual( + 2, + request_depth(#{<<"depth">> => <<"2">>}, <<"full">>, Opts0) + ), + ?assertEqual( + 1, + request_depth(#{<<"depth">> => <<"0">>}, <<"full">>, Opts0) + ), + ?assertEqual( + 6, + request_depth(#{<<"depth">> => <<"999">>}, <<"full">>, Opts0) + ), + Opts1 = set_depth_recursion_cap(2, Opts0), + ?assertEqual(2, request_depth(#{}, <<"full">>, Opts1)), + % no recursion cap set, use default from hb_opts + ?assertEqual(6, request_depth(#{}, <<"full">>, #{})), + ok. + +id_depth_1_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + ok = index_l1_offsets(Block, Opts), + {ok, Result} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "depth=1" + >>, + Opts + ), + ?assertEqual(26, maps:get(items_count, Result)), + ?assertEqual(1, maps:get(bundle_count, Result)), + ?assertEqual(0, maps:get(skipped_count, Result)), + assert_bundle_read( + <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>, + [ + {<<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, <<"1">>}, + {<<"MgatoEjlO_YtdbxFi9Q7Hxbs0YQVcChddhSS7FsdeIg">>, <<"19">>}, + {<<"z-oKJfhMq5qoVFrljEfiBKgumaJmCWVxNJaavR5aPE8">>, <<"26">>} + ], + Opts + ), + % L3 item not read when doing L1 depth=1 + assert_item_not_read(<<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, Opts), + ok. + +id_depth_2_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + ok = index_l1_offsets(Block, Opts), + {ok, Result} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "depth=2" + >>, + Opts + ), + ?assertEqual(52, maps:get(items_count, Result)), + ?assertEqual(1, maps:get(bundle_count, Result)), + ?assertEqual(0, maps:get(skipped_count, Result)), + assert_bundle_read( + <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>, + [ + {<<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, <<"1">>}, + {<<"MgatoEjlO_YtdbxFi9Q7Hxbs0YQVcChddhSS7FsdeIg">>, <<"19">>}, + {<<"z-oKJfhMq5qoVFrljEfiBKgumaJmCWVxNJaavR5aPE8">>, <<"26">>} + ], + Opts + ), + % L2 bundle and L3 children should be read when doing L1 with depth=2 + assert_bundle_read( + <<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, + [ + {<<"iS5R3iSKaCdcXG2nlKWsbdT1_uhQe54nMsgYK-ivEcE">>, <<"1">>}, + {<<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, <<"2">>} + ], + Opts + ), + ok. + +id_exclude_tag_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + ok = index_l1_offsets(Block, Opts), + {ok, Result} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "exclude-tag=App-Name:ArDrive%20Turbo&" + "depth=2" + >>, + Opts + ), + ?assertEqual(0, maps:get(items_count, Result)), + ?assertEqual(0, maps:get(bundle_count, Result)), + ?assertEqual(1, maps:get(skipped_count, Result)), + assert_item_not_read(<<"iS5R3iSKaCdcXG2nlKWsbdT1_uhQe54nMsgYK-ivEcE">>, Opts), + ok. + +id_include_owner_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + ok = index_l1_offsets(Block, Opts), + {ok, Included} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "include-owner=JNC6vBhjHY1EPwV3pEeNmrsgFMxH5d38_LHsZ7jful8" + >>, + Opts + ), + ?assertEqual(52, maps:get(items_count, Included)), + ?assertEqual(1, maps:get(bundle_count, Included)), + ?assertEqual(0, maps:get(skipped_count, Included)), + {ok, Skipped} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "include-owner=FPjbN7EVwP3XwQJx8qnKqJDYa4TLJ0Y8gu4AaiUuW1c" + >>, + Opts + ), + ?assertEqual(0, maps:get(items_count, Skipped)), + ?assertEqual(0, maps:get(bundle_count, Skipped)), + ?assertEqual(1, maps:get(skipped_count, Skipped)). + +id_missing_offset_without_load_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {_Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + {ok, Result} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write" + >>, + Opts + ), + ?assertEqual(0, maps:get(items_count, Result)), + ?assertEqual(0, maps:get(bundle_count, Result)), + ?assertEqual(1, maps:get(skipped_count, Result)), + assert_item_not_read(<<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>, Opts), + ok. + +id_missing_offset_with_load_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {_Block, TXID} = {1827942, <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>}, + {ok, Result} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "id=", TXID/binary, "&" + "mode=write&" + "query-l1-offset=true&" + "depth=2" + >>, + Opts + ), + ?assertEqual(52, maps:get(items_count, Result)), + ?assertEqual(1, maps:get(bundle_count, Result)), + ?assertEqual(0, maps:get(skipped_count, Result)), + assert_bundle_read( + <<"T2pluNnaavL7-S2GkO_m3pASLUqMH_XQ9IiIhZKfySs">>, + [ + {<<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, <<"1">>}, + {<<"MgatoEjlO_YtdbxFi9Q7Hxbs0YQVcChddhSS7FsdeIg">>, <<"19">>}, + {<<"z-oKJfhMq5qoVFrljEfiBKgumaJmCWVxNJaavR5aPE8">>, <<"26">>} + ], + Opts + ), + % L2 bundle and L3 children should be read when doing L1 with depth=2 + assert_bundle_read( + <<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, + [ + {<<"iS5R3iSKaCdcXG2nlKWsbdT1_uhQe54nMsgYK-ivEcE">>, <<"1">>}, + {<<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, <<"2">>} + ], + Opts + ), + ok. + +parse_owner_filter_unknown_alias_test() -> + ?assertEqual( + {error, {owner_alias_not_found, <<"nonexistent">>}}, + parse_owner_filter( + #{<<"include-owner-alias">> => <<"nonexistent">>}, + #{} + ) + ), + ok. + +index_l1_offsets(Block, Opts) -> + BlockBin = hb_util:bin(Block), + {ok, Block} = + hb_ao:resolve( + << + "~copycat@1.0/arweave&" + "from=", BlockBin/binary, "&" + "to=", BlockBin/binary, "&" + "mode=write&" + "depth=1" + >>, + Opts + ), + ok. + +setup_index_opts() -> + TestStore = hb_test_utils:test_store(), + StoreOpts = #{ <<"index-store">> => [TestStore] }, + Store = [ + TestStore, + #{ + <<"store-module">> => hb_store_fs, + <<"name">> => <<"cache-mainnet">> + }, + #{ + <<"store-module">> => hb_store_arweave, + <<"name">> => <<"cache-arweave">>, + <<"index-store">> => [TestStore], + <<"arweave-node">> => <<"https://arweave.net">> + }, + #{ + <<"store-module">> => hb_store_gateway, + <<"subindex">> => [ + #{ + <<"name">> => <<"Data-Protocol">>, + <<"value">> => <<"ao">> + } + ], + <<"local-store">> => [TestStore] + }, + #{ + <<"store-module">> => hb_store_gateway, + <<"local-store">> => [TestStore] + } + ], + Opts = #{ + <<"store">> => Store, + <<"arweave-index-ids">> => true, + <<"arweave-index-store">> => StoreOpts + }, + {TestStore, StoreOpts, Opts}. + +assert_bundle_read(BundleID, ExpectedItems, Opts) -> + ReadItems = + lists:map( + fun({ItemID, _Index}) -> + assert_item_read(ItemID, Opts) + end, + ExpectedItems + ), + Bundle = assert_item_read(BundleID, Opts), + lists:foreach( + fun({{_ItemID, Index}, Item}) -> + QueriedItem = hb_ao:get(Index, Bundle, Opts), + ?assertEqual( + hb_maps:without(?AO_CORE_KEYS, Item), + hb_maps:without(?AO_CORE_KEYS, QueriedItem)) + end, + lists:zip(ExpectedItems, ReadItems) + ), + ok. + +assert_item_read(ItemID, Opts) -> + ?event(debug_test, {resolving, {explicit, ItemID}}), + ReadResult = hb_store_arweave:read( + hb_store_arweave:store_from_opts(Opts), + #{<<"read">> => ItemID}, + Opts + ), + ?assertMatch({ok, _}, ReadResult, ItemID), + {ok, Item} = ReadResult, + ?event(debug_test, {item, Item}), + ?assert(hb_message:verify(Item, all, Opts)), + ?assertEqual(ItemID, hb_message:id(Item, signed)), + Item. + +assert_item_not_read(ItemID, Opts) -> + ReadResult = hb_store_arweave:read( + hb_store_arweave:store_from_opts(Opts), + #{<<"read">> => ItemID}, + Opts + ), + ?assertEqual({error, not_found}, ReadResult), + ok. + +has_any_indexed_tx(Height, Opts) -> + case fetch_block_header(Height, Opts) of + {ok, Block} -> TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), - lists:any(fun(TXID) -> is_tx_indexed(TXID, Opts) end, TXIDs); + lists:any(fun(TXID) -> hb_store_arweave:is_tx_indexed(TXID, Opts) end, TXIDs); {error, _} -> false end. @@ -1158,3 +2525,483 @@ assert_indexed_range(From, To, _Opts) when From < To -> assert_indexed_range(From, To, Opts) -> ?assert(has_any_indexed_tx(From, Opts)), assert_indexed_range(From - 1, To, Opts). + +block_marker_default_depth_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=2">>, + Opts + ), + ?assert(hb_store_arweave:is_block_indexed(Block, 2, Opts)), + ?assertNot(hb_store_arweave:is_block_indexed(Block, 3, Opts)), + ok. + +depth_1_normalizes_to_2_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + TX1 = #tx{ + format = 2, + id = crypto:strong_rand_bytes(32), + data_size = 100, + tags = [] + }, + TX2 = #tx{ + format = 2, + id = crypto:strong_rand_bytes(32), + data_size = 200, + tags = [] + }, + Tuples = [ + {{TX1, <<>>}, 100}, + {{TX2, <<>>}, 300} + ], + Result = process_block_txs(Tuples, 0, 1, 88888888, Opts), + ?assertEqual(2, maps:get(achieved_depth, Result)), + Height = 88888888, + hb_store_arweave:mark_block_indexed(Height, maps:get(achieved_depth, Result), Opts), + ?assert(hb_store_arweave:is_block_indexed(Height, 1, Opts)), + ?assert(hb_store_arweave:is_block_indexed(Height, 2, Opts)), + ?assertNot(hb_store_arweave:is_block_indexed(Height, 3, Opts)), + ok. + +block_marker_cutover_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + LowerBlock = 1827941, + UpperBlock = 1827942, + {ok, UpperBlock} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(UpperBlock))/binary, "&to=", + (hb_util:bin(UpperBlock))/binary>>, + Opts + ), + Cutover = hb_store_arweave:read_cutover_height(Opts), + ?assertNotEqual(undefined, Cutover), + ?assert(hb_store_arweave:is_block_indexed(UpperBlock, 2, Opts)), + ?assertNot(hb_store_arweave:is_block_indexed(LowerBlock, 2, Opts)), + ok. + +achieved_depth_block_depth_2_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=2">>, + Opts + ), + ?assert(hb_store_arweave:is_block_indexed(Block, 2, Opts)), + ?assertNot(hb_store_arweave:is_block_indexed(Block, 3, Opts)), + ok. + + +achieved_depth_block_depth_3_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=3">>, + Opts + ), + ?assert(hb_store_arweave:is_block_indexed(Block, 3, Opts)), + ok. + +invalid_bundle_bytes_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + StoreOpts = hb_store_arweave:store_from_opts(Opts), + ?assertEqual( + {error, invalid_bundle_header}, + index_full_bundle_bytes(<<"not a bundle">>, 0, 2, StoreOpts, <<0:256>>, Opts) + ), + ok. + +small_block_depth_3_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1889322, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=3">>, + Opts + ), + ?assert(hb_store_arweave:is_block_indexed(Block, 3, Opts)), + #{ <<"index-store">> := Store } = hb_store_arweave:store_from_opts(Opts), + {ok, L1Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(Block, 1), Opts), + ?assert(length(hb_store_arweave:decode_item_ids(L1Bin)) > 0), + {ok, L2Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(Block, 2), Opts), + ?assert(length(hb_store_arweave:decode_item_ids(L2Bin)) > 0), + {ok, L3Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(Block, 3), Opts), + L3IDs = hb_store_arweave:decode_item_ids(L3Bin), + ?assertEqual(3, length(L3IDs)), + assert_item_read( + <<"npAzk_BomjWBQQr_xnmlhdxjyl97EJnNv_MAaXffs1s">>, + Opts), + ok. + +no_mismatch_flags_on_valid_bundles_test() -> + {_TestStore, StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=3">>, + Opts + ), + #{ <<"index-store">> := IndexStore } = StoreOpts, + ItemID = hb_util:native_id( + <<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>), + ?assertEqual( + {error, not_found}, + hb_store:read( + IndexStore, + hb_store_arweave_offset:mismatch_path(ItemID), + Opts + ) + ), + ok. + +mismatch_path_encoding_test() -> + ID = crypto:strong_rand_bytes(32), + Path = hb_store_arweave_offset:mismatch_path(ID), + ?assert(binary:match(Path, <<"mismatch/">>) =/= nomatch), + ok. + +exact_marker_depth_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=3">>, + Opts + ), + #{ <<"index-store">> := Store } = + hb_store_arweave:store_from_opts(Opts), + {ok, StoredBin} = + hb_store:read(Store, hb_store_arweave:block_indexed_path(Block), Opts), + StoredDepth = binary_to_integer(StoredBin), + ?assertEqual(3, StoredDepth), + ok. + +fabricated_mismatch_test() -> + {_TestStore, StoreOpts, Opts} = setup_index_opts(), + {Priv, Pub} = ar_wallet:new(), + Target = crypto:strong_rand_bytes(32), + Anchor = crypto:strong_rand_bytes(32), + Item = ar_bundles:sign_item( + ar_bundles:new_item(Target, Anchor, [], <<"test data">>), + {Priv, Pub} + ), + ItemBinary = ar_bundles:serialize(Item), + RealID = crypto:hash(sha256, Item#tx.signature), + FakeID = crypto:strong_rand_bytes(32), + EncodedFakeID = hb_util:encode(FakeID), + #{ <<"index-store">> := IndexStore } = StoreOpts, + validate_and_flag_item_id(ItemBinary, FakeID, EncodedFakeID, IndexStore), + {ok, StoredActualID} = + hb_store:read( + IndexStore, + hb_store_arweave_offset:mismatch_path(FakeID), + Opts + ), + ?assertEqual(RealID, StoredActualID), + ?assertEqual( + {error, not_found}, + hb_store:read( + IndexStore, + hb_store_arweave_offset:mismatch_path(RealID), + Opts + ) + ), + ok. + +block_item_ids_depth_2_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&depth=2">>, + Opts + ), + #{ <<"index-store">> := Store } = hb_store_arweave:store_from_opts(Opts), + {ok, L1Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 1), Opts), + L1IDs = hb_store_arweave:decode_item_ids(L1Bin), + ?assert(length(L1IDs) > 0), + {ok, L2Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 2), Opts), + L2IDs = hb_store_arweave:decode_item_ids(L2Bin), + ?assert(length(L2IDs) > 0), + L2Encoded = [hb_util:encode(ID) || ID <- L2IDs], + Pos54K = index_of(<<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, L2Encoded), + PosOBK = index_of(<<"OBKr-7UrmjxFD-h-qP-XLuvCgtyuO_IDpBMgIytvusA">>, L2Encoded), + ?assert(is_integer(Pos54K)), + ?assert(is_integer(PosOBK)), + ?assert(Pos54K < PosOBK), + ?assertEqual({error, not_found}, hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 3), Opts)), + ok. + +block_item_ids_depth_3_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&depth=3">>, + Opts + ), + #{ <<"index-store">> := Store } = hb_store_arweave:store_from_opts(Opts), + {ok, L1Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 1), Opts), + L1Count = length(hb_store_arweave:decode_item_ids(L1Bin)), + ?assertEqual(5, L1Count), + {ok, L2Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 2), Opts), + L2Count = length(hb_store_arweave:decode_item_ids(L2Bin)), + ?assert(L2Count > 0), + {ok, L3Bin} = hb_store:read(Store, hb_store_arweave:block_items_path(1827942, 3), Opts), + L3Count = length(hb_store_arweave:decode_item_ids(L3Bin)), + ?assert(L3Count >= 1), + L3IDs = hb_store_arweave:decode_item_ids(L3Bin), + L3Encoded = [hb_util:encode(ID) || ID <- L3IDs], + ?assert(lists:member( + <<"8aJrRWtHcJvJ61qsH6agGkemzrtLw3W22xFrpCGAnTM">>, L3Encoded)), + ok. + +list_index_with_items_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942">>, + Opts + ), + {ok, ListResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&mode=list">>, + Opts + ), + Body = hb_json:decode(hb_maps:get(<<"body">>, ListResult)), + BlockInfo = maps:get(<<"1827942">>, Body), + ?assert(is_integer(maps:get(<<"depth">>, BlockInfo))), + Items = maps:get(<<"items">>, BlockInfo), + ?assert(maps:get(<<"1">>, Items) > 0), + ?assert(maps:get(<<"2">>, Items) > 0), + ok. + +inventory_single_block_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + {ok, 1827942} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942">>, + Opts + ), + {ok, InvResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=1827942&to=1827942&mode=inventory">>, + Opts + ), + Body = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + BlockInfo = maps:get(<<"1827942">>, Body), + ?assert(is_integer(maps:get(<<"depth">>, BlockInfo))), + Items = maps:get(<<"items">>, BlockInfo), + L1Items = maps:get(<<"1">>, Items), + ?assert(is_list(L1Items)), + ?assert(length(L1Items) > 0), + L2Items = maps:get(<<"2">>, Items), + ?assert(is_list(L2Items)), + ?assert(length(L2Items) > 0), + ?assertEqual(5, length(L1Items)), + ?assert(lists:member( + <<"54K1ehEIKZxGSusgZzgbGYaHfllwWQ09-S9-eRUJg5Y">>, L2Items)), + ok. + +inventory_range_test() -> + {_TestStore, StoreOpts, Opts} = setup_index_opts(), + #{ <<"index-store">> := Store } = StoreOpts, + ok = hb_store:write(Store, #{hb_store_arweave:block_indexed_path(77777777) => <<"2">>}, Opts), + ok = hb_store:write(Store, #{hb_store_arweave:block_items_path(77777777, 1) => <<0:256>>}, Opts), + ok = hb_store:write(Store, #{hb_store_arweave:block_items_path(77777777, 2) => <<>>}, Opts), + ok = hb_store:write(Store, #{hb_store_arweave:block_indexed_path(77777778) => <<"2">>}, Opts), + ok = hb_store:write(Store, #{hb_store_arweave:block_items_path(77777778, 1) => <<1:256>>}, Opts), + ok = hb_store:write(Store, #{hb_store_arweave:block_items_path(77777778, 2) => <<>>}, Opts), + {ok, InvResult} = inventory_index(77777778, 77777777, Opts), + Body = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + ?assert(maps:is_key(<<"77777777">>, Body)), + ?assert(maps:is_key(<<"77777778">>, Body)), + ?assertEqual(2, maps:get(<<"depth">>, maps:get(<<"77777777">>, Body))), + ?assertEqual(2, maps:get(<<"depth">>, maps:get(<<"77777778">>, Body))), + ok. + +parent_depth_2_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=2">>, + Opts + ), + StoreOpts2 = hb_store_arweave:store_from_opts(Opts), + {ok, InvResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&mode=inventory">>, + Opts + ), + Body = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + BlockInfo = maps:get(hb_util:bin(Block), Body), + L1Items = maps:get(<<"1">>, maps:get(<<"items">>, BlockInfo)), + L1ID = hb_util:decode(hd(L1Items)), + {ok, [{Block, block}]} = hb_store_arweave:read_parent(StoreOpts2, L1ID, Opts), + L2Items = maps:get(<<"2">>, maps:get(<<"items">>, BlockInfo)), + case L2Items of + [] -> ok; + [FirstL2 | _] -> + L2ID = hb_util:decode(FirstL2), + {ok, [{L2Parent, bundle}]} = + hb_store_arweave:read_parent(StoreOpts2, L2ID, Opts), + ?assert(lists:member( + hb_util:encode(L2Parent), L1Items)) + end, + ok. + +parent_depth_3_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1889322, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=3">>, + Opts + ), + StoreOpts2 = hb_store_arweave:store_from_opts(Opts), + {ok, InvResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&mode=inventory">>, + Opts + ), + Body = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + BlockInfo = maps:get(hb_util:bin(Block), Body), + L3Items = maps:get(<<"3">>, maps:get(<<"items">>, BlockInfo)), + ?assert(length(L3Items) > 0), + L2Items = maps:get(<<"2">>, maps:get(<<"items">>, BlockInfo)), + L3ID = hb_util:decode(hd(L3Items)), + {ok, [{L3Parent, bundle}]} = + hb_store_arweave:read_parent(StoreOpts2, L3ID, Opts), + ?assert(lists:member(hb_util:encode(L3Parent), L2Items)), + ok. + +parent_corrupt_data_test() -> + ?assertEqual([], hb_store_arweave:decode_parent_entries(<<>>)), + ?assertEqual( + {error, corrupt_parent_data}, + hb_store_arweave:decode_parent_entries(<<5, 1, 2, 3>>)), + Truncated = <<0, 1, 2, 3>>, + ?assertEqual( + {error, corrupt_parent_data}, + hb_store_arweave:decode_parent_entries(Truncated)), + ValidThenCorrupt = <<0, 100:64/big-unsigned, 99>>, + ?assertEqual( + {error, corrupt_parent_data}, + hb_store_arweave:decode_parent_entries(ValidThenCorrupt)), + ok. + +parent_endpoint_block_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=2">>, + Opts + ), + {ok, InvResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&mode=inventory">>, + Opts + ), + InvBody = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + BlockInfo = maps:get(hb_util:bin(Block), InvBody), + L1Items = maps:get(<<"1">>, maps:get(<<"items">>, BlockInfo)), + L1EncodedID = hd(L1Items), + {ok, ParentResult} = + hb_ao:resolve( + <<"~arweave@2.9/parent=", L1EncodedID/binary>>, + Opts + ), + ?assertEqual( + <<"application/json">>, + hb_maps:get(<<"content-type">>, ParentResult)), + Body = hb_json:decode(hb_maps:get(<<"body">>, ParentResult)), + Parents = maps:get(<<"parents">>, Body), + ?assertEqual(1, length(Parents)), + [Entry] = Parents, + ?assertEqual(<<"block">>, maps:get(<<"type">>, Entry)), + ?assertEqual(Block, maps:get(<<"height">>, Entry)), + ok. + +parent_endpoint_bundle_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + Block = 1827942, + {ok, Block} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&depth=2">>, + Opts + ), + {ok, InvResult} = + hb_ao:resolve( + <<"~copycat@1.0/arweave&from=", + (hb_util:bin(Block))/binary, "&to=", + (hb_util:bin(Block))/binary, "&mode=inventory">>, + Opts + ), + InvBody = hb_json:decode(hb_maps:get(<<"body">>, InvResult)), + BlockInfo = maps:get(hb_util:bin(Block), InvBody), + L1Items = maps:get(<<"1">>, maps:get(<<"items">>, BlockInfo)), + L2Items = maps:get(<<"2">>, maps:get(<<"items">>, BlockInfo)), + ?assert(length(L2Items) > 0), + L2EncodedID = hd(L2Items), + {ok, ParentResult} = + hb_ao:resolve( + <<"~arweave@2.9/parent=", L2EncodedID/binary>>, + Opts + ), + ?assertEqual( + <<"application/json">>, + hb_maps:get(<<"content-type">>, ParentResult)), + Body = hb_json:decode(hb_maps:get(<<"body">>, ParentResult)), + [Entry] = maps:get(<<"parents">>, Body), + ?assertEqual(<<"bundle">>, maps:get(<<"type">>, Entry)), + ParentID = maps:get(<<"id">>, Entry), + ?assert(lists:member(ParentID, L1Items)), + ok. + +parent_endpoint_not_found_test() -> + {_TestStore, _StoreOpts, Opts} = setup_index_opts(), + FakeID = <<"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA">>, + ?assertEqual( + {error, not_found}, + hb_ao:resolve( + <<"~arweave@2.9/parent=", FakeID/binary>>, + Opts + ) + ), + ok. + +index_of(Elem, List) -> index_of(Elem, List, 1). + +index_of(_Elem, [], _N) -> not_found; +index_of(Elem, [Elem | _], N) -> N; +index_of(Elem, [_ | Rest], N) -> index_of(Elem, Rest, N + 1). \ No newline at end of file diff --git a/src/dev_match.erl b/src/dev_match.erl index 0d9bb378e..b44c29a8f 100644 --- a/src/dev_match.erl +++ b/src/dev_match.erl @@ -93,7 +93,7 @@ write(IDs, Base, Opts) -> fun(RawKey, Value) -> Key = hb_ao:normalize_key(RawKey), ValuePath = value_path(Value, Opts), - ok = hb_store:group(Store, address(Key, ValuePath), Opts), + hb_store:group(Store, address(Key, ValuePath), Opts), lists:foreach( fun(ID) -> Address = address(Key, ValuePath, ID), diff --git a/src/hb_cache.erl b/src/hb_cache.erl index 629a27e08..ed55df045 100644 --- a/src/hb_cache.erl +++ b/src/hb_cache.erl @@ -278,7 +278,7 @@ do_write_message(Bin, Store, Opts) when is_binary(Bin) -> % Write the binary in the store at its calculated content-hash. % Return the path. Path = generate_binary_path(Bin, Opts), - ok = hb_store:write(Store, #{ Path => Bin }, Opts), + hb_store:write(Store, #{ Path => Bin }, Opts), %lists:map(fun(ID) -> hb_store:make_link(Store, Path, ID) end, AllIDs), {ok, Path}; do_write_message(List, Store, Opts) when is_list(List) -> @@ -296,7 +296,7 @@ do_write_message(Msg, Store, Opts) when is_map(Msg) -> MsgHashpathAlg = hb_path:hashpath_alg(Msg, Opts), ?event(debug_cache, {writing_message, {id, UncommittedID}, {alt_ids, AltIDs}, {original, Msg}}), % Write all of the keys of the message into the store. - ok = hb_store:group(Store, UncommittedID, Opts), + hb_store:group(Store, UncommittedID, Opts), maps:map( fun(Key, Value) -> write_key(UncommittedID, Key, MsgHashpathAlg, Value, Store, Opts) @@ -363,7 +363,7 @@ write_key(Base, Key, HPAlg, Value, Store, Opts) -> Opts ), {ok, Path} = do_write_message(Value, Store, Opts), - ok = hb_store:link(Store, #{ KeyHashPath => Path }, Opts), + hb_store:link(Store, #{ KeyHashPath => Path }, Opts), {ok, Path}. %% @doc The `structured@1.0` encoder does not typically encode `commitments`, diff --git a/src/hb_copycat_budget.erl b/src/hb_copycat_budget.erl new file mode 100644 index 000000000..3aabf0184 --- /dev/null +++ b/src/hb_copycat_budget.erl @@ -0,0 +1,201 @@ +%%% @doc Atomics-based byte budget pool for copycat memory throttling. +%%% Controls how many bytes of TX data can be held in memory simultaneously +%%% across all copycat workers. Uses persistent_term for constant-time access. +-module(hb_copycat_budget). +-export([ensure_started/1, reset/1, lease/1, release/1, get_budget/0, stats/0]). +-include_lib("eunit/include/eunit.hrl"). +-include("include/hb.hrl"). + +-define(PERSISTENT_KEY, hb_copycat_budget). +-define(IDX_LEASED, 1). +-define(IDX_PEAK, 2). +-define(IDX_BUDGET, 3). +-define(IDX_RETRIES, 4). +-define(RETRY_SLEEP_MS, 50). +-define(LEASE_LOOP_MAX_RETRIES, 100). + +-define(INIT_LOCK, hb_copycat_budget_init). + +ensure_started(Budget) when is_integer(Budget), Budget > 0 -> + case persistent_term:get(?PERSISTENT_KEY, undefined) of + undefined -> + init_with_lock(Budget); + _Ref -> + ok + end. + +init_with_lock(Budget) -> + try register(?INIT_LOCK, self()) of + true -> + try + case persistent_term:get(?PERSISTENT_KEY, undefined) of + undefined -> + Ref = atomics:new(4, [{signed, false}]), + atomics:put(Ref, ?IDX_BUDGET, Budget), + persistent_term:put(?PERSISTENT_KEY, Ref); + _AlreadySet -> + ok + end + after + unregister(?INIT_LOCK) + end, + ok + catch + error:badarg -> + await_init(Budget) + end. + +await_init(Budget) -> + case persistent_term:get(?PERSISTENT_KEY, undefined) of + undefined -> + case whereis(?INIT_LOCK) of + undefined -> + init_with_lock(Budget); + _Pid -> + timer:sleep(1), + await_init(Budget) + end; + _Ref -> + ok + end. + +reset(Budget) when is_integer(Budget), Budget > 0 -> + Ref = atomics:new(4, [{signed, false}]), + atomics:put(Ref, ?IDX_BUDGET, Budget), + persistent_term:put(?PERSISTENT_KEY, Ref), + ok. + +lease(Size) when is_integer(Size), Size > 0 -> + Ref = persistent_term:get(?PERSISTENT_KEY), + lease_loop(Ref, Size, 0). + +lease_loop(Ref, Size, ?LEASE_LOOP_MAX_RETRIES) -> + ?event(error, + {lease_loop_max_retries_exhausted, + {ref, Ref}, + {size, Size}, + {max_retries, ?LEASE_LOOP_MAX_RETRIES}}), + throw(exhausted_lease_loop_max_retires); +lease_loop(Ref, Size, Retries) -> + Current = atomics:get(Ref, ?IDX_LEASED), + Budget = atomics:get(Ref, ?IDX_BUDGET), + case Current + Size > Budget of + true -> + atomics:add(Ref, ?IDX_RETRIES, 1), + timer:sleep(?RETRY_SLEEP_MS), + lease_loop(Ref, Size, Retries + 1); + false -> + case atomics:compare_exchange(Ref, ?IDX_LEASED, Current, Current + Size) of + ok -> + update_peak(Ref, Current + Size), + ok; + _Changed -> + lease_loop(Ref, Size, Retries + 1) + end + end. + +release(Size) when is_integer(Size), Size > 0 -> + Ref = persistent_term:get(?PERSISTENT_KEY), + atomics:sub(Ref, ?IDX_LEASED, Size), + ok. + +get_budget() -> + case persistent_term:get(?PERSISTENT_KEY, undefined) of + undefined -> undefined; + Ref -> atomics:get(Ref, ?IDX_BUDGET) + end. + +stats() -> + case persistent_term:get(?PERSISTENT_KEY, undefined) of + undefined -> + not_started; + Ref -> + #{ + leased => atomics:get(Ref, ?IDX_LEASED), + peak => atomics:get(Ref, ?IDX_PEAK), + budget => atomics:get(Ref, ?IDX_BUDGET), + retries => atomics:get(Ref, ?IDX_RETRIES) + } + end. + +update_peak(Ref, NewLeased) -> + Peak = atomics:get(Ref, ?IDX_PEAK), + case NewLeased =< Peak of + true -> ok; + false -> + case atomics:compare_exchange(Ref, ?IDX_PEAK, Peak, NewLeased) of + ok -> ok; + _Changed -> update_peak(Ref, NewLeased) + end + end. + +%%% Tests + +lease_release_cycle_test() -> + reset(1000), + ?assertEqual(1000, get_budget()), + ok = lease(400), + #{leased := 400, peak := 400, budget := 1000} = stats(), + ok = lease(300), + #{leased := 700, peak := 700} = stats(), + ok = release(400), + #{leased := 300, peak := 700} = stats(), + ok = release(300), + #{leased := 0, peak := 700} = stats(), + reset_to_default(), + ok. + +blocks_when_over_budget_test() -> + reset(100), + ok = lease(100), + Parent = self(), + Ref = make_ref(), + Pid = spawn(fun() -> + Parent ! {Ref, trying}, + ok = lease(50), + Parent ! {Ref, got_lease} + end), + receive {Ref, trying} -> ok end, + timer:sleep(120), + receive + {Ref, got_lease} -> error(should_have_blocked) + after 0 -> ok + end, + release(60), + receive + {Ref, got_lease} -> ok + after 500 -> + exit(Pid, kill), + error(lease_never_granted) + end, + release(50), + #{leased := 40} = stats(), + release(40), + reset_to_default(), + ok. + +concurrent_leases_test() -> + Budget = 1000, + reset(Budget), + Parent = self(), + NumWorkers = 20, + LeaseSize = 200, + Pids = [spawn(fun() -> + ok = lease(LeaseSize), + timer:sleep(10), + release(LeaseSize), + Parent ! {done, self()} + end) || _ <- lists:seq(1, NumWorkers)], + lists:foreach(fun(Pid) -> + receive {done, Pid} -> ok + after 5000 -> error({timeout, Pid}) + end + end, Pids), + #{leased := 0, peak := Peak, budget := Budget} = stats(), + ?assert(Peak =< Budget), + ?assert(Peak > 0), + reset_to_default(), + ok. + +reset_to_default() -> + reset(hb_opts:get(<<"copycat-memory-budget">>, 6 * 1024 * 1024 * 1024, #{})). diff --git a/src/hb_event.erl b/src/hb_event.erl index d4fcaf5c4..147084210 100644 --- a/src/hb_event.erl +++ b/src/hb_event.erl @@ -363,18 +363,8 @@ check_overload(Last, N) -> case erlang:process_info(self(), message_queue_len) of {message_queue_len, Len} when Len > ?OVERLOAD_QUEUE_LENGTH -> {memory, MemorySize} = erlang:process_info(self(), memory), - case rand:uniform(max(1000, Len - ?OVERLOAD_QUEUE_LENGTH)) of - 1 -> - ?debug_print( - {warning, - prometheus_event_queue_overloading, - {queue, Len}, - {last_event, Last}, - {memory_bytes, MemorySize} - } - ); - _ -> ignored - end, + % If the size of this process is too large, exit such that + % we can be restarted by the next caller. case MemorySize of MemorySize when MemorySize > ?MAX_MEMORY -> ?debug_print( diff --git a/src/hb_opts.erl b/src/hb_opts.erl index d04ee29a7..db75f5829 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -20,6 +20,7 @@ -include("include/hb.hrl"). -include("include/hb_opts.hrl"). -include("include/hb_arweave_nodes.hrl"). +-include("include/hb_store_arweave.hrl"). %%% Environment variables that can be used to override the default message. -ifdef(TEST). @@ -299,6 +300,11 @@ raw_default_message() -> <<"relay-http-client">> => httpc, %% The default codec to use for commitment signatures. <<"commitment-device">> => <<"httpsig@1.0">>, + %% Copycat-specific options. + <<"copycat-memory-budget">> => 6 * 1024 * 1024 * 1024, + <<"copycat-depth-recursion-cap">> => 6, % 2x the deepest we've seen to date + <<"arweave-block-workers">> => 3, + <<"copycat-scope">> => [?SCOPE_OFFSET, ?SCOPE_PARENT], %% Dev options <<"mode">> => debug, <<"profiling">> => true, @@ -1173,4 +1179,4 @@ ensure_node_history_test() -> ] }, ?assertEqual({error, invalid_values}, ensure_node_history(InvalidItems, RequiredOpts)). --endif. +-endif. \ No newline at end of file diff --git a/src/hb_prometheus.erl b/src/hb_prometheus.erl index f4917da77..9f8bb4383 100644 --- a/src/hb_prometheus.erl +++ b/src/hb_prometheus.erl @@ -1,7 +1,7 @@ %%% @doc HyperBEAM wrapper for Prometheus metrics. -module(hb_prometheus). -export([ensure_started/0, declare/2, measure_and_report/2, measure_and_report/3]). --export([observe/2, observe/3, inc/2, inc/3, inc/4, dec/2, dec/3, dec/4]). +-export([observe/2, observe/3, inc/2, inc/3, inc/4, dec/2, dec/3, dec/4, set/4]). -define(STARTED_CACHE_KEY, {?MODULE, started}). %% @doc Ensure the Prometheus application has been started. Caches startup @@ -118,4 +118,13 @@ dec(Type, Metrics, Labels, Value) -> end. do_dec(gauge, Name, Labels, Value) -> - prometheus_gauge:dec(Name, Labels, Value). \ No newline at end of file + prometheus_gauge:dec(Name, Labels, Value). + +set(gauge, Name, Labels, Value) -> + case ensure_started() of + ok -> + try prometheus_gauge:set(Name, Labels, Value) + catch error:mfa_already_exists -> ok + end; + _ -> ok + end. \ No newline at end of file diff --git a/src/hb_store.erl b/src/hb_store.erl index e4b09779a..86975e1f3 100644 --- a/src/hb_store.erl +++ b/src/hb_store.erl @@ -91,8 +91,8 @@ behavior_info(callbacks) -> %% @doc Store access policies to function names. -define(STORE_ACCESS_POLICIES, #{ - <<"read">> => [read, resolve, list, type, match, scope], - <<"write">> => [write, link, group, reset, scope], + <<"read">> => [read, resolve, list, type, match, scope, start, stop], + <<"write">> => [write, link, group, reset, scope, start, stop], <<"admin">> => [start, stop, reset, scope] }). @@ -560,6 +560,7 @@ start_one(Store = #{ <<"store-module">> := Mod }, Req, Opts) -> end. call_store_start(Mod, Store, Req, Opts) -> + code:ensure_loaded(Mod), case erlang:function_exported(Mod, start, 3) of true -> Mod:start(Store, Req, Opts); false -> Mod:start(Store) diff --git a/src/hb_store_arweave.erl b/src/hb_store_arweave.erl index b62d40317..635b2fed9 100644 --- a/src/hb_store_arweave.erl +++ b/src/hb_store_arweave.erl @@ -6,17 +6,24 @@ %%% Unused Store API: -export([resolve/3, write/3, link/3, group/3]). %%% Indexing API: --export([store_from_opts/1, write_offset/5, read_offset/2, read_chunks/3]). +-export([store_from_opts/1, write_offset/6, write_parent/5, read_offset/2, read_parent/3, decode_parent_entries/1, read_chunks/3]). +-export([block_indexed_path/1, block_items_path/2]). +-export([read_block_item_counts/2, read_block_item_ids/2]). +-export([ensure_cutover_height/2, read_cutover_height/1, is_tx_indexed/2 ]). +-export([write_block_item_ids/4, read_block_marker_depth/2]). +-export([decode_item_ids/1, is_block_indexed/3, is_post_cutover/2, mark_block_indexed/3 ]). -include("include/hb.hrl"). +-include("include/hb_store_arweave.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(PARTITION_SIZE, 3_600_000_000_000). +-define(CUTOVER_KEY, <<"block/marker-cutover-height">>). %% @doc Find the first Arweave store from the given node message. Searches first %% for the `arweave_index_store' option, and if not found, searches the main %% `store' list for the first Arweave store with an index. store_from_opts(Opts) -> - case hb_opts:get(arweave_index_store, no_store, Opts) of + case hb_opts:get(<<"arweave-index-store">>, no_store, Opts) of no_store -> first_arweave_store(hb_opts:get(store, [], Opts)); IndexStoreOpts -> IndexStoreOpts end. @@ -31,9 +38,9 @@ first_arweave_store( first_arweave_store([_ | Rest]) -> first_arweave_store(Rest). %% @doc Start the Arweave store, and the downstream associated index store. -start(#{<<"index-store">> := IndexStore}, _Req, _Opts) -> +start(#{<<"index-store">> := IndexStore}, Req, Opts) -> init_prometheus(), - hb_store:start(IndexStore). + hb_store:start(IndexStore, Req, Opts). %% @doc Although the index is local, loading an item via the index will make %% requests to a remote node, so we define the scope as remote. @@ -94,6 +101,57 @@ read_offset(StoreOpts = #{ <<"index-store">> := IndexStore }, ID) -> end; read_offset(_, _) -> not_found. +%% @doc Read the parent entries for an item from the index store. +read_parent(#{ <<"index-store">> := IndexStore }, ID, Opts) -> + NormalizedID = hb_util:native_id(ID), + ParentPath = <<"parent/", NormalizedID/binary>>, + case hb_store:read(IndexStore, ParentPath, Opts) of + {ok, Bin} -> + case decode_parent_entries(Bin) of + {error, _} = Err -> Err; + Entries -> {ok, Entries} + end; + _ -> + not_found + end; +read_parent(_, _, _) -> not_found. + +decode_parent_entries(<<>>) -> []; +decode_parent_entries(<<0, Height:64/big-unsigned, Rest/binary>>) -> + case decode_parent_entries(Rest) of + {error, _} = Err -> Err; + Tail -> [{Height, block} | Tail] + end; +decode_parent_entries(<<1, ParentID:32/binary, Rest/binary>>) -> + case decode_parent_entries(Rest) of + {error, _} = Err -> Err; + Tail -> [{ParentID, bundle} | Tail] + end; +decode_parent_entries(_Corrupt) -> + {error, corrupt_parent_data}. + + +%% @doc Return the store path for a parent index entry. +parent_path(ItemID) when byte_size(ItemID) =:= 32 -> + <<"parent/", ItemID/binary>>. + +%% @doc Encode a parent entry for storage. +encode_parent_entry(Height, block) when is_integer(Height) -> + <<0, Height:64/big-unsigned>>; +encode_parent_entry(ParentID, bundle) when byte_size(ParentID) =:= 32 -> + <<1, ParentID:32/binary>>. + +%% Block Information Index + +%% @doc Return the store path for a block completion marker. +block_indexed_path(Height) -> + <<"block/", (hb_util:bin(Height))/binary, "/depth">>. + +%% @doc Return the store path for a per-block item index at a given depth. +block_items_path(Height, Depth) -> + <<"block/", (hb_util:bin(Height))/binary, + "/items/", (hb_util:bin(Depth))/binary>>. + %% @doc Read the data at the given key, reading the `local-store' first if %% available. read(StoreOpts, #{ <<"read">> := ID }, _NodeOpts) when ?IS_ID(ID) -> @@ -183,22 +241,36 @@ load_item(ExpectedID, StartOffset, Length, Opts) -> fun() -> case read_chunks(StartOffset, Length, Opts) of {ok, SerializedItem} -> - Item = - ar_bundles:deserialize(SerializedItem), - case hb_util:encode(Item#tx.id) of - ExpectedID -> - {ok, hb_message:convert( - Item, - <<"structured@1.0">>, - <<"ans104@1.0">>, - Opts - )}; - ActualID -> - {error, - {id_mismatch, - ExpectedID, ActualID}} + try + Item = + ar_bundles:deserialize(SerializedItem), + case hb_util:encode(Item#tx.id) of + ExpectedID -> + {ok, hb_message:convert( + Item, + <<"structured@1.0">>, + <<"ans104@1.0">>, + Opts + )}; + ActualID -> + ?event(error, {load_item, {id_mismatch}}), + {error, + {id_mismatch, + ExpectedID, ActualID}} + end + catch _:Reason:Stacktrace -> + %% Due to malformed encoding, attempt to deserialize + %% can throw. + ?event(error, + {load_item, + {expected_id, ExpectedID}, + {reason, Reason}, + {stacktrace, Stacktrace} + }), + {error, Reason} end; {error, Reason} -> + ?event(error, {load_item, Reason}), {error, Reason} end end, @@ -266,31 +338,210 @@ read_chunks(StartOffset, Length, Opts) -> Opts ). +%% @doc Write a parent entry for an item to the index store. +write_parent(ItemID, ParentData, Type, Store, Opts) -> + case + lists:member( + ?SCOPE_PARENT, + hb_opts:get(<<"copycat-scope">>, [], Opts) + ) of + true -> + Entry = encode_parent_entry(ParentData, Type), + hb_store:write(Store, #{parent_path(ItemID) => Entry}, Opts); + false -> + ok + end. + %% @doc Write offset information to the index store. write_offset( - StoreOpts = #{ <<"index-store">> := IndexStore }, + #{ <<"index-store">> := IndexStore }, ID, CodecName, StartOffset, - Length + Length, + Opts ) -> - Value = hb_store_arweave_offset:encode(CodecName, StartOffset, Length), - ?event( - debug_store_arweave, - {writing_offset, - {id, {explicit, ID}}, - {type, CodecName}, - {start_offset, StartOffset}, - {length, Length}, - {value, {explicit, Value}} - } + case + lists:member( + ?SCOPE_OFFSET, + hb_opts:get(<<"copycat-scope">>, [], Opts) + ) of + true -> + Value = hb_store_arweave_offset:encode(CodecName, StartOffset, Length), + ?event( + debug_store_arweave, + {writing_offset, + {id, {explicit, ID}}, + {type, CodecName}, + {start_offset, StartOffset}, + {length, Length}, + {value, {explicit, Value}} + } + ), + hb_store:write( + IndexStore, + #{ hb_store_arweave_offset:path(ID) => Value }, + Opts + ); + false -> + ok + end. + +%% @doc Probe item entries upward from depth 1, applying TransformFun to each. +probe_block_items(Height, Opts, TransformFun) -> + case store_from_opts(Opts) of + no_store -> + erlang:display({no_store, Opts}), + #{}; + #{ <<"index-store">> := Store } -> + probe_block_items(Height, Store, 1, #{}, TransformFun, Opts) + end. + +probe_block_items(Height, Store, Depth, Acc, TransformFun, Opts) -> + case hb_store:read(Store, block_items_path(Height, Depth), Opts) of + {ok, Bin} -> + Key = hb_util:bin(Depth), + probe_block_items( + Height, Store, Depth + 1, + Acc#{Key => TransformFun(Bin)}, TransformFun, Opts); + {error, not_found} -> + Acc + end. + +count_ids(Bin) when byte_size(Bin) rem 32 =:= 0 -> + byte_size(Bin) div 32; +count_ids(_) -> <<"corrupt">>. + +decode_and_encode_ids(Bin) -> + case decode_item_ids(Bin) of + {error, _} -> <<"corrupt">>; + List -> [hb_util:encode(ID) || ID <- List] + end. + +read_block_item_counts(Height, Opts) -> + probe_block_items(Height, Opts, fun count_ids/1). + +read_block_item_ids(Height, Opts) -> + probe_block_items(Height, Opts, fun decode_and_encode_ids/1). + +%% @doc Write per-depth item ID lists for a block. +%% Writes an entry for every depth from 1 through AchievedDepth (empty if +%% no items at that level), plus any partial depths beyond AchievedDepth +%% that were collected during indexing. +write_block_item_ids(Height, AchievedDepth, ItemIDs, Opts) -> + Store = get_index_store(Opts), + MaxStoredDepth = case maps:keys(ItemIDs) of + [] -> AchievedDepth; + Keys -> max(AchievedDepth, lists:max(Keys)) + end, + Results = lists:map( + fun(D) -> + IDs = maps:get(D, ItemIDs, []), + Bin = encode_item_ids(IDs), + hb_store:write( + Store, + #{block_items_path(Height, D) => Bin}, + Opts + ) + end, + lists:seq(1, MaxStoredDepth) ), + case lists:all(fun(R) -> R =:= ok end, Results) of + true -> ok; + false -> + ?event(copycat_short, + {block_item_ids_write_failed, + {height, Height}}), + {error, item_ids_write_failed} + end. + +%% @doc Encode a list of 32-byte raw IDs into a single binary. +encode_item_ids(IDs) -> + << <> || ID <- IDs >>. + +%% @doc Decode a binary of concatenated 32-byte IDs into a list. +%% Rejects binaries whose size is not a multiple of 32. +decode_item_ids(<<>>) -> []; +decode_item_ids(Bin) when byte_size(Bin) rem 32 =/= 0 -> + {error, invalid_item_ids_binary}; +decode_item_ids(Bin) -> + decode_item_ids_acc(Bin, []). + +decode_item_ids_acc(<<>>, Acc) -> lists:reverse(Acc); +decode_item_ids_acc(<>, Acc) -> + decode_item_ids_acc(Rest, [ID | Acc]). + +%% @doc Read the stored marker depth for a block, or undefined if none. +read_block_marker_depth(Height, Opts) -> + case store_from_opts(Opts) of + no_store -> undefined; + #{ <<"index-store">> := Store } -> + case hb_store:read(Store, block_indexed_path(Height), Opts) of + {ok, Bin} -> + try binary_to_integer(Bin) + catch _:_ -> undefined + end; + {error, not_found} -> undefined + end + end. + +%% @doc Check if a block has been indexed at the given depth or deeper. +is_block_indexed(undefined, _TargetDepth, _Opts) -> + false; +is_block_indexed(Height, TargetDepth, Opts) -> + case read_block_marker_depth(Height, Opts) of + undefined -> false; + StoredDepth -> StoredDepth >= TargetDepth + end. + +%% @doc Write a block completion marker with the achieved depth. +mark_block_indexed(Height, Depth, Opts) -> + Store = get_index_store(Opts), hb_store:write( - IndexStore, - #{ hb_store_arweave_offset:path(ID) => Value }, - StoreOpts + Store, + #{block_indexed_path(Height) => integer_to_binary(Depth)}, + Opts ). +%% @doc Read the persisted cutover height from the index store. +read_cutover_height(Opts) -> + Store = get_index_store(Opts), + case hb_store:read(Store, ?CUTOVER_KEY, Opts) of + {ok, Bin} -> hb_util:int(Bin); + {error, not_found} -> undefined + end. + +%% @doc Write the cutover height if not already set. +ensure_cutover_height(Height, Opts) -> + case read_cutover_height(Opts) of + undefined -> + Store = get_index_store(Opts), + hb_store:write(Store, #{?CUTOVER_KEY => hb_util:bin(Height)}, Opts), + ?event(copycat_short, {marker_cutover_initialized, {height, Height}}); + _ -> ok + end. + +%% @doc Check if a transaction ID is indexed in the arweave index store. +is_tx_indexed(TXID, Opts) -> + Store = get_index_store(Opts), + case hb_store:read(Store, hb_store_arweave_offset:path(TXID), Opts) of + {ok, _} -> true; + {error, not_found} -> false + end. + +is_post_cutover(undefined, _Opts) -> false; +is_post_cutover(Height, Opts) -> + case read_cutover_height(Opts) of + undefined -> false; + Cutover -> Height >= Cutover + end. + +get_index_store(Opts) -> + case store_from_opts(Opts) of + #{ <<"index-store">> := Store } -> Store; + _ -> throw(no_index_store_available) + end. + %% @doc Record the partition that data is found in when it is requested. record_partition_metric(Offset, Result, StoreOpts) when is_integer(Offset) -> case hb_opts:get(prometheus, not hb_features:test(), StoreOpts) of @@ -341,17 +592,24 @@ init_prometheus() -> %%% Tests +setup_test_store() -> + IndexStore = [hb_test_utils:test_store()], + ArweaveStore = + #{ + <<"store-module">> => hb_store_arweave, + <<"index-store">> => IndexStore + }, + Opts = #{<<"store">> => [ArweaveStore]}, + {IndexStore, ArweaveStore, Opts}. + write_read_tx_test() -> - Store = [hb_test_utils:test_store()], - Opts = #{ - <<"index-store">> => Store - }, + {_, ArweaveStoreOpts, Opts} = setup_test_store(), ID = <<"bndIwac23-s0K11TLC1N7z472sLGAkiOdhds87ZywoE">>, EndOffset = 363524457284025, Size = 8387, StartOffset = EndOffset - Size, - ok = write_offset(Opts, ID, <<"tx@1.0">>, StartOffset, Size), - {ok, Bundle} = read(Opts, #{ <<"read">> => ID }, Opts), + ok = write_offset(ArweaveStoreOpts, ID, <<"tx@1.0">>, StartOffset, Size, Opts), + {ok, Bundle} = read(ArweaveStoreOpts, #{ <<"read">> => ID }, Opts), ?assert(hb_message:verify(Bundle, all, #{})), {ok, Child} = hb_ao:resolve( @@ -385,26 +643,80 @@ write_read_tx_test() -> %% @doc Stale ANS-104 offset: fake ID pointing to a known bundle TX's %% data range. The deserialized item's ID won't match the fake ID. stale_ans104_offset_returns_error_test() -> - Store = [hb_test_utils:test_store()], - Opts = #{<<"index-store">> => Store}, + {_, ArweaveStoreOpts, Opts} = setup_test_store(), FakeID = <<"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA">>, RealEndOffset = 363524457284025, RealSize = 8387, RealStartOffset = RealEndOffset - RealSize, - ok = write_offset(Opts, FakeID, <<"ans104@1.0">>, RealStartOffset, RealSize), - Result = read(Opts, #{ <<"read">> => FakeID }, Opts), + ok = write_offset(ArweaveStoreOpts, FakeID, <<"ans104@1.0">>, RealStartOffset, RealSize, Opts), + Result = read(ArweaveStoreOpts, #{ <<"read">> => FakeID }, Opts), ?assertMatch({error, {id_mismatch, _, _}}, Result). %% @doc The L1 TX has bundle tags, but data is not a valid bundle. write_read_fake_bundle_tx_test() -> - Store = [hb_test_utils:test_store()], - Opts = #{ - <<"index-store">> => Store - }, + {_, ArweaveStoreOpts, Opts} = setup_test_store(), ID = <<"cGNURX2IUt98VKVIeXSfYe6eulNwPEqijaQfvatzd_o">>, Size = 2, StartOffset = 155309918167286, - ok = write_offset(Opts, ID, <<"tx@1.0">>, StartOffset, Size), - {ok, TX} = read(Opts, #{ <<"read">> => ID }, Opts), + ok = write_offset(ArweaveStoreOpts, ID, <<"tx@1.0">>, StartOffset, Size, Opts), + {ok, TX} = read(ArweaveStoreOpts, #{ <<"read">> => ID }, Opts), ?assert(hb_message:verify(TX, all, #{})), ok. + +%% @doc Interior Arweave offset returns bytes that are not a valid ANS-104 item, +%% so ar_bundles:deserialize/1 throws. The catch in load_item/4 must convert +%% that throw into {error, _} rather than crashing. +load_item_deserialize_throws_test() -> + {_, ArweaveStoreOpts, Opts} = setup_test_store(), + FakeID = <<"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB">>, + %% Same interior offset used in dev_arweave bundle_header_garbage_guard test: + %% the bytes at ProbeOffset are mid-TX application data, not an ANS-104 header. + ProbeOffset = 376836336327208, + Size = 4096, + ok = write_offset(ArweaveStoreOpts, FakeID, <<"ans104@1.0">>, ProbeOffset - 1, Size, Opts), + ?assertMatch({error, _}, read(ArweaveStoreOpts, #{<<"read">> => FakeID}, #{})). + +corrupt_item_ids_read_test() -> + {IndexStore, _StoreOpts, Opts} = setup_test_store(), + Height = 99999999, + ok = hb_store:write(IndexStore, #{block_indexed_path(Height) => <<"2">>}, Opts), + ok = hb_store:write(IndexStore, #{block_items_path(Height, 1) => <<0:256>>}, Opts), + ok = hb_store:write(IndexStore, #{block_items_path(Height, 2) => <<0:240>>}, Opts), + Counts = read_block_item_counts(Height, Opts), + erlang:display({counts, Counts}), + ?assertEqual(1, maps:get(<<"1">>, Counts)), + ?assertEqual(<<"corrupt">>, maps:get(<<"2">>, Counts)), + IDs = read_block_item_ids(Height, Opts), + ?assertEqual(1, length(maps:get(<<"1">>, IDs))), + ?assertEqual(<<"corrupt">>, maps:get(<<"2">>, IDs)), + ok. + +parent_encode_decode_test() -> + BlockEntry = encode_parent_entry(12345, block), + ?assertEqual(<<0, 12345:64/big-unsigned>>, BlockEntry), + BundleID = crypto:strong_rand_bytes(32), + BundleEntry = encode_parent_entry(BundleID, bundle), + ?assertEqual(<<1, BundleID:32/binary>>, BundleEntry), + Combined = <>, + Decoded = decode_parent_entries(Combined), + ?assertEqual([{12345, block}, {BundleID, bundle}], Decoded), + ok. + +parent_not_found_test() -> + {_IndexStore, ArweaveStoreOpts, Opts} = setup_test_store(), + UnknownID = crypto:strong_rand_bytes(32), + ?assertEqual( + not_found, + hb_store_arweave:read_parent(ArweaveStoreOpts, UnknownID, Opts), + Opts + ), + ok. + +decode_item_ids_validation_test() -> + ?assertEqual([], decode_item_ids(<<>>)), + GoodBin = <<0:256, 1:256>>, + ?assertEqual(2, length(decode_item_ids(GoodBin))), + BadBin = <<0:240>>, + ?assertEqual({error, invalid_item_ids_binary}, decode_item_ids(BadBin)), + ok. + diff --git a/src/hb_store_arweave_offset.erl b/src/hb_store_arweave_offset.erl index 7b4d5a914..1913645d5 100644 --- a/src/hb_store_arweave_offset.erl +++ b/src/hb_store_arweave_offset.erl @@ -26,7 +26,7 @@ %%% to contract to only the number of bytes actually necessary to represent it. %%% -module(hb_store_arweave_offset). --export([encode/3, decode/1, path/1]). +-export([encode/3, decode/1, path/1, mismatch_path/1]). -include("include/hb.hrl"). %% @doc Determine if a value is within a given unsigned bit range. @@ -42,6 +42,10 @@ path(ID) when ?IS_ID(ID) -> hb_util:native_id(ID); path(ID) -> throw({cannot_encode_path, ID}). +mismatch_path(ID) when ?IS_ID(ID) -> + <<"mismatch/", (hb_util:native_id(ID))/binary>>; +mismatch_path(ID) -> throw({cannot_encode_mismatch_path, ID}). + %% @doc Encode the offset of the data if it is valid. Throws `cannot_encode_offset' %% if invalid. encode(Type, StartOffset, Length) diff --git a/src/hb_store_lmdb.erl b/src/hb_store_lmdb.erl index a5b649ffc..8f3ad4cbd 100644 --- a/src/hb_store_lmdb.erl +++ b/src/hb_store_lmdb.erl @@ -23,6 +23,7 @@ -export([start/3, stop/3, scope/0, scope/1, reset/3]). -export([read/3, write/3, list/3, match/3]). -export([group/3, link/3, type/3, resolve/3]). +-export([overlay_count/1]). %% Test framework and project includes -include_lib("eunit/include/eunit.hrl"). @@ -62,9 +63,12 @@ start(Opts = #{ <<"name">> := DataDir }, _Req, _NodeOpts) -> batch_size, hb_util:int(maps:get(<<"batch-size">>, Opts, ?DEFAULT_BATCH_SIZE)) }, - no_mem_init, - no_sync + no_mem_init ] ++ + case maps:get(<<"sync">>, Opts, false) of + true -> []; + false -> [no_sync] + end ++ case maps:get(<<"read-ahead">>, Opts, true) of true -> []; false -> [no_readahead] @@ -84,7 +88,11 @@ start(Opts = #{ <<"name">> := DataDir }, _Req, _NodeOpts) -> % Create the LMDB environment with specified size limit {ok, Env} = elmdb:env_open(DataDirPath, EnvOpts), {ok, DBInstance} = elmdb:db_open(Env, [create]), - {ok, #{ <<"env">> => Env, <<"db">> => DBInstance }}; + SyncInterval = hb_util:int(maps:get(<<"sync-interval">>, Opts, 0)), + MonitorPid = spawn(fun() -> + overlay_monitor_loop(Env, DBInstance, DataDir, SyncInterval, 0) + end), + {ok, #{ <<"env">> => Env, <<"db">> => DBInstance, <<"monitor">> => MonitorPid }}; start(_Store, _Req, _NodeOpts) -> {error, {badarg, <<"StoreOpts must be a map">>}}. @@ -547,8 +555,27 @@ resolve(Opts, #{ <<"resolve">> := Path }, _NodeOpts) -> %% @doc Retrieve or create the LMDB environment handle for a database. find_env(Opts) -> hb_store:find(Opts). +%% @doc Return the number of writes currently pending in the elmdb overlay. +%% Safe to call on any live database — does not trigger any I/O. +-spec overlay_count(map()) -> non_neg_integer(). +overlay_count(Opts) -> + #{ <<"db">> := DB } = find_env(Opts), + elmdb:overlay_count(DB). + %% Shutdown LMDB environment and cleanup resources -stop(#{ <<"store-module">> := ?MODULE, <<"name">> := DataDir }, _Req, _Opts) -> +stop(#{ <<"store-module">> := ?MODULE, <<"name">> := DataDir } = StoreOpts, _Req, _Opts) -> + case maps:get(<<"monitor">>, StoreOpts, undefined) of + undefined -> ok; + Pid -> + Ref = erlang:monitor(process, Pid), + exit(Pid, shutdown), + receive + {'DOWN', Ref, process, Pid, _Reason} -> ok + after 5000 -> + erlang:demonitor(Ref, [flush]), + ok + end + end, % Soft-close by name; refs stay valid and reopen lazily on next access. catch elmdb:env_close_by_name(hb_util:list(DataDir)), ok; @@ -593,6 +620,26 @@ sample_metrics(Name, StartTime, Type) -> miss -> ok end. +%% @doc Periodically samples overlay_count and reports it to Prometheus. +%% When sync-interval > 0, also calls env_sync every that many seconds, +%% decoupling durability from the per-commit flush worker path. +overlay_monitor_loop(Env, DBInstance, StoreName, SyncInterval, SecondsSinceSync) -> + receive + stop -> ok + after 1000 -> + Count = elmdb:overlay_count(DBInstance), + hb_prometheus:set(gauge, hb_store_lmdb_overlay_count, [StoreName], Count), + NextSecondsSinceSync = + case SyncInterval > 0 andalso SecondsSinceSync + 1 >= SyncInterval of + true -> + elmdb:env_sync(Env), + 0; + false -> + SecondsSinceSync + 1 + end, + overlay_monitor_loop(Env, DBInstance, StoreName, SyncInterval, NextSecondsSinceSync) + end. + init_prometheus() -> hb_prometheus:declare(histogram, [ {name, hb_store_lmdb_duration_seconds}, @@ -605,6 +652,11 @@ init_prometheus() -> {labels, [name]}, {help, "LMDB name requested"} ]), + hb_prometheus:declare(gauge, [ + {name, hb_store_lmdb_overlay_count}, + {labels, [store_name]}, + {help, "Number of writes pending in the elmdb overlay for each store"} + ]), ok. %% @doc Test suite demonstrating basic store operations. diff --git a/src/include/hb_store_arweave.hrl b/src/include/hb_store_arweave.hrl new file mode 100644 index 000000000..6bd62ae9b --- /dev/null +++ b/src/include/hb_store_arweave.hrl @@ -0,0 +1,3 @@ +-define(SCOPE_PARENT, <<"parent">>). +-define(SCOPE_OFFSET, <<"offset">>). + diff --git a/test/arbundles.js/upload-items.js b/test/arbundles.js/upload-items.js index 9bc63d475..88b07ff19 100644 --- a/test/arbundles.js/upload-items.js +++ b/test/arbundles.js/upload-items.js @@ -7,15 +7,15 @@ const BUNDLER_URL = "http://localhost:8734"; const DEFAULT_WALLET = "../../hyperbeam-key.json"; const CONCURRENT_UPLOADS = 100; // Number of parallel uploads -async function performanceTest(walletPath, itemCount, bytesPerItem = 0) { +async function performanceTest(walletPath, itemCount, bytesPerItem = 0, bundlerUrl = BUNDLER_URL) { const wallet = require(path.resolve(walletPath)); const signer = new ArweaveSigner(wallet); - const endpoint = `${BUNDLER_URL}/~bundler@1.0/item?codec-device=ans104@1.0`; + const endpoint = `${bundlerUrl}/~bundler@1.0/item?codec-device=ans104@1.0`; console.log("\n" + "=".repeat(70)); console.log("ANS-104 Bundle Upload Performance Test"); console.log("=".repeat(70)); - console.log(`Target: ${BUNDLER_URL}`); + console.log(`Target: ${bundlerUrl}`); console.log(`Items: ${itemCount}`); console.log(`Item Size: ${bytesPerItem > 0 ? `~${bytesPerItem} bytes` : 'default'}`); console.log(`Concurrent: ${CONCURRENT_UPLOADS}`); @@ -138,23 +138,26 @@ if (require.main === module) { const walletPath = firstIsNumber ? DEFAULT_WALLET : (process.argv[2] || DEFAULT_WALLET); const itemCount = parseInt(firstIsNumber ? process.argv[2] : process.argv[3], 10); const bytesPerItem = parseInt(firstIsNumber ? process.argv[3] : process.argv[4], 10) || 0; + const bundlerUrl = (firstIsNumber ? process.argv[4] : process.argv[5]) || BUNDLER_URL; if (!itemCount || itemCount < 1 || isNaN(itemCount)) { - console.error("Usage: node upload-items.js [wallet_path] [bytes_per_item]"); + console.error("Usage: node upload-items.js [wallet_path] [bytes_per_item] [bundler_url]"); console.error(""); console.error("Arguments:"); console.error(" wallet_path - Path to Arweave wallet JSON (default: ../../hyperbeam-key.json)"); console.error(" number_of_items - Number of data items to create and upload"); console.error(" bytes_per_item - Minimum size of each item in bytes (optional)"); + console.error(" bundler_url - Bundler base URL (default: " + BUNDLER_URL + ")"); console.error(""); console.error("Examples:"); console.error(" node upload-items.js 100"); console.error(" node upload-items.js 100 1024"); console.error(" node upload-items.js /path/to/wallet.json 100 1024"); + console.error(" node upload-items.js 100 0 http://other-bundler:8734"); process.exit(1); } - performanceTest(walletPath, itemCount, bytesPerItem) + performanceTest(walletPath, itemCount, bytesPerItem, bundlerUrl) .then(() => { process.exit(0); })