From 66796b04a55b889f19e3c8f244f25317c6dc6914 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Fri, 3 Dec 2021 10:46:37 +0530 Subject: [PATCH 1/5] lighstep opentelemetry - tracing on message process --- .../outbound/consumers/OutboundKafkaController.java | 13 ++++++++++++- src/main/resources/application.properties | 7 +++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index d19c838..bff39af 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -8,6 +8,11 @@ import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.XMessage; import com.uci.dao.utils.XMessageDAOUtils; + +import io.grpc.Context; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import messagerosa.xml.XMessageParser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; @@ -31,6 +36,9 @@ public class OutboundKafkaController { @Autowired private XMessageRepository xMessageRepo; + + @Autowired + private Tracer tracer; @EventListener(ApplicationStartedEvent.class) public void onMessage() { @@ -38,8 +46,10 @@ public void onMessage() { .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord msg) { + Span rootSpan = tracer.spanBuilder("outbound-processMessage").startSpan(); + Context currentContext = Context.current(); XMessage currentXmsg = null; - try { + try(Scope scope = rootSpan.makeCurrent()) { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); @@ -54,6 +64,7 @@ public void accept(XMessage xMessage) { @Override public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); + rootSpan.end(); } }); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5b9cebd..9a24941 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,3 +51,10 @@ spring.r2dbc.password=${FORMS_DB_PASSWORD} #Caffeine Cache caffeine.cache.max.size=${CAFFEINE_CACHE_MAX_SIZE:#{1000}} caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} + +#Opentelemetry Lighstep Config +opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} +opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} +opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} +opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} From 7754894b84eecaac6a5a9462c12e8191788430e5 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 17 Jan 2022 18:44:19 +0530 Subject: [PATCH 2/5] opentelemetry spans removed --- .../uci/outbound/consumers/OutboundKafkaController.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index 2ee39b7..fac5592 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -46,11 +46,9 @@ public void onMessage() { .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord msg) { - log.info("kafka message receieved"); - Span rootSpan = tracer.spanBuilder("outbound-processMessage").startSpan(); - Context currentContext = Context.current(); - XMessage currentXmsg = null; - try(Scope scope = rootSpan.makeCurrent()) { + log.info("kafka message receieved"); + XMessage currentXmsg = null; + try { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); @@ -65,7 +63,6 @@ public void accept(XMessage xMessage) { @Override public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); - rootSpan.end(); } }); } From 037d3660fe00aea8f9447b3e4b033aaa19299684 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Wed, 19 Jan 2022 10:01:41 +0530 Subject: [PATCH 3/5] context propagation & extract --- .../Application/ReactiveKafkaConfiguration.java | 7 ++++--- .../consumers/OutboundKafkaController.java | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java index 091afbb..9a8421c 100644 --- a/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java +++ b/src/main/java/com/uci/outbound/Application/ReactiveKafkaConfiguration.java @@ -1,6 +1,7 @@ package com.uci.outbound.Application; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -38,12 +39,12 @@ Map kafkaConsumerConfiguration() { ReceiverOptions kafkaReceiverOptions(@Value("${outbound}") String[] inTopicName) { ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration()); return options.subscription(Arrays.asList(inTopicName)) - .withKeyDeserializer(new JsonDeserializer<>()) + .withKeyDeserializer(new StringDeserializer()) .withValueDeserializer(new JsonDeserializer()); } @Bean - Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { - return KafkaReceiver.create(kafkaReceiverOptions).receive(); + Flux> reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) { + return KafkaReceiver.create(kafkaReceiverOptions).receiveAtmostOnce(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index fac5592..6332dea 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -8,12 +8,17 @@ import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.XMessage; import com.uci.dao.utils.XMessageDAOUtils; +import com.uci.utils.kafka.adapter.TextMapGetterAdapter; +import com.uci.utils.kafka.adapter.TextMapSetterAdapter; import io.grpc.Context; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import messagerosa.xml.XMessageParser; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.event.EventListener; @@ -29,7 +34,7 @@ @Slf4j public class OutboundKafkaController { - private final Flux> reactiveKafkaReceiver; + private final Flux> reactiveKafkaReceiver; @Autowired private ProviderFactory factoryProvider; @@ -43,12 +48,15 @@ public class OutboundKafkaController { @EventListener(ApplicationStartedEvent.class) public void onMessage() { reactiveKafkaReceiver - .doOnNext(new Consumer>() { + .doOnNext(new Consumer>() { @Override - public void accept(ReceiverRecord msg) { + public void accept(ConsumerRecord msg) { log.info("kafka message receieved"); XMessage currentXmsg = null; - try { + Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg.headers(), TextMapGetterAdapter.getter); + log.info("Opentelemetry extracted context : "+extractedContext); + + try (Scope scope = extractedContext.makeCurrent()) { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); From 33dae5135e575f9a9f13c55c01cb42775bd98223 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Thu, 20 Jan 2022 11:32:25 +0530 Subject: [PATCH 4/5] changes --- pom.xml | 29 ++++++++++++++++++- .../consumers/OutboundKafkaController.java | 7 +++-- src/main/resources/application.properties | 12 ++++---- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index 9e09d2d..63fbae2 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,34 @@ 1.0 compile - + + + + com.google.protobuf + protobuf-java + 3.19.1 + + + com.google.guava + guava + 31.0.1-jre + + + com.lightstep.opentelemetry + opentelemetry-launcher + 1.5.0 + + + io.opentelemetry + opentelemetry-api + 1.7.1 + + + io.opentelemetry + opentelemetry-extension-annotations + 1.7.1 + + diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index 6332dea..e3291ef 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -53,10 +53,11 @@ public void onMessage() { public void accept(ConsumerRecord msg) { log.info("kafka message receieved"); XMessage currentXmsg = null; - Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg.headers(), TextMapGetterAdapter.getter); - log.info("Opentelemetry extracted context : "+extractedContext); +// Context extractedContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), msg.headers(), TextMapGetterAdapter.getter); +// log.info("Opentelemetry extracted context : "+extractedContext); - try (Scope scope = extractedContext.makeCurrent()) { +// try (Scope scope = extractedContext.makeCurrent()) { + try { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 38fb98d..cda1fa0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -17,7 +17,7 @@ inbound-error=${KAFKA_INBOUND_ERROR_TOPIC} kafka.logs.topic=logs outbound=${KAFKA_OUTBOUND_TOPIC} -server.port=9090 +server.port=9096 spring.liquibase.enabled=false # Cassandra @@ -54,11 +54,11 @@ caffeine.cache.max.size=0 caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} #Opentelemetry Lighstep Config -opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} -opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} -opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} -opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} -opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} +opentelemetry.lightstep.tracer=${LS_TRACER_NAME} +opentelemetry.lightstep.tracer.version=${LS_TRACER_VERSION} +opentelemetry.lightstep.service=${LS_SERVICE_NAME} +opentelemetry.lightstep.access.token=${LS_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${LS_END_POINT} #Sunbird Adapater Outbound URL adapter.sunbird.transport.url=${TRANSPORT_SOCKET_BASE_URL} From fc083552f7da8345d4cd256befe5d94c1b47b1f5 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Thu, 20 Jan 2022 11:38:17 +0530 Subject: [PATCH 5/5] changes --- src/main/resources/application.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cda1fa0..9637ef8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -17,7 +17,7 @@ inbound-error=${KAFKA_INBOUND_ERROR_TOPIC} kafka.logs.topic=logs outbound=${KAFKA_OUTBOUND_TOPIC} -server.port=9096 +server.port=9090 spring.liquibase.enabled=false # Cassandra