diff --git a/pom.xml b/pom.xml index 9e09d2d..63fbae2 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,34 @@ 1.0 compile - + + + + com.google.protobuf + protobuf-java + 3.19.1 + + + com.google.guava + guava + 31.0.1-jre + + + com.lightstep.opentelemetry + opentelemetry-launcher + 1.5.0 + + + io.opentelemetry + opentelemetry-api + 1.7.1 + + + io.opentelemetry + opentelemetry-extension-annotations + 1.7.1 + + diff --git a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java index 091afbb..9a8421c 100644 --- a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java +++ b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java @@ -1,6 +1,7 @@ package com.uci.outbound.Application; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -38,12 +39,12 @@ Map kafkaConsumerConfiguration() { ReceiverOptions kafkaReceiverOptions(@Value("${outbound}") String[] inTopicName) { ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration()); return options.subscription(Arrays.asList(inTopicName)) - .withKeyDeserializer(new JsonDeserializer<>()) + .withKeyDeserializer(new StringDeserializer()) .withValueDeserializer(new JsonDeserializer()); } @Bean - Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { - return KafkaReceiver.create(kafkaReceiverOptions).receive(); + Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { + return KafkaReceiver.create(kafkaReceiverOptions).receiveAtmostOnce(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index b97b2ca..e3291ef 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -8,7 +8,17 @@ import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.XMessage; import com.uci.dao.utils.XMessageDAOUtils; +import com.uci.utils.kafka.adapter.TextMapGetterAdapter; +import com.uci.utils.kafka.adapter.TextMapSetterAdapter; + +import io.grpc.Context; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import messagerosa.xml.XMessageParser; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.event.EventListener; @@ -24,23 +34,30 @@ @Slf4j public class OutboundKafkaController { - private final Flux> reactiveKafkaReceiver; + private final Flux> reactiveKafkaReceiver; @Autowired private ProviderFactory factoryProvider; @Autowired private XMessageRepository xMessageRepo; + + @Autowired + private Tracer tracer; @EventListener(ApplicationStartedEvent.class) public void onMessage() { reactiveKafkaReceiver - .doOnNext(new Consumer>() { + .doOnNext(new Consumer>() { @Override - public void accept(ReceiverRecord msg) { - log.info("kafka message receieved"); - XMessage currentXmsg = null; - try { + public void accept(ConsumerRecord msg) { + log.info("kafka message receieved"); + XMessage currentXmsg = null; +// Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg.headers(), TextMapGetterAdapter.getter); +// log.info("Opentelemetry extracted context : "+extractedContext); + +// try (Scope scope = extractedContext.makeCurrent()) { + try { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); @@ -74,4 +91,4 @@ public void accept(Throwable e) { }) .subscribe(); } -} \ No newline at end of file +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 38fb98d..9637ef8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -54,11 +54,11 @@ caffeine.cache.max.size=0 caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} #Opentelemetry Lighstep Config -opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} -opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} -opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} -opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} -opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} +opentelemetry.lightstep.tracer=${LS_TRACER_NAME} +opentelemetry.lightstep.tracer.version=${LS_TRACER_VERSION} +opentelemetry.lightstep.service=${LS_SERVICE_NAME} +opentelemetry.lightstep.access.token=${LS_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${LS_END_POINT} #Sunbird Adapater Outbound URL adapter.sunbird.transport.url=${TRANSPORT_SOCKET_BASE_URL}