From 1dd5025b0cff5131ce6d388c20d84221e0c8a9bf Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 9 Aug 2021 16:03:48 +0530 Subject: [PATCH 1/3] stream logs to kakfa --- pom.xml | 18 +++++++++-- src/main/java/com/uci/inbound/Inbound.java | 7 +++-- .../java/com/uci/inbound/KakfaLogTopics.java | 26 ++++++++++++++++ src/main/resources/log4j2.xml | 30 +++++++++++++++++++ 4 files changed, 76 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/uci/inbound/KakfaLogTopics.java create mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index ae45bec..7b6ea2f 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,12 @@ org.springframework.boot spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + org.springframework.boot @@ -75,7 +81,7 @@ jaxb-impl 2.2.11 - + org.springframework.boot spring-boot-starter-test @@ -98,7 +104,15 @@ 1.0 compile - + + org.springframework.boot + spring-boot-starter-log4j2 + + + org.apache.kafka + kafka-log4j-appender + 1.0.0 + diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 28792d9..198f059 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -10,8 +10,6 @@ import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; import org.springframework.kafka.annotation.EnableKafka; -import com.uci.dao.service.HealthService; - @EnableKafka @EnableReactiveCassandraRepositories("com.uci.dao") @EntityScan("com.uci.dao") @@ -24,7 +22,10 @@ @SpringBootApplication @ComponentScan(basePackages = {"com.uci.inbound", "com.uci.adapter", "com.uci.utils"}) public class Inbound { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { SpringApplication.run(Inbound.class, args); + + KakfaLogTopics object = new KakfaLogTopics(); + object.createTopic(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/inbound/KakfaLogTopics.java b/src/main/java/com/uci/inbound/KakfaLogTopics.java new file mode 100644 index 0000000..5fe18d8 --- /dev/null +++ b/src/main/java/com/uci/inbound/KakfaLogTopics.java @@ -0,0 +1,26 @@ +package com.uci.inbound; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.stereotype.Component; + +@Component +public class KakfaLogTopics { + + public void createTopic() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "165.232.182.146:9094"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("group.id", "logs"); + + KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); + List topics = new ArrayList(); + topics.add("inbound-logs"); + kafkaConsumer.subscribe(topics); + } +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..70b9c0b --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,30 @@ + + + + + + + 165.232.182.146:9094 + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From dbcb00a23ece5038e1068db9c6c7e2ecb3ad2fad Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 9 Aug 2021 17:55:23 +0530 Subject: [PATCH 2/3] push logs to kafka via utils --- pom.xml | 9 ------ src/main/java/com/uci/inbound/Inbound.java | 3 -- .../java/com/uci/inbound/KakfaLogTopics.java | 26 ---------------- src/main/resources/application.properties | 2 ++ src/main/resources/log4j2.xml | 30 ------------------- 5 files changed, 2 insertions(+), 68 deletions(-) delete mode 100644 src/main/java/com/uci/inbound/KakfaLogTopics.java delete mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index 7b6ea2f..c8177df 100644 --- a/pom.xml +++ b/pom.xml @@ -104,15 +104,6 @@ 1.0 compile - - org.springframework.boot - spring-boot-starter-log4j2 - - - org.apache.kafka - kafka-log4j-appender - 1.0.0 - diff --git a/src/main/java/com/uci/inbound/Inbound.java b/src/main/java/com/uci/inbound/Inbound.java index 198f059..7370a3e 100644 --- a/src/main/java/com/uci/inbound/Inbound.java +++ b/src/main/java/com/uci/inbound/Inbound.java @@ -24,8 +24,5 @@ public class Inbound { public static void main(String[] args) throws InterruptedException { SpringApplication.run(Inbound.class, args); - - KakfaLogTopics object = new KakfaLogTopics(); - object.createTopic(); } } \ No newline at end of file diff --git a/src/main/java/com/uci/inbound/KakfaLogTopics.java b/src/main/java/com/uci/inbound/KakfaLogTopics.java deleted file mode 100644 index 5fe18d8..0000000 --- a/src/main/java/com/uci/inbound/KakfaLogTopics.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.uci.inbound; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.stereotype.Component; - -@Component -public class KakfaLogTopics { - - public void createTopic() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", "165.232.182.146:9094"); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("group.id", "logs"); - - KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); - List topics = new ArrayList(); - topics.add("inbound-logs"); - kafkaConsumer.subscribe(topics); - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 965059e..202fb28 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -28,6 +28,8 @@ provider.gupshup.whatsapp.appname=Ekstep campaign.url = ${CAMPAIGN_URL} campaign.admin.token = ${CAMPAIGN_ADMIN_TOKEN} +# log4j2 log topic config +kafka.logs.topic=${KAFKA_LOGS_TOPIC} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml deleted file mode 100644 index 70b9c0b..0000000 --- a/src/main/resources/log4j2.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - - 165.232.182.146:9094 - - - - - - - - - - - - - - - - - - - - \ No newline at end of file From 1d4b1a79e1e016835fa97f891e66c5286834c948 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Thu, 19 Aug 2021 17:46:45 +0530 Subject: [PATCH 3/3] custom kafka appender for logs with custom layout --- .../health/ServiceStatusController.java | 109 ++++++++++++------ src/main/resources/application.properties | 2 + src/main/resources/log4j2.xml | 35 ++++++ 3 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 src/main/resources/log4j2.xml diff --git a/src/main/java/com/uci/inbound/health/ServiceStatusController.java b/src/main/java/com/uci/inbound/health/ServiceStatusController.java index 9a80d13..a6a771b 100644 --- a/src/main/java/com/uci/inbound/health/ServiceStatusController.java +++ b/src/main/java/com/uci/inbound/health/ServiceStatusController.java @@ -10,6 +10,9 @@ import com.uci.utils.BotService; import com.uci.dao.service.HealthService; import com.uci.utils.kafka.KafkaConfig; +import com.uci.utils.telemetry.LogTelemetryBuilder; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -18,6 +21,9 @@ import java.io.IOException; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator; @@ -32,43 +38,70 @@ @RestController @RequestMapping(value = "/service") public class ServiceStatusController { - @Autowired + + @Autowired private HealthService healthService; - - @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCassandraHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity kafkaStatusCheck() throws IOException, JsonProcessingException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - @RequestMapping(value = "/health/campaign", method = RequestMethod.GET, produces = { "application/json", "text/json" }) - public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingException, IOException { - JsonNode jsonNode = getResponseJsonNode(); - ((ObjectNode) jsonNode).put("result", healthService.getCampaignUrlHealthNode()); - - return ResponseEntity.ok(jsonNode); - } - - /** - * Returns json node for service response - * - * @return JsonNode - * @throws JsonMappingException - * @throws JsonProcessingException - */ - private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree("{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); - return jsonNode; - } + + @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", + "text/json" }) + public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getCassandraHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + @RequestMapping(value = "/health/kafka", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity kafkaStatusCheck() + throws IOException, JsonProcessingException, InterruptedException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getKafkaHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + @RequestMapping(value = "/health/campaign", method = RequestMethod.GET, produces = { "application/json", + "text/json" }) + public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingException, IOException { + JsonNode jsonNode = getResponseJsonNode(); + ((ObjectNode) jsonNode).put("result", healthService.getCampaignUrlHealthNode()); + + return ResponseEntity.ok(jsonNode); + } + + private static final Logger logger = LogManager.getLogger(TelemetryLogger.class); + + /* + * Test with custom kafka appender & custom layout, + * telemetry object build internally via custom layout mentioned in xml by sent message + */ + @RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity testKafkaLogAppender() throws JsonProcessingException, IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}"); + + logger.fatal("Fatal Test Message 1"); + + logger.info("Info Test Message 1"); + + logger.error("Error Test Message 1"); + + logger.warn("Warn Test Message 1"); + + return ResponseEntity.ok(jsonNode); + } + + /** + * Returns json node for service response + * + * @return JsonNode + * @throws JsonMappingException + * @throws JsonProcessingException + */ + private JsonNode getResponseJsonNode() throws JsonMappingException, JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree( + "{\"id\":\"api.content.service.health\",\"ver\":\"3.0\",\"ts\":null,\"params\":{\"resmsgid\":null,\"msgid\":null,\"err\":null,\"status\":\"successful\",\"errmsg\":null},\"responseCode\":\"OK\",\"result\":{\"healthy\":false}}"); + return jsonNode; + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 202fb28..ae05601 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -31,5 +31,7 @@ campaign.admin.token = ${CAMPAIGN_ADMIN_TOKEN} # log4j2 log topic config kafka.logs.topic=${KAFKA_LOGS_TOPIC} +log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector + diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..61e2c0a --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,35 @@ + + + + ${env:KAFKA_LOGS_TOPIC} + ${env:BOOTSTRAP_SERVERS} + + + + + ${kafka.bootstrap.servers} + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file