diff --git a/pom.xml b/pom.xml index 32cb04ae..a2e69dc4 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ dev.vality service-parent-pom - 3.1.9 + 3.1.10 analytics @@ -26,8 +26,11 @@ analytics analytics jdbc:postgresql://localhost:${db.port}/${db.name} + 10.22.0 + + @@ -64,8 +67,8 @@ dev.vality - xrates-proto - 1.19-e7ad3f5 + exrates-proto + 1.9-5d53aec dev.vality @@ -146,11 +149,17 @@ org.flywaydb flyway-core + ${flyway.version} org.flywaydb flyway-database-postgresql + + org.flywaydb + flyway-database-clickhouse + 10.24.0 + org.apache.kafka kafka-clients @@ -179,7 +188,7 @@ dev.vality testcontainers-annotations - 3.3.2 + 4.1.3 test @@ -254,6 +263,7 @@ org.flywaydb flyway-maven-plugin + ${flyway.version} ${db.url} ${db.user} @@ -343,6 +353,19 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + 1.18.42 + + + + diff --git a/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java b/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java index 0735889b..bd17a232 100644 --- a/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java +++ b/src/main/java/dev/vality/analytics/config/ClickHouseConfig.java @@ -1,17 +1,41 @@ package dev.vality.analytics.config; import dev.vality.analytics.config.properties.ClickHouseDbProperties; +import org.flywaydb.core.Flyway; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; @Configuration public class ClickHouseConfig { + private static final String NON_SHARDED_SCHEMA_MODE = "non-sharded"; + private static final String SHARDED_SCHEMA_MODE = "sharded"; + private static final String NON_SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/non-sharded"; + private static final String SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/sharded"; + private static final String CLUSTER_PLACEHOLDER = "cluster"; + private static final String SHARD_PLACEHOLDER = "shard"; + private static final String REPLICA_PLACEHOLDER = "replica"; + private static final String CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX = "clickhouse.flyway.sharded."; + private static final String SHARDED_CLUSTER_PROPERTY = + CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + CLUSTER_PLACEHOLDER; + private static final String SHARDED_SHARD_PROPERTY = + CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + SHARD_PLACEHOLDER; + private static final String SHARDED_REPLICA_PROPERTY = + CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + REPLICA_PLACEHOLDER; + @Bean(name = "clickHouseDataSourceProperties") @ConfigurationProperties(prefix = "clickhouse.db") public ClickHouseDbProperties clickHouseDataSourceProperties() { @@ -34,6 +58,34 @@ public JdbcTemplate clickHouseJdbcTemplate(@Qualifier("clickHouseDataSource") Da return new JdbcTemplate(clickHouseDataSource); } + @Bean + @DependsOn("flyway") + @ConditionalOnProperty(prefix = "clickhouse.flyway", name = "enabled", havingValue = "true", matchIfMissing = true) + public Flyway clickHouseFlyway( + @Qualifier("clickHouseDataSource") DataSource clickHouseDataSource, + @Qualifier("dataSourceProperties") DataSourceProperties postgresDataSourceProperties, + @Value("${clickhouse.flyway.schema-mode:non-sharded}") String schemaMode, + @Value("${" + SHARDED_CLUSTER_PROPERTY + ":}") String shardedCluster, + @Value("${" + SHARDED_SHARD_PROPERTY + ":}") String shardedShard, + @Value("${" + SHARDED_REPLICA_PROPERTY + ":}") String shardedReplica, + @Value("${postgres.db.schema}") String postgresSchema) { + String normalizedSchemaMode = schemaMode.toLowerCase(Locale.ROOT); + Map placeholders = new LinkedHashMap<>(resolveFlywayShardedPlaceholders( + normalizedSchemaMode, shardedCluster, shardedShard, shardedReplica)); + placeholders.putAll(ClickHouseFlywaySupport.resolvePostgresPlaceholders( + postgresDataSourceProperties.getUrl(), + postgresDataSourceProperties.getUsername(), + postgresDataSourceProperties.getPassword(), + postgresSchema)); + final var flyway = ClickHouseFlywaySupport.createFlyway( + clickHouseDataSource, + List.of(resolveClickHouseMigrationLocation(normalizedSchemaMode)), + placeholders, + "clickhouse_flyway_schema_history"); + flyway.migrate(); + return flyway; + } + private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) { var urlBuilder = new StringBuilder(); urlBuilder.append(properties.getUrl()); @@ -49,4 +101,44 @@ private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) { } return urlBuilder.toString(); } + + private String resolveClickHouseMigrationLocation(String schemaMode) { + return switch (schemaMode) { + case NON_SHARDED_SCHEMA_MODE -> NON_SHARDED_MIGRATION_LOCATION; + case SHARDED_SCHEMA_MODE -> SHARDED_MIGRATION_LOCATION; + default -> throw new IllegalArgumentException( + String.format("Unsupported clickhouse.flyway.schema-mode: %s. Allowed values: %s, %s", + schemaMode, NON_SHARDED_SCHEMA_MODE, SHARDED_SCHEMA_MODE)); + }; + } + + private Map resolveFlywayShardedPlaceholders( + String schemaMode, + String shardedCluster, + String shardedShard, + String shardedReplica) { + if (NON_SHARDED_SCHEMA_MODE.equals(schemaMode)) { + return Map.of(); + } + + if (SHARDED_SCHEMA_MODE.equals(schemaMode)) { + validateRequiredShardedProperty(SHARDED_CLUSTER_PROPERTY, shardedCluster); + validateRequiredShardedProperty(SHARDED_SHARD_PROPERTY, shardedShard); + validateRequiredShardedProperty(SHARDED_REPLICA_PROPERTY, shardedReplica); + return Map.of( + CLUSTER_PLACEHOLDER, shardedCluster, + SHARD_PLACEHOLDER, shardedShard, + REPLICA_PLACEHOLDER, shardedReplica); + } + + return Map.of(); + } + + private void validateRequiredShardedProperty(String propertyName, String value) { + if (value == null || value.isBlank()) { + throw new IllegalArgumentException( + String.format("Property '%s' must be configured when clickhouse.flyway.schema-mode=sharded", + propertyName)); + } + } } diff --git a/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java b/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java new file mode 100644 index 00000000..2c23c8db --- /dev/null +++ b/src/main/java/dev/vality/analytics/config/ClickHouseFlywaySupport.java @@ -0,0 +1,77 @@ +package dev.vality.analytics.config; + +import lombok.experimental.UtilityClass; +import org.flywaydb.core.Flyway; + +import javax.sql.DataSource; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@UtilityClass +final class ClickHouseFlywaySupport { + + private static final String POSTGRES_HOST = "postgresHost"; + private static final String POSTGRES_PORT = "postgresPort"; + private static final String POSTGRES_USER = "postgresUser"; + private static final String POSTGRES_PASSWORD = "postgresPassword"; + private static final String POSTGRES_DATABASE = "postgresDatabase"; + private static final String POSTGRES_SCHEMA = "postgresSchema"; + private static final Pattern POSTGRES_JDBC_URL_PATTERN = Pattern.compile( + "^jdbc:postgresql://(?[^:/?]+)(?::(?\\d+))?/(?[^?]+).*$"); + + static Flyway createFlyway( + DataSource dataSource, + List locations, + Map placeholders, + String schemaHistoryTable) { + return Flyway.configure() + .dataSource(dataSource) + .locations(locations.toArray(String[]::new)) + .placeholderPrefix("<<") + .placeholderSuffix(">>") + .placeholders(placeholders) + .table(schemaHistoryTable) + .baselineOnMigrate(true) + .load(); + } + + static Map resolvePostgresPlaceholders( + String jdbcUrl, + String username, + String password, + String schema) { + Matcher matcher = POSTGRES_JDBC_URL_PATTERN.matcher(jdbcUrl); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unsupported postgres.db.url format: " + jdbcUrl); + } + + String port = matcher.group("port"); + return resolvePostgresPlaceholders( + matcher.group("host"), + port == null ? 5432 : Integer.parseInt(port), + matcher.group("database"), + username, + password, + schema); + } + + static Map resolvePostgresPlaceholders( + String host, + int port, + String database, + String username, + String password, + String schema) { + Map placeholders = new LinkedHashMap<>(); + placeholders.put(POSTGRES_HOST, host); + placeholders.put(POSTGRES_PORT, Integer.toString(port)); + placeholders.put(POSTGRES_USER, username); + placeholders.put(POSTGRES_PASSWORD, password); + placeholders.put(POSTGRES_DATABASE, database); + placeholders.put(POSTGRES_SCHEMA, schema); + return placeholders; + } +} diff --git a/src/main/java/dev/vality/analytics/config/KafkaConfig.java b/src/main/java/dev/vality/analytics/config/KafkaConfig.java index 45940403..600b39e8 100644 --- a/src/main/java/dev/vality/analytics/config/KafkaConfig.java +++ b/src/main/java/dev/vality/analytics/config/KafkaConfig.java @@ -2,6 +2,8 @@ import dev.vality.analytics.serde.HistoricalCommitDeserializer; import dev.vality.analytics.serde.MachineEventDeserializer; +import dev.vality.analytics.serde.CurrencyEventKafkaDeserializer; +import dev.vality.exrates.events.CurrencyEvent; import dev.vality.analytics.service.ConsumerGroupIdService; import dev.vality.damsel.domain_config_v2.HistoricalCommit; import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; @@ -87,10 +89,10 @@ public ConcurrentKafkaListenerContainerFactory dominan } @Bean - public ConcurrentKafkaListenerContainerFactory rateContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = + public ConcurrentKafkaListenerContainerFactory rateContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - initDefaultListenerProperties(factory, rateGroupId, new MachineEventDeserializer(), + initDefaultListenerProperties(factory, rateGroupId, new CurrencyEventKafkaDeserializer(), maxPollRecordsRatesListener); return factory; } diff --git a/src/main/java/dev/vality/analytics/config/SerializeConfig.java b/src/main/java/dev/vality/analytics/config/SerializeConfig.java index 41d903fb..e601706d 100644 --- a/src/main/java/dev/vality/analytics/config/SerializeConfig.java +++ b/src/main/java/dev/vality/analytics/config/SerializeConfig.java @@ -1,11 +1,12 @@ package dev.vality.analytics.config; import dev.vality.damsel.domain_config_v2.HistoricalCommit; +import dev.vality.exrates.events.CurrencyEvent; import dev.vality.geck.serializer.Geck; import dev.vality.sink.common.parser.impl.MachineEventParser; import dev.vality.sink.common.serialization.BinaryDeserializer; import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer; -import dev.vality.xrates.rate.Change; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,11 +14,11 @@ public class SerializeConfig { @Bean - public BinaryDeserializer rateEventDataBinaryDeserializer() { + public BinaryDeserializer currencyEventBinaryDeserializer() { return new AbstractThriftBinaryDeserializer<>() { @Override - public Change deserialize(byte[] bytes) { - return Geck.msgPackToTBase(bytes, Change.class); + public CurrencyEvent deserialize(byte[] bytes) { + return Geck.msgPackToTBase(bytes, CurrencyEvent.class); } }; } diff --git a/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java b/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java index 06f749ae..4aa976cb 100644 --- a/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java +++ b/src/main/java/dev/vality/analytics/dao/repository/postgres/RateDao.java @@ -28,19 +28,9 @@ public void saveRateBatch(List rates) { .map(rate -> getDslContext().newRecord(RATE, rate)) .map(rateRecord -> getDslContext() .insertInto(RATE).set(rateRecord) - .onConflict(RATE.SOURCE_ID, RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE, - RATE.LOWER_BOUND_INCLUSIVE, RATE.UPPER_BOUND_EXCLUSIVE) + .onConflict(RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE, RATE.EVENT_TIME) .doNothing()) .collect(Collectors.toList()); batchExecute(queries); } - - public Rate getRate(String sourceId, String sourceCode, String destinationCode) { - Query query = getDslContext().selectFrom(RATE) - .where(RATE.SOURCE_ID.eq(sourceId)) - .and(RATE.SOURCE_SYMBOLIC_CODE.eq(sourceCode) - .and(RATE.DESTINATION_SYMBOLIC_CODE.eq(destinationCode))); - return fetchOne(query, rateRowMapper); - } - } diff --git a/src/main/java/dev/vality/analytics/listener/RateListener.java b/src/main/java/dev/vality/analytics/listener/RateListener.java index 13d24618..93d4d8c1 100644 --- a/src/main/java/dev/vality/analytics/listener/RateListener.java +++ b/src/main/java/dev/vality/analytics/listener/RateListener.java @@ -1,7 +1,7 @@ package dev.vality.analytics.listener; -import dev.vality.analytics.listener.handler.rate.RateMachineEventHandler; -import dev.vality.machinegun.eventsink.MachineEvent; +import dev.vality.analytics.listener.handler.rate.CurrencyEventHandler; +import dev.vality.exrates.events.CurrencyEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; @@ -17,18 +17,18 @@ @RequiredArgsConstructor public class RateListener { - private final RateMachineEventHandler rateMachineEventHandler; + private final CurrencyEventHandler currencyEventHandler; @KafkaListener(autoStartup = "${kafka.listener.rate.enabled}", topics = "${kafka.topic.rate.initial}", containerFactory = "rateContainerFactory") - public void handle(List batch, + public void handle(List batch, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) int offsets, Acknowledgment ack) throws InterruptedException { log.info("Got RateListener listen offsets: {}, partition: {}, batch.size: {}", offsets, partition, batch.size()); - rateMachineEventHandler.handle(batch, ack); + currencyEventHandler.handle(batch, ack); log.info("Batch RateListener has been committed, size={}", batch.size()); } diff --git a/src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java b/src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java similarity index 54% rename from src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java rename to src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java index c9b37cad..e3f529a4 100644 --- a/src/main/java/dev/vality/analytics/listener/handler/rate/RateMachineEventHandler.java +++ b/src/main/java/dev/vality/analytics/listener/handler/rate/CurrencyEventHandler.java @@ -2,10 +2,8 @@ import dev.vality.analytics.dao.repository.postgres.RateDao; import dev.vality.analytics.domain.db.tables.pojos.Rate; -import dev.vality.analytics.listener.mapper.Mapper; -import dev.vality.machinegun.eventsink.MachineEvent; -import dev.vality.sink.common.parser.impl.MachineEventParser; -import dev.vality.xrates.rate.Change; +import dev.vality.analytics.listener.mapper.rate.CurrencyEventMapper; +import dev.vality.exrates.events.CurrencyEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -16,43 +14,43 @@ import org.springframework.util.CollectionUtils; import java.time.Duration; -import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; @Service @Slf4j @RequiredArgsConstructor -public class RateMachineEventHandler { +public class CurrencyEventHandler { - private final MachineEventParser eventParser; - private final List>> mappers; + private final CurrencyEventMapper currencyEventMapper; private final RateDao rateDao; @Value("${kafka.consumer.throttling-timeout-ms}") private int throttlingTimeout; @Transactional(propagation = Propagation.REQUIRED) - public void handle(List batch, Acknowledgment ack) throws InterruptedException { + public void handle( + final List batch, + final Acknowledgment ack) throws InterruptedException { try { if (CollectionUtils.isEmpty(batch)) { + ack.acknowledge(); return; } - for (MachineEvent machineEvent : batch) { - final Change change = eventParser.parse(machineEvent); - final List rates = mappers.stream() - .filter(mapper -> mapper.accept(change)) - .map(mapper -> mapper.map(change, machineEvent)) - .flatMap(Collection::stream) - .collect(Collectors.toList()); + List rates = batch.stream() + .map(currencyEventMapper::map) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (!rates.isEmpty()) { rateDao.saveRateBatch(rates); } ack.acknowledge(); } catch (Exception e) { - log.error("Exception during PartyListener process", e); + log.error("Exception during CurrencyEventHandler process", e); ack.nack(Duration.ofMillis(throttlingTimeout)); throw e; } } - } diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java new file mode 100644 index 00000000..f351a540 --- /dev/null +++ b/src/main/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapper.java @@ -0,0 +1,105 @@ +package dev.vality.analytics.listener.mapper.rate; + +import dev.vality.analytics.domain.db.tables.pojos.Rate; +import dev.vality.exrates.base.Currency; +import dev.vality.exrates.base.Rational; +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.exrates.events.CurrencyEventPayload; +import dev.vality.exrates.events.CurrencyExchangeRate; +import dev.vality.geck.common.util.TypeUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +@Component +@Slf4j +public class CurrencyEventMapper { + + public Rate map(CurrencyEvent event) { + String eventId = event.getEventId(); + CurrencyExchangeRate exchangeRate = validateAndGetExchangeRate(event, eventId); + if (exchangeRate == null) { + return null; + } + + Currency sourceCurrency = exchangeRate.getSourceCurrency(); + Currency destinationCurrency = exchangeRate.getDestinationCurrency(); + Rational rational = exchangeRate.getExchangeRate(); + var eventTime = parseEventTime(exchangeRate.getTimestamp(), eventId); + if (eventTime == null) { + return null; + } + + return buildRate(eventId, eventTime, sourceCurrency, destinationCurrency, rational); + } + + private CurrencyExchangeRate validateAndGetExchangeRate(CurrencyEvent event, String eventId) { + if (!event.isSetPayload()) { + log.warn("CurrencyEvent payload not set, eventId={}", eventId); + return null; + } + + CurrencyEventPayload payload = event.getPayload(); + if (!payload.isSetExchangeRate()) { + log.warn("CurrencyEvent payload is not exchange_rate, eventId={}", eventId); + return null; + } + + CurrencyExchangeRate exchangeRate = payload.getExchangeRate(); + Currency sourceCurrency = exchangeRate.getSourceCurrency(); + if (!isCurrencySet(sourceCurrency)) { + log.warn("CurrencyEvent source currency incomplete, eventId={}", eventId); + return null; + } + + Currency destinationCurrency = exchangeRate.getDestinationCurrency(); + if (!isCurrencySet(destinationCurrency)) { + log.warn("CurrencyEvent destination currency incomplete, eventId={}", eventId); + return null; + } + + Rational rational = exchangeRate.getExchangeRate(); + if (!isRationalSet(rational)) { + log.warn("CurrencyEvent exchange rate rational incomplete, eventId={}", eventId); + return null; + } + + return exchangeRate; + } + + private LocalDateTime parseEventTime(String timestamp, String eventId) { + try { + return TypeUtil.stringToLocalDateTime(timestamp); + } catch (Exception e) { + log.warn("Failed to parse timestamp '{}' for CurrencyEvent, eventId={}", timestamp, eventId, e); + return null; + } + } + + private Rate buildRate( + String eventId, + LocalDateTime eventTime, + Currency sourceCurrency, + Currency destinationCurrency, + Rational rational) { + Rate rate = new Rate(); + rate.setEventId(eventId); + rate.setEventTime(eventTime); + rate.setSourceSymbolicCode(sourceCurrency.getSymbolicCode()); + rate.setSourceExponent(sourceCurrency.getExponent()); + rate.setDestinationSymbolicCode(destinationCurrency.getSymbolicCode()); + rate.setDestinationExponent(destinationCurrency.getExponent()); + rate.setExchangeRateRationalP(rational.getP()); + rate.setExchangeRateRationalQ(rational.getQ()); + return rate; + } + + private boolean isCurrencySet(Currency currency) { + return currency != null && currency.isSetSymbolicCode() && currency.isSetExponent(); + } + + private boolean isRationalSet(Rational rational) { + return rational != null && rational.isSetP() && rational.isSetQ(); + } +} diff --git a/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java b/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java deleted file mode 100644 index 81e8d17f..00000000 --- a/src/main/java/dev/vality/analytics/listener/mapper/rate/RateCreatedMapper.java +++ /dev/null @@ -1,74 +0,0 @@ -package dev.vality.analytics.listener.mapper.rate; - -import dev.vality.analytics.constant.EventType; -import dev.vality.analytics.domain.db.tables.pojos.Rate; -import dev.vality.analytics.listener.mapper.Mapper; -import dev.vality.geck.common.util.TypeUtil; -import dev.vality.machinegun.eventsink.MachineEvent; -import dev.vality.xrates.base.Rational; -import dev.vality.xrates.base.TimestampInterval; -import dev.vality.xrates.rate.*; -import jakarta.validation.constraints.NotNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.stream.Collectors; - -@Slf4j -@Component -@RequiredArgsConstructor -public class RateCreatedMapper implements Mapper> { - - @Override - public List map(Change change, MachineEvent event) { - if (change.getCreated().getExchangeRateData().getQuotes().isEmpty()) { - log.warn("Quotes is empty, SinkEvent will not be saved, eventId={}, sourceId={}", - event.getEventId(), event.getSourceId()); - return List.of(); - } - log.info("Start rate created handling, eventId={}, sourceId={}", event.getEventId(), event.getSourceId()); - - ExchangeRateCreated exchangeRateCreated = change.getCreated(); - ExchangeRateData exchangeRateData = exchangeRateCreated.getExchangeRateData(); - TimestampInterval interval = exchangeRateData.getInterval(); - - return exchangeRateData.getQuotes().stream() - .map(quote -> initRate(event, interval, quote)) - .collect(Collectors.toList()); - } - - @NotNull - private Rate initRate(MachineEvent event, TimestampInterval interval, Quote quote) { - Rate rate = new Rate(); - rate.setSourceId(event.getSourceId()); - rate.setEventId(event.getEventId()); - rate.setEventTime(TypeUtil.stringToLocalDateTime(event.getCreatedAt())); - - // Quote - Currency source = quote.getSource(); - Currency destination = quote.getDestination(); - Rational exchangeRate = quote.getExchangeRate(); - - // Currency - rate.setSourceSymbolicCode(source.getSymbolicCode()); - rate.setSourceExponent(source.getExponent()); - rate.setDestinationSymbolicCode(destination.getSymbolicCode()); - rate.setDestinationExponent(destination.getExponent()); - - // ExchangeRate - rate.setExchangeRateRationalP(exchangeRate.getP()); - rate.setExchangeRateRationalQ(exchangeRate.getQ()); - - rate.setLowerBoundInclusive(TypeUtil.stringToLocalDateTime(interval.getLowerBoundInclusive())); - rate.setUpperBoundExclusive(TypeUtil.stringToLocalDateTime(interval.getUpperBoundExclusive())); - - return rate; - } - - @Override - public EventType getChangeType() { - return EventType.RATE_CREATED; - } -} diff --git a/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java b/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java deleted file mode 100644 index 96566281..00000000 --- a/src/main/java/dev/vality/analytics/parser/RateMachineEventParser.java +++ /dev/null @@ -1,14 +0,0 @@ -package dev.vality.analytics.parser; - -import dev.vality.sink.common.parser.impl.MachineEventParser; -import dev.vality.sink.common.serialization.BinaryDeserializer; -import dev.vality.xrates.rate.Change; -import org.springframework.stereotype.Component; - -@Component -public class RateMachineEventParser extends MachineEventParser { - - public RateMachineEventParser(BinaryDeserializer deserializer) { - super(deserializer); - } -} diff --git a/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java b/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java new file mode 100644 index 00000000..86f8f6fa --- /dev/null +++ b/src/main/java/dev/vality/analytics/serde/CurrencyEventDeserializer.java @@ -0,0 +1,16 @@ +package dev.vality.analytics.serde; + +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.geck.serializer.Geck; +import dev.vality.sink.common.serialization.BinaryDeserializer; +import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer; +import org.springframework.stereotype.Component; + +@Component +public class CurrencyEventDeserializer extends AbstractThriftBinaryDeserializer { + + @Override + public CurrencyEvent deserialize(byte[] bytes) { + return Geck.msgPackToTBase(bytes, CurrencyEvent.class); + } +} \ No newline at end of file diff --git a/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java b/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java new file mode 100644 index 00000000..6d1adde1 --- /dev/null +++ b/src/main/java/dev/vality/analytics/serde/CurrencyEventKafkaDeserializer.java @@ -0,0 +1,45 @@ +package dev.vality.analytics.serde; + +import dev.vality.exrates.events.CurrencyEvent; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TTransportException; + +import java.util.Map; + +@Slf4j +public class CurrencyEventKafkaDeserializer implements Deserializer { + + ThreadLocal thriftDeserializerThreadLocal = + ThreadLocal.withInitial(() -> { + try { + return new TDeserializer(new TBinaryProtocol.Factory()); + } catch (TTransportException e) { + throw new RuntimeException(); + } + }); + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public CurrencyEvent deserialize(String topic, byte[] data) { + log.debug("Message, topic: {}, byteLength: {}", topic, data.length); + CurrencyEvent currencyEvent = new CurrencyEvent(); + + try { + thriftDeserializerThreadLocal.get().deserialize(currencyEvent, data); + } catch (Exception e) { + log.error("Error when deserialize CurrencyEvent data: {} ", data, e); + } + + return currencyEvent; + } + + @Override + public void close() { + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index cc837ca9..4f88a505 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -53,6 +53,14 @@ clickhouse.db: connection-timeout: 10000 leak-detection-threshold: 5000 +clickhouse.flyway: + schema-mode: non-sharded + sharded: + # Required only when clickhouse.flyway.schema-mode=sharded + cluster: + shard: + replica: + postgres.db: schema: analytics type: com.zaxxer.hikari.HikariDataSource @@ -84,7 +92,7 @@ kafka: initial: mg-events-dominant max.poll.records: 50 rate: - initial: mg-events-rates + initial: etl-exchange-rate groupId: analytics-rate-group max.poll.records: 50 consumer: diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql new file mode 100644 index 00000000..8d45f9f5 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/non-sharded/V1__create_tables.sql @@ -0,0 +1,168 @@ +CREATE DATABASE IF NOT EXISTS analytic; + +CREATE TABLE IF NOT EXISTS analytic.events_sink ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + status Enum8('pending' = 1, 'processed' = 2, 'captured' = 3, 'cancelled' = 4, 'failed' = 5), + errorReason String, + errorCode String, + invoiceId String, + paymentId String, + sequenceId UInt64, + ip String, + bin String, + maskedPan String, + paymentTool String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String, + cardHolderName String DEFAULT 'UNKNOWN', + bankCardTokenProvider String, + riskScore String, + rrn String, + paymentTerminal String +) ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, paymentTool, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_refund ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + reason String, + status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3), + errorReason String, + errorCode String, + invoiceId String, + refundId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, refundId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + oldAmount UInt64, + oldGuaranteeDeposit UInt64, + oldSystemFee UInt64, + oldProviderFee UInt64, + oldExternalFee UInt64, + currency String, + reason String, + status Enum8('captured' = 1, 'cancelled' = 2), + errorCode String, + errorReason String, + invoiceId String, + adjustmentId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, adjustmentId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + chargebackCode String, + stage Enum8('chargeback' = 1, 'pre_arbitration' = 2, 'arbitration' = 3), + status Enum8('accepted' = 1, 'rejected' = 2, 'cancelled' = 3), + category Enum8('fraud' = 1, 'dispute' = 2, 'authorisation' = 3, 'processing_error' = 4), + invoiceId String, + chargebackId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, category, status, stage, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, chargebackId, sequenceId); diff --git a/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql b/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql new file mode 100644 index 00000000..b6977de8 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/non-sharded/V2__create_dictionaries.sql @@ -0,0 +1,179 @@ +DROP DICTIONARY IF EXISTS analytic.party_dictionary; +DROP DICTIONARY IF EXISTS analytic.shop_dictionary; +DROP DICTIONARY IF EXISTS analytic.category_dictionary; +DROP DICTIONARY IF EXISTS analytic.country_dictionary; +DROP DICTIONARY IF EXISTS analytic.trade_bloc_dictionary; +DROP DICTIONARY IF EXISTS analytic.rate_dictionary; + +CREATE DICTIONARY IF NOT EXISTS analytic.party_dictionary ( + id UInt64, + event_id UInt64, + event_time DateTime, + party_id String, + created_at DateTime, + email String, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'party' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary ( + id UInt64, + event_id UInt64, + event_time DateTime, + party_id String, + shop_id String, + category_id Int32, + created_at DateTime, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + details_name String, + details_description String, + location_url String, + account_currency_code String, + account_settlement String, + account_guarantee String, + international_legal_entity_country_code String, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'shop' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.category_dictionary ( + id UInt64, + version_id UInt64, + category_id Int32, + name String, + description String, + type String, + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'category' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.country_dictionary ( + id UInt64, + version_id UInt64, + country_id String, + name String, + trade_bloc Array(String), + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'country' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.trade_bloc_dictionary ( + id UInt64, + version_id UInt64, + trade_bloc_id String, + name String, + description String, + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'trade_bloc' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.rate_dictionary ( + id UInt64, + event_id String, + event_time String, + source_symbolic_code String, + source_exponent UInt32, + destination_symbolic_code String, + destination_exponent UInt32, + exchange_rate_rational_p UInt64, + exchange_rate_rational_q UInt64 +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'rate' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); diff --git a/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql b/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql new file mode 100644 index 00000000..9acc1cda --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/sharded/V1__create_tables.sql @@ -0,0 +1,180 @@ +CREATE DATABASE IF NOT EXISTS analytic; + +CREATE TABLE IF NOT EXISTS analytic.events_sink_local ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + status Enum8('pending' = 1, 'processed' = 2, 'captured' = 3, 'cancelled' = 4, 'failed' = 5), + errorReason String, + errorCode String, + invoiceId String, + paymentId String, + sequenceId UInt64, + ip String, + bin String, + maskedPan String, + paymentTool String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String, + cardHolderName String DEFAULT 'UNKNOWN', + bankCardTokenProvider String, + riskScore String, + rrn String, + paymentTerminal String +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, paymentTool, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink AS analytic.events_sink_local +ENGINE = Distributed('<>', analytic, events_sink_local, cityHash64(timestamp, partyId)); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_refund_local ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + reason String, + status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3), + errorReason String, + errorCode String, + invoiceId String, + refundId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, refundId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_refund AS analytic.events_sink_refund_local +ENGINE = Distributed('<>', analytic, events_sink_refund_local, cityHash64(timestamp, partyId)); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment_local ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + oldAmount UInt64, + oldGuaranteeDeposit UInt64, + oldSystemFee UInt64, + oldProviderFee UInt64, + oldExternalFee UInt64, + currency String, + reason String, + status Enum8('captured' = 1, 'cancelled' = 2), + errorCode String, + errorReason String, + invoiceId String, + adjustmentId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, adjustmentId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_adjustment AS analytic.events_sink_adjustment_local +ENGINE = Distributed('<>', analytic, events_sink_adjustment_local, cityHash64(timestamp, partyId)); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback_local ( + timestamp Date, + eventTime UInt64, + eventTimeHour UInt64, + partyId String, + shopId String, + email String, + providerName String, + amount UInt64, + guaranteeDeposit UInt64, + systemFee UInt64, + providerFee UInt64, + externalFee UInt64, + currency String, + chargebackCode String, + stage Enum8('chargeback' = 1, 'pre_arbitration' = 2, 'arbitration' = 3), + status Enum8('accepted' = 1, 'rejected' = 2, 'cancelled' = 3), + category Enum8('fraud' = 1, 'dispute' = 2, 'authorisation' = 3, 'processing_error' = 4), + invoiceId String, + chargebackId String, + paymentId String, + sequenceId UInt64, + ip String, + fingerprint String, + cardToken String, + paymentSystem String, + digitalWalletProvider String, + digitalWalletToken String, + cryptoCurrency String, + mobileOperator String, + paymentCountry String, + bankCountry String, + paymentTime UInt64, + providerId String, + terminal String +) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/<>/tables/<>/{database}/{table}', '<>') +PARTITION BY toYYYYMM(timestamp) +ORDER BY (eventTimeHour, partyId, shopId, category, status, stage, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, chargebackId, sequenceId); + +CREATE TABLE IF NOT EXISTS analytic.events_sink_chargeback AS analytic.events_sink_chargeback_local +ENGINE = Distributed('<>', analytic, events_sink_chargeback_local, cityHash64(timestamp, partyId)); diff --git a/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql b/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql new file mode 100644 index 00000000..b6977de8 --- /dev/null +++ b/src/main/resources/db/migration-clickhouse/sharded/V2__create_dictionaries.sql @@ -0,0 +1,179 @@ +DROP DICTIONARY IF EXISTS analytic.party_dictionary; +DROP DICTIONARY IF EXISTS analytic.shop_dictionary; +DROP DICTIONARY IF EXISTS analytic.category_dictionary; +DROP DICTIONARY IF EXISTS analytic.country_dictionary; +DROP DICTIONARY IF EXISTS analytic.trade_bloc_dictionary; +DROP DICTIONARY IF EXISTS analytic.rate_dictionary; + +CREATE DICTIONARY IF NOT EXISTS analytic.party_dictionary ( + id UInt64, + event_id UInt64, + event_time DateTime, + party_id String, + created_at DateTime, + email String, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'party' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.shop_dictionary ( + id UInt64, + event_id UInt64, + event_time DateTime, + party_id String, + shop_id String, + category_id Int32, + created_at DateTime, + blocking String, + blocked_reason String, + blocked_since DateTime, + unblocked_reason String, + unblocked_since DateTime, + suspension String, + suspension_active_since DateTime, + suspension_suspended_since DateTime, + details_name String, + details_description String, + location_url String, + account_currency_code String, + account_settlement String, + account_guarantee String, + international_legal_entity_country_code String, + version_id UInt64, + changed_by_id String, + changed_by_email String, + changed_by_name String, + deleted Bool +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'shop' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.category_dictionary ( + id UInt64, + version_id UInt64, + category_id Int32, + name String, + description String, + type String, + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'category' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.country_dictionary ( + id UInt64, + version_id UInt64, + country_id String, + name String, + trade_bloc Array(String), + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'country' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.trade_bloc_dictionary ( + id UInt64, + version_id UInt64, + trade_bloc_id String, + name String, + description String, + deleted Bool, + changed_by_id String, + changed_by_email String, + changed_by_name String +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'trade_bloc' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); + +CREATE DICTIONARY IF NOT EXISTS analytic.rate_dictionary ( + id UInt64, + event_id String, + event_time String, + source_symbolic_code String, + source_exponent UInt32, + destination_symbolic_code String, + destination_exponent UInt32, + exchange_rate_rational_p UInt64, + exchange_rate_rational_q UInt64 +) +PRIMARY KEY id +SOURCE(POSTGRESQL( + HOST '<>' + PORT <> + USER '<>' + PASSWORD '<>' + DB '<>' + SCHEMA '<>' + TABLE 'rate' + )) +LAYOUT(HASHED()) +LIFETIME(MIN 300 MAX 360); diff --git a/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql b/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql new file mode 100644 index 00000000..3837c9da --- /dev/null +++ b/src/main/resources/db/migration/V20__alter_rate_table_for_exrates.sql @@ -0,0 +1,12 @@ +DROP INDEX IF EXISTS analytics.rate_source_id_idx; + +ALTER TABLE analytics.rate +ALTER COLUMN event_id TYPE VARCHAR USING event_id::VARCHAR; + +ALTER TABLE analytics.rate +DROP COLUMN source_id, +DROP COLUMN lower_bound_inclusive, +DROP COLUMN upper_bound_exclusive; + +CREATE UNIQUE INDEX rate_source_symbolic_code_destination_symbolic_code_event_time_idx + ON analytics.rate (source_symbolic_code, destination_symbolic_code, event_time); diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTest.java b/src/test/java/dev/vality/analytics/config/ClickhouseTest.java index ecb2f59b..2c1e35fd 100644 --- a/src/test/java/dev/vality/analytics/config/ClickhouseTest.java +++ b/src/test/java/dev/vality/analytics/config/ClickhouseTest.java @@ -1,6 +1,6 @@ package dev.vality.analytics.config; -import dev.vality.testcontainers.annotations.clickhouse.ClickhouseTestcontainerSingleton; +import org.junit.jupiter.api.extension.ExtendWith; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -9,12 +9,6 @@ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) -@ClickhouseTestcontainerSingleton( - dbNameShouldBeDropped = "analytic", - migrations = { - "sql/V1__db_init.sql", - "sql/V2__add_fields.sql", - "sql/V3__add_provider_field.sql", - "sql/test.data/inserts_event_sink.sql"}) +@ExtendWith(ClickhouseTestExtension.class) public @interface ClickhouseTest { } diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java b/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java new file mode 100644 index 00000000..ce2c678c --- /dev/null +++ b/src/test/java/dev/vality/analytics/config/ClickhouseTestContainerHolder.java @@ -0,0 +1,25 @@ +package dev.vality.analytics.config; + +import org.testcontainers.clickhouse.ClickHouseContainer; + +final class ClickhouseTestContainerHolder { + + private static final String IMAGE = + System.getProperty("clickhouse.test.image", "clickhouse/clickhouse-server:23.10.3"); + + private static final ClickHouseContainer CONTAINER = new ClickHouseContainer(IMAGE) + .withAccessToHost(true) + .withUsername("test") + .withPassword("test") + .withDatabaseName("default"); + + private ClickhouseTestContainerHolder() { + } + + static synchronized ClickHouseContainer getContainer() { + if (!CONTAINER.isRunning()) { + CONTAINER.start(); + } + return CONTAINER; + } +} diff --git a/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java b/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java new file mode 100644 index 00000000..8db9feae --- /dev/null +++ b/src/test/java/dev/vality/analytics/config/ClickhouseTestExtension.java @@ -0,0 +1,97 @@ +package dev.vality.analytics.config; + +import dev.vality.testcontainers.annotations.postgresql.PostgresqlContainerExtension; +import dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerFactory; +import org.flywaydb.core.Flyway; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.Testcontainers; + +import java.sql.DriverManager; +import java.util.List; +import java.util.Map; + +public class ClickhouseTestExtension implements BeforeAllCallback, BeforeEachCallback { + + private static final String ANALYTIC_DB = "analytic"; + private static final int POSTGRES_PORT = 5432; + private static final String POSTGRES_SCHEMA = "analytics"; + private static final String CLICKHOUSE_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/non-sharded"; + private static final String CLICKHOUSE_TEST_MIGRATION_LOCATION = "classpath:db/test-migration-clickhouse"; + private static final String TESTCONTAINERS_POSTGRES_HOST = "host.testcontainers.internal"; + + @Override + public void beforeAll(ExtensionContext context) { + var postgres = PostgresqlTestcontainerFactory.singletonContainer(); + if (!postgres.isRunning()) { + postgres.start(); + } + Testcontainers.exposeHostPorts(postgres.getMappedPort(POSTGRES_PORT)); + + var container = ClickhouseTestContainerHolder.getContainer(); + System.setProperty("clickhouse.db.url", buildClickHouseDatabaseUrl(container.getJdbcUrl())); + System.setProperty("clickhouse.db.username", container.getUsername()); + System.setProperty("clickhouse.db.password", container.getPassword()); + System.setProperty("clickhouse.flyway.enabled", "false"); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + var postgres = PostgresqlTestcontainerFactory.singletonContainer(); + migratePostgres(postgres); + + var container = ClickhouseTestContainerHolder.getContainer(); + try (var connection = DriverManager.getConnection( + container.getJdbcUrl(), + container.getUsername(), + container.getPassword())) { + try (var statement = connection.createStatement()) { + statement.execute("DROP DATABASE IF EXISTS " + ANALYTIC_DB); + statement.execute("DROP TABLE IF EXISTS clickhouse_flyway_schema_history"); + } + } + migrateClickHouse(container, postgres); + } + + private void migratePostgres(PostgresqlContainerExtension postgres) { + Flyway.configure() + .dataSource(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()) + .schemas(POSTGRES_SCHEMA) + .load() + .migrate(); + } + + private void migrateClickHouse(ClickHouseContainer clickhouse, PostgresqlContainerExtension postgres) { + DriverManagerDataSource dataSource = new DriverManagerDataSource( + clickhouse.getJdbcUrl(), + clickhouse.getUsername(), + clickhouse.getPassword()); + Map placeholders = ClickHouseFlywaySupport.resolvePostgresPlaceholders( + TESTCONTAINERS_POSTGRES_HOST, + postgres.getMappedPort(POSTGRES_PORT), + postgres.getDatabaseName(), + postgres.getUsername(), + postgres.getPassword(), + POSTGRES_SCHEMA); + Flyway flyway = ClickHouseFlywaySupport.createFlyway( + dataSource, + List.of(CLICKHOUSE_MIGRATION_LOCATION, CLICKHOUSE_TEST_MIGRATION_LOCATION), + placeholders, + "clickhouse_flyway_schema_history"); + flyway.migrate(); + } + + private String buildClickHouseDatabaseUrl(String jdbcUrl) { + int paramsStart = jdbcUrl.indexOf('?'); + String base = paramsStart >= 0 ? jdbcUrl.substring(0, paramsStart) : jdbcUrl; + String params = paramsStart >= 0 ? jdbcUrl.substring(paramsStart) : ""; + int databaseSeparator = base.lastIndexOf('/'); + if (databaseSeparator < 0) { + throw new IllegalArgumentException("Unsupported ClickHouse JDBC URL: " + jdbcUrl); + } + return base.substring(0, databaseSeparator + 1) + ANALYTIC_DB + params; + } +} diff --git a/src/test/java/dev/vality/analytics/config/SpringBootITest.java b/src/test/java/dev/vality/analytics/config/SpringBootITest.java index 779c5c25..12462fb9 100644 --- a/src/test/java/dev/vality/analytics/config/SpringBootITest.java +++ b/src/test/java/dev/vality/analytics/config/SpringBootITest.java @@ -1,6 +1,5 @@ package dev.vality.analytics.config; - import org.springframework.boot.test.context.SpringBootTest; import java.lang.annotation.ElementType; diff --git a/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java b/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java new file mode 100644 index 00000000..4571e87c --- /dev/null +++ b/src/test/java/dev/vality/analytics/listener/RateEventTestUtils.java @@ -0,0 +1,53 @@ +package dev.vality.analytics.listener; + +import dev.vality.exrates.base.Currency; +import dev.vality.exrates.base.Rational; +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.exrates.events.CurrencyEventPayload; +import dev.vality.exrates.events.CurrencyExchangeRate; +import dev.vality.geck.common.util.TypeUtil; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class RateEventTestUtils { + + public static CurrencyEvent createCurrencyEvent(String eventId, String sourceCode, String destCode, + long p, long q, Instant timestamp) { + Currency sourceCurrency = new Currency() + .setSymbolicCode(sourceCode) + .setExponent((short) 2); + Currency destCurrency = new Currency() + .setSymbolicCode(destCode) + .setExponent((short) 2); + Rational rational = new Rational() + .setP(p) + .setQ(q); + CurrencyExchangeRate exchangeRate = new CurrencyExchangeRate(); + exchangeRate.setSourceCurrency(sourceCurrency); + exchangeRate.setDestinationCurrency(destCurrency); + exchangeRate.setExchangeRate(rational); + exchangeRate.setTimestamp(TypeUtil.temporalToString(timestamp)); + CurrencyEventPayload payload = new CurrencyEventPayload(); + payload.setExchangeRate(exchangeRate); + CurrencyEvent event = new CurrencyEvent(); + event.setEventId(eventId); + event.setEventCreatedAt(TypeUtil.temporalToString(timestamp)); + event.setPayload(payload); + return event; + } + + public static List createCurrencyEvents(int count, Instant baseTime) { + return IntStream.range(0, count) + .mapToObj(i -> createCurrencyEvent( + "event_" + i, + "USD", + "RUB", + 75L + i, + 1L, + baseTime.plusSeconds(i * 3600))) + .collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/src/test/java/dev/vality/analytics/listener/RateListenerTest.java b/src/test/java/dev/vality/analytics/listener/RateListenerTest.java index f5363266..e5789732 100644 --- a/src/test/java/dev/vality/analytics/listener/RateListenerTest.java +++ b/src/test/java/dev/vality/analytics/listener/RateListenerTest.java @@ -1,8 +1,6 @@ package dev.vality.analytics.listener; import dev.vality.analytics.config.SpringBootITest; -import dev.vality.analytics.utils.RateSinkEventTestUtils; -import dev.vality.machinegun.eventsink.SinkEvent; import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer; import org.apache.thrift.TBase; import org.junit.jupiter.api.Test; @@ -10,10 +8,11 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -28,22 +27,30 @@ public class RateListenerTest { private KafkaProducer> testThriftKafkaProducer; @Test - public void handle() throws InterruptedException { - String sourceId = "CBR"; + public void handle() { + List currencyEvents = RateEventTestUtils.createCurrencyEvents( + 4, Instant.now()); - final List sinkEvents = RateSinkEventTestUtils.create(sourceId); - sinkEvents.forEach(event -> testThriftKafkaProducer.send(rateTopic, event)); + currencyEvents.forEach(event -> testThriftKafkaProducer.send(rateTopic, event)); - await().atMost(60, SECONDS).until(() -> { - Integer count = postgresJdbcTemplate.queryForObject("SELECT count(*) FROM analytics.rate", Integer.class); - if (count == 0) { + await().atMost(60, TimeUnit.SECONDS).until(() -> { + Integer count = postgresJdbcTemplate.queryForObject( + "SELECT count(*) FROM analytics.rate", Integer.class); + if (count < 4) { Thread.sleep(1000); return false; } return true; }); - final List> maps = postgresJdbcTemplate.queryForList("SELECT * FROM analytics.rate"); + final List> maps = postgresJdbcTemplate.queryForList("SELECT * FROM analytics.rate"); assertEquals(4, maps.size()); + + maps.forEach(row -> { + assertEquals("USD", row.get("source_symbolic_code")); + assertEquals("RUB", row.get("destination_symbolic_code")); + assertEquals(2, row.get("source_exponent")); + assertEquals(2, row.get("destination_exponent")); + }); } } diff --git a/src/test/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapperTest.java b/src/test/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapperTest.java new file mode 100644 index 00000000..3e6a03e4 --- /dev/null +++ b/src/test/java/dev/vality/analytics/listener/mapper/rate/CurrencyEventMapperTest.java @@ -0,0 +1,141 @@ +package dev.vality.analytics.listener.mapper.rate; + +import dev.vality.analytics.domain.db.tables.pojos.Rate; +import dev.vality.exrates.base.Currency; +import dev.vality.exrates.base.Rational; +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.exrates.events.CurrencyEventPayload; +import dev.vality.exrates.events.CurrencyExchangeRate; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.*; + +class CurrencyEventMapperTest { + + private CurrencyEventMapper mapper; + + @BeforeEach + void setUp() { + mapper = new CurrencyEventMapper(); + } + + @Test + void mapValidEvent() { + CurrencyEvent event = createValidCurrencyEvent("event1", "USD", "RUB", 75L, 1L, + Instant.parse("2024-01-01T12:00:00Z")); + + Rate rate = mapper.map(event); + + assertNotNull(rate); + assertEquals("event1", rate.getEventId()); + assertEquals(LocalDateTime.of(2024, 1, 1, 12, 0, 0), rate.getEventTime()); + assertEquals("USD", rate.getSourceSymbolicCode()); + assertEquals((short) 2, rate.getSourceExponent()); + assertEquals("RUB", rate.getDestinationSymbolicCode()); + assertEquals((short) 2, rate.getDestinationExponent()); + assertEquals(75L, rate.getExchangeRateRationalP()); + assertEquals(1L, rate.getExchangeRateRationalQ()); + } + + @Test + void mapEventMissingPayload() { + CurrencyEvent event = new CurrencyEvent(); + event.setEventId("event1"); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + @Test + void mapEventPayloadNotExchangeRate() { + CurrencyEvent event = new CurrencyEvent(); + event.setEventId("event1"); + CurrencyEventPayload payload = new CurrencyEventPayload(); + // payload not set + event.setPayload(payload); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + @Test + void mapEventMissingSourceCurrency() { + CurrencyEvent event = createValidCurrencyEvent("event1", "USD", "RUB", 75L, 1L, + Instant.parse("2024-01-01T12:00:00Z")); + // Clear source currency + event.getPayload().getExchangeRate().unsetSourceCurrency(); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + @Test + void mapEventMissingDestinationCurrency() { + CurrencyEvent event = createValidCurrencyEvent("event1", "USD", "RUB", 75L, 1L, + Instant.parse("2024-01-01T12:00:00Z")); + event.getPayload().getExchangeRate().unsetDestinationCurrency(); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + @Test + void mapEventMissingRational() { + CurrencyEvent event = createValidCurrencyEvent("event1", "USD", "RUB", 75L, 1L, + Instant.parse("2024-01-01T12:00:00Z")); + event.getPayload().getExchangeRate().unsetExchangeRate(); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + @Test + void mapEventInvalidTimestamp() { + CurrencyEvent event = createValidCurrencyEvent("event1", "USD", "RUB", 75L, 1L, + Instant.parse("2024-01-01T12:00:00Z")); + event.getPayload().getExchangeRate().setTimestamp("not-a-valid-timestamp"); + + Rate rate = mapper.map(event); + + assertNull(rate); + } + + private CurrencyEvent createValidCurrencyEvent(String eventId, String sourceCode, String destCode, + long p, long q, Instant timestamp) { + Currency sourceCurrency = new Currency(); + sourceCurrency.setSymbolicCode(sourceCode); + sourceCurrency.setExponent((short) 2); + + Currency destCurrency = new Currency(); + destCurrency.setSymbolicCode(destCode); + destCurrency.setExponent((short) 2); + + Rational rational = new Rational(); + rational.setP(p); + rational.setQ(q); + + CurrencyExchangeRate exchangeRate = new CurrencyExchangeRate(); + exchangeRate.setSourceCurrency(sourceCurrency); + exchangeRate.setDestinationCurrency(destCurrency); + exchangeRate.setExchangeRate(rational); + exchangeRate.setTimestamp(timestamp.toString()); + + CurrencyEventPayload payload = new CurrencyEventPayload(); + payload.setExchangeRate(exchangeRate); + + CurrencyEvent event = new CurrencyEvent(); + event.setEventId(eventId); + event.setPayload(payload); + + return event; + } +} \ No newline at end of file diff --git a/src/test/java/dev/vality/analytics/utils/RateSinkEventTestUtils.java b/src/test/java/dev/vality/analytics/utils/RateSinkEventTestUtils.java deleted file mode 100644 index abe7fe81..00000000 --- a/src/test/java/dev/vality/analytics/utils/RateSinkEventTestUtils.java +++ /dev/null @@ -1,46 +0,0 @@ -package dev.vality.analytics.utils; - -import dev.vality.geck.serializer.Geck; -import dev.vality.machinegun.eventsink.MachineEvent; -import dev.vality.machinegun.eventsink.SinkEvent; -import dev.vality.machinegun.msgpack.Value; -import dev.vality.xrates.base.TimestampInterval; -import dev.vality.xrates.rate.Change; -import dev.vality.xrates.rate.ExchangeRateCreated; -import dev.vality.xrates.rate.ExchangeRateData; -import dev.vality.xrates.rate.Quote; - -import java.time.Instant; -import java.util.Collections; -import java.util.List; - -import static dev.vality.testcontainers.annotations.util.RandomBeans.randomListOf; - -public class RateSinkEventTestUtils { - - public static List create(String sourceId, String... excludedFields) { - List quotes = randomListOf(4, Quote.class, excludedFields); - quotes.forEach(quote -> { - quote.getDestination().setExponent((short) 2); - quote.getSource().setExponent((short) 2); - quote.getExchangeRate().setQ(1L); - quote.getExchangeRate().setP(1L); - }); - SinkEvent sinkEvent = new SinkEvent(); - sinkEvent.setEvent(new MachineEvent() - .setEventId(123L) - .setCreatedAt("2016-03-22T06:12:27Z") - .setSourceId(sourceId) - .setSourceNs(sourceId) - .setData(Value.bin(Geck.toMsgPack(Change.created( - new ExchangeRateCreated( - new ExchangeRateData( - new TimestampInterval( - Instant.now().toString(), - Instant.now().toString() - ), - quotes - ))))))); - return Collections.singletonList(sinkEvent); - } -} diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties new file mode 100644 index 00000000..9bfd0ae6 --- /dev/null +++ b/src/test/resources/application.properties @@ -0,0 +1 @@ +clickhouse.flyway.enabled=false diff --git a/src/test/resources/sql/test.data/inserts_event_sink.sql b/src/test/resources/db/test-migration-clickhouse/V1000__seed_events_sink.sql similarity index 100% rename from src/test/resources/sql/test.data/inserts_event_sink.sql rename to src/test/resources/db/test-migration-clickhouse/V1000__seed_events_sink.sql diff --git a/src/test/resources/sql/V1__db_init.sql b/src/test/resources/sql/V1__db_init.sql deleted file mode 100644 index 6ff26a71..00000000 --- a/src/test/resources/sql/V1__db_init.sql +++ /dev/null @@ -1,216 +0,0 @@ -CREATE DATABASE IF NOT EXISTS analytic; - -DROP TABLE IF EXISTS analytic.events_sink; - -create table analytic.events_sink -( - timestamp Date, - eventTime UInt64, - eventTimeHour UInt64, - - partyId String, - shopId String, - - email String, - providerName String, - - amount UInt64, - guaranteeDeposit UInt64, - systemFee UInt64, - providerFee UInt64, - externalFee UInt64, - currency String, - - status Enum8('pending' = 1, 'processed' = 2, 'captured' = 3, 'cancelled' = 4, 'failed' = 5), - errorReason String, - errorCode String, - - invoiceId String, - paymentId String, - sequenceId UInt64, - - ip String, - bin String, - maskedPan String, - paymentTool String, - fingerprint String, - cardToken String, - paymentSystem String, - digitalWalletProvider String, - digitalWalletToken String, - cryptoCurrency String, - mobileOperator String, - - paymentCountry String, - bankCountry String, - - paymentTime UInt64, - providerId String, - terminal String -) ENGINE = ReplacingMergeTree() -PARTITION BY toYYYYMM (timestamp) -ORDER BY (eventTimeHour, partyId, shopId, paymentTool, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, sequenceId); - -DROP TABLE IF EXISTS analytic.events_sink_refund; - -create table analytic.events_sink_refund -( - timestamp Date, - eventTime UInt64, - eventTimeHour UInt64, - - partyId String, - shopId String, - - email String, - providerName String, - - amount UInt64, - guaranteeDeposit UInt64, - systemFee UInt64, - providerFee UInt64, - externalFee UInt64, - currency String, - - reason String, - - status Enum8('pending' = 1, 'succeeded' = 2, 'failed' = 3), - errorReason String, - errorCode String, - - invoiceId String, - refundId String, - paymentId String, - sequenceId UInt64, - - ip String, - fingerprint String, - cardToken String, - paymentSystem String, - digitalWalletProvider String, - digitalWalletToken String, - cryptoCurrency String, - mobileOperator String, - - paymentCountry String, - bankCountry String, - - paymentTime UInt64, - providerId String, - terminal String -) ENGINE = ReplacingMergeTree() -PARTITION BY toYYYYMM (timestamp) -ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, refundId, sequenceId); - -DROP TABLE IF EXISTS analytic.events_sink_adjustment; - -create table analytic.events_sink_adjustment -( - timestamp Date, - eventTime UInt64, - eventTimeHour UInt64, - - partyId String, - shopId String, - - email String, - providerName String, - - amount UInt64, - guaranteeDeposit UInt64, - systemFee UInt64, - providerFee UInt64, - externalFee UInt64, - - oldAmount UInt64, - oldGuaranteeDeposit UInt64, - oldSystemFee UInt64, - oldProviderFee UInt64, - oldExternalFee UInt64, - - currency String, - - reason String, - - status Enum8('captured' = 1, 'cancelled' = 2), - errorCode String, - errorReason String, - - invoiceId String, - adjustmentId String, - paymentId String, - sequenceId UInt64, - - ip String, - fingerprint String, - cardToken String, - paymentSystem String, - digitalWalletProvider String, - digitalWalletToken String, - cryptoCurrency String, - mobileOperator String, - - paymentCountry String, - bankCountry String, - - paymentTime UInt64, - providerId String, - terminal String - -) ENGINE = ReplacingMergeTree() -PARTITION BY toYYYYMM (timestamp) -ORDER BY (eventTimeHour, partyId, shopId, status, currency, providerName, fingerprint, cardToken, invoiceId, paymentId, adjustmentId, sequenceId); - -DROP TABLE IF EXISTS analytic.events_sink_chargeback; - -create table analytic.events_sink_chargeback -( - timestamp Date, - eventTime UInt64, - eventTimeHour UInt64, - - partyId String, - shopId String, - - email String, - providerName String, - - amount UInt64, - guaranteeDeposit UInt64, - systemFee UInt64, - providerFee UInt64, - externalFee UInt64, - - currency String, - - chargebackCode String, - - stage Enum8('chargeback' = 1, 'pre_arbitration' = 2, 'arbitration' = 3), - status Enum8('accepted' = 1, 'rejected' = 2, 'cancelled' = 3), - category Enum8('fraud' = 1, 'dispute' = 2, 'authorisation' = 3, 'processing_error' = 4), - - invoiceId String, - chargebackId String, - paymentId String, - sequenceId UInt64, - - ip String, - fingerprint String, - cardToken String, - paymentSystem String, - digitalWalletProvider String, - digitalWalletToken String, - cryptoCurrency String, - mobileOperator String, - - paymentCountry String, - bankCountry String, - - paymentTime UInt64, - providerId String, - terminal String - -) ENGINE = ReplacingMergeTree() -PARTITION BY toYYYYMM (timestamp) -ORDER BY (eventTimeHour, partyId, shopId, category, status, stage, currency, providerName, fingerprint, cardToken, -invoiceId, paymentId, chargebackId, sequenceId); diff --git a/src/test/resources/sql/V2__add_fields.sql b/src/test/resources/sql/V2__add_fields.sql deleted file mode 100644 index 5f6e8a45..00000000 --- a/src/test/resources/sql/V2__add_fields.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE analytic.events_sink ADD COLUMN cardHolderName String DEFAULT 'UNKNOWN'; diff --git a/src/test/resources/sql/V3__add_provider_field.sql b/src/test/resources/sql/V3__add_provider_field.sql deleted file mode 100644 index 160fd227..00000000 --- a/src/test/resources/sql/V3__add_provider_field.sql +++ /dev/null @@ -1,4 +0,0 @@ -ALTER TABLE analytic.events_sink ADD COLUMN bankCardTokenProvider String; -ALTER TABLE analytic.events_sink ADD COLUMN riskScore String; -ALTER TABLE analytic.events_sink ADD COLUMN rrn String; -ALTER TABLE analytic.events_sink ADD COLUMN paymentTerminal String; diff --git a/src/test/resources/sql/V4__party_shop_category_dictionary.sql b/src/test/resources/sql/V4__party_shop_category_dictionary.sql deleted file mode 100644 index e46091cf..00000000 --- a/src/test/resources/sql/V4__party_shop_category_dictionary.sql +++ /dev/null @@ -1,100 +0,0 @@ -CREATE DICTIONARY party_dictionary ( - id UInt64, - event_id String, - event_time String, - party_id String, - created_at String, - email String, - blocking String, - blocked_reason String, - blocked_since String, - unblocked_reason String, - unblocked_since String, - suspension String, - suspension_active_since String, - suspension_suspended_since String, - revision_id String, - revision_changed_at String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.party')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) - -CREATE DICTIONARY shop_dictionary ( - id UInt64, - event_id String, - event_time String, - party_id String, - shop_id String, - - category_id Int32, - contract_id String, - payout_tool_id String, - payout_schedule_id Int32, - created_at String, - blocked_reason String, - blocked_since String, - unblocked_reason String, - unblocked_since String, - suspension String, - suspension_active_since String, - suspension_suspended_since String, - details_name String, - details_description String, - location_url String, - account_currency_code String, - account_settlement String, - account_guarantee String, - account_payout String, - - contractor_id String, - contractor_type String, - reg_user_email String, - legal_entity_type String, - russian_legal_entity_name String, - russian_legal_entity_registered_number String, - russian_legal_entity_inn String, - russian_legal_entity_actual_address String, - russian_legal_entity_post_address String, - russian_legal_entity_representative_position String, - russian_legal_entity_representative_full_name String, - russian_legal_entity_representative_document String, - russian_legal_entity_bank_account String, - russian_legal_entity_bank_name String, - russian_legal_entity_bank_post_account String, - russian_legal_entity_bank_bik String, - - international_legal_entity_name String, - international_legal_entity_trading_name String, - international_actual_address String, - international_legal_entity_registered_address String, - international_legal_entity_registered_number String, - - private_entity_type String, - russian_private_entity_first_name String, - russian_private_entity_second_name String, - russian_private_entity_middle_name String, - russian_private_entity_phone_number String, - russian_private_entity_email String, - - contractor_identification_level String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.shop')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) - -CREATE DICTIONARY category_dictionary ( - id UInt64, - version_id UInt64, - category_id UInt32, - name String, - description String, - type String, - deleted String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.category')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) diff --git a/src/test/resources/sql/V5__rate_dictionary.sql b/src/test/resources/sql/V5__rate_dictionary.sql deleted file mode 100644 index 321305a2..00000000 --- a/src/test/resources/sql/V5__rate_dictionary.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE DICTIONARY rate_dictionary ( - id UInt64, - event_id UInt64, - event_time String, - source_id String, - lower_bound_inclusive String, - upper_bound_exclusive String, - source_symbolic_code String, - source_exponent UInt32, - destination_symbolic_code String, - destination_exponent UInt32, - exchange_rate_rational_p UInt64, - exchange_rate_rational_q UInt64 -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.rate')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) diff --git a/src/test/resources/sql/V6__add _country_and_trade_bloc.sql b/src/test/resources/sql/V6__add _country_and_trade_bloc.sql deleted file mode 100644 index a5c6a430..00000000 --- a/src/test/resources/sql/V6__add _country_and_trade_bloc.sql +++ /dev/null @@ -1,96 +0,0 @@ ---country-- -CREATE -DICTIONARY country_dictionary ( - id UInt64, - version_id UInt64, - country_id UInt32, - name String, - trade_bloc Array(String), - deleted String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.country')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) - ---trade_bloc-- -CREATE -DICTIONARY trade_bloc_dictionary ( - id UInt64, - version_id UInt64, - trade_bloc_id UInt32, - name String, - description String, - deleted String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.trade_bloc')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) - ---shop-- -CREATE -DICTIONARY shop_dictionary ( - id UInt64, - event_id String, - event_time String, - party_id String, - shop_id String, - - category_id Int32, - contract_id String, - payout_tool_id String, - payout_schedule_id Int32, - created_at String, - blocked_reason String, - blocked_since String, - unblocked_reason String, - unblocked_since String, - suspension String, - suspension_active_since String, - suspension_suspended_since String, - details_name String, - details_description String, - location_url String, - account_currency_code String, - account_settlement String, - account_guarantee String, - account_payout String, - - contractor_id String, - contractor_type String, - reg_user_email String, - legal_entity_type String, - russian_legal_entity_name String, - russian_legal_entity_registered_number String, - russian_legal_entity_inn String, - russian_legal_entity_actual_address String, - russian_legal_entity_post_address String, - russian_legal_entity_representative_position String, - russian_legal_entity_representative_full_name String, - russian_legal_entity_representative_document String, - russian_legal_entity_bank_account String, - russian_legal_entity_bank_name String, - russian_legal_entity_bank_post_account String, - russian_legal_entity_bank_bik String, - - international_legal_entity_name String, - international_legal_entity_trading_name String, - international_actual_address String, - international_legal_entity_registered_address String, - international_legal_entity_registered_number String, - international_legal_entity_country_code String, - - private_entity_type String, - russian_private_entity_first_name String, - russian_private_entity_second_name String, - russian_private_entity_middle_name String, - russian_private_entity_phone_number String, - russian_private_entity_email String, - - contractor_identification_level String -) -PRIMARY KEY id -SOURCE(ODBC(connection_string 'DSN=myconnection;UID=user;PWD=password;HOST=host;PORT=5432;DATABASE=analytics' table 'analytics.shop')) -LAYOUT(HASHED()) -LIFETIME(MIN 300 MAX 360) diff --git a/src/test/resources/sql/V7__new_payouts.sql b/src/test/resources/sql/V7__new_payouts.sql deleted file mode 100644 index 4c10060e..00000000 --- a/src/test/resources/sql/V7__new_payouts.sql +++ /dev/null @@ -1,24 +0,0 @@ -DROP TABLE IF EXISTS analytic.events_sink_payout; - -CREATE TABLE analytic.events_sink_payout -( - payoutId String, - status Enum8('unpaid' = 1, 'paid' = 2, 'cancelled' = 3, 'confirmed' = 4), - payoutToolId String, - statusCancelledDetails String, - isCancelledAfterBeingPaid UInt8, - - timestamp Date, - eventTime UInt64, - eventTimeHour UInt64, - payoutTime UInt64, - - shopId String, - partyId String, - - amount UInt64, - fee UInt64, - currency String -) ENGINE = ReplacingMergeTree() -PARTITION BY toYYYYMM (timestamp) -ORDER BY (eventTimeHour, partyId, shopId, status, payoutId, currency);