diff --git a/CHANGELOG.md b/CHANGELOG.md index 99740055b..b2066b02f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,17 @@ # Changelog All notable changes to this project will be documented in this file. +## [9.6.0] +### Added + - Implement for Document ingestion; + - Implement Document tags ingestion; + - add Content news tags upsert; + - add currencies upsert; + +### Changed + - use latest investment service api 1.3.0; + - move customization of specs into an investment package; + ## [9.5.0] ### Added - investment service intraday generation and ingestion function diff --git a/stream-dbs-clients/pom.xml b/stream-dbs-clients/pom.xml index a604aa11f..31bae8860 100644 --- a/stream-dbs-clients/pom.xml +++ b/stream-dbs-clients/pom.xml @@ -106,14 +106,6 @@ generate-sources - - com.backbase.investment - investment-service-api - api - zip - ${project.build.directory}/yaml - true - com.backbase.dbs.accesscontrol access-control @@ -248,25 +240,6 @@ boat-maven-plugin 0.17.66 - - generate-investment-service-api-code - - generate-webclient-embedded - - generate-sources - - ${project.build.directory}/yaml/investment-service-api/investment-service-api-v1*.yaml - com.backbase.investment.api.service.v1 - com.backbase.investment.api.service.v1.model - - Etc/GMT-12=ETC_GMT_1222 - - - false - false - - - generate-accesscontrol-datagroup-service-api-v1-code diff --git a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java index a06f4af96..e2e23b076 100644 --- a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java +++ b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/autoconfigure/DbsApiClientsAutoConfiguration.java @@ -7,7 +7,6 @@ import com.backbase.stream.clients.config.CustomerProfileClientConfig; import com.backbase.stream.clients.config.IdentityIntegrationClientConfig; import com.backbase.stream.clients.config.InstrumentApiConfiguration; -import com.backbase.stream.clients.config.InvestmentClientConfig; import com.backbase.stream.clients.config.LimitsClientConfig; import com.backbase.stream.clients.config.PaymentOrderClientConfig; import com.backbase.stream.clients.config.PortfolioApiConfiguration; @@ -45,8 +44,7 @@ InstrumentApiConfiguration.class, PortfolioApiConfiguration.class, PlanManagerClientConfig.class, - CustomerProfileClientConfig.class, - InvestmentClientConfig.class + CustomerProfileClientConfig.class }) @EnableConfigurationProperties public class DbsApiClientsAutoConfiguration { diff --git a/stream-investment/investment-core/pom.xml b/stream-investment/investment-core/pom.xml index 24a832ea2..fb3bdeefd 100644 --- a/stream-investment/investment-core/pom.xml +++ b/stream-investment/investment-core/pom.xml @@ -15,17 +15,11 @@ true + 1.3.0 - - com.backbase - backbase-bom - ${backbase-bom.version} - pom - import - com.backbase.buildingblocks backbase-building-blocks-release @@ -105,12 +99,13 @@ unpack - generate-resources + generate-sources com.backbase.investment investment-service-api + ${investment-service-api.version} api zip ${project.build.directory}/yaml @@ -129,12 +124,31 @@ generate-investment-service-api-code + + generate-webclient-embedded + + generate-resources + + ${project.build.directory}/yaml/investment-service-api/investment-service-api-v${investment-service-api.version}.yaml + com.backbase.investment.api.service.v1 + com.backbase.investment.api.service.v1.model + + Etc/GMT-12=ETC_GMT_1222 + + + false + false + + + + + generate-investment-service-rest-sync-api-code generate-rest-template-embedded generate-resources - ${project.build.directory}/yaml/investment-service-api/investment-service-api-v1*.yaml + ${project.build.directory}/yaml/investment-service-api/investment-service-api-v${investment-service-api.version}.yaml com.backbase.investment.api.service.sync.v1 com.backbase.investment.api.service.sync.v1.model diff --git a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java similarity index 90% rename from stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java rename to stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java index ae2bd2f22..d5c6289c8 100644 --- a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java @@ -1,4 +1,4 @@ -package com.backbase.stream.clients.config; +package com.backbase.stream.configuration; import com.backbase.investment.api.service.ApiClient; import com.backbase.investment.api.service.v1.AllocationsApi; @@ -6,15 +6,18 @@ import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi; import com.backbase.investment.api.service.v1.ClientApi; import com.backbase.investment.api.service.v1.ContentApi; +import com.backbase.investment.api.service.v1.CurrencyApi; import com.backbase.investment.api.service.v1.FinancialAdviceApi; import com.backbase.investment.api.service.v1.InvestmentApi; import com.backbase.investment.api.service.v1.InvestmentProductsApi; import com.backbase.investment.api.service.v1.PaymentsApi; import com.backbase.investment.api.service.v1.PortfolioApi; +import com.backbase.stream.clients.config.CompositeApiClientConfig; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import java.text.DateFormat; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -29,6 +32,7 @@ * Configuration for Investment service REST client (ClientApi). */ @Configuration +@ConditionalOnBean(InvestmentServiceConfiguration.class) @ConfigurationProperties("backbase.communication.services.investment") public class InvestmentClientConfig extends CompositeApiClientConfig { @@ -113,6 +117,12 @@ public PaymentsApi paymentsApi(ApiClient investmentApiClient) { return new PaymentsApi(investmentApiClient); } + @Bean + @ConditionalOnMissingBean + public CurrencyApi currencyApi(ApiClient investmentApiClient) { + return new CurrencyApi(investmentApiClient); + } + @Bean @ConditionalOnMissingBean public AsyncBulkGroupsApi asyncBulkGroupsApi(ApiClient investmentApiClient) { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java index bcdf8e8a8..53bb897db 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java @@ -2,12 +2,14 @@ package com.backbase.stream.configuration; import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.context.properties.ConfigurationProperties; /** * Configuration properties governing investment client ingestion behavior. */ @Data +@ConditionalOnBean(InvestmentServiceConfiguration.class) @ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment") public class InvestmentIngestionConfigurationProperties { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java index 45d6bb906..e4de101a5 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java @@ -3,6 +3,7 @@ import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; import com.backbase.investment.api.service.sync.v1.ContentApi; import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService; import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; @@ -82,6 +83,11 @@ public InvestmentRestNewsContentService investmentNewsContentService(ContentApi return new InvestmentRestNewsContentService(restContentApi, restInvestmentApiClient); } + @Bean + public InvestmentRestDocumentContentService investmentRestContentDocumentService(ContentApi restContentApi, + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { + return new InvestmentRestDocumentContentService(restContentApi, restInvestmentApiClient); + } @Bean public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi, diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index b36469f97..a6fbe808c 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -5,6 +5,7 @@ import com.backbase.investment.api.service.v1.AssetUniverseApi; import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi; import com.backbase.investment.api.service.v1.ClientApi; +import com.backbase.investment.api.service.v1.CurrencyApi; import com.backbase.investment.api.service.v1.FinancialAdviceApi; import com.backbase.investment.api.service.v1.InvestmentApi; import com.backbase.investment.api.service.v1.InvestmentProductsApi; @@ -19,11 +20,13 @@ import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentCurrencyService; import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentModelPortfolioService; import com.backbase.stream.investment.service.InvestmentPortfolioAllocationService; import com.backbase.stream.investment.service.InvestmentPortfolioService; import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService; import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -34,6 +37,7 @@ @Import({ DbsApiClientsAutoConfiguration.class, + InvestmentClientConfig.class }) @EnableConfigurationProperties({ InvestmentIngestionConfigurationProperties.class @@ -96,6 +100,11 @@ public InvestmentPortfolioAllocationService investmentPortfolioAllocationService customIntegrationApiService); } + @Bean + public InvestmentCurrencyService investmentCurrencyService(CurrencyApi currencyApi) { + return new InvestmentCurrencyService(currencyApi); + } + @Bean public InvestmentSaga investmentSaga(InvestmentClientService investmentClientService, InvestmentPortfolioService investmentPortfolioService, @@ -112,17 +121,20 @@ public InvestmentAssetUniverseSaga investmentStaticDataSaga( InvestmentAssetUniverseService investmentAssetUniverseService, InvestmentAssetPriceService investmentAssetPriceService, InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService, + InvestmentCurrencyService investmentCurrencyService, AsyncTaskService asyncTaskService, InvestmentIngestionConfigurationProperties coreConfigurationProperties) { return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService, - investmentIntradayAssetPriceService, asyncTaskService, coreConfigurationProperties); + investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService, + coreConfigurationProperties); } @Bean public InvestmentContentSaga investmentContentSaga( InvestmentRestNewsContentService investmentRestNewsContentService, + InvestmentRestDocumentContentService investmentRestDocumentContentService, InvestmentIngestionConfigurationProperties coreConfigurationProperties) { - return new InvestmentContentSaga(investmentRestNewsContentService, coreConfigurationProperties); + return new InvestmentContentSaga(investmentRestNewsContentService, investmentRestDocumentContentService, coreConfigurationProperties); } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java index 78a8c2630..0409debb3 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java @@ -1,6 +1,7 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.AssetCategoryType; +import com.backbase.investment.api.service.v1.model.Currency; import com.backbase.investment.api.service.v1.model.GroupResult; import com.backbase.investment.api.service.v1.model.Market; import com.backbase.investment.api.service.v1.model.MarketSpecialDay; @@ -20,6 +21,7 @@ @Builder public class InvestmentAssetData { + private List currencies; private List markets; private List marketSpecialDays; private List assetCategoryTypes; diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java index 6739a3d31..8a30a2581 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java @@ -1,5 +1,7 @@ package com.backbase.stream.investment; +import com.backbase.stream.investment.model.ContentDocumentEntry; +import com.backbase.stream.investment.model.ContentTag; import com.backbase.stream.investment.model.MarketNewsEntry; import java.util.List; import lombok.Builder; @@ -11,6 +13,9 @@ @Builder public class InvestmentContentData { + private List marketNewsTags; private List marketNews; + private List documentTags; + private List documents; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java new file mode 100644 index 000000000..69224c18d --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentDocumentEntry.java @@ -0,0 +1,30 @@ +package com.backbase.stream.investment.model; + +import com.backbase.stream.investment.ModelAsset; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.springframework.core.io.Resource; + +@Setter +@Getter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class ContentDocumentEntry { + + private String name; + private String description; + private List tags; + private List assets; + @JsonProperty("extra_data") + private Map extraData; + private String document; + private Resource documentResource; + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java new file mode 100644 index 000000000..5c0703498 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ContentTag.java @@ -0,0 +1,17 @@ +package com.backbase.stream.investment.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Setter +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class ContentTag { + + private String code; + private String value; + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java new file mode 100644 index 000000000..d7dc0898d --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/UpsertPartition.java @@ -0,0 +1,14 @@ +package com.backbase.stream.investment.model; + +/** + * Partitions entities into those to be created and those to be updated during an upsert operation. Exactly one of + * create or update must be non-null. + * + */ +public record UpsertPartition(ID id, T entity) { + + public static UpsertPartition createPartition(T entity) { + return new UpsertPartition<>(null, entity); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java index d562cbb99..06537f940 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java @@ -10,6 +10,7 @@ import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentCurrencyService; import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentPortfolioService; import com.backbase.stream.worker.StreamTaskExecutor; @@ -61,6 +62,7 @@ public class InvestmentAssetUniverseSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) { } log.info("Starting investment asset universe saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); - return upsertMarkets(streamTask) + return upsertCurrencies(streamTask) + .flatMap(this::upsertMarkets) .flatMap(this::upsertMarketSpecialDays) .flatMap(this::upsertAssetCategoryTypes) .flatMap(this::upsertAssetCategories) @@ -94,18 +97,6 @@ public Mono executeTask(InvestmentAssetsTask streamTask) { .onErrorResume(throwable -> Mono.just(streamTask)); } - private Mono upsertPrices(InvestmentAssetsTask investmentTask) { - return investmentAssetPriceService.ingestPrices(investmentTask.getData().getAssets(), investmentTask.getData() - .getPriceByAsset()) - .map(investmentTask::setPriceTasks); - } - - private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) { - return asyncTaskService.checkPriceAsyncTasksFinished(investmentTask.getData().getPriceAsyncTasks()) - .then(investmentIntradayAssetPriceService.ingestIntradayPrices() - .map(investmentTask::setIntradayPriceTasks)); - } - /** * Rollback is not implemented for investment saga. * @@ -122,6 +113,54 @@ public Mono rollBack(InvestmentAssetsTask streamTask) { return Mono.empty(); } + private Mono upsertCurrencies(InvestmentAssetsTask investmentTask) { + InvestmentAssetData investmentData = investmentTask.getData(); + int currencyCount = investmentData.getCurrencies() != null ? investmentData.getCurrencies().size() : 0; + log.info("Starting investment currency creation: taskId={}, currencies={}", + investmentTask.getId(), currencyCount); + // Log the start of market creation and set task state to IN_PROGRESS + investmentTask.info(INVESTMENT, OP_CREATE, null, investmentTask.getName(), investmentTask.getId(), + PROCESSING_PREFIX + currencyCount + " investment currencies"); + investmentTask.setState(State.IN_PROGRESS); + + if (currencyCount == 0) { + log.warn("No currencies to create for taskId={}", investmentTask.getId()); + investmentTask.setState(State.COMPLETED); + return Mono.just(investmentTask); + } + return investmentCurrencyService.upsertCurrencies(investmentData.getCurrencies()) + .map(currencies -> { + // Update the task with the created markets + // Log completion and set task state to COMPLETED + investmentTask.info(INVESTMENT, OP_CREATE, OP_UPSERT, investmentTask.getName(), + investmentTask.getId(), + OP_UPSERT + " " + currencies.size() + " Investment Currencies"); + investmentTask.setState(State.COMPLETED); + log.info("Successfully processed all currencies: taskId={}, marketCount={}", + investmentTask.getId(), currencies.size()); + return investmentTask; + }) + .doOnError(throwable -> { + log.error("Failed to create/upsert investment currencies: taskId={}, marketCount={}", + investmentTask.getId(), currencyCount, throwable); + investmentTask.error(INVESTMENT, OP_CREATE, RESULT_FAILED, investmentTask.getName(), + investmentTask.getId(), + "Failed to create investment currencies: " + throwable.getMessage()); + }); + } + + private Mono upsertPrices(InvestmentAssetsTask investmentTask) { + return investmentAssetPriceService.ingestPrices(investmentTask.getData().getAssets(), investmentTask.getData() + .getPriceByAsset()) + .map(investmentTask::setPriceTasks); + } + + private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) { + return asyncTaskService.checkPriceAsyncTasksFinished(investmentTask.getData().getPriceAsyncTasks()) + .then(investmentIntradayAssetPriceService.ingestIntradayPrices() + .map(investmentTask::setIntradayPriceTasks)); + } + public Mono upsertMarkets(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int marketCount = investmentData.getMarkets() != null ? investmentData.getMarkets().size() : 0; diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java index 4021c9a7d..9120b29d8 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java @@ -2,12 +2,16 @@ import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentContentTask; +import com.backbase.stream.investment.model.ContentDocumentEntry; import com.backbase.stream.investment.service.InvestmentClientService; import com.backbase.stream.investment.service.InvestmentPortfolioService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService; import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; import com.backbase.stream.worker.StreamTaskExecutor; import com.backbase.stream.worker.model.StreamTask; import com.backbase.stream.worker.model.StreamTask.State; +import java.util.List; +import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @@ -47,6 +51,7 @@ public class InvestmentContentSaga implements StreamTaskExecutor executeTask(InvestmentContentTask streamTask) streamTask.getId(), streamTask.getName()); log.info("Starting investment saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); - return upsertNewsContent(streamTask) + return upsertNewsTags(streamTask) + .flatMap(this::upsertNewsContent) + .flatMap(this::upsertDocumentTags) + .flatMap(this::upsertContentDocuments) .doOnSuccess(completedTask -> log.info( - "Successfully completed investment saga: taskId={}, taskName={}, state={}", + "Successfully completed investment content saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) .doOnError(throwable -> { - log.error("Failed to execute investment saga: taskId={}, taskName={}", + log.error("Failed to execute investment content saga: taskId={}, taskName={}", streamTask.getId(), streamTask.getName(), throwable); streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, streamTask.getName(), streamTask.getId(), - "Investment saga failed: " + throwable.getMessage()); + "Investment content saga failed: " + throwable.getMessage()); streamTask.setState(State.FAILED); }) .onErrorResume(throwable -> Mono.just(streamTask)); } private Mono upsertNewsContent(InvestmentContentTask investmentContentTask) { - return investmentRestNewsContentService.upsertContent(investmentContentTask.getData().getMarketNews()) + return investmentRestNewsContentService + .upsertContent(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNews(), List.of())) .thenReturn(investmentContentTask); } + private Mono upsertNewsTags(InvestmentContentTask investmentContentTask) { + return investmentRestNewsContentService + .upsertTags(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNewsTags(), List.of())) + .thenReturn(investmentContentTask); + } + + private Mono upsertDocumentTags(InvestmentContentTask investmentContentTask) { + return investmentRestDocumentContentService + .upsertContentTags(Objects.requireNonNullElse(investmentContentTask.getData().getDocumentTags(), List.of())) + .thenReturn(investmentContentTask); + } + + private Mono upsertContentDocuments(InvestmentContentTask investmentContentTask) { + List documents = investmentContentTask.getData().getDocuments(); + return investmentRestDocumentContentService + .upsertDocuments(Objects.requireNonNullElse(documents, List.of())) + .thenReturn(investmentContentTask); + } /** * Rollback is not implemented for investment saga. diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java new file mode 100644 index 000000000..6fb279207 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentCurrencyService.java @@ -0,0 +1,129 @@ +package com.backbase.stream.investment.service; + +import com.backbase.investment.api.service.v1.CurrencyApi; +import com.backbase.investment.api.service.v1.model.Currency; +import com.backbase.investment.api.service.v1.model.CurrencyRequest; +import java.util.List; +import java.util.Objects; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@RequiredArgsConstructor +public class InvestmentCurrencyService { + + public static final int CONTENT_RETRIEVE_LIMIT = 100; + private final CurrencyApi currencyApi; + + /** + * Upserts a batch of currency entries. For each currency, checks if it exists by code and updates it, otherwise + * creates a new entry. Continues processing remaining entries even if individual entries fail. + * + * @param currencies List of currencies to upsert + * @return Mono that completes when all currencies have been processed + */ + public Mono> upsertCurrencies(List currencies) { + log.info("Starting currency upsert batch operation: totalEntries={}", currencies.size()); + log.debug("Currency upsert batch details: entries={}", currencies); + + return Flux.fromIterable(currencies) + .flatMap(this::upsertSingleCurrency) + .doOnComplete(() -> log.info("Currency upsert batch completed successfully: totalEntriesProcessed={}", + currencies.size())) + .doOnError( + error -> log.error("Currency upsert batch failed: totalEntries={}, errorType={}, errorMessage={}", + currencies.size(), error.getClass().getSimpleName(), error.getMessage(), error)) + .collectList(); + } + + /** + * Upserts a single currency entry using the CurrencyApi endpoints. Implementation follows the upsert pattern: + *
    + *
  1. List existing currencies to check if the currency code already exists
  2. + *
  3. If currency exists, update it with PUT
  4. + *
  5. If not found, create a new currency entry
  6. + *
+ * + * @param currency The currency to upsert + * @return Mono that completes with the currency when processed, or empty if validation fails + */ + private Mono upsertSingleCurrency(Currency currency) { + log.debug("Processing currency: code='{}'", currency.getCode()); + + // Validation + if (currency.getCode() == null || currency.getCode().isBlank()) { + log.warn("Skipping currency with empty code"); + return Mono.empty(); + } + + log.debug("Checking if currency exists: code='{}'", currency.getCode()); + + // Check if currency already exists + return currencyApi.listCurrencies(CONTENT_RETRIEVE_LIMIT, 0) + .map(paginatedList -> paginatedList.getResults().stream() + .filter(Objects::nonNull) + .filter(entry -> currency.getCode().equals(entry.getCode())) + .findFirst()) + .flatMap(existingCurrency -> { + if (existingCurrency.isPresent()) { + log.info("Currency already exists: code='{}', updating", currency.getCode()); + return updateCurrency(currency); + } else { + log.debug("Creating new currency: code='{}'", currency.getCode()); + return createCurrency(currency); + } + }) + .doOnSuccess(curr -> log.info("Currency upsert completed successfully: code='{}'", curr.getCode())) + .doOnError(error -> log.error( + "Currency upsert failed: code='{}', errorType={}, errorMessage={}", + currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> { + log.warn("Continuing without currency: code='{}', reason={}", + currency.getCode(), error.getMessage()); + return Mono.empty(); + }); + } + + /** + * Creates a new currency entry using the CurrencyApi. + * + * @param currency The currency to create + * @return Mono of the created currency + */ + private Mono createCurrency(Currency currency) { + CurrencyRequest currencyRequest = new CurrencyRequest(); + currencyRequest.code(currency.getCode()); + currencyRequest.name(currency.getName()); + currencyRequest.symbol(currency.getSymbol()); + return currencyApi.createCurrency(currencyRequest) + .doOnSuccess(created -> log.info( + "Currency created successfully: code='{}', name='{}'", + created.getCode(), created.getName())) + .doOnError(error -> log.error( + "Currency creation failed: code='{}', errorType={}, errorMessage={}", + currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error)); + } + + /** + * Updates an existing currency entry with new values. + * + * @param currency The currency with updated values + * @return Mono of the updated currency + */ + private Mono updateCurrency(Currency currency) { + CurrencyRequest currencyRequest = new CurrencyRequest(); + currencyRequest.code(currency.getCode()); + currencyRequest.name(currency.getName()); + currencyRequest.symbol(currency.getSymbol()); + return currencyApi.updateCurrency(currency.getCode(), currencyRequest) + .doOnSuccess(updated -> log.info( + "Currency updated successfully: code='{}', name='{}'", + updated.getCode(), updated.getName())) + .doOnError(error -> log.error( + "Currency update failed: code='{}', errorType={}, errorMessage={}", + currency.getCode(), error.getClass().getSimpleName(), error.getMessage(), error)); + } + +} \ No newline at end of file diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java index 9e1957e1a..717c82b0f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/ContentMapper.java @@ -1,14 +1,33 @@ package com.backbase.stream.investment.service.resttemplate; import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest; +import com.backbase.investment.api.service.sync.v1.model.OASDocumentRequestDataRequest; +import com.backbase.stream.investment.AssetKey; +import com.backbase.stream.investment.ModelAsset; +import com.backbase.stream.investment.model.ContentDocumentEntry; import com.backbase.stream.investment.model.MarketNewsEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import org.mapstruct.Named; @Mapper public interface ContentMapper { @Mapping(target = "thumbnail", ignore = true) + @Mapping(target = "status", constant = "PUBLISHED") EntryCreateUpdateRequest map(MarketNewsEntry entry); + @Mapping(target = "assets", source = "assets", qualifiedByName = "mapRawAsserts") + OASDocumentRequestDataRequest map(ContentDocumentEntry request); + + @Named("mapRawAsserts") + default List> mapRawAsserts(List assets) { + return Objects.requireNonNullElse(assets, new ArrayList()) + .stream().map(AssetKey::getAssetMap).toList(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestDocumentContentService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestDocumentContentService.java new file mode 100644 index 000000000..7dd60c62c --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestDocumentContentService.java @@ -0,0 +1,298 @@ +package com.backbase.stream.investment.service.resttemplate; + + +import com.backbase.investment.api.service.sync.ApiClient; +import com.backbase.investment.api.service.sync.v1.ContentApi; +import com.backbase.investment.api.service.sync.v1.model.DocumentTagRequest; +import com.backbase.investment.api.service.sync.v1.model.OASDocumentRequestDataRequest; +import com.backbase.investment.api.service.sync.v1.model.OASDocumentResponse; +import com.backbase.investment.api.service.sync.v1.model.PatchedDocumentTagRequest; +import com.backbase.stream.investment.model.ContentDocumentEntry; +import com.backbase.stream.investment.model.ContentTag; +import com.backbase.stream.investment.model.UpsertPartition; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.mapstruct.factory.Mappers; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.io.Resource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestClientException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@RequiredArgsConstructor +public class InvestmentRestDocumentContentService { + + private static final Integer CONTENT_RETRIEVE_LIMIT = 100; + private final ContentApi contentApi; + private final ApiClient apiClient; + private final ContentMapper contentMapper = Mappers.getMapper(ContentMapper.class); + + public Mono upsertContentTags(List documentTags) { + log.info("Starting document tag upsert batch operation: totalEntries={}", documentTags.size()); + log.debug("Tag document upsert batch details: entries={}", documentTags); + + return Flux.fromIterable(documentTags) + .flatMap(this::upsertSingleTag) + .doOnComplete(() -> log.info("Document Tag upsert batch completed successfully: totalEntriesProcessed={}", + documentTags.size())) + .doOnError( + error -> log.error("Document Tag upsert batch failed: totalEntries={}, errorType={}, errorMessage={}", + documentTags.size(), error.getClass().getSimpleName(), error.getMessage(), error)) + .then(); + } + + private Mono upsertSingleTag(ContentTag documentTag) { + log.debug("Processing Document tag: code='{}', value='{}'", documentTag.getCode(), documentTag.getValue()); + + // Validation + if (documentTag.getCode() == null || documentTag.getCode().isBlank()) { + log.warn("Skipping Document tag with empty code: value='{}'", documentTag.getValue()); + return Mono.empty(); + } + + if (documentTag.getValue() == null || documentTag.getValue().isBlank()) { + log.warn("Skipping Document tag with empty value: code='{}'", documentTag.getCode()); + return Mono.empty(); + } + + log.debug("Checking if Document tag entry exists: code='{}', value='{}'", + documentTag.getCode(), documentTag.getValue()); + + // Check if tag entry already exists + return Mono.fromCallable(() -> + contentApi.contentDocumentTagList(CONTENT_RETRIEVE_LIMIT, 0)) + .map(paginatedList -> paginatedList.getResults().stream() + .filter(Objects::nonNull) + .filter(entry -> documentTag.getCode().equals(entry.getCode())) + .findFirst()) + .flatMap(existingEntry -> { + if (existingEntry.isPresent()) { + log.info("Document Tag entry already exists: code='{}', value='{}'", + documentTag.getCode(), documentTag.getValue()); + return patchTagEntry(documentTag); + } else { + // Create new tag entry + log.debug("Creating new Document tag entry: code='{}', value='{}'", + documentTag.getCode(), documentTag.getValue()); + return createTagEntry(documentTag); + } + }) + .doOnSuccess(tag -> log.info("Document Tag upsert completed successfully: code='{}', value='{}'", + tag.getCode(), tag.getValue())) + .doOnError(error -> log.error( + "Document Tag upsert failed: code='{}', value='{}', errorType={}, errorMessage={}", + documentTag.getCode(), documentTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> { + log.warn("Continuing without Document tag: code='{}', reason={}", + documentTag.getCode(), error.getMessage()); + return Mono.empty(); + }); + } + + private Mono patchTagEntry(ContentTag contentTag) { + PatchedDocumentTagRequest request = new PatchedDocumentTagRequest() + .code(contentTag.getCode()) + .value(contentTag.getValue()); + + return Mono.defer(() -> Mono.just(contentApi.contentDocumentTagPartialUpdate(contentTag.getCode(), request))) + .doOnSuccess(patched -> log.info( + "Document Tag entry patched successfully: code='{}', value='{}'", + patched.getCode(), patched.getValue())) + .doOnError(error -> log.error( + "Document Tag entry patch failed: code='{}', value='{}', errorType={}, errorMessage={}", + contentTag.getCode(), contentTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .thenReturn(contentTag); + } + + private Mono createTagEntry(ContentTag contentTag) { + DocumentTagRequest request = new DocumentTagRequest() + .code(contentTag.getCode()) + .value(contentTag.getValue()); + + return Mono.defer(() -> Mono.just(contentApi.contentDocumentTagCreate(request))) + .doOnSuccess(created -> log.info( + "Document Tag entry created successfully: code='{}', value='{}'", + created.getCode(), created.getValue())) + .doOnError(error -> log.error( + "Document Tag entry creation failed: code='{}', value='{}', errorType={}, errorMessage={}", + contentTag.getCode(), contentTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .thenReturn(contentTag); + } + + public Mono upsertDocuments(List documents) { + log.info("Starting document upsert batch operation: totalEntries={}", documents.size()); + log.debug("Document upsert batch details: entries={}", documents); + + return findUpsertDocuments(documents) + .flatMap(this::upsertDocument) + .doOnComplete( + () -> log.info("Document upsert batch completed successfully: totalEntriesProcessed={}", + documents.size())) + .doOnError( + error -> log.error("Document upsert batch failed: totalEntries={}, errorType={}, errorMessage={}", + documents.size(), error.getClass().getSimpleName(), error.getMessage(), error)) + .then(); + } + + private Mono upsertDocument(UpsertPartition request) { + if (request.id() == null) { + return insertDocument(request.entity()); + } + return patchDocument(request.id(), request.entity()); + } + + private Mono patchDocument(UUID uuid, ContentDocumentEntry request) { + log.debug("Patching document entry: title='{}', hasDocument={}", request.getName(), + request.getDocumentResource() != null); + OASDocumentRequestDataRequest createDocumentRequest = contentMapper.map(request); + log.debug("Document entry request mapped: {}", createDocumentRequest); + return Mono.defer(() -> Mono.just(patchDocument(uuid, createDocumentRequest, request.getDocumentResource())) + .doOnSuccess( + created -> log.info("Document entry created successfully: title='{}', uuid={}, documentAttached={}", + request.getName(), created.getUuid(), request.getDocumentResource() != null)) + .doOnError( + error -> log.error("Document entry creation failed: title='{}', errorType={}, errorMessage={}", + request.getName(), error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> Mono.empty()) + .thenReturn(request)); + } + + private Mono insertDocument(ContentDocumentEntry request) { + log.debug("Processing document entry: title='{}', hasDocument={}", request.getName(), + request.getDocumentResource() != null); + + OASDocumentRequestDataRequest createDocumentRequest = contentMapper.map(request); + log.debug("Document entry request mapped: {}", createDocumentRequest); + return Mono.defer(() -> Mono.just(createContentDocument(createDocumentRequest, request.getDocumentResource())) + .doOnSuccess( + created -> log.info("Document entry created successfully: title='{}', uuid={}, documentAttached={}", + request.getName(), created.getUuid(), request.getDocumentResource() != null)) + .doOnError( + error -> log.error("Document entry creation failed: title='{}', errorType={}, errorMessage={}", + request.getName(), error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> Mono.empty()) + .thenReturn(request)); + } + + private Flux> findUpsertDocuments( + List documents) { + List existsNews = contentApi.listContentDocuments(null, CONTENT_RETRIEVE_LIMIT, null, 0, + null, null) + .getResults().stream().filter(Objects::nonNull).toList(); + + if (existsNews.isEmpty()) { + log.info("No existing document found in system: newEntries={}", + documents.size()); + return Flux.fromIterable(documents.stream() + .map(UpsertPartition::createPartition) + .toList()); + } + + Map existTitles = existsNews.stream() + .collect(Collectors.toMap(OASDocumentResponse::getName, OASDocumentResponse::getUuid, + (existing, replacement) -> existing)); + + List> newEntries = documents.stream() + .map(d -> new UpsertPartition<>(existTitles.get(d.getName()), d)) + .toList(); + + return Flux.fromIterable(newEntries); + } + + public OASDocumentResponse createContentDocument(OASDocumentRequestDataRequest data, Resource document) + throws RestClientException { + + HttpHeaders localVarHeaderParams = new HttpHeaders(); + MultiValueMap localVarCookieParams = new LinkedMultiValueMap<>(); + MultiValueMap localVarFormParams = new LinkedMultiValueMap<>(); + + if (data != null) { + localVarFormParams.add("data", data); + } + if (document != null) { + localVarFormParams.add("path", document); + } + + final String[] localVarAccepts = { + "application/json" + }; + final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + final String[] localVarContentTypes = { + "multipart/form-data" + }; + final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[]{}; + + ParameterizedTypeReference localReturnType = new ParameterizedTypeReference<>() { + }; + return apiClient.invokeAPI("/service-api/v2/content/documents/", HttpMethod.POST, + Collections.emptyMap(), new LinkedMultiValueMap<>(), null, localVarHeaderParams, + localVarCookieParams, localVarFormParams, localVarAccept, localVarContentType, localVarAuthNames, + localReturnType) + .getBody(); + } + + public OASDocumentResponse patchDocument(UUID uuid, + OASDocumentRequestDataRequest data, Resource document) throws RestClientException { + + // verify the required parameter 'uuid' is set + if (uuid == null) { + throw new HttpClientErrorException(HttpStatus.BAD_REQUEST, + "Missing the required parameter 'uuid' when calling patchContentDocument"); + } + + // create path and map variables + final Map uriVariables = new HashMap(); + uriVariables.put("uuid", uuid.toString()); + + final MultiValueMap localVarQueryParams = new LinkedMultiValueMap(); + final HttpHeaders localVarHeaderParams = new HttpHeaders(); + final MultiValueMap localVarCookieParams = new LinkedMultiValueMap(); + final MultiValueMap localVarFormParams = new LinkedMultiValueMap(); + + if (data != null) { + localVarFormParams.add("data", data); + } + if (document != null) { + localVarFormParams.add("path", document); + } + + final String[] localVarAccepts = { + "application/json" + }; + final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + final String[] localVarContentTypes = { + "multipart/form-data" + }; + final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[]{}; + + ParameterizedTypeReference localReturnType = new ParameterizedTypeReference() { + }; + return apiClient.invokeAPI("/service-api/v2/content/documents/{uuid}/", HttpMethod.PATCH, uriVariables, + localVarQueryParams, null, localVarHeaderParams, localVarCookieParams, localVarFormParams, + localVarAccept, localVarContentType, localVarAuthNames, localReturnType) + .getBody(); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java index 52146eed3..051a89da2 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java @@ -7,7 +7,10 @@ import com.backbase.investment.api.service.sync.v1.model.Entry; import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdate; import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest; +import com.backbase.investment.api.service.sync.v1.model.EntryTagRequest; +import com.backbase.investment.api.service.sync.v1.model.PatchedEntryTagRequest; import com.backbase.stream.investment.model.MarketNewsEntry; +import com.backbase.stream.investment.model.ContentTag; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,6 +50,125 @@ public class InvestmentRestNewsContentService { private final ApiClient apiClient; private final ContentMapper contentMapper = Mappers.getMapper(ContentMapper.class); + public Mono upsertTags(List tagEntries) { + log.info("Starting tag upsert batch operation: totalEntries={}", tagEntries.size()); + log.debug("Tag upsert batch details: entries={}", tagEntries); + + return Flux.fromIterable(tagEntries) + .flatMap(this::upsertSingleTag) + .doOnComplete(() -> log.info("Tag upsert batch completed successfully: totalEntriesProcessed={}", + tagEntries.size())) + .doOnError(error -> log.error("Tag upsert batch failed: totalEntries={}, errorType={}, errorMessage={}", + tagEntries.size(), error.getClass().getSimpleName(), error.getMessage(), error)) + .then(); + + } + + /** + * Upserts a single tag entry using the ContentApi tag endpoints. Implementation follows the upsert pattern: + *
    + *
  1. List existing tag entries to check if the tag code already exists
  2. + *
  3. If tag exists, patch it with the new value
  4. + *
  5. If not found, create a new tag entry
  6. + *
+ * + * @param marketNewsTag The tag to upsert + * @return Mono that completes with the tag when processed, or empty if validation fails + */ + private Mono upsertSingleTag(ContentTag marketNewsTag) { + log.debug("Processing tag: code='{}', value='{}'", marketNewsTag.getCode(), marketNewsTag.getValue()); + + // Validation + if (marketNewsTag.getCode() == null || marketNewsTag.getCode().isBlank()) { + log.warn("Skipping tag with empty code: value='{}'", marketNewsTag.getValue()); + return Mono.empty(); + } + + if (marketNewsTag.getValue() == null || marketNewsTag.getValue().isBlank()) { + log.warn("Skipping tag with empty value: code='{}'", marketNewsTag.getCode()); + return Mono.empty(); + } + + log.debug("Checking if tag entry exists: code='{}', value='{}'", + marketNewsTag.getCode(), marketNewsTag.getValue()); + + // Check if tag entry already exists + return Mono.fromCallable(() -> + contentApi.contentEntryTagList(CONTENT_RETRIEVE_LIMIT, 0)) + .map(paginatedList -> paginatedList.getResults().stream() + .filter(Objects::nonNull) + .filter(entry -> marketNewsTag.getCode().equals(entry.getCode())) + .findFirst()) + .flatMap(existingEntry -> { + if (existingEntry.isPresent()) { + log.info("Tag entry already exists: code='{}', value='{}'", + marketNewsTag.getCode(), marketNewsTag.getValue()); + return patchTagEntry(marketNewsTag); + } else { + // Create new tag entry + log.debug("Creating new tag entry: code='{}', value='{}'", + marketNewsTag.getCode(), marketNewsTag.getValue()); + return createTagEntry(marketNewsTag); + } + }) + .doOnSuccess(tag -> log.info("Tag upsert completed successfully: code='{}', value='{}'", + tag.getCode(), tag.getValue())) + .doOnError(error -> log.error( + "Tag upsert failed: code='{}', value='{}', errorType={}, errorMessage={}", + marketNewsTag.getCode(), marketNewsTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> { + log.warn("Continuing without tag: code='{}', reason={}", + marketNewsTag.getCode(), error.getMessage()); + return Mono.empty(); + }); + } + + /** + * Creates a new tag entry using the ContentApi. + * + * @param contentTag The tag to create an entry for + * @return Mono of the created tag + */ + private Mono createTagEntry(ContentTag contentTag) { + EntryTagRequest request = new EntryTagRequest() + .code(contentTag.getCode()) + .value(contentTag.getValue()); + + return Mono.defer(() -> Mono.just(contentApi.contentEntryTagCreate(request))) + .doOnSuccess(created -> log.info( + "Tag entry created successfully: code='{}', value='{}'", + created.getCode(), created.getValue())) + .doOnError(error -> log.error( + "Tag entry creation failed: code='{}', value='{}', errorType={}, errorMessage={}", + contentTag.getCode(), contentTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .thenReturn(contentTag); + } + + /** + * Patches an existing tag entry with updated values. + * + * @param contentTag The tag with updated values to patch + * @return Mono of the patched tag + */ + private Mono patchTagEntry(ContentTag contentTag) { + PatchedEntryTagRequest request = new PatchedEntryTagRequest() + .code(contentTag.getCode()) + .value(contentTag.getValue()); + + return Mono.defer(() -> Mono.just(contentApi.contentEntryTagPartialUpdate(contentTag.getCode(), request))) + .doOnSuccess(patched -> log.info( + "Tag entry patched successfully: code='{}', value='{}'", + patched.getCode(), patched.getValue())) + .doOnError(error -> log.error( + "Tag entry patch failed: code='{}', value='{}', errorType={}, errorMessage={}", + contentTag.getCode(), contentTag.getValue(), + error.getClass().getSimpleName(), error.getMessage(), error)) + .thenReturn(contentTag); + } + + /** * Upserts a list of content entries. For each entry, checks if content with the same title exists. If exists, * updates it; otherwise creates a new entry. Continues processing remaining entries even if individual entries @@ -56,7 +178,7 @@ public class InvestmentRestNewsContentService { * @return Mono that completes when all entries have been processed */ public Mono upsertContent(List contentEntries) { - log.info("Starting content upsert batch operation:, totalEntries={}", contentEntries.size()); + log.info("Starting content upsert batch operation: totalEntries={}", contentEntries.size()); log.debug("Content upsert batch details: entries={}", contentEntries); return findEntriesNewContent(contentEntries).flatMap(this::upsertSingleEntry).doOnComplete( @@ -80,7 +202,7 @@ private Mono upsertSingleEntry(MarketNewsEntry request) { log.debug("Creating new content entry: title='{}', hasThumbnail={}", request.getTitle(), request.getThumbnailResource() != null); EntryCreateUpdateRequest createUpdateRequest = contentMapper.map(request); - log.debug("Content entry processing : {}", createUpdateRequest); + log.debug("Content entry request mapped: {}", createUpdateRequest); return Mono.defer(() -> Mono.just(contentApi.createContentEntry(createUpdateRequest))) .flatMap(e -> addThumbnail(e, request.getThumbnailResource())) .doOnSuccess( @@ -101,7 +223,7 @@ private Flux findEntriesNewContent(List conten .getResults().stream().filter(Objects::nonNull).toList(); if (existsNews.isEmpty()) { - log.info("No existing content found in system:requestedEntries={}, existingEntries=0, newEntries={}", + log.info("No existing content found in system: requestedEntries={}, existingEntries=0, newEntries={}", entryByTitle.size(), entryByTitle.size()); return Flux.fromIterable(entryByTitle.values()); } @@ -111,7 +233,8 @@ private Flux findEntriesNewContent(List conten .filter(c -> existTitles.stream().noneMatch(e -> c.getTitle().contains(e))).toList(); log.info( - "Content filtering completed: requestedEntries={}, existingEntriesFound={}, newEntriesToCreate={}, duplicatesSkipped={}", + "Content filtering completed: requestedEntries={}, existingEntriesFound={}, " + + "newEntriesToCreate={}, duplicatesSkipped={}", entryByTitle.size(), existsNews.size(), newEntries.size(), entryByTitle.size() - newEntries.size()); log.debug("Filtered new content titles: newTitles={}", newEntries.stream().map(MarketNewsEntry::getTitle).collect(Collectors.toList())); @@ -131,40 +254,40 @@ private Mono addThumbnail(EntryCreateUpdate entry, Resource t getFileNameForLog(thumbnail)); return Mono.defer(() -> { - // create path and map variables - Map uriVariables = new HashMap<>(); - uriVariables.put("uuid", uuid); + // create path and map variables + Map uriVariables = new HashMap<>(); + uriVariables.put("uuid", uuid); - MultiValueMap localVarQueryParams = new LinkedMultiValueMap<>(); - HttpHeaders localVarHeaderParams = new HttpHeaders(); - MultiValueMap localVarCookieParams = new LinkedMultiValueMap<>(); - MultiValueMap localVarFormParams = new LinkedMultiValueMap<>(); + MultiValueMap localVarQueryParams = new LinkedMultiValueMap<>(); + HttpHeaders localVarHeaderParams = new HttpHeaders(); + MultiValueMap localVarCookieParams = new LinkedMultiValueMap<>(); + MultiValueMap localVarFormParams = new LinkedMultiValueMap<>(); - localVarFormParams.add("thumbnail", thumbnail); + localVarFormParams.add("thumbnail", thumbnail); - final String[] localVarAccepts = {"application/json"}; - final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); - final String[] localVarContentTypes = {"multipart/form-data"}; - final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + final String[] localVarAccepts = {"application/json"}; + final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + final String[] localVarContentTypes = {"multipart/form-data"}; + final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); - String[] localVarAuthNames = new String[]{}; + String[] localVarAuthNames = new String[]{}; - ParameterizedTypeReference localReturnType = new ParameterizedTypeReference<>() { - }; - apiClient.invokeAPI("/service-api/v2/content/entries/{uuid}/", HttpMethod.PATCH, uriVariables, - localVarQueryParams, null, localVarHeaderParams, localVarCookieParams, localVarFormParams, - localVarAccept, localVarContentType, localVarAuthNames, localReturnType); + ParameterizedTypeReference localReturnType = new ParameterizedTypeReference<>() { + }; + apiClient.invokeAPI("/service-api/v2/content/entries/{uuid}/", HttpMethod.PATCH, uriVariables, + localVarQueryParams, null, localVarHeaderParams, localVarCookieParams, localVarFormParams, + localVarAccept, localVarContentType, localVarAuthNames, localReturnType); - log.info("Thumbnail attached successfully: uuid={}, thumbnailFile='{}'", uuid, - getFileNameForLog(thumbnail)); - return Mono.just(entry); - }).doOnError(error -> log.error( - "Thumbnail attachment failed: uuid={}, thumbnailFile='{}', errorType={}, errorMessage={}", uuid, - getFileNameForLog(thumbnail), error.getClass().getSimpleName(), error.getMessage(), error)) - .onErrorResume(error -> { - log.warn("Content entry created without thumbnail: uuid={}, reason={}", uuid, error.getMessage()); - return Mono.just(entry); - }); + log.info("Thumbnail attached successfully: uuid={}, thumbnailFile='{}'", uuid, + getFileNameForLog(thumbnail)); + return Mono.just(entry); + }).doOnError(error -> log.error( + "Thumbnail attachment failed: uuid={}, thumbnailFile='{}', errorType={}, errorMessage={}", uuid, + getFileNameForLog(thumbnail), error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> { + log.warn("Content entry created without thumbnail: uuid={}, reason={}", uuid, error.getMessage()); + return Mono.just(entry); + }); } } diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java index 3825365e9..d3e13e1ec 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java @@ -21,6 +21,7 @@ import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; +import com.backbase.stream.investment.service.InvestmentCurrencyService; import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import java.time.Duration; import java.util.Collections; @@ -56,6 +57,7 @@ class InvestmentAssetUniverseSagaTest { @Mock private InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService; + private InvestmentCurrencyService investmentCurrencyService; private AsyncTaskService asyncTaskService; @Mock @@ -70,6 +72,7 @@ void setUp() { assetUniverseService, investmentAssetPriceService, investmentIntradayAssetPriceService, + investmentCurrencyService, asyncTaskService, configurationProperties ); diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java index 3748f2e60..692c4029a 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java @@ -28,13 +28,13 @@ void setUp() { void upsertContent_createsNewEntry_whenNotExists() { // Given EntryCreateUpdateRequest request = new EntryCreateUpdateRequest() - .title("New Article") - .excerpt("Excerpt") - .tags(List.of("tag1")); + .title("New Article") + .excerpt("Excerpt") + .tags(List.of("tag1")); PaginatedEntryList emptyList = new PaginatedEntryList() - .count(0) - .results(List.of()); + .count(0) + .results(List.of()); EntryCreateUpdate created = new EntryCreateUpdate();