diff --git a/pom.xml b/pom.xml
index 32cb04ae..a2e69dc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
dev.vality
service-parent-pom
- 3.1.9
+ 3.1.10
analytics
@@ -26,8 +26,11 @@
analytics
analytics
jdbc:postgresql://localhost:${db.port}/${db.name}
+ 10.22.0
+
+
@@ -64,8 +67,8 @@
dev.vality
- xrates-proto
- 1.19-e7ad3f5
+ exrates-proto
+ 1.9-5d53aec
dev.vality
@@ -146,11 +149,17 @@
org.flywaydb
flyway-core
+ ${flyway.version}
org.flywaydb
flyway-database-postgresql
+
+ org.flywaydb
+ flyway-database-clickhouse
+ 10.24.0
+
org.apache.kafka
kafka-clients
@@ -179,7 +188,7 @@
dev.vality
testcontainers-annotations
- 3.3.2
+ 4.1.3
test
@@ -254,6 +263,7 @@
org.flywaydb
flyway-maven-plugin
+ ${flyway.version}
${db.url}
${db.user}
@@ -343,6 +353,19 @@
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.42
+
+
+
+
diff --git a/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java b/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java
index 0735889b..bd17a232 100644
--- a/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java
+++ b/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java
@@ -1,17 +1,41 @@
package dev.vality.analytics.config;
import dev.vality.analytics.config.properties.ClickHouseDbProperties;
+import org.flywaydb.core.Flyway;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
@Configuration
public class ClickHouseConfig {
+ private static final String NON_SHARDED_SCHEMA_MODE = "non-sharded";
+ private static final String SHARDED_SCHEMA_MODE = "sharded";
+ private static final String NON_SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/non-sharded";
+ private static final String SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/sharded";
+ private static final String CLUSTER_PLACEHOLDER = "cluster";
+ private static final String SHARD_PLACEHOLDER = "shard";
+ private static final String REPLICA_PLACEHOLDER = "replica";
+ private static final String CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX = "clickhouse.flyway.sharded.";
+ private static final String SHARDED_CLUSTER_PROPERTY =
+ CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + CLUSTER_PLACEHOLDER;
+ private static final String SHARDED_SHARD_PROPERTY =
+ CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + SHARD_PLACEHOLDER;
+ private static final String SHARDED_REPLICA_PROPERTY =
+ CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + REPLICA_PLACEHOLDER;
+
@Bean(name = "clickHouseDataSourceProperties")
@ConfigurationProperties(prefix = "clickhouse.db")
public ClickHouseDbProperties clickHouseDataSourceProperties() {
@@ -34,6 +58,34 @@ public JdbcTemplate clickHouseJdbcTemplate(@Qualifier("clickHouseDataSource") Da
return new JdbcTemplate(clickHouseDataSource);
}
+ @Bean
+ @DependsOn("flyway")
+ @ConditionalOnProperty(prefix = "clickhouse.flyway", name = "enabled", havingValue = "true", matchIfMissing = true)
+ public Flyway clickHouseFlyway(
+ @Qualifier("clickHouseDataSource") DataSource clickHouseDataSource,
+ @Qualifier("dataSourceProperties") DataSourceProperties postgresDataSourceProperties,
+ @Value("${clickhouse.flyway.schema-mode:non-sharded}") String schemaMode,
+ @Value("${" + SHARDED_CLUSTER_PROPERTY + ":}") String shardedCluster,
+ @Value("${" + SHARDED_SHARD_PROPERTY + ":}") String shardedShard,
+ @Value("${" + SHARDED_REPLICA_PROPERTY + ":}") String shardedReplica,
+ @Value("${postgres.db.schema}") String postgresSchema) {
+ String normalizedSchemaMode = schemaMode.toLowerCase(Locale.ROOT);
+ Map placeholders = new LinkedHashMap<>(resolveFlywayShardedPlaceholders(
+ normalizedSchemaMode, shardedCluster, shardedShard, shardedReplica));
+ placeholders.putAll(ClickHouseFlywaySupport.resolvePostgresPlaceholders(
+ postgresDataSourceProperties.getUrl(),
+ postgresDataSourceProperties.getUsername(),
+ postgresDataSourceProperties.getPassword(),
+ postgresSchema));
+ final var flyway = ClickHouseFlywaySupport.createFlyway(
+ clickHouseDataSource,
+ List.of(resolveClickHouseMigrationLocation(normalizedSchemaMode)),
+ placeholders,
+ "clickhouse_flyway_schema_history");
+ flyway.migrate();
+ return flyway;
+ }
+
private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) {
var urlBuilder = new StringBuilder();
urlBuilder.append(properties.getUrl());
@@ -49,4 +101,44 @@ private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) {
}
return urlBuilder.toString();
}
+
+ private String resolveClickHouseMigrationLocation(String schemaMode) {
+ return switch (schemaMode) {
+ case NON_SHARDED_SCHEMA_MODE -> NON_SHARDED_MIGRATION_LOCATION;
+ case SHARDED_SCHEMA_MODE -> SHARDED_MIGRATION_LOCATION;
+ default -> throw new IllegalArgumentException(
+ String.format("Unsupported clickhouse.flyway.schema-mode: %s. Allowed values: %s, %s",
+ schemaMode, NON_SHARDED_SCHEMA_MODE, SHARDED_SCHEMA_MODE));
+ };
+ }
+
+ private Map resolveFlywayShardedPlaceholders(
+ String schemaMode,
+ String shardedCluster,
+ String shardedShard,
+ String shardedReplica) {
+ if (NON_SHARDED_SCHEMA_MODE.equals(schemaMode)) {
+ return Map.of();
+ }
+
+ if (SHARDED_SCHEMA_MODE.equals(schemaMode)) {
+ validateRequiredShardedProperty(SHARDED_CLUSTER_PROPERTY, shardedCluster);
+ validateRequiredShardedProperty(SHARDED_SHARD_PROPERTY, shardedShard);
+ validateRequiredShardedProperty(SHARDED_REPLICA_PROPERTY, shardedReplica);
+ return Map.of(
+ CLUSTER_PLACEHOLDER, shardedCluster,
+ SHARD_PLACEHOLDER, shardedShard,
+ REPLICA_PLACEHOLDER, shardedReplica);
+ }
+
+ return Map.of();
+ }
+
+ private void validateRequiredShardedProperty(String propertyName, String value) {
+ if (value == null || value.isBlank()) {
+ throw new IllegalArgumentException(
+ String.format("Property '%s' must be configured when clickhouse.flyway.schema-mode=sharded",
+ propertyName));
+ }
+ }
}
diff --git a/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java b/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java
new file mode 100644
index 00000000..2c23c8db
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java
@@ -0,0 +1,77 @@
+package dev.vality.analytics.config;
+
+import lombok.experimental.UtilityClass;
+import org.flywaydb.core.Flyway;
+
+import javax.sql.DataSource;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@UtilityClass
+final class ClickHouseFlywaySupport {
+
+ private static final String POSTGRES_HOST = "postgresHost";
+ private static final String POSTGRES_PORT = "postgresPort";
+ private static final String POSTGRES_USER = "postgresUser";
+ private static final String POSTGRES_PASSWORD = "postgresPassword";
+ private static final String POSTGRES_DATABASE = "postgresDatabase";
+ private static final String POSTGRES_SCHEMA = "postgresSchema";
+ private static final Pattern POSTGRES_JDBC_URL_PATTERN = Pattern.compile(
+ "^jdbc:postgresql://(?[^:/?]+)(?::(?\\d+))?/(?[^?]+).*$");
+
+ static Flyway createFlyway(
+ DataSource dataSource,
+ List locations,
+ Map placeholders,
+ String schemaHistoryTable) {
+ return Flyway.configure()
+ .dataSource(dataSource)
+ .locations(locations.toArray(String[]::new))
+ .placeholderPrefix("<<")
+ .placeholderSuffix(">>")
+ .placeholders(placeholders)
+ .table(schemaHistoryTable)
+ .baselineOnMigrate(true)
+ .load();
+ }
+
+ static Map resolvePostgresPlaceholders(
+ String jdbcUrl,
+ String username,
+ String password,
+ String schema) {
+ Matcher matcher = POSTGRES_JDBC_URL_PATTERN.matcher(jdbcUrl);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Unsupported postgres.db.url format: " + jdbcUrl);
+ }
+
+ String port = matcher.group("port");
+ return resolvePostgresPlaceholders(
+ matcher.group("host"),
+ port == null ? 5432 : Integer.parseInt(port),
+ matcher.group("database"),
+ username,
+ password,
+ schema);
+ }
+
+ static Map resolvePostgresPlaceholders(
+ String host,
+ int port,
+ String database,
+ String username,
+ String password,
+ String schema) {
+ Map placeholders = new LinkedHashMap<>();
+ placeholders.put(POSTGRES_HOST, host);
+ placeholders.put(POSTGRES_PORT, Integer.toString(port));
+ placeholders.put(POSTGRES_USER, username);
+ placeholders.put(POSTGRES_PASSWORD, password);
+ placeholders.put(POSTGRES_DATABASE, database);
+ placeholders.put(POSTGRES_SCHEMA, schema);
+ return placeholders;
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/config/KafkaConfig.java b/src/main/java/dev/vality/analytics/config/KafkaConfig.java
index 45940403..600b39e8 100644
--- a/src/main/java/dev/vality/analytics/config/KafkaConfig.java
+++ b/src/main/java/dev/vality/analytics/config/KafkaConfig.java
@@ -2,6 +2,8 @@
import dev.vality.analytics.serde.HistoricalCommitDeserializer;
import dev.vality.analytics.serde.MachineEventDeserializer;
+import dev.vality.analytics.serde.CurrencyEventKafkaDeserializer;
+import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.analytics.service.ConsumerGroupIdService;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
@@ -87,10 +89,10 @@ public ConcurrentKafkaListenerContainerFactory dominan
}
@Bean
- public ConcurrentKafkaListenerContainerFactory rateContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory =
+ public ConcurrentKafkaListenerContainerFactory rateContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
- initDefaultListenerProperties(factory, rateGroupId, new MachineEventDeserializer(),
+ initDefaultListenerProperties(factory, rateGroupId, new CurrencyEventKafkaDeserializer(),
maxPollRecordsRatesListener);
return factory;
}
diff --git a/src/main/java/dev/vality/analytics/config/SerializeConfig.java b/src/main/java/dev/vality/analytics/config/SerializeConfig.java
index 41d903fb..e601706d 100644
--- a/src/main/java/dev/vality/analytics/config/SerializeConfig.java
+++ b/src/main/java/dev/vality/analytics/config/SerializeConfig.java
@@ -1,11 +1,12 @@
package dev.vality.analytics.config;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
+import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.geck.serializer.Geck;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import dev.vality.sink.common.serialization.BinaryDeserializer;
import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer;
-import dev.vality.xrates.rate.Change;
+
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -13,11 +14,11 @@
public class SerializeConfig {
@Bean
- public BinaryDeserializer rateEventDataBinaryDeserializer() {
+ public BinaryDeserializer currencyEventBinaryDeserializer() {
return new AbstractThriftBinaryDeserializer<>() {
@Override
- public Change deserialize(byte[] bytes) {
- return Geck.msgPackToTBase(bytes, Change.class);
+ public CurrencyEvent deserialize(byte[] bytes) {
+ return Geck.msgPackToTBase(bytes, CurrencyEvent.class);
}
};
}
diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java
index 06f749ae..4aa976cb 100644
--- a/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java
+++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java
@@ -28,19 +28,9 @@ public void saveRateBatch(List rates) {
.map(rate -> getDslContext().newRecord(RATE, rate))
.map(rateRecord -> getDslContext()
.insertInto(RATE).set(rateRecord)
- .onConflict(RATE.SOURCE_ID, RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE,
- RATE.LOWER_BOUND_INCLUSIVE, RATE.UPPER_BOUND_EXCLUSIVE)
+ .onConflict(RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE, RATE.EVENT_TIME)
.doNothing())
.collect(Collectors.toList());
batchExecute(queries);
}
-
- public Rate getRate(String sourceId, String sourceCode, String destinationCode) {
- Query query = getDslContext().selectFrom(RATE)
- .where(RATE.SOURCE_ID.eq(sourceId))
- .and(RATE.SOURCE_SYMBOLIC_CODE.eq(sourceCode)
- .and(RATE.DESTINATION_SYMBOLIC_CODE.eq(destinationCode)));
- return fetchOne(query, rateRowMapper);
- }
-
}
diff --git a/src/main/java/dev/vality/analytics/listener/RateListener.java b/src/main/java/dev/vality/analytics/listener/RateListener.java
index 13d24618..93d4d8c1 100644
--- a/src/main/java/dev/vality/analytics/listener/RateListener.java
+++ b/src/main/java/dev/vality/analytics/listener/RateListener.java
@@ -1,7 +1,7 @@
package dev.vality.analytics.listener;
-import dev.vality.analytics.listener.handler.rate.RateMachineEventHandler;
-import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.analytics.listener.handler.rate.CurrencyEventHandler;
+import dev.vality.exrates.events.CurrencyEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
@@ -17,18 +17,18 @@
@RequiredArgsConstructor
public class RateListener {
- private final RateMachineEventHandler rateMachineEventHandler;
+ private final CurrencyEventHandler currencyEventHandler;
@KafkaListener(autoStartup = "${kafka.listener.rate.enabled}",
topics = "${kafka.topic.rate.initial}",
containerFactory = "rateContainerFactory")
- public void handle(List batch,
+ public void handle(List batch,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment ack) throws InterruptedException {
log.info("Got RateListener listen offsets: {}, partition: {}, batch.size: {}",
offsets, partition, batch.size());
- rateMachineEventHandler.handle(batch, ack);
+ currencyEventHandler.handle(batch, ack);
log.info("Batch RateListener has been committed, size={}", batch.size());
}
diff --git a/src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java
similarity index 54%
rename from src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java
rename to src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java
index c9b37cad..e3f529a4 100644
--- a/src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java
+++ b/src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java
@@ -2,10 +2,8 @@
import dev.vality.analytics.dao.repository.postgres.RateDao;
import dev.vality.analytics.domain.db.tables.pojos.Rate;
-import dev.vality.analytics.listener.mapper.Mapper;
-import dev.vality.machinegun.eventsink.MachineEvent;
-import dev.vality.sink.common.parser.impl.MachineEventParser;
-import dev.vality.xrates.rate.Change;
+import dev.vality.analytics.listener.mapper.rate.CurrencyEventMapper;
+import dev.vality.exrates.events.CurrencyEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@@ -16,43 +14,43 @@
import org.springframework.util.CollectionUtils;
import java.time.Duration;
-import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
@Service
@Slf4j
@RequiredArgsConstructor
-public class RateMachineEventHandler {
+public class CurrencyEventHandler {
- private final MachineEventParser eventParser;
- private final List>> mappers;
+ private final CurrencyEventMapper currencyEventMapper;
private final RateDao rateDao;
@Value("${kafka.consumer.throttling-timeout-ms}")
private int throttlingTimeout;
@Transactional(propagation = Propagation.REQUIRED)
- public void handle(List batch, Acknowledgment ack) throws InterruptedException {
+ public void handle(
+ final List batch,
+ final Acknowledgment ack) throws InterruptedException {
try {
if (CollectionUtils.isEmpty(batch)) {
+ ack.acknowledge();
return;
}
- for (MachineEvent machineEvent : batch) {
- final Change change = eventParser.parse(machineEvent);
- final List rates = mappers.stream()
- .filter(mapper -> mapper.accept(change))
- .map(mapper -> mapper.map(change, machineEvent))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ List rates = batch.stream()
+ .map(currencyEventMapper::map)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ if (!rates.isEmpty()) {
rateDao.saveRateBatch(rates);
}
ack.acknowledge();
} catch (Exception e) {
- log.error("Exception during PartyListener process", e);
+ log.error("Exception during CurrencyEventHandler process", e);
ack.nack(Duration.ofMillis(throttlingTimeout));
throw e;
}
}
-
}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java
new file mode 100644
index 00000000..f351a540
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java
@@ -0,0 +1,105 @@
+package dev.vality.analytics.listener.mapper.rate;
+
+import dev.vality.analytics.domain.db.tables.pojos.Rate;
+import dev.vality.exrates.base.Currency;
+import dev.vality.exrates.base.Rational;
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.exrates.events.CurrencyEventPayload;
+import dev.vality.exrates.events.CurrencyExchangeRate;
+import dev.vality.geck.common.util.TypeUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+@Component
+@Slf4j
+public class CurrencyEventMapper {
+
+ public Rate map(CurrencyEvent event) {
+ String eventId = event.getEventId();
+ CurrencyExchangeRate exchangeRate = validateAndGetExchangeRate(event, eventId);
+ if (exchangeRate == null) {
+ return null;
+ }
+
+ Currency sourceCurrency = exchangeRate.getSourceCurrency();
+ Currency destinationCurrency = exchangeRate.getDestinationCurrency();
+ Rational rational = exchangeRate.getExchangeRate();
+ var eventTime = parseEventTime(exchangeRate.getTimestamp(), eventId);
+ if (eventTime == null) {
+ return null;
+ }
+
+ return buildRate(eventId, eventTime, sourceCurrency, destinationCurrency, rational);
+ }
+
+ private CurrencyExchangeRate validateAndGetExchangeRate(CurrencyEvent event, String eventId) {
+ if (!event.isSetPayload()) {
+ log.warn("CurrencyEvent payload not set, eventId={}", eventId);
+ return null;
+ }
+
+ CurrencyEventPayload payload = event.getPayload();
+ if (!payload.isSetExchangeRate()) {
+ log.warn("CurrencyEvent payload is not exchange_rate, eventId={}", eventId);
+ return null;
+ }
+
+ CurrencyExchangeRate exchangeRate = payload.getExchangeRate();
+ Currency sourceCurrency = exchangeRate.getSourceCurrency();
+ if (!isCurrencySet(sourceCurrency)) {
+ log.warn("CurrencyEvent source currency incomplete, eventId={}", eventId);
+ return null;
+ }
+
+ Currency destinationCurrency = exchangeRate.getDestinationCurrency();
+ if (!isCurrencySet(destinationCurrency)) {
+ log.warn("CurrencyEvent destination currency incomplete, eventId={}", eventId);
+ return null;
+ }
+
+ Rational rational = exchangeRate.getExchangeRate();
+ if (!isRationalSet(rational)) {
+ log.warn("CurrencyEvent exchange rate rational incomplete, eventId={}", eventId);
+ return null;
+ }
+
+ return exchangeRate;
+ }
+
+ private LocalDateTime parseEventTime(String timestamp, String eventId) {
+ try {
+ return TypeUtil.stringToLocalDateTime(timestamp);
+ } catch (Exception e) {
+ log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId, e);
+ return null;
+ }
+ }
+
+ private Rate buildRate(
+ String eventId,
+ LocalDateTime eventTime,
+ Currency sourceCurrency,
+ Currency destinationCurrency,
+ Rational rational) {
+ Rate rate = new Rate();
+ rate.setEventId(eventId);
+ rate.setEventTime(eventTime);
+ rate.setSourceSymbolicCode(sourceCurrency.getSymbolicCode());
+ rate.setSourceExponent(sourceCurrency.getExponent());
+ rate.setDestinationSymbolicCode(destinationCurrency.getSymbolicCode());
+ rate.setDestinationExponent(destinationCurrency.getExponent());
+ rate.setExchangeRateRationalP(rational.getP());
+ rate.setExchangeRateRationalQ(rational.getQ());
+ return rate;
+ }
+
+ private boolean isCurrencySet(Currency currency) {
+ return currency != null && currency.isSetSymbolicCode() && currency.isSetExponent();
+ }
+
+ private boolean isRationalSet(Rational rational) {
+ return rational != null && rational.isSetP() && rational.isSetQ();
+ }
+}
diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java
deleted file mode 100644
index 81e8d17f..00000000
--- a/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package dev.vality.analytics.listener.mapper.rate;
-
-import dev.vality.analytics.constant.EventType;
-import dev.vality.analytics.domain.db.tables.pojos.Rate;
-import dev.vality.analytics.listener.mapper.Mapper;
-import dev.vality.geck.common.util.TypeUtil;
-import dev.vality.machinegun.eventsink.MachineEvent;
-import dev.vality.xrates.base.Rational;
-import dev.vality.xrates.base.TimestampInterval;
-import dev.vality.xrates.rate.*;
-import jakarta.validation.constraints.NotNull;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Component
-@RequiredArgsConstructor
-public class RateCreatedMapper implements Mapper> {
-
- @Override
- public List map(Change change, MachineEvent event) {
- if (change.getCreated().getExchangeRateData().getQuotes().isEmpty()) {
- log.warn("Quotes is empty, SinkEvent will not be saved, eventId={}, sourceId={}",
- event.getEventId(), event.getSourceId());
- return List.of();
- }
- log.info("Start rate created handling, eventId={}, sourceId={}", event.getEventId(), event.getSourceId());
-
- ExchangeRateCreated exchangeRateCreated = change.getCreated();
- ExchangeRateData exchangeRateData = exchangeRateCreated.getExchangeRateData();
- TimestampInterval interval = exchangeRateData.getInterval();
-
- return exchangeRateData.getQuotes().stream()
- .map(quote -> initRate(event, interval, quote))
- .collect(Collectors.toList());
- }
-
- @NotNull
- private Rate initRate(MachineEvent event, TimestampInterval interval, Quote quote) {
- Rate rate = new Rate();
- rate.setSourceId(event.getSourceId());
- rate.setEventId(event.getEventId());
- rate.setEventTime(TypeUtil.stringToLocalDateTime(event.getCreatedAt()));
-
- // Quote
- Currency source = quote.getSource();
- Currency destination = quote.getDestination();
- Rational exchangeRate = quote.getExchangeRate();
-
- // Currency
- rate.setSourceSymbolicCode(source.getSymbolicCode());
- rate.setSourceExponent(source.getExponent());
- rate.setDestinationSymbolicCode(destination.getSymbolicCode());
- rate.setDestinationExponent(destination.getExponent());
-
- // ExchangeRate
- rate.setExchangeRateRationalP(exchangeRate.getP());
- rate.setExchangeRateRationalQ(exchangeRate.getQ());
-
- rate.setLowerBoundInclusive(TypeUtil.stringToLocalDateTime(interval.getLowerBoundInclusive()));
- rate.setUpperBoundExclusive(TypeUtil.stringToLocalDateTime(interval.getUpperBoundExclusive()));
-
- return rate;
- }
-
- @Override
- public EventType getChangeType() {
- return EventType.RATE_CREATED;
- }
-}
diff --git a/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java b/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java
deleted file mode 100644
index 96566281..00000000
--- a/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package dev.vality.analytics.parser;
-
-import dev.vality.sink.common.parser.impl.MachineEventParser;
-import dev.vality.sink.common.serialization.BinaryDeserializer;
-import dev.vality.xrates.rate.Change;
-import org.springframework.stereotype.Component;
-
-@Component
-public class RateMachineEventParser extends MachineEventParser {
-
- public RateMachineEventParser(BinaryDeserializer deserializer) {
- super(deserializer);
- }
-}
diff --git a/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java b/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java
new file mode 100644
index 00000000..86f8f6fa
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java
@@ -0,0 +1,16 @@
+package dev.vality.analytics.serde;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.geck.serializer.Geck;
+import dev.vality.sink.common.serialization.BinaryDeserializer;
+import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CurrencyEventDeserializer extends AbstractThriftBinaryDeserializer {
+
+ @Override
+ public CurrencyEvent deserialize(byte[] bytes) {
+ return Geck.msgPackToTBase(bytes, CurrencyEvent.class);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java b/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java
new file mode 100644
index 00000000..6d1adde1
--- /dev/null
+++ b/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java
@@ -0,0 +1,45 @@
+package dev.vality.analytics.serde;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransportException;
+
+import java.util.Map;
+
+@Slf4j
+public class CurrencyEventKafkaDeserializer implements Deserializer {
+
+ ThreadLocal thriftDeserializerThreadLocal =
+ ThreadLocal.withInitial(() -> {
+ try {
+ return new TDeserializer(new TBinaryProtocol.Factory());
+ } catch (TTransportException e) {
+ throw new RuntimeException();
+ }
+ });
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public CurrencyEvent deserialize(String topic, byte[] data) {
+ log.debug("Message, topic: {}, byteLength: {}", topic, data.length);
+ CurrencyEvent currencyEvent = new CurrencyEvent();
+
+ try {
+ thriftDeserializerThreadLocal.get().deserialize(currencyEvent, data);
+ } catch (Exception e) {
+ log.error("Error when deserialize CurrencyEvent data: {} ", data, e);
+ }
+
+ return currencyEvent;
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index cc837ca9..4f88a505 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -53,6 +53,14 @@ clickhouse.db:
connection-timeout: 10000
leak-detection-threshold: 5000
+clickhouse.flyway:
+ schema-mode: non-sharded
+ sharded:
+ # Required only when clickhouse.flyway.schema-mode=sharded
+ cluster:
+ shard:
+ replica:
+
postgres.db:
schema: analytics
type: com.zaxxer.hikari.HikariDataSource
@@ -84,7 +92,7 @@ kafka:
initial: mg-events-dominant
max.poll.records: 50
rate:
- initial: mg-events-rates
+ initial: etl-exchange-rate
groupId: analytics-rate-group
max.poll.records: 50
consumer:
diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql
new file mode 100644
index 00000000..8d45f9f5
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql
@@ -0,0 +1,168 @@
+CREATE DATABASE IF NOT EXISTS analytic;
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ status Enum8('pending' = 1, 'processed' = 2, 'captured' = 3, 'cancelled' = 4, 'failed' = 5),
+ errorReason String,
+ errorCode String,
+ invoiceId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ bin String,
+ maskedPan String,
+ paymentTool String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String,
+ cardHolderName String DEFAULT 'UNKNOWN',
+ bankCardTokenProvider String,
+ riskScore String,
+ rrn String,
+ paymentTerminal String
+) ENGINE = ReplacingMergeTree()
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, paymentTool, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_refund (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ reason String,
+ status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3),
+ errorReason String,
+ errorCode String,
+ invoiceId String,
+ refundId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplacingMergeTree()
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, refundId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ oldAmount UInt64,
+ oldGuaranteeDeposit UInt64,
+ oldSystemFee UInt64,
+ oldProviderFee UInt64,
+ oldExternalFee UInt64,
+ currency String,
+ reason String,
+ status Enum8('captured' = 1, 'cancelled' = 2),
+ errorCode String,
+ errorReason String,
+ invoiceId String,
+ adjustmentId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplacingMergeTree()
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, adjustmentId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ chargebackCode String,
+ stage Enum8('chargeback' = 1, 'pre_arbitration' = 2, 'arbitration' = 3),
+ status Enum8('accepted' = 1, 'rejected' = 2, 'cancelled' = 3),
+ category Enum8('fraud' = 1, 'dispute' = 2, 'authorisation' = 3, 'processing_error' = 4),
+ invoiceId String,
+ chargebackId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplacingMergeTree()
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, category, status, stage, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, chargebackId, sequenceId);
diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql
new file mode 100644
index 00000000..b6977de8
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql
@@ -0,0 +1,179 @@
+DROP DICTIONARY IF EXISTS analytic.party_dictionary;
+DROP DICTIONARY IF EXISTS analytic.shop_dictionary;
+DROP DICTIONARY IF EXISTS analytic.category_dictionary;
+DROP DICTIONARY IF EXISTS analytic.country_dictionary;
+DROP DICTIONARY IF EXISTS analytic.trade_bloc_dictionary;
+DROP DICTIONARY IF EXISTS analytic.rate_dictionary;
+
+CREATE DICTIONARY IF NOT EXISTS analytic.party_dictionary (
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ party_id String,
+ created_at DateTime,
+ email String,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'party'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary (
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ party_id String,
+ shop_id String,
+ category_id Int32,
+ created_at DateTime,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ details_name String,
+ details_description String,
+ location_url String,
+ account_currency_code String,
+ account_settlement String,
+ account_guarantee String,
+ international_legal_entity_country_code String,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'shop'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.category_dictionary (
+ id UInt64,
+ version_id UInt64,
+ category_id Int32,
+ name String,
+ description String,
+ type String,
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'category'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.country_dictionary (
+ id UInt64,
+ version_id UInt64,
+ country_id String,
+ name String,
+ trade_bloc Array(String),
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'country'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.trade_bloc_dictionary (
+ id UInt64,
+ version_id UInt64,
+ trade_bloc_id String,
+ name String,
+ description String,
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'trade_bloc'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.rate_dictionary (
+ id UInt64,
+ event_id String,
+ event_time String,
+ source_symbolic_code String,
+ source_exponent UInt32,
+ destination_symbolic_code String,
+ destination_exponent UInt32,
+ exchange_rate_rational_p UInt64,
+ exchange_rate_rational_q UInt64
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'rate'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
diff --git a/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql b/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql
new file mode 100644
index 00000000..9acc1cda
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql
@@ -0,0 +1,180 @@
+CREATE DATABASE IF NOT EXISTS analytic;
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_local (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ status Enum8('pending' = 1, 'processed' = 2, 'captured' = 3, 'cancelled' = 4, 'failed' = 5),
+ errorReason String,
+ errorCode String,
+ invoiceId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ bin String,
+ maskedPan String,
+ paymentTool String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String,
+ cardHolderName String DEFAULT 'UNKNOWN',
+ bankCardTokenProvider String,
+ riskScore String,
+ rrn String,
+ paymentTerminal String
+) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>')
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, paymentTool, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink AS analytic.events_sink_local
+ENGINE = Distributed('<>', analytic, events_sink_local, cityHash64(timestamp, partyId));
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_refund_local (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ reason String,
+ status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3),
+ errorReason String,
+ errorCode String,
+ invoiceId String,
+ refundId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>')
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, refundId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_refund AS analytic.events_sink_refund_local
+ENGINE = Distributed('<>', analytic, events_sink_refund_local, cityHash64(timestamp, partyId));
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment_local (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ oldAmount UInt64,
+ oldGuaranteeDeposit UInt64,
+ oldSystemFee UInt64,
+ oldProviderFee UInt64,
+ oldExternalFee UInt64,
+ currency String,
+ reason String,
+ status Enum8('captured' = 1, 'cancelled' = 2),
+ errorCode String,
+ errorReason String,
+ invoiceId String,
+ adjustmentId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>')
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, adjustmentId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment AS analytic.events_sink_adjustment_local
+ENGINE = Distributed('<>', analytic, events_sink_adjustment_local, cityHash64(timestamp, partyId));
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback_local (
+ timestamp Date,
+ eventTime UInt64,
+ eventTimeHour UInt64,
+ partyId String,
+ shopId String,
+ email String,
+ providerName String,
+ amount UInt64,
+ guaranteeDeposit UInt64,
+ systemFee UInt64,
+ providerFee UInt64,
+ externalFee UInt64,
+ currency String,
+ chargebackCode String,
+ stage Enum8('chargeback' = 1, 'pre_arbitration' = 2, 'arbitration' = 3),
+ status Enum8('accepted' = 1, 'rejected' = 2, 'cancelled' = 3),
+ category Enum8('fraud' = 1, 'dispute' = 2, 'authorisation' = 3, 'processing_error' = 4),
+ invoiceId String,
+ chargebackId String,
+ paymentId String,
+ sequenceId UInt64,
+ ip String,
+ fingerprint String,
+ cardToken String,
+ paymentSystem String,
+ digitalWalletProvider String,
+ digitalWalletToken String,
+ cryptoCurrency String,
+ mobileOperator String,
+ paymentCountry String,
+ bankCountry String,
+ paymentTime UInt64,
+ providerId String,
+ terminal String
+) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>')
+PARTITION BY toYYYYMM(timestamp)
+ORDER BY (eventTimeHour, partyId, shopId, category, status, stage, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, chargebackId, sequenceId);
+
+CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback AS analytic.events_sink_chargeback_local
+ENGINE = Distributed('<>', analytic, events_sink_chargeback_local, cityHash64(timestamp, partyId));
diff --git a/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql b/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql
new file mode 100644
index 00000000..b6977de8
--- /dev/null
+++ b/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql
@@ -0,0 +1,179 @@
+DROP DICTIONARY IF EXISTS analytic.party_dictionary;
+DROP DICTIONARY IF EXISTS analytic.shop_dictionary;
+DROP DICTIONARY IF EXISTS analytic.category_dictionary;
+DROP DICTIONARY IF EXISTS analytic.country_dictionary;
+DROP DICTIONARY IF EXISTS analytic.trade_bloc_dictionary;
+DROP DICTIONARY IF EXISTS analytic.rate_dictionary;
+
+CREATE DICTIONARY IF NOT EXISTS analytic.party_dictionary (
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ party_id String,
+ created_at DateTime,
+ email String,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'party'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary (
+ id UInt64,
+ event_id UInt64,
+ event_time DateTime,
+ party_id String,
+ shop_id String,
+ category_id Int32,
+ created_at DateTime,
+ blocking String,
+ blocked_reason String,
+ blocked_since DateTime,
+ unblocked_reason String,
+ unblocked_since DateTime,
+ suspension String,
+ suspension_active_since DateTime,
+ suspension_suspended_since DateTime,
+ details_name String,
+ details_description String,
+ location_url String,
+ account_currency_code String,
+ account_settlement String,
+ account_guarantee String,
+ international_legal_entity_country_code String,
+ version_id UInt64,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String,
+ deleted Bool
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'shop'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.category_dictionary (
+ id UInt64,
+ version_id UInt64,
+ category_id Int32,
+ name String,
+ description String,
+ type String,
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'category'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.country_dictionary (
+ id UInt64,
+ version_id UInt64,
+ country_id String,
+ name String,
+ trade_bloc Array(String),
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'country'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.trade_bloc_dictionary (
+ id UInt64,
+ version_id UInt64,
+ trade_bloc_id String,
+ name String,
+ description String,
+ deleted Bool,
+ changed_by_id String,
+ changed_by_email String,
+ changed_by_name String
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'trade_bloc'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
+
+CREATE DICTIONARY IF NOT EXISTS analytic.rate_dictionary (
+ id UInt64,
+ event_id String,
+ event_time String,
+ source_symbolic_code String,
+ source_exponent UInt32,
+ destination_symbolic_code String,
+ destination_exponent UInt32,
+ exchange_rate_rational_p UInt64,
+ exchange_rate_rational_q UInt64
+)
+PRIMARY KEY id
+SOURCE(POSTGRESQL(
+ HOST '<>'
+ PORT <>
+ USER '<>'
+ PASSWORD '<>'
+ DB '<>'
+ SCHEMA '<>'
+ TABLE 'rate'
+ ))
+LAYOUT(HASHED())
+LIFETIME(MIN 300 MAX 360);
diff --git a/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql b/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql
new file mode 100644
index 00000000..3837c9da
--- /dev/null
+++ b/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql
@@ -0,0 +1,12 @@
+DROP INDEX IF EXISTS analytics.rate_source_id_idx;
+
+ALTER TABLE analytics.rate
+ALTER COLUMN event_id TYPE VARCHAR USING event_id::VARCHAR;
+
+ALTER TABLE analytics.rate
+DROP COLUMN source_id,
+DROP COLUMN lower_bound_inclusive,
+DROP COLUMN upper_bound_exclusive;
+
+CREATE UNIQUE INDEX rate_source_symbolic_code_destination_symbolic_code_event_time_idx
+ ON analytics.rate (source_symbolic_code, destination_symbolic_code, event_time);
diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTest.java b/src/test/java/dev/vality/analytics/config/ClickhouseTest.java
index ecb2f59b..2c1e35fd 100644
--- a/src/test/java/dev/vality/analytics/config/ClickhouseTest.java
+++ b/src/test/java/dev/vality/analytics/config/ClickhouseTest.java
@@ -1,6 +1,6 @@
package dev.vality.analytics.config;
-import dev.vality.testcontainers.annotations.clickhouse.ClickhouseTestcontainerSingleton;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -9,12 +9,6 @@
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
-@ClickhouseTestcontainerSingleton(
- dbNameShouldBeDropped = "analytic",
- migrations = {
- "sql/V1__db_init.sql",
- "sql/V2__add_fields.sql",
- "sql/V3__add_provider_field.sql",
- "sql/test.data/inserts_event_sink.sql"})
+@ExtendWith(ClickhouseTestExtension.class)
public @interface ClickhouseTest {
}
diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java b/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java
new file mode 100644
index 00000000..ce2c678c
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java
@@ -0,0 +1,25 @@
+package dev.vality.analytics.config;
+
+import org.testcontainers.clickhouse.ClickHouseContainer;
+
+final class ClickhouseTestContainerHolder {
+
+ private static final String IMAGE =
+ System.getProperty("clickhouse.test.image", "clickhouse/clickhouse-server:23.10.3");
+
+ private static final ClickHouseContainer CONTAINER = new ClickHouseContainer(IMAGE)
+ .withAccessToHost(true)
+ .withUsername("test")
+ .withPassword("test")
+ .withDatabaseName("default");
+
+ private ClickhouseTestContainerHolder() {
+ }
+
+ static synchronized ClickHouseContainer getContainer() {
+ if (!CONTAINER.isRunning()) {
+ CONTAINER.start();
+ }
+ return CONTAINER;
+ }
+}
diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java b/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java
new file mode 100644
index 00000000..8db9feae
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java
@@ -0,0 +1,97 @@
+package dev.vality.analytics.config;
+
+import dev.vality.testcontainers.annotations.postgresql.PostgresqlContainerExtension;
+import dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerFactory;
+import org.flywaydb.core.Flyway;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.Testcontainers;
+
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseTestExtension implements BeforeAllCallback, BeforeEachCallback {
+
+ private static final String ANALYTIC_DB = "analytic";
+ private static final int POSTGRES_PORT = 5432;
+ private static final String POSTGRES_SCHEMA = "analytics";
+ private static final String CLICKHOUSE_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/non-sharded";
+ private static final String CLICKHOUSE_TEST_MIGRATION_LOCATION = "classpath:db/test-migration-clickhouse";
+ private static final String TESTCONTAINERS_POSTGRES_HOST = "host.testcontainers.internal";
+
+ @Override
+ public void beforeAll(ExtensionContext context) {
+ var postgres = PostgresqlTestcontainerFactory.singletonContainer();
+ if (!postgres.isRunning()) {
+ postgres.start();
+ }
+ Testcontainers.exposeHostPorts(postgres.getMappedPort(POSTGRES_PORT));
+
+ var container = ClickhouseTestContainerHolder.getContainer();
+ System.setProperty("clickhouse.db.url", buildClickHouseDatabaseUrl(container.getJdbcUrl()));
+ System.setProperty("clickhouse.db.username", container.getUsername());
+ System.setProperty("clickhouse.db.password", container.getPassword());
+ System.setProperty("clickhouse.flyway.enabled", "false");
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ var postgres = PostgresqlTestcontainerFactory.singletonContainer();
+ migratePostgres(postgres);
+
+ var container = ClickhouseTestContainerHolder.getContainer();
+ try (var connection = DriverManager.getConnection(
+ container.getJdbcUrl(),
+ container.getUsername(),
+ container.getPassword())) {
+ try (var statement = connection.createStatement()) {
+ statement.execute("DROP DATABASE IF EXISTS " + ANALYTIC_DB);
+ statement.execute("DROP TABLE IF EXISTS clickhouse_flyway_schema_history");
+ }
+ }
+ migrateClickHouse(container, postgres);
+ }
+
+ private void migratePostgres(PostgresqlContainerExtension postgres) {
+ Flyway.configure()
+ .dataSource(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())
+ .schemas(POSTGRES_SCHEMA)
+ .load()
+ .migrate();
+ }
+
+ private void migrateClickHouse(ClickHouseContainer clickhouse, PostgresqlContainerExtension postgres) {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource(
+ clickhouse.getJdbcUrl(),
+ clickhouse.getUsername(),
+ clickhouse.getPassword());
+ Map placeholders = ClickHouseFlywaySupport.resolvePostgresPlaceholders(
+ TESTCONTAINERS_POSTGRES_HOST,
+ postgres.getMappedPort(POSTGRES_PORT),
+ postgres.getDatabaseName(),
+ postgres.getUsername(),
+ postgres.getPassword(),
+ POSTGRES_SCHEMA);
+ Flyway flyway = ClickHouseFlywaySupport.createFlyway(
+ dataSource,
+ List.of(CLICKHOUSE_MIGRATION_LOCATION, CLICKHOUSE_TEST_MIGRATION_LOCATION),
+ placeholders,
+ "clickhouse_flyway_schema_history");
+ flyway.migrate();
+ }
+
+ private String buildClickHouseDatabaseUrl(String jdbcUrl) {
+ int paramsStart = jdbcUrl.indexOf('?');
+ String base = paramsStart >= 0 ? jdbcUrl.substring(0, paramsStart) : jdbcUrl;
+ String params = paramsStart >= 0 ? jdbcUrl.substring(paramsStart) : "";
+ int databaseSeparator = base.lastIndexOf('/');
+ if (databaseSeparator < 0) {
+ throw new IllegalArgumentException("Unsupported ClickHouse JDBC URL: " + jdbcUrl);
+ }
+ return base.substring(0, databaseSeparator + 1) + ANALYTIC_DB + params;
+ }
+}
diff --git a/src/test/java/dev/vality/analytics/config/SpringBootITest.java b/src/test/java/dev/vality/analytics/config/SpringBootITest.java
index 779c5c25..12462fb9 100644
--- a/src/test/java/dev/vality/analytics/config/SpringBootITest.java
+++ b/src/test/java/dev/vality/analytics/config/SpringBootITest.java
@@ -1,6 +1,5 @@
package dev.vality.analytics.config;
-
import org.springframework.boot.test.context.SpringBootTest;
import java.lang.annotation.ElementType;
diff --git a/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java b/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java
new file mode 100644
index 00000000..4571e87c
--- /dev/null
+++ b/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java
@@ -0,0 +1,53 @@
+package dev.vality.analytics.listener;
+
+import dev.vality.exrates.base.Currency;
+import dev.vality.exrates.base.Rational;
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.exrates.events.CurrencyEventPayload;
+import dev.vality.exrates.events.CurrencyExchangeRate;
+import dev.vality.geck.common.util.TypeUtil;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RateEventTestUtils {
+
+ public static CurrencyEvent createCurrencyEvent(String eventId, String sourceCode, String destCode,
+ long p, long q, Instant timestamp) {
+ Currency sourceCurrency = new Currency()
+ .setSymbolicCode(sourceCode)
+ .setExponent((short) 2);
+ Currency destCurrency = new Currency()
+ .setSymbolicCode(destCode)
+ .setExponent((short) 2);
+ Rational rational = new Rational()
+ .setP(p)
+ .setQ(q);
+ CurrencyExchangeRate exchangeRate = new CurrencyExchangeRate();
+ exchangeRate.setSourceCurrency(sourceCurrency);
+ exchangeRate.setDestinationCurrency(destCurrency);
+ exchangeRate.setExchangeRate(rational);
+ exchangeRate.setTimestamp(TypeUtil.temporalToString(timestamp));
+ CurrencyEventPayload payload = new CurrencyEventPayload();
+ payload.setExchangeRate(exchangeRate);
+ CurrencyEvent event = new CurrencyEvent();
+ event.setEventId(eventId);
+ event.setEventCreatedAt(TypeUtil.temporalToString(timestamp));
+ event.setPayload(payload);
+ return event;
+ }
+
+ public static List createCurrencyEvents(int count, Instant baseTime) {
+ return IntStream.range(0, count)
+ .mapToObj(i -> createCurrencyEvent(
+ "event_" + i,
+ "USD",
+ "RUB",
+ 75L + i,
+ 1L,
+ baseTime.plusSeconds(i * 3600)))
+ .collect(Collectors.toList());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/dev/vality/analytics/listener/RateListenerTest.java b/src/test/java/dev/vality/analytics/listener/RateListenerTest.java
index f5363266..e5789732 100644
--- a/src/test/java/dev/vality/analytics/listener/RateListenerTest.java
+++ b/src/test/java/dev/vality/analytics/listener/RateListenerTest.java
@@ -1,8 +1,6 @@
package dev.vality.analytics.listener;
import dev.vality.analytics.config.SpringBootITest;
-import dev.vality.analytics.utils.RateSinkEventTestUtils;
-import dev.vality.machinegun.eventsink.SinkEvent;
import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
import org.apache.thrift.TBase;
import org.junit.jupiter.api.Test;
@@ -10,10 +8,11 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -28,22 +27,30 @@ public class RateListenerTest {
private KafkaProducer> testThriftKafkaProducer;
@Test
- public void handle() throws InterruptedException {
- String sourceId = "CBR";
+ public void handle() {
+ List currencyEvents = RateEventTestUtils.createCurrencyEvents(
+ 4, Instant.now());
- final List sinkEvents = RateSinkEventTestUtils.create(sourceId);
- sinkEvents.forEach(event -> testThriftKafkaProducer.send(rateTopic, event));
+ currencyEvents.forEach(event -> testThriftKafkaProducer.send(rateTopic, event));
- await().atMost(60, SECONDS).until(() -> {
- Integer count = postgresJdbcTemplate.queryForObject("SELECT count(*) FROM analytics.rate", Integer.class);
- if (count == 0) {
+ await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ Integer count = postgresJdbcTemplate.queryForObject(
+ "SELECT count(*) FROM analytics.rate", Integer.class);
+ if (count < 4) {
Thread.sleep(1000);
return false;
}
return true;
});
- final List