Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ data class UpstreamsConfig(
val labels: Labels = Labels(),
var methods: Methods? = null,
var methodGroups: MethodGroups? = null,
var subscriptions: Subscriptions? = null,
var role: UpstreamRole = UpstreamRole.PRIMARY,
var customHeaders: Map<String, String> = emptyMap(),
var additionalSettings: AdditionalSettings? = null,
Expand Down Expand Up @@ -215,4 +216,8 @@ data class UpstreamsConfig(
val quorum: String? = null,
val static: String? = null,
)

data class Subscriptions(
val disabled: Set<String>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class UpstreamsConfigReader(
upstream.options = optionsReader.read(upNode)
upstream.methods = tryReadMethods(upNode)
upstream.methodGroups = tryReadMethodGroups(upNode)
upstream.subscriptions = tryReadSubscriptions(upNode)
upstream.additionalSettings = readAdditionalSettings(upNode)
getValueAsBool(upNode, "enabled")?.let {
upstream.isEnabled = it
Expand Down Expand Up @@ -406,4 +407,11 @@ class UpstreamsConfigReader(
)
}
}

private fun tryReadSubscriptions(upNode: MappingNode): UpstreamsConfig.Subscriptions? {
return getMapping(upNode, "subscriptions")?.let { snode ->
val disabled = getListOfString(snode, "disabled")?.toSet() ?: emptySet()
UpstreamsConfig.Subscriptions(disabled)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { ms ->
val pendingTxes: PendingTxesSource = (ms.getAll())
val allUpstreams = ms.getAll()
.filter { it is GenericUpstream }
.map { it as GenericUpstream }

val disabledTopics = allUpstreams
.flatMap { it.getDisabledSubscriptions() }
.toSet()

val pendingTxes: PendingTxesSource = allUpstreams
.filter { it.getIngressSubscription() is EthereumIngressSubscription }
.mapNotNull {
(it.getIngressSubscription() as EthereumIngressSubscription).getPendingTxes()
Expand All @@ -84,7 +90,7 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
AggregatedPendingTxes(it)
}
}
EthereumEgressSubscription(ms, headScheduler, pendingTxes)
EthereumEgressSubscription(ms, headScheduler, pendingTxes, disabledTopics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ data class Transaction(
val s: String?,
)

open class EthereumEgressSubscription(
open class EthereumEgressSubscription @JvmOverloads constructor(
val upstream: Multistream,
val scheduler: Scheduler,
val pendingTxesSource: PendingTxesSource?,
private val disabledTopics: Set<String> = emptySet(),
) : EgressSubscription {

companion object {
Expand All @@ -72,15 +73,19 @@ open class EthereumEgressSubscription(
} else {
listOf()
}
return if (pendingTxesSource != null) {
val withPending = if (pendingTxesSource != null) {
subs.plus(listOf(METHOD_PENDING_TXES, METHOD_DRPC_PENDING_TXES))
} else {
subs
}
return withPending.filter { it !in disabledTopics }
}

@Suppress("UNCHECKED_CAST")
override fun subscribe(topic: String, params: Any?, matcher: Selector.Matcher, unsubscribeMethod: String): Flux<out Any> {
if (topic in disabledTopics) {
return Flux.error(UnsupportedOperationException("subscription $topic is disabled"))
}
if (topic == METHOD_NEW_HEADS) {
return newHeads.connect(matcher)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ open class GenericUpstream(
finalizationDetectorBuilder: FinalizationDetectorBuilder,
versionRules: Supplier<CompatibleVersionsRules?>,
private val additionalSettings: UpstreamsConfig.AdditionalSettings?,
private val disabledSubscriptions: Set<String> = emptySet(),
) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain),
Lifecycle {
constructor(
Expand Down Expand Up @@ -98,6 +99,7 @@ open class GenericUpstream(
finalizationDetectorBuilder,
versionRules,
config.additionalSettings,
config.subscriptions?.disabled ?: emptySet(),
) {
rpcMethodsDetector = upstreamRpcMethodsDetectorBuilder(this, config)
detectRpcMethods(config, buildMethods)
Expand Down Expand Up @@ -449,6 +451,8 @@ open class GenericUpstream(
return additionalSettings
}

fun getDisabledSubscriptions(): Set<String> = disabledSubscriptions

fun isValid(): Boolean = isUpstreamValid.get()

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class GenericUpstreamMock extends GenericUpstream {
io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&finalizationDetectorBuilder,
[get: { null }] as java.util.function.Supplier,
null,
Collections.emptySet(),
)
this.ethereumHeadMock = this.getHead() as EthereumHeadMock
setLag(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class FilteredApisSpec extends Specification {
cs.&finalizationDetectorBuilder,
[get: { null }] as java.util.function.Supplier,
null,
Collections.emptySet(),
)
}
def matcher = new Selector.LabelMatcher("test", ["foo"])
Expand Down
Loading