Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
30125cb
perf: parallelize setup_subscriptions
thlorenz Feb 26, 2026
c3d5dcd
perf: parallelize ATA/eATA subscriptions
thlorenz Feb 26, 2026
79bda11
perf: parallelize undelegation refresh checks via JoinSet
thlorenz Feb 26, 2026
06ec7f9
perf: parallelize ATA delegation record fetches
thlorenz Feb 26, 2026
860bfd7
chore: managed JoinSet loop for subscription updates
thlorenz Feb 26, 2026
26877c4
fix: rollback fetching on sub setup fail
thlorenz Feb 27, 2026
a992eb8
perf: deduplicate pubkeys before ATA subscription
thlorenz Feb 27, 2026
1f7eae6
fix: queue ATAs for cloning when eATA derivation fails
thlorenz Feb 27, 2026
921e9ae
chore: fmt
thlorenz Feb 27, 2026
1484c1f
chore: expect mutex lock (instead of unwrap)
thlorenz Feb 27, 2026
c9327d5
chore: less flaky escrow test
thlorenz Feb 27, 2026
bd27196
perf: optimize in_bank membership check from O(n²) to O(n)
thlorenz Feb 27, 2026
cdecee6
chore: fmt + lint
thlorenz Feb 27, 2026
743c13b
chore: more retries for escrow test
thlorenz Feb 27, 2026
8a17464
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Feb 27, 2026
226adfa
Revert "chore: managed JoinSet loop for subscription updates"
thlorenz Feb 27, 2026
0c1f8e4
Revert "chore: more retries for escrow test"
thlorenz Feb 27, 2026
e85b7f4
Revert "chore: less flaky escrow test"
thlorenz Feb 27, 2026
9ddba87
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 2, 2026
fd9af5e
Reapply "chore: managed JoinSet loop for subscription updates"
thlorenz Mar 2, 2026
439d251
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 2, 2026
23dc903
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 2, 2026
a0526fa
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 3, 2026
2a4b4fb
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 3, 2026
5097834
fix: only ignore updates if they have a newer (not equal) slot
thlorenz Mar 3, 2026
04c685c
chore: update test to allow same slot updates
thlorenz Mar 3, 2026
1cda159
chore: fmt
thlorenz Mar 3, 2026
5e70cf8
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 3, 2026
ef85356
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 3, 2026
cda3654
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 4, 2026
0622081
chore: removing flaky escrow test
thlorenz Mar 4, 2026
3f08468
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 4, 2026
d0c2537
Merge branch 'master' into thlorenz/parallel-subs
thlorenz Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 102 additions & 50 deletions magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::atomic::AtomicU16;
use std::collections::HashSet;

use dlp::state::DelegationRecord;
use futures_util::future::join_all;
use magicblock_accounts_db::traits::AccountsBank;
use magicblock_core::{
logger::log_trace_warn, token_programs::try_derive_eata_address_and_bump,
};
use magicblock_core::token_programs::try_derive_eata_address_and_bump;
use magicblock_metrics::metrics;
use solana_account::AccountSharedData;
use solana_pubkey::Pubkey;
Expand All @@ -13,7 +13,9 @@ use tracing::*;
use super::{delegation, types::AccountWithCompanion, FetchCloner};
use crate::{
cloner::{AccountCloneRequest, Cloner},
remote_account_provider::{ChainPubsubClient, ChainRpcClient},
remote_account_provider::{
ChainPubsubClient, ChainRpcClient, ResolvedAccountSharedData,
},
};

/// Resolves ATAs with eATA projection.
Expand Down Expand Up @@ -45,53 +47,86 @@ where
let mut accounts_to_clone = vec![];
let mut ata_join_set = JoinSet::new();

// Subscribe first so subsequent fetches are kept up-to-date
// Collect all pubkeys to subscribe to and spawn fetch tasks
let mut pubkeys_to_subscribe = vec![];

for (ata_pubkey, _, ata_info, ata_account_slot) in &atas {
if let Err(err) = this.subscribe_to_account(ata_pubkey).await {
static ATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 =
AtomicU16::new(0);
log_trace_warn(
"Failed to subscribe to ATA",
"Failed to subscribe to ATAs",
&ata_pubkey,
&err,
1000,
&ATA_SUBSCRIPTION_FAILURE_COUNT,
);
}
// Collect ATA pubkey for subscription
pubkeys_to_subscribe.push(*ata_pubkey);

let effective_slot = if let Some(min_slot) = min_context_slot {
min_slot.max(*ata_account_slot)
} else {
*ata_account_slot
};

if let Some((eata, _)) =
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint)
{
if let Err(err) = this.subscribe_to_account(&eata).await {
static EATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 =
AtomicU16::new(0);
log_trace_warn(
"Failed to subscribe to derived eATA",
"Failed to subscribe to derived eATAs",
&eata,
&err,
1000,
&EATA_SUBSCRIPTION_FAILURE_COUNT,
);
}
// Collect eATA pubkey for subscription
pubkeys_to_subscribe.push(eata);

let effective_slot = if let Some(min_slot) = min_context_slot {
min_slot.max(*ata_account_slot)
} else {
*ata_account_slot
};
ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion(
this,
*ata_pubkey,
eata,
effective_slot,
fetch_origin,
));
} else {
// eATA derivation failed, but still queue the ATA for cloning
// without a companion by using a dummy companion pubkey
// The resolve_account_with_companion logic handles the case
// where the companion is not found
ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion(
this,
*ata_pubkey,
Pubkey::default(), // Dummy companion - will be marked as NotFound
effective_slot,
fetch_origin,
));
}
}

// Deduplicate pubkeys to avoid redundant subscribe calls
pubkeys_to_subscribe = pubkeys_to_subscribe
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect();

// Subscribe to all ATA and eATA accounts in parallel
let subscription_results = join_all(
pubkeys_to_subscribe
.iter()
.map(|pk| this.subscribe_to_account(pk)),
)
.await;

for (pubkey, result) in
pubkeys_to_subscribe.iter().zip(subscription_results)
{
if let Err(err) = result {
warn!(
pubkey = %pubkey,
err = ?err,
"Failed to subscribe to ATA/eATA account"
);
}
}

let ata_results = ata_join_set.join_all().await;

// Phase 1: Collect successfully resolved ATAs
struct AtaResolutionInput {
ata_pubkey: Pubkey,
ata_account: ResolvedAccountSharedData,
eata_pubkey: Pubkey,
eata_shared: Option<AccountSharedData>,
}

let mut ata_inputs: Vec<AtaResolutionInput> = Vec::new();

for result in ata_results {
let AccountWithCompanion {
pubkey: ata_pubkey,
Expand All @@ -110,31 +145,48 @@ where
}
};

// Defaults: clone the ATA as-is
let mut account_to_clone = ata_account.account_shared_data_cloned();
let mut commit_frequency_ms = None;
let mut delegated_to_other = None;

// If there's an eATA, try to use it + delegation record to project the ATA
if let Some(eata_acc) = maybe_eata_account {
let eata_shared = eata_acc.account_shared_data_cloned();
let eata_shared =
maybe_eata_account.map(|e| e.account_shared_data_cloned());
ata_inputs.push(AtaResolutionInput {
ata_pubkey,
ata_account,
eata_pubkey,
eata_shared,
});
}

if let Some(deleg) = delegation::fetch_and_parse_delegation_record(
// Phase 2: Fetch delegation records in parallel for all eATAs
let deleg_futures = ata_inputs.iter().filter_map(|input| {
input.eata_shared.as_ref().map(|_| {
delegation::fetch_and_parse_delegation_record(
this,
eata_pubkey,
input.eata_pubkey,
this.remote_account_provider.chain_slot(),
fetch_origin,
)
.await
{
})
});
let deleg_results: Vec<Option<DelegationRecord>> =
join_all(deleg_futures).await;

// Phase 3: Combine results
let mut deleg_iter = deleg_results.into_iter();
for input in ata_inputs {
let mut account_to_clone =
input.ata_account.account_shared_data_cloned();
let mut commit_frequency_ms = None;
let mut delegated_to_other = None;

if let Some(eata_shared) = &input.eata_shared {
if let Some(Some(deleg)) = deleg_iter.next() {
delegated_to_other =
delegation::get_delegated_to_other(this, &deleg);
commit_frequency_ms = Some(deleg.commit_frequency_ms);

if let Some(projected_ata) = this
.maybe_project_delegated_ata_from_eata(
ata_account.account_shared_data(),
&eata_shared,
input.ata_account.account_shared_data(),
eata_shared,
&deleg,
)
{
Expand All @@ -144,7 +196,7 @@ where
}

accounts_to_clone.push(AccountCloneRequest {
pubkey: ata_pubkey,
pubkey: input.ata_pubkey,
account: account_to_clone,
commit_frequency_ms,
delegated_to_other,
Expand Down
Loading