diff --git a/CHANGELOG.md b/CHANGELOG.md
index 99740055b..b2066b02f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,17 @@
# Changelog
All notable changes to this project will be documented in this file.
+## [9.6.0]
+### Added
+ - Implement for Document ingestion;
+ - Implement Document tags ingestion;
+ - add Content news tags upsert;
+ - add currencies upsert;
+
+### Changed
+ - use latest investment service api 1.3.0;
+ - move customization of specs into an investment package;
+
## [9.5.0]
### Added
- investment service intraday generation and ingestion function
diff --git a/stream-dbs-clients/pom.xml b/stream-dbs-clients/pom.xml
index a604aa11f..31bae8860 100644
--- a/stream-dbs-clients/pom.xml
+++ b/stream-dbs-clients/pom.xml
@@ -106,14 +106,6 @@
generate-sources
-
- com.backbase.investment
- investment-service-api
- api
- zip
- ${project.build.directory}/yaml
- true
-
com.backbase.dbs.accesscontrol
access-control
@@ -248,25 +240,6 @@
boat-maven-plugin
0.17.66
-
- generate-investment-service-api-code
-
- generate-webclient-embedded
-
- generate-sources
-
- ${project.build.directory}/yaml/investment-service-api/investment-service-api-v1*.yaml
- com.backbase.investment.api.service.v1
- com.backbase.investment.api.service.v1.model
-
- Etc/GMT-12=ETC_GMT_1222
-
-
- false
- false
-
-
-
generate-accesscontrol-datagroup-service-api-v1-code
diff --git a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java
index a06f4af96..e2e23b076 100644
--- a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java
+++ b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java
@@ -7,7 +7,6 @@
import com.backbase.stream.clients.config.CustomerProfileClientConfig;
import com.backbase.stream.clients.config.IdentityIntegrationClientConfig;
import com.backbase.stream.clients.config.InstrumentApiConfiguration;
-import com.backbase.stream.clients.config.InvestmentClientConfig;
import com.backbase.stream.clients.config.LimitsClientConfig;
import com.backbase.stream.clients.config.PaymentOrderClientConfig;
import com.backbase.stream.clients.config.PortfolioApiConfiguration;
@@ -45,8 +44,7 @@
InstrumentApiConfiguration.class,
PortfolioApiConfiguration.class,
PlanManagerClientConfig.class,
- CustomerProfileClientConfig.class,
- InvestmentClientConfig.class
+ CustomerProfileClientConfig.class
})
@EnableConfigurationProperties
public class DbsApiClientsAutoConfiguration {
diff --git a/stream-investment/investment-core/pom.xml b/stream-investment/investment-core/pom.xml
index 24a832ea2..fb3bdeefd 100644
--- a/stream-investment/investment-core/pom.xml
+++ b/stream-investment/investment-core/pom.xml
@@ -15,17 +15,11 @@
true
+ 1.3.0
-
- com.backbase
- backbase-bom
- ${backbase-bom.version}
- pom
- import
-
com.backbase.buildingblocks
backbase-building-blocks-release
@@ -105,12 +99,13 @@
unpack
- generate-resources
+ generate-sources
com.backbase.investment
investment-service-api
+ ${investment-service-api.version}
api
zip
${project.build.directory}/yaml
@@ -129,12 +124,31 @@
generate-investment-service-api-code
+
+ generate-webclient-embedded
+
+ generate-resources
+
+ ${project.build.directory}/yaml/investment-service-api/investment-service-api-v${investment-service-api.version}.yaml
+ com.backbase.investment.api.service.v1
+ com.backbase.investment.api.service.v1.model
+
+ Etc/GMT-12=ETC_GMT_1222
+
+
+ false
+ false
+
+
+
+
+ generate-investment-service-rest-sync-api-code
generate-rest-template-embedded
generate-resources
- ${project.build.directory}/yaml/investment-service-api/investment-service-api-v1*.yaml
+ ${project.build.directory}/yaml/investment-service-api/investment-service-api-v${investment-service-api.version}.yaml
com.backbase.investment.api.service.sync.v1
com.backbase.investment.api.service.sync.v1.model
diff --git a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java
similarity index 90%
rename from stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java
rename to stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java
index ae2bd2f22..d5c6289c8 100644
--- a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java
@@ -1,4 +1,4 @@
-package com.backbase.stream.clients.config;
+package com.backbase.stream.configuration;
import com.backbase.investment.api.service.ApiClient;
import com.backbase.investment.api.service.v1.AllocationsApi;
@@ -6,15 +6,18 @@
import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi;
import com.backbase.investment.api.service.v1.ClientApi;
import com.backbase.investment.api.service.v1.ContentApi;
+import com.backbase.investment.api.service.v1.CurrencyApi;
import com.backbase.investment.api.service.v1.FinancialAdviceApi;
import com.backbase.investment.api.service.v1.InvestmentApi;
import com.backbase.investment.api.service.v1.InvestmentProductsApi;
import com.backbase.investment.api.service.v1.PaymentsApi;
import com.backbase.investment.api.service.v1.PortfolioApi;
+import com.backbase.stream.clients.config.CompositeApiClientConfig;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.text.DateFormat;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
@@ -29,6 +32,7 @@
* Configuration for Investment service REST client (ClientApi).
*/
@Configuration
+@ConditionalOnBean(InvestmentServiceConfiguration.class)
@ConfigurationProperties("backbase.communication.services.investment")
public class InvestmentClientConfig extends CompositeApiClientConfig {
@@ -113,6 +117,12 @@ public PaymentsApi paymentsApi(ApiClient investmentApiClient) {
return new PaymentsApi(investmentApiClient);
}
+ @Bean
+ @ConditionalOnMissingBean
+ public CurrencyApi currencyApi(ApiClient investmentApiClient) {
+ return new CurrencyApi(investmentApiClient);
+ }
+
@Bean
@ConditionalOnMissingBean
public AsyncBulkGroupsApi asyncBulkGroupsApi(ApiClient investmentApiClient) {
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java
index bcdf8e8a8..53bb897db 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java
@@ -2,12 +2,14 @@
package com.backbase.stream.configuration;
import lombok.Data;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Configuration properties governing investment client ingestion behavior.
*/
@Data
+@ConditionalOnBean(InvestmentServiceConfiguration.class)
@ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment")
public class InvestmentIngestionConfigurationProperties {
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java
index 45d6bb906..e4de101a5 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java
@@ -3,6 +3,7 @@
import com.backbase.investment.api.service.sync.v1.AssetUniverseApi;
import com.backbase.investment.api.service.sync.v1.ContentApi;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -82,6 +83,11 @@ public InvestmentRestNewsContentService investmentNewsContentService(ContentApi
return new InvestmentRestNewsContentService(restContentApi, restInvestmentApiClient);
}
+ @Bean
+ public InvestmentRestDocumentContentService investmentRestContentDocumentService(ContentApi restContentApi,
+ com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) {
+ return new InvestmentRestDocumentContentService(restContentApi, restInvestmentApiClient);
+ }
@Bean
public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi,
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java
index b36469f97..a6fbe808c 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java
@@ -5,6 +5,7 @@
import com.backbase.investment.api.service.v1.AssetUniverseApi;
import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi;
import com.backbase.investment.api.service.v1.ClientApi;
+import com.backbase.investment.api.service.v1.CurrencyApi;
import com.backbase.investment.api.service.v1.FinancialAdviceApi;
import com.backbase.investment.api.service.v1.InvestmentApi;
import com.backbase.investment.api.service.v1.InvestmentProductsApi;
@@ -19,11 +20,13 @@
import com.backbase.stream.investment.service.InvestmentAssetPriceService;
import com.backbase.stream.investment.service.InvestmentAssetUniverseService;
import com.backbase.stream.investment.service.InvestmentClientService;
+import com.backbase.stream.investment.service.InvestmentCurrencyService;
import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService;
import com.backbase.stream.investment.service.InvestmentModelPortfolioService;
import com.backbase.stream.investment.service.InvestmentPortfolioAllocationService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -34,6 +37,7 @@
@Import({
DbsApiClientsAutoConfiguration.class,
+ InvestmentClientConfig.class
})
@EnableConfigurationProperties({
InvestmentIngestionConfigurationProperties.class
@@ -96,6 +100,11 @@ public InvestmentPortfolioAllocationService investmentPortfolioAllocationService
customIntegrationApiService);
}
+ @Bean
+ public InvestmentCurrencyService investmentCurrencyService(CurrencyApi currencyApi) {
+ return new InvestmentCurrencyService(currencyApi);
+ }
+
@Bean
public InvestmentSaga investmentSaga(InvestmentClientService investmentClientService,
InvestmentPortfolioService investmentPortfolioService,
@@ -112,17 +121,20 @@ public InvestmentAssetUniverseSaga investmentStaticDataSaga(
InvestmentAssetUniverseService investmentAssetUniverseService,
InvestmentAssetPriceService investmentAssetPriceService,
InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService,
+ InvestmentCurrencyService investmentCurrencyService,
AsyncTaskService asyncTaskService,
InvestmentIngestionConfigurationProperties coreConfigurationProperties) {
return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService,
- investmentIntradayAssetPriceService, asyncTaskService, coreConfigurationProperties);
+ investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService,
+ coreConfigurationProperties);
}
@Bean
public InvestmentContentSaga investmentContentSaga(
InvestmentRestNewsContentService investmentRestNewsContentService,
+ InvestmentRestDocumentContentService investmentRestDocumentContentService,
InvestmentIngestionConfigurationProperties coreConfigurationProperties) {
- return new InvestmentContentSaga(investmentRestNewsContentService, coreConfigurationProperties);
+ return new InvestmentContentSaga(investmentRestNewsContentService, investmentRestDocumentContentService, coreConfigurationProperties);
}
}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java
index 78a8c2630..0409debb3 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java
@@ -1,6 +1,7 @@
package com.backbase.stream.investment;
import com.backbase.investment.api.service.v1.model.AssetCategoryType;
+import com.backbase.investment.api.service.v1.model.Currency;
import com.backbase.investment.api.service.v1.model.GroupResult;
import com.backbase.investment.api.service.v1.model.Market;
import com.backbase.investment.api.service.v1.model.MarketSpecialDay;
@@ -20,6 +21,7 @@
@Builder
public class InvestmentAssetData {
+ private List currencies;
private List markets;
private List marketSpecialDays;
private List assetCategoryTypes;
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java
index 6739a3d31..8a30a2581 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java
@@ -1,5 +1,7 @@
package com.backbase.stream.investment;
+import com.backbase.stream.investment.model.ContentDocumentEntry;
+import com.backbase.stream.investment.model.ContentTag;
import com.backbase.stream.investment.model.MarketNewsEntry;
import java.util.List;
import lombok.Builder;
@@ -11,6 +13,9 @@
@Builder
public class InvestmentContentData {
+ private List marketNewsTags;
private List marketNews;
+ private List documentTags;
+ private List documents;
}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java
new file mode 100644
index 000000000..69224c18d
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java
@@ -0,0 +1,30 @@
+package com.backbase.stream.investment.model;
+
+import com.backbase.stream.investment.ModelAsset;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.springframework.core.io.Resource;
+
+@Setter
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class ContentDocumentEntry {
+
+ private String name;
+ private String description;
+ private List tags;
+ private List assets;
+ @JsonProperty("extra_data")
+ private Map extraData;
+ private String document;
+ private Resource documentResource;
+
+}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java
new file mode 100644
index 000000000..5c0703498
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java
@@ -0,0 +1,17 @@
+package com.backbase.stream.investment.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Setter
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+public class ContentTag {
+
+ private String code;
+ private String value;
+
+}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java
new file mode 100644
index 000000000..d7dc0898d
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java
@@ -0,0 +1,14 @@
+package com.backbase.stream.investment.model;
+
+/**
+ * Partitions entities into those to be created and those to be updated during an upsert operation. Exactly one of
+ * create or update must be non-null.
+ *
+ */
+public record UpsertPartition(ID id, T entity) {
+
+ public static UpsertPartition createPartition(T entity) {
+ return new UpsertPartition<>(null, entity);
+ }
+
+}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java
index d562cbb99..06537f940 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java
@@ -10,6 +10,7 @@
import com.backbase.stream.investment.service.InvestmentAssetPriceService;
import com.backbase.stream.investment.service.InvestmentAssetUniverseService;
import com.backbase.stream.investment.service.InvestmentClientService;
+import com.backbase.stream.investment.service.InvestmentCurrencyService;
import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
import com.backbase.stream.worker.StreamTaskExecutor;
@@ -61,6 +62,7 @@ public class InvestmentAssetUniverseSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) {
}
log.info("Starting investment asset universe saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
- return upsertMarkets(streamTask)
+ return upsertCurrencies(streamTask)
+ .flatMap(this::upsertMarkets)
.flatMap(this::upsertMarketSpecialDays)
.flatMap(this::upsertAssetCategoryTypes)
.flatMap(this::upsertAssetCategories)
@@ -94,18 +97,6 @@ public Mono executeTask(InvestmentAssetsTask streamTask) {
.onErrorResume(throwable -> Mono.just(streamTask));
}
- private Mono upsertPrices(InvestmentAssetsTask investmentTask) {
- return investmentAssetPriceService.ingestPrices(investmentTask.getData().getAssets(), investmentTask.getData()
- .getPriceByAsset())
- .map(investmentTask::setPriceTasks);
- }
-
- private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) {
- return asyncTaskService.checkPriceAsyncTasksFinished(investmentTask.getData().getPriceAsyncTasks())
- .then(investmentIntradayAssetPriceService.ingestIntradayPrices()
- .map(investmentTask::setIntradayPriceTasks));
- }
-
/**
* Rollback is not implemented for investment saga.
*
@@ -122,6 +113,54 @@ public Mono rollBack(InvestmentAssetsTask streamTask) {
return Mono.empty();
}
+ private Mono upsertCurrencies(InvestmentAssetsTask investmentTask) {
+ InvestmentAssetData investmentData = investmentTask.getData();
+ int currencyCount = investmentData.getCurrencies() != null ? investmentData.getCurrencies().size() : 0;
+ log.info("Starting investment currency creation: taskId={}, currencies={}",
+ investmentTask.getId(), currencyCount);
+ // Log the start of market creation and set task state to IN_PROGRESS
+ investmentTask.info(INVESTMENT, OP_CREATE, null, investmentTask.getName(), investmentTask.getId(),
+ PROCESSING_PREFIX + currencyCount + " investment currencies");
+ investmentTask.setState(State.IN_PROGRESS);
+
+ if (currencyCount == 0) {
+ log.warn("No currencies to create for taskId={}", investmentTask.getId());
+ investmentTask.setState(State.COMPLETED);
+ return Mono.just(investmentTask);
+ }
+ return investmentCurrencyService.upsertCurrencies(investmentData.getCurrencies())
+ .map(currencies -> {
+ // Update the task with the created markets
+ // Log completion and set task state to COMPLETED
+ investmentTask.info(INVESTMENT, OP_CREATE, OP_UPSERT, investmentTask.getName(),
+ investmentTask.getId(),
+ OP_UPSERT + " " + currencies.size() + " Investment Currencies");
+ investmentTask.setState(State.COMPLETED);
+ log.info("Successfully processed all currencies: taskId={}, marketCount={}",
+ investmentTask.getId(), currencies.size());
+ return investmentTask;
+ })
+ .doOnError(throwable -> {
+ log.error("Failed to create/upsert investment currencies: taskId={}, marketCount={}",
+ investmentTask.getId(), currencyCount, throwable);
+ investmentTask.error(INVESTMENT, OP_CREATE, RESULT_FAILED, investmentTask.getName(),
+ investmentTask.getId(),
+ "Failed to create investment currencies: " + throwable.getMessage());
+ });
+ }
+
+ private Mono upsertPrices(InvestmentAssetsTask investmentTask) {
+ return investmentAssetPriceService.ingestPrices(investmentTask.getData().getAssets(), investmentTask.getData()
+ .getPriceByAsset())
+ .map(investmentTask::setPriceTasks);
+ }
+
+ private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) {
+ return asyncTaskService.checkPriceAsyncTasksFinished(investmentTask.getData().getPriceAsyncTasks())
+ .then(investmentIntradayAssetPriceService.ingestIntradayPrices()
+ .map(investmentTask::setIntradayPriceTasks));
+ }
+
public Mono upsertMarkets(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int marketCount = investmentData.getMarkets() != null ? investmentData.getMarkets().size() : 0;
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java
index 4021c9a7d..9120b29d8 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java
@@ -2,12 +2,16 @@
import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties;
import com.backbase.stream.investment.InvestmentContentTask;
+import com.backbase.stream.investment.model.ContentDocumentEntry;
import com.backbase.stream.investment.service.InvestmentClientService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
import com.backbase.stream.worker.StreamTaskExecutor;
import com.backbase.stream.worker.model.StreamTask;
import com.backbase.stream.worker.model.StreamTask.State;
+import java.util.List;
+import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@@ -47,6 +51,7 @@ public class InvestmentContentSaga implements StreamTaskExecutor executeTask(InvestmentContentTask streamTask)
streamTask.getId(), streamTask.getName());
log.info("Starting investment saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
- return upsertNewsContent(streamTask)
+ return upsertNewsTags(streamTask)
+ .flatMap(this::upsertNewsContent)
+ .flatMap(this::upsertDocumentTags)
+ .flatMap(this::upsertContentDocuments)
.doOnSuccess(completedTask -> log.info(
- "Successfully completed investment saga: taskId={}, taskName={}, state={}",
+ "Successfully completed investment content saga: taskId={}, taskName={}, state={}",
completedTask.getId(), completedTask.getName(), completedTask.getState()))
.doOnError(throwable -> {
- log.error("Failed to execute investment saga: taskId={}, taskName={}",
+ log.error("Failed to execute investment content saga: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName(), throwable);
streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED,
streamTask.getName(), streamTask.getId(),
- "Investment saga failed: " + throwable.getMessage());
+ "Investment content saga failed: " + throwable.getMessage());
streamTask.setState(State.FAILED);
})
.onErrorResume(throwable -> Mono.just(streamTask));
}
private Mono upsertNewsContent(InvestmentContentTask investmentContentTask) {
- return investmentRestNewsContentService.upsertContent(investmentContentTask.getData().getMarketNews())
+ return investmentRestNewsContentService
+ .upsertContent(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNews(), List.of()))
.thenReturn(investmentContentTask);
}
+ private Mono upsertNewsTags(InvestmentContentTask investmentContentTask) {
+ return investmentRestNewsContentService
+ .upsertTags(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNewsTags(), List.of()))
+ .thenReturn(investmentContentTask);
+ }
+
+ private Mono upsertDocumentTags(InvestmentContentTask investmentContentTask) {
+ return investmentRestDocumentContentService
+ .upsertContentTags(Objects.requireNonNullElse(investmentContentTask.getData().getDocumentTags(), List.of()))
+ .thenReturn(investmentContentTask);
+ }
+
+ private Mono upsertContentDocuments(InvestmentContentTask investmentContentTask) {
+ List documents = investmentContentTask.getData().getDocuments();
+ return investmentRestDocumentContentService
+ .upsertDocuments(Objects.requireNonNullElse(documents, List.of()))
+ .thenReturn(investmentContentTask);
+ }
/**
* Rollback is not implemented for investment saga.
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java
new file mode 100644
index 000000000..6fb279207
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java
@@ -0,0 +1,129 @@
+package com.backbase.stream.investment.service;
+
+import com.backbase.investment.api.service.v1.CurrencyApi;
+import com.backbase.investment.api.service.v1.model.Currency;
+import com.backbase.investment.api.service.v1.model.CurrencyRequest;
+import java.util.List;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+@RequiredArgsConstructor
+public class InvestmentCurrencyService {
+
+ public static final int CONTENT_RETRIEVE_LIMIT = 100;
+ private final CurrencyApi currencyApi;
+
+ /**
+ * Upserts a batch of currency entries. For each currency, checks if it exists by code and updates it, otherwise
+ * creates a new entry. Continues processing remaining entries even if individual entries fail.
+ *
+ * @param currencies List of currencies to upsert
+ * @return Mono that completes when all currencies have been processed
+ */
+ public Mono> upsertCurrencies(List currencies) {
+ log.info("Starting currency upsert batch operation: totalEntries={}", currencies.size());
+ log.debug("Currency upsert batch details: entries={}", currencies);
+
+ return Flux.fromIterable(currencies)
+ .flatMap(this::upsertSingleCurrency)
+ .doOnComplete(() -> log.info("Currency upsert batch completed successfully: totalEntriesProcessed={}",
+ currencies.size()))
+ .doOnError(
+ error -> log.error("Currency upsert batch failed: totalEntries={}, errorType={}, errorMessage={}",
+ currencies.size(), error.getClass().getSimpleName(), error.getMessage(), error))
+ .collectList();
+ }
+
+ /**
+ * Upserts a single currency entry using the CurrencyApi endpoints. Implementation follows the upsert pattern:
+ *
+ * - List existing currencies to check if the currency code already exists
+ * - If currency exists, update it with PUT
+ * - If not found, create a new currency entry
+ *
+ *
+ * @param currency The currency to upsert
+ * @return Mono that completes with the currency when processed, or empty if validation fails
+ */
+ private Mono upsertSingleCurrency(Currency currency) {
+ log.debug("Processing currency: code='{}'", currency.getCode());
+
+ // Validation
+ if (currency.getCode() == null || currency.getCode().isBlank()) {
+ log.warn("Skipping currency with empty code");
+ return Mono.empty();
+ }
+
+ log.debug("Checking if currency exists: code='{}'", currency.getCode());
+
+ // Check if currency already exists
+ return currencyApi.listCurrencies(CONTENT_RETRIEVE_LIMIT, 0)
+ .map(paginatedList -> paginatedList.getResults().stream()
+ .filter(Objects::nonNull)
+ .filter(entry -> currency.getCode().equals(entry.getCode()))
+ .findFirst())
+ .flatMap(existingCurrency -> {
+ if (existingCurrency.isPresent()) {
+ log.info("Currency already exists: code='{}', updating", currency.getCode());
+ return updateCurrency(currency);
+ } else {
+ log.debug("Creating new currency: code='{}'", currency.getCode());
+ return createCurrency(currency);
+ }
+ })
+ .doOnSuccess(curr -> log.info("Currency upsert completed successfully: code='{}'", curr.getCode()))
+ .doOnError(error -> log.error(
+ "Currency upsert failed: code='{}', errorType={}, errorMessage={}",
+ currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error))
+ .onErrorResume(error -> {
+ log.warn("Continuing without currency: code='{}', reason={}",
+ currency.getCode(), error.getMessage());
+ return Mono.empty();
+ });
+ }
+
+ /**
+ * Creates a new currency entry using the CurrencyApi.
+ *
+ * @param currency The currency to create
+ * @return Mono of the created currency
+ */
+ private Mono createCurrency(Currency currency) {
+ CurrencyRequest currencyRequest = new CurrencyRequest();
+ currencyRequest.code(currency.getCode());
+ currencyRequest.name(currency.getName());
+ currencyRequest.symbol(currency.getSymbol());
+ return currencyApi.createCurrency(currencyRequest)
+ .doOnSuccess(created -> log.info(
+ "Currency created successfully: code='{}', name='{}'",
+ created.getCode(), created.getName()))
+ .doOnError(error -> log.error(
+ "Currency creation failed: code='{}', errorType={}, errorMessage={}",
+ currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error));
+ }
+
+ /**
+ * Updates an existing currency entry with new values.
+ *
+ * @param currency The currency with updated values
+ * @return Mono of the updated currency
+ */
+ private Mono updateCurrency(Currency currency) {
+ CurrencyRequest currencyRequest = new CurrencyRequest();
+ currencyRequest.code(currency.getCode());
+ currencyRequest.name(currency.getName());
+ currencyRequest.symbol(currency.getSymbol());
+ return currencyApi.updateCurrency(currency.getCode(), currencyRequest)
+ .doOnSuccess(updated -> log.info(
+ "Currency updated successfully: code='{}', name='{}'",
+ updated.getCode(), updated.getName()))
+ .doOnError(error -> log.error(
+ "Currency update failed: code='{}', errorType={}, errorMessage={}",
+ currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error));
+ }
+
+}
\ No newline at end of file
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java
index 9e1957e1a..717c82b0f 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java
@@ -1,14 +1,33 @@
package com.backbase.stream.investment.service.resttemplate;
import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest;
+import com.backbase.investment.api.service.sync.v1.model.OASDocumentRequestDataRequest;
+import com.backbase.stream.investment.AssetKey;
+import com.backbase.stream.investment.ModelAsset;
+import com.backbase.stream.investment.model.ContentDocumentEntry;
import com.backbase.stream.investment.model.MarketNewsEntry;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
+import org.mapstruct.Named;
@Mapper
public interface ContentMapper {
@Mapping(target = "thumbnail", ignore = true)
+ @Mapping(target = "status", constant = "PUBLISHED")
EntryCreateUpdateRequest map(MarketNewsEntry entry);
+ @Mapping(target = "assets", source = "assets", qualifiedByName = "mapRawAsserts")
+ OASDocumentRequestDataRequest map(ContentDocumentEntry request);
+
+ @Named("mapRawAsserts")
+ default List