Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>3.1.9</version>
<version>3.1.10</version>
</parent>

<artifactId>analytics</artifactId>
Expand All @@ -26,8 +26,11 @@
<db.name>analytics</db.name>
<db.schema>analytics</db.schema>
<db.url>jdbc:postgresql://localhost:${db.port}/${db.name}</db.url>
<flyway.version>10.22.0</flyway.version>
</properties>



<dependencies>
<!--dev.vality-->
<dependency>
Expand Down Expand Up @@ -64,8 +67,8 @@
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>xrates-proto</artifactId>
<version>1.19-e7ad3f5</version>
<artifactId>exrates-proto</artifactId>
<version>1.9-5d53aec</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
Expand Down Expand Up @@ -146,11 +149,17 @@
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
<version>${flyway.version}</version>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-clickhouse</artifactId>
<version>10.24.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -179,7 +188,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>testcontainers-annotations</artifactId>
<version>3.3.2</version>
<version>4.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -254,6 +263,7 @@
<plugin>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>${flyway.version}</version>
<configuration>
<url>${db.url}</url>
<user>${db.user}</user>
Expand Down Expand Up @@ -343,6 +353,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.42</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
92 changes: 92 additions & 0 deletions src/main/java/dev/vality/analytics/config/ClickHouseConfig.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
package dev.vality.analytics.config;

import dev.vality.analytics.config.properties.ClickHouseDbProperties;
import org.flywaydb.core.Flyway;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

@Configuration
public class ClickHouseConfig {

private static final String NON_SHARDED_SCHEMA_MODE = "non-sharded";
private static final String SHARDED_SCHEMA_MODE = "sharded";
private static final String NON_SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/non-sharded";
private static final String SHARDED_MIGRATION_LOCATION = "classpath:db/migration-clickhouse/sharded";
private static final String CLUSTER_PLACEHOLDER = "cluster";
private static final String SHARD_PLACEHOLDER = "shard";
private static final String REPLICA_PLACEHOLDER = "replica";
private static final String CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX = "clickhouse.flyway.sharded.";
private static final String SHARDED_CLUSTER_PROPERTY =
CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + CLUSTER_PLACEHOLDER;
private static final String SHARDED_SHARD_PROPERTY =
CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + SHARD_PLACEHOLDER;
private static final String SHARDED_REPLICA_PROPERTY =
CLICKHOUSE_FLYWAY_SHARDED_PROPERTY_PREFIX + REPLICA_PLACEHOLDER;

@Bean(name = "clickHouseDataSourceProperties")
@ConfigurationProperties(prefix = "clickhouse.db")
public ClickHouseDbProperties clickHouseDataSourceProperties() {
Expand All @@ -34,6 +58,34 @@ public JdbcTemplate clickHouseJdbcTemplate(@Qualifier("clickHouseDataSource") Da
return new JdbcTemplate(clickHouseDataSource);
}

@Bean
@DependsOn("flyway")
@ConditionalOnProperty(prefix = "clickhouse.flyway", name = "enabled", havingValue = "true", matchIfMissing = true)
public Flyway clickHouseFlyway(
@Qualifier("clickHouseDataSource") DataSource clickHouseDataSource,
@Qualifier("dataSourceProperties") DataSourceProperties postgresDataSourceProperties,
@Value("${clickhouse.flyway.schema-mode:non-sharded}") String schemaMode,
@Value("${" + SHARDED_CLUSTER_PROPERTY + ":}") String shardedCluster,
@Value("${" + SHARDED_SHARD_PROPERTY + ":}") String shardedShard,
@Value("${" + SHARDED_REPLICA_PROPERTY + ":}") String shardedReplica,
@Value("${postgres.db.schema}") String postgresSchema) {
String normalizedSchemaMode = schemaMode.toLowerCase(Locale.ROOT);
Map<String, String> placeholders = new LinkedHashMap<>(resolveFlywayShardedPlaceholders(
normalizedSchemaMode, shardedCluster, shardedShard, shardedReplica));
placeholders.putAll(ClickHouseFlywaySupport.resolvePostgresPlaceholders(
postgresDataSourceProperties.getUrl(),
postgresDataSourceProperties.getUsername(),
postgresDataSourceProperties.getPassword(),
postgresSchema));
final var flyway = ClickHouseFlywaySupport.createFlyway(
clickHouseDataSource,
List.of(resolveClickHouseMigrationLocation(normalizedSchemaMode)),
placeholders,
"clickhouse_flyway_schema_history");
flyway.migrate();
return flyway;
}

private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) {
var urlBuilder = new StringBuilder();
urlBuilder.append(properties.getUrl());
Expand All @@ -49,4 +101,44 @@ private String buildClickHouseJdbcUrl(ClickHouseDbProperties properties) {
}
return urlBuilder.toString();
}

private String resolveClickHouseMigrationLocation(String schemaMode) {
return switch (schemaMode) {
case NON_SHARDED_SCHEMA_MODE -> NON_SHARDED_MIGRATION_LOCATION;
case SHARDED_SCHEMA_MODE -> SHARDED_MIGRATION_LOCATION;
default -> throw new IllegalArgumentException(
String.format("Unsupported clickhouse.flyway.schema-mode: %s. Allowed values: %s, %s",
schemaMode, NON_SHARDED_SCHEMA_MODE, SHARDED_SCHEMA_MODE));
};
}

private Map<String, String> resolveFlywayShardedPlaceholders(
String schemaMode,
String shardedCluster,
String shardedShard,
String shardedReplica) {
if (NON_SHARDED_SCHEMA_MODE.equals(schemaMode)) {
return Map.of();
}

if (SHARDED_SCHEMA_MODE.equals(schemaMode)) {
validateRequiredShardedProperty(SHARDED_CLUSTER_PROPERTY, shardedCluster);
validateRequiredShardedProperty(SHARDED_SHARD_PROPERTY, shardedShard);
validateRequiredShardedProperty(SHARDED_REPLICA_PROPERTY, shardedReplica);
return Map.of(
CLUSTER_PLACEHOLDER, shardedCluster,
SHARD_PLACEHOLDER, shardedShard,
REPLICA_PLACEHOLDER, shardedReplica);
}

return Map.of();
}

private void validateRequiredShardedProperty(String propertyName, String value) {
if (value == null || value.isBlank()) {
throw new IllegalArgumentException(
String.format("Property '%s' must be configured when clickhouse.flyway.schema-mode=sharded",
propertyName));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package dev.vality.analytics.config;

import lombok.experimental.UtilityClass;
import org.flywaydb.core.Flyway;

import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@UtilityClass
final class ClickHouseFlywaySupport {

private static final String POSTGRES_HOST = "postgresHost";
private static final String POSTGRES_PORT = "postgresPort";
private static final String POSTGRES_USER = "postgresUser";
private static final String POSTGRES_PASSWORD = "postgresPassword";
private static final String POSTGRES_DATABASE = "postgresDatabase";
private static final String POSTGRES_SCHEMA = "postgresSchema";
private static final Pattern POSTGRES_JDBC_URL_PATTERN = Pattern.compile(
"^jdbc:postgresql://(?<host>[^:/?]+)(?::(?<port>\\d+))?/(?<database>[^?]+).*$");

static Flyway createFlyway(
DataSource dataSource,
List<String> locations,
Map<String, String> placeholders,
String schemaHistoryTable) {
return Flyway.configure()
.dataSource(dataSource)
.locations(locations.toArray(String[]::new))
.placeholderPrefix("<<")
.placeholderSuffix(">>")
.placeholders(placeholders)
.table(schemaHistoryTable)
.baselineOnMigrate(true)
.load();
}

static Map<String, String> resolvePostgresPlaceholders(
String jdbcUrl,
String username,
String password,
String schema) {
Matcher matcher = POSTGRES_JDBC_URL_PATTERN.matcher(jdbcUrl);
if (!matcher.matches()) {
throw new IllegalArgumentException("Unsupported postgres.db.url format: " + jdbcUrl);
}

String port = matcher.group("port");
return resolvePostgresPlaceholders(
matcher.group("host"),
port == null ? 5432 : Integer.parseInt(port),
matcher.group("database"),
username,
password,
schema);
}

static Map<String, String> resolvePostgresPlaceholders(
String host,
int port,
String database,
String username,
String password,
String schema) {
Map<String, String> placeholders = new LinkedHashMap<>();
placeholders.put(POSTGRES_HOST, host);
placeholders.put(POSTGRES_PORT, Integer.toString(port));
placeholders.put(POSTGRES_USER, username);
placeholders.put(POSTGRES_PASSWORD, password);
placeholders.put(POSTGRES_DATABASE, database);
placeholders.put(POSTGRES_SCHEMA, schema);
return placeholders;
}
}
8 changes: 5 additions & 3 deletions src/main/java/dev/vality/analytics/config/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import dev.vality.analytics.serde.HistoricalCommitDeserializer;
import dev.vality.analytics.serde.MachineEventDeserializer;
import dev.vality.analytics.serde.CurrencyEventKafkaDeserializer;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.analytics.service.ConsumerGroupIdService;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
Expand Down Expand Up @@ -87,10 +89,10 @@ public ConcurrentKafkaListenerContainerFactory<String, HistoricalCommit> dominan
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> rateContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
public ConcurrentKafkaListenerContainerFactory<String, CurrencyEvent> rateContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, CurrencyEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initDefaultListenerProperties(factory, rateGroupId, new MachineEventDeserializer(),
initDefaultListenerProperties(factory, rateGroupId, new CurrencyEventKafkaDeserializer(),
maxPollRecordsRatesListener);
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package dev.vality.analytics.config;

import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.geck.serializer.Geck;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import dev.vality.sink.common.serialization.BinaryDeserializer;
import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer;
import dev.vality.xrates.rate.Change;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SerializeConfig {

@Bean
public BinaryDeserializer<Change> rateEventDataBinaryDeserializer() {
public BinaryDeserializer<CurrencyEvent> currencyEventBinaryDeserializer() {
return new AbstractThriftBinaryDeserializer<>() {
@Override
public Change deserialize(byte[] bytes) {
return Geck.msgPackToTBase(bytes, Change.class);
public CurrencyEvent deserialize(byte[] bytes) {
return Geck.msgPackToTBase(bytes, CurrencyEvent.class);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,9 @@ public void saveRateBatch(List<Rate> rates) {
.map(rate -> getDslContext().newRecord(RATE, rate))
.map(rateRecord -> getDslContext()
.insertInto(RATE).set(rateRecord)
.onConflict(RATE.SOURCE_ID, RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE,
RATE.LOWER_BOUND_INCLUSIVE, RATE.UPPER_BOUND_EXCLUSIVE)
.onConflict(RATE.SOURCE_SYMBOLIC_CODE, RATE.DESTINATION_SYMBOLIC_CODE, RATE.EVENT_TIME)
.doNothing())
.collect(Collectors.toList());
batchExecute(queries);
}

public Rate getRate(String sourceId, String sourceCode, String destinationCode) {
Query query = getDslContext().selectFrom(RATE)
.where(RATE.SOURCE_ID.eq(sourceId))
.and(RATE.SOURCE_SYMBOLIC_CODE.eq(sourceCode)
.and(RATE.DESTINATION_SYMBOLIC_CODE.eq(destinationCode)));
return fetchOne(query, rateRowMapper);
}

}
10 changes: 5 additions & 5 deletions src/main/java/dev/vality/analytics/listener/RateListener.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.vality.analytics.listener;

import dev.vality.analytics.listener.handler.rate.RateMachineEventHandler;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.analytics.listener.handler.rate.CurrencyEventHandler;
import dev.vality.exrates.events.CurrencyEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -17,18 +17,18 @@
@RequiredArgsConstructor
public class RateListener {

private final RateMachineEventHandler rateMachineEventHandler;
private final CurrencyEventHandler currencyEventHandler;

@KafkaListener(autoStartup = "${kafka.listener.rate.enabled}",
topics = "${kafka.topic.rate.initial}",
containerFactory = "rateContainerFactory")
public void handle(List<MachineEvent> batch,
public void handle(List<CurrencyEvent> batch,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment ack) throws InterruptedException {
log.info("Got RateListener listen offsets: {}, partition: {}, batch.size: {}",
offsets, partition, batch.size());
rateMachineEventHandler.handle(batch, ack);
currencyEventHandler.handle(batch, ack);
log.info("Batch RateListener has been committed, size={}", batch.size());
}

Expand Down
Loading
Loading