diff --git a/pom.xml b/pom.xml
index 9e09d2d..63fbae2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,7 +111,34 @@
1.0
compile
-
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.19.1
+
+
+ com.google.guava
+ guava
+ 31.0.1-jre
+
+
+ com.lightstep.opentelemetry
+ opentelemetry-launcher
+ 1.5.0
+
+
+ io.opentelemetry
+ opentelemetry-api
+ 1.7.1
+
+
+ io.opentelemetry
+ opentelemetry-extension-annotations
+ 1.7.1
+
+
diff --git a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java
index 091afbb..9a8421c 100644
--- a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java
+++ b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java
@@ -1,6 +1,7 @@
package com.uci.outbound.Application;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -38,12 +39,12 @@ Map kafkaConsumerConfiguration() {
ReceiverOptions kafkaReceiverOptions(@Value("${outbound}") String[] inTopicName) {
ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration());
return options.subscription(Arrays.asList(inTopicName))
- .withKeyDeserializer(new JsonDeserializer<>())
+ .withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new JsonDeserializer());
}
@Bean
- Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) {
- return KafkaReceiver.create(kafkaReceiverOptions).receive();
+ Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) {
+ return KafkaReceiver.create(kafkaReceiverOptions).receiveAtmostOnce();
}
}
\ No newline at end of file
diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java
index b97b2ca..e3291ef 100644
--- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java
+++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java
@@ -8,7 +8,17 @@
import lombok.extern.slf4j.Slf4j;
import messagerosa.core.model.XMessage;
import com.uci.dao.utils.XMessageDAOUtils;
+import com.uci.utils.kafka.adapter.TextMapGetterAdapter;
+import com.uci.utils.kafka.adapter.TextMapSetterAdapter;
+
+import io.grpc.Context;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
import messagerosa.xml.XMessageParser;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
@@ -24,23 +34,30 @@
@Slf4j
public class OutboundKafkaController {
- private final Flux> reactiveKafkaReceiver;
+ private final Flux> reactiveKafkaReceiver;
@Autowired
private ProviderFactory factoryProvider;
@Autowired
private XMessageRepository xMessageRepo;
+
+ @Autowired
+ private Tracer tracer;
@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
- .doOnNext(new Consumer>() {
+ .doOnNext(new Consumer>() {
@Override
- public void accept(ReceiverRecord msg) {
- log.info("kafka message receieved");
- XMessage currentXmsg = null;
- try {
+ public void accept(ConsumerRecord msg) {
+ log.info("kafka message receieved");
+ XMessage currentXmsg = null;
+// Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg.headers(), TextMapGetterAdapter.getter);
+// log.info("Opentelemetry extracted context : "+extractedContext);
+
+// try (Scope scope = extractedContext.makeCurrent()) {
+ try {
currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes()));
String channel = currentXmsg.getChannelURI();
String provider = currentXmsg.getProviderURI();
@@ -74,4 +91,4 @@ public void accept(Throwable e) {
})
.subscribe();
}
-}
\ No newline at end of file
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 38fb98d..9637ef8 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -54,11 +54,11 @@ caffeine.cache.max.size=0
caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}}
#Opentelemetry Lighstep Config
-opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER}
-opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION}
-opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE}
-opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN}
-opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT}
+opentelemetry.lightstep.tracer=${LS_TRACER_NAME}
+opentelemetry.lightstep.tracer.version=${LS_TRACER_VERSION}
+opentelemetry.lightstep.service=${LS_SERVICE_NAME}
+opentelemetry.lightstep.access.token=${LS_ACCESS_TOKEN}
+opentelemetry.lightstep.end.point=${LS_END_POINT}
#Sunbird Adapater Outbound URL
adapter.sunbird.transport.url=${TRANSPORT_SOCKET_BASE_URL}