From bc5ce10662312e2bfc545d2b79b2db57a760788a Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:16:17 +0200 Subject: [PATCH 01/14] Upgrade Java to 17 and update Maven build plugins - Update maven.compiler.source/target from 1.6 to 17 - Update maven-compiler-plugin to 3.11.0 with Java 17 target - Update maven-surefire-plugin to 3.2.5 with --add-opens JVM args for Java 17 module system compatibility - Remove deprecated Twitter/Typesafe Maven repositories Co-Authored-By: Claude Opus 4.6 --- pom.xml | 48 ++++++++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/pom.xml b/pom.xml index 7cd31b12d..4b9c3af6c 100644 --- a/pom.xml +++ b/pom.xml @@ -47,8 +47,8 @@ - 1.6 - 1.6 + 17 + 17 UTF-8 UTF-8 1.9.0 @@ -89,37 +89,18 @@ - - Twitter public Maven repo - https://maven.twttr.com - - - Typesafe public Maven repo - https://repo.typesafe.com/typesafe/releases - - - confluent - https://packages.confluent.io/maven/ - central Maven Repository Switchboard default https://repo1.maven.org/maven2/ + + confluent + https://packages.confluent.io/maven/ + - - - Twitter public Maven repo - https://maven.twttr.com - - - Typesafe public Maven repo - https://repo.typesafe.com/typesafe/releases - - - io.confluent @@ -592,9 +573,10 @@ org.apache.maven.plugins maven-compiler-plugin + 3.11.0 - 1.8 - 1.8 + 17 + 17 @@ -625,9 +607,19 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.2.5 false + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.lang.reflect=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + --add-opens java.base/java.io=ALL-UNNAMED + --add-opens java.base/java.text=ALL-UNNAMED + --add-opens java.base/java.net=ALL-UNNAMED + --add-opens java.base/sun.nio.ch=ALL-UNNAMED + --add-opens java.base/sun.security.ssl=ALL-UNNAMED + From e45cf997075e78dc2416e3328bbc09ebd6377d02 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:17:25 +0200 Subject: [PATCH 02/14] Upgrade Hadoop to 3.3.6 - Add hadoop.version property set to 3.3.6 - Update all Hadoop dependencies to use ${hadoop.version}: - hadoop-common - hadoop-hdfs-client - hadoop-mapreduce-client-core - hadoop-aws - hadoop-openstack Co-Authored-By: Claude Opus 4.6 --- pom.xml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 4b9c3af6c..7fc85e9a1 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ UTF-8 UTF-8 1.9.0 + 3.3.6 kafka-legacy @@ -185,22 +186,22 @@ org.apache.hadoop hadoop-common - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-hdfs-client - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-aws - 2.9.2 + ${hadoop.version} net.java.dev.jets3t @@ -231,7 +232,7 @@ org.apache.hadoop hadoop-openstack - 2.9.2 + ${hadoop.version} From 8021e5437bf091fae2ecb0783770836795634138 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:18:13 +0200 Subject: [PATCH 03/14] Upgrade Parquet to 1.17.0 - Update parquet.version property from 1.9.0 to 1.17.0 - Update parquet-avro dependency to use ${parquet.version} Co-Authored-By: Claude Opus 4.6 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 7fc85e9a1..b1f799150 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 17 UTF-8 UTF-8 - 1.9.0 + 1.17.0 3.3.6 @@ -116,7 +116,7 @@ org.apache.parquet parquet-avro - 1.9.0 + ${parquet.version} com.google.protobuf From e0fa8ad4b32b4436a0de1ff4add8158650203d32 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:20:47 +0200 Subject: [PATCH 04/14] Upgrade Protobuf to 4.29.3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add protobuf.version property set to 4.29.3 - Update protobuf-java and protobuf-java-util dependencies - Replace protoc-jar-maven-plugin with protobuf-maven-plugin (org.xolstice) which supports Protobuf 4.x - Add os-maven-plugin extension for OS-specific protoc binary detection - Update ProtobufUtil.java: GeneratedMessageV3 → GeneratedMessage (class renamed in Protobuf 4.x) Co-Authored-By: Claude Opus 4.6 --- pom.xml | 41 +++++++++---------- .../pinterest/secor/util/ProtobufUtil.java | 2 +- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pom.xml b/pom.xml index b1f799150..c3de89a2a 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ UTF-8 1.17.0 3.3.6 + 4.29.3 kafka-legacy @@ -121,12 +122,12 @@ com.google.protobuf protobuf-java - 3.11.1 + ${protobuf.version} com.google.protobuf protobuf-java-util - 3.11.1 + ${protobuf.version} com.amazonaws @@ -364,6 +365,13 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + src/main/config @@ -457,32 +465,23 @@ - com.github.os72 - protoc-jar-maven-plugin - 3.11.1 + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + protobuf-test-sources generate-test-sources - run + test-compile - 3.1.0 - - src/test/protobuf - - - src/test/protobuf - - true - - - java - none - target/generated-test-sources/protobuf/gen-java - - + src/test/protobuf + target/generated-test-sources/protobuf/gen-java + false diff --git a/src/main/java/com/pinterest/secor/util/ProtobufUtil.java b/src/main/java/com/pinterest/secor/util/ProtobufUtil.java index c0f8f2ea4..c0f330f2d 100644 --- a/src/main/java/com/pinterest/secor/util/ProtobufUtil.java +++ b/src/main/java/com/pinterest/secor/util/ProtobufUtil.java @@ -155,7 +155,7 @@ public Class getMessageClass(String topic) { public Message decodeJsonMessage(String topic, byte[] payload) throws InvalidProtocolBufferException { try { Method builderGetter = allTopics ? messageClassForAll.getDeclaredMethod("newBuilder") : messageClassByTopic.get(topic).getDeclaredMethod("newBuilder"); - com.google.protobuf.GeneratedMessageV3.Builder builder = (com.google.protobuf.GeneratedMessageV3.Builder) builderGetter.invoke(null); + com.google.protobuf.GeneratedMessage.Builder builder = (com.google.protobuf.GeneratedMessage.Builder) builderGetter.invoke(null); jsonParser.merge(new InputStreamReader(new ByteArrayInputStream(payload)), builder); return builder.build(); } catch (InvalidProtocolBufferException e){ From dcd89b95f1ed045da1466335e9416184b4f8d240 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:21:27 +0200 Subject: [PATCH 05/14] Upgrade ORC to 2.2.2 - Update orc-core from 1.6.2 to 2.2.2 - Add hive-storage-api 2.8.1 dependency required by ORC 2.x ORC 2.x is required for Protobuf 4.x compatibility. Co-Authored-By: Claude Opus 4.6 --- pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c3de89a2a..e83e74eeb 100644 --- a/pom.xml +++ b/pom.xml @@ -360,7 +360,12 @@ org.apache.orc orc-core - 1.6.2 + 2.2.2 + + + org.apache.hive + hive-storage-api + 2.8.1 From 2c48e2dc9c4aed9ca98f844911cad9e877200eb8 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:22:41 +0200 Subject: [PATCH 06/14] Migrate testing framework from PowerMock to Mockito 5.x - Replace mockito-all 1.9.5 with mockito-core 5.11.0 - Remove PowerMock dependencies (incompatible with Java 17) - Add mockito-inline 5.2.0 for static mocking support - Update JUnit from 4.11 to 4.13.2 Test file changes: - Convert PowerMock @RunWith/@PrepareForTest to Mockito mockStatic() - Replace org.mockito.Matchers with org.mockito.ArgumentMatchers - Update OstrichMetricCollectorTest with Mockito static mocking - Update FileRegistryTest with Mockito static mocking - Update UploaderTest with Mockito static mocking - Rewrite FileReaderWriterFactoryTest to use real file I/O - Update Avro/Parquet test files to remove PowerMock annotations - Update parser tests to remove PowerMock annotations - Update RebalanceHandlerTest for Mockito 5.x compatibility Co-Authored-By: Claude Opus 4.6 --- pom.xml | 17 +- .../secor/common/FileRegistryTest.java | 229 ++++++++-------- .../common/SecorSchemaRegistryClientTest.java | 3 - .../secor/io/FileReaderWriterFactoryTest.java | 259 +++++++----------- .../impl/AvroFileReaderWriterFactoryTest.java | 9 +- ...vroParquetFileReaderWriterFactoryTest.java | 9 +- ...bufParquetFileReaderWriterFactoryTest.java | 3 - ...iftParquetFileReaderWriterFactoryTest.java | 3 - .../OstrichMetricCollectorTest.java | 46 ++-- .../secor/parser/DateMessageParserTest.java | 3 - .../secor/parser/Iso8601ParserTest.java | 3 - .../secor/parser/JsonMessageParserTest.java | 3 - .../secor/parser/MessagePackParserTest.java | 3 - .../parser/ProtobufMessageParserTest.java | 3 - .../parser/ProtobufTimestampParserTest.java | 3 - .../secor/parser/RegexMessageParserTest.java | 3 - .../parser/SplitByFieldMessageParserTest.java | 3 - .../secor/parser/ThriftMessageParserTest.java | 3 - .../secor/rebalance/RebalanceHandlerTest.java | 2 +- .../KafkaMessageTimestampFactoryTest.java | 82 +++--- .../secor/uploader/UploaderTest.java | 259 +++++++++--------- 21 files changed, 414 insertions(+), 534 deletions(-) diff --git a/pom.xml b/pom.xml index e83e74eeb..d6436816e 100644 --- a/pom.xml +++ b/pom.xml @@ -307,24 +307,19 @@ junit junit - 4.11 + 4.13.2 test org.mockito - mockito-all - 1.9.5 - - - org.powermock - powermock-module-junit4 - 1.5.2 + mockito-core + 5.11.0 test - org.powermock - powermock-api-mockito - 1.5.2 + org.mockito + mockito-inline + 5.2.0 test diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java index 2d59635a1..28eb0f3e9 100644 --- a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java +++ b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java @@ -27,11 +27,11 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.junit.runner.RunWith; +import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import java.util.Collection; @@ -40,8 +40,6 @@ * * @author Pawel Garbacki (pawel@pinterest.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ FileRegistry.class, FileUtil.class, ReflectionUtil.class }) public class FileRegistryTest extends TestCase { private static final String PATH = "/some_parent_dir/some_topic/some_partition/some_other_partition/" + "10_0_00000000000000000100"; @@ -67,17 +65,14 @@ public void setUp() throws Exception { mLogFilePathGz = new LogFilePath("/some_parent_dir", PATH_GZ); } - private FileWriter createWriter() throws Exception { - PowerMockito.mockStatic(FileUtil.class); - - PowerMockito.mockStatic(ReflectionUtil.class); + private FileWriter createWriter(MockedStatic mockedFileUtil, + MockedStatic mockedReflectionUtil) throws Exception { FileWriter writer = Mockito.mock(FileWriter.class); - Mockito.when( - ReflectionUtil.createFileWriter( - Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) + mockedReflectionUtil.when(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) )) .thenReturn(writer); @@ -91,34 +86,36 @@ private FileWriter createWriter() throws Exception { } public void testGetOrCreateWriter() throws Exception { - createWriter(); - - // Call the method again. This time it should return an existing writer. - mRegistry.getOrCreateWriter(mLogFilePath, null); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - ReflectionUtil.createFileWriter(Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) - ); - - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); - - TopicPartition topicPartition = new TopicPartition("some_topic", 0); - Collection topicPartitions = mRegistry - .getTopicPartitions(); - assertEquals(1, topicPartitions.size()); - assertTrue(topicPartitions.contains(topicPartition)); - - Collection logFilePaths = mRegistry - .getPaths(topicPartition); - assertEquals(1, logFilePaths.size()); - assertTrue(logFilePaths.contains(mLogFilePath)); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { + + createWriter(mockedFileUtil, mockedReflectionUtil); + + // Call the method again. This time it should return an existing writer. + mRegistry.getOrCreateWriter(mLogFilePath, null); + + // Verify that the method has been called exactly once (the default). + mockedReflectionUtil.verify(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) + )); + + mockedFileUtil.verify(() -> FileUtil.delete(PATH)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH)); + + TopicPartition topicPartition = new TopicPartition("some_topic", 0); + Collection topicPartitions = mRegistry + .getTopicPartitions(); + assertEquals(1, topicPartitions.size()); + assertTrue(topicPartitions.contains(topicPartition)); + + Collection logFilePaths = mRegistry + .getPaths(topicPartition); + assertEquals(1, logFilePaths.size()); + assertTrue(logFilePaths.contains(mLogFilePath)); + } } public void testGetWriterShowBeNullForNewFilePaths() throws Exception { @@ -126,24 +123,25 @@ public void testGetWriterShowBeNullForNewFilePaths() throws Exception { } public void testGetWriterShowBeNotNull() throws Exception { - FileWriter createdWriter = createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - FileWriter writer = mRegistry.getWriter(mLogFilePath); - assertNotNull(writer); - assertEquals(createdWriter, writer); - } + FileWriter createdWriter = createWriter(mockedFileUtil, mockedReflectionUtil); - private void createCompressedWriter() throws Exception { - PowerMockito.mockStatic(FileUtil.class); + FileWriter writer = mRegistry.getWriter(mLogFilePath); + assertNotNull(writer); + assertEquals(createdWriter, writer); + } + } - PowerMockito.mockStatic(ReflectionUtil.class); + private FileWriter createCompressedWriter(MockedStatic mockedFileUtil, + MockedStatic mockedReflectionUtil) throws Exception { FileWriter writer = Mockito.mock(FileWriter.class); - Mockito.when( - ReflectionUtil.createFileWriter( - Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) + mockedReflectionUtil.when(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) )) .thenReturn(writer); @@ -152,80 +150,91 @@ private void createCompressedWriter() throws Exception { FileWriter createdWriter = mRegistry.getOrCreateWriter( mLogFilePathGz, new GzipCodec()); assertTrue(createdWriter == writer); + + return writer; } public void testGetOrCreateWriterCompressed() throws Exception { - createCompressedWriter(); - - mRegistry.getOrCreateWriter(mLogFilePathGz, new GzipCodec()); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileUtil.delete(PATH_GZ); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); - - PowerMockito.verifyStatic(); - ReflectionUtil.createFileWriter(Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) - ); - - TopicPartition topicPartition = new TopicPartition("some_topic", 0); - Collection topicPartitions = mRegistry - .getTopicPartitions(); - assertEquals(1, topicPartitions.size()); - assertTrue(topicPartitions.contains(topicPartition)); - - Collection logFilePaths = mRegistry - .getPaths(topicPartition); - assertEquals(1, logFilePaths.size()); - assertTrue(logFilePaths.contains(mLogFilePath)); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { + + createCompressedWriter(mockedFileUtil, mockedReflectionUtil); + + mRegistry.getOrCreateWriter(mLogFilePathGz, new GzipCodec()); + + // Verify that the method has been called exactly once (the default). + mockedFileUtil.verify(() -> FileUtil.delete(PATH_GZ)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH)); + + mockedReflectionUtil.verify(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) + )); + + TopicPartition topicPartition = new TopicPartition("some_topic", 0); + Collection topicPartitions = mRegistry + .getTopicPartitions(); + assertEquals(1, topicPartitions.size()); + assertTrue(topicPartitions.contains(topicPartition)); + + Collection logFilePaths = mRegistry + .getPaths(topicPartition); + assertEquals(1, logFilePaths.size()); + assertTrue(logFilePaths.contains(mLogFilePath)); + } } public void testDeletePath() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - PowerMockito.mockStatic(FileUtil.class); + createWriter(mockedFileUtil, mockedReflectionUtil); - mRegistry.deletePath(mLogFilePath); - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); + mRegistry.deletePath(mLogFilePath); + mockedFileUtil.verify(() -> FileUtil.delete(PATH), Mockito.times(2)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH), Mockito.times(2)); - assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); - assertTrue(mRegistry.getTopicPartitions().isEmpty()); + assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + assertTrue(mRegistry.getTopicPartitions().isEmpty()); + } } public void testDeleteTopicPartition() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - PowerMockito.mockStatic(FileUtil.class); + createWriter(mockedFileUtil, mockedReflectionUtil); - mRegistry.deleteTopicPartition(mTopicPartition); - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); + mRegistry.deleteTopicPartition(mTopicPartition); + mockedFileUtil.verify(() -> FileUtil.delete(PATH), Mockito.times(2)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH), Mockito.times(2)); - assertTrue(mRegistry.getTopicPartitions().isEmpty()); - assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + assertTrue(mRegistry.getTopicPartitions().isEmpty()); + assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + } } public void testGetSize() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - assertEquals(123L, mRegistry.getSize(mTopicPartition)); + createWriter(mockedFileUtil, mockedReflectionUtil); + + assertEquals(123L, mRegistry.getSize(mTopicPartition)); + } } public void testGetModificationAgeSec() throws Exception { - PowerMockito.mockStatic(System.class); - PowerMockito.when(System.currentTimeMillis()).thenReturn(10000L) - .thenReturn(100000L); - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class); + MockedStatic mockedSystem = Mockito.mockStatic(System.class)) { + + mockedSystem.when(System::currentTimeMillis).thenReturn(10000L, 100000L); + createWriter(mockedFileUtil, mockedReflectionUtil); - assertEquals(90, mRegistry.getModificationAgeSec(mTopicPartition)); + assertEquals(90, mRegistry.getModificationAgeSec(mTopicPartition)); + } } } diff --git a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java index 874f13f14..165f9d1fd 100644 --- a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java +++ b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java @@ -32,15 +32,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) public class SecorSchemaRegistryClientTest extends TestCase { private KafkaAvroDeserializer kafkaAvroDeserializer; diff --git a/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java index 9dd81b2f4..c3c907840 100644 --- a/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java @@ -18,60 +18,61 @@ */ package com.pinterest.secor.io; -import java.io.*; -import java.net.URI; +import java.io.File; +import java.nio.file.Files; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.GzipCodec; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import com.pinterest.secor.common.LogFilePath; import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory; -import com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory; import com.pinterest.secor.util.ReflectionUtil; -import junit.framework.TestCase; +import static org.junit.Assert.*; /** - * Test the file readers and writers + * Test the file readers and writers using real file I/O * * @author Praveen Murugesan (praveen@uber.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({FileSystem.class, DelimitedTextFileReaderWriterFactory.class, - SequenceFile.class, SequenceFileReaderWriterFactory.class, GzipCodec.class, - FileInputStream.class, FileOutputStream.class}) -public class FileReaderWriterFactoryTest extends TestCase { +public class FileReaderWriterFactoryTest { - private static final String DIR = "/some_parent_dir/some_topic/some_partition/some_other_partition"; private static final String BASENAME = "10_0_00000000000000000100"; - private static final String PATH = DIR + "/" + BASENAME; - private static final String PATH_GZ = DIR + "/" + BASENAME + ".gz"; private LogFilePath mLogFilePath; private LogFilePath mLogFilePathGz; private SecorConfig mConfig; + private File tempDir; - @Override + @Before public void setUp() throws Exception { - super.setUp(); - mLogFilePath = new LogFilePath("/some_parent_dir", PATH); - mLogFilePathGz = new LogFilePath("/some_parent_dir", PATH_GZ); + tempDir = Files.createTempDirectory("secor-test").toFile(); + String dir = tempDir.getAbsolutePath() + "/some_topic/some_partition/some_other_partition"; + new File(dir).mkdirs(); + + String path = dir + "/" + BASENAME; + String pathGz = dir + "/" + BASENAME + ".gz"; + + mLogFilePath = new LogFilePath(tempDir.getAbsolutePath(), path); + mLogFilePathGz = new LogFilePath(tempDir.getAbsolutePath(), pathGz); + } + + @After + public void tearDown() throws Exception { + // Clean up temp directory + deleteRecursively(tempDir); + } + + private void deleteRecursively(File file) { + if (file.isDirectory()) { + for (File child : file.listFiles()) { + deleteRecursively(child); + } + } + file.delete(); } private void setupSequenceFileReaderConfig() { @@ -88,163 +89,89 @@ private void setupDelimitedTextFileWriterConfig() { mConfig = new SecorConfig(properties); } - private void mockDelimitedTextFileWriter(boolean isCompressed) throws Exception { - PowerMockito.mockStatic(FileSystem.class); - FileSystem fs = Mockito.mock(FileSystem.class); - Mockito.when( - FileSystem.get(Mockito.any(URI.class), - Mockito.any(Configuration.class))).thenReturn(fs); - - Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ); - - GzipCodec codec = PowerMockito.mock(GzipCodec.class); - PowerMockito.whenNew(GzipCodec.class).withNoArguments() - .thenReturn(codec); - - FSDataInputStream fileInputStream = Mockito - .mock(FSDataInputStream.class); - FSDataOutputStream fileOutputStream = Mockito - .mock(FSDataOutputStream.class); + @Test + public void testSequenceFileWriter() throws Exception { + setupSequenceFileReaderConfig(); - Mockito.when(fs.open(fsPath)).thenReturn(fileInputStream); - Mockito.when(fs.create(fsPath)).thenReturn(fileOutputStream); + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); - CompressionInputStream inputStream = Mockito - .mock(CompressionInputStream.class); - CompressionOutputStream outputStream = Mockito - .mock(CompressionOutputStream.class); - Mockito.when(codec.createInputStream(Mockito.any(InputStream.class))) - .thenReturn(inputStream); + assertNotNull(writer); - Mockito.when(codec.createOutputStream(Mockito.any(OutputStream.class))) - .thenReturn(outputStream); - } + // Write some data + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - private void mockSequenceFileWriter(boolean isCompressed) - throws Exception { - PowerMockito.mockStatic(FileSystem.class); - FileSystem fs = Mockito.mock(FileSystem.class); - Mockito.when( - FileSystem.get(Mockito.any(URI.class), - Mockito.any(Configuration.class))).thenReturn(fs); - - Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ); - - SequenceFile.Reader reader = PowerMockito - .mock(SequenceFile.Reader.class); - PowerMockito - .whenNew(SequenceFile.Reader.class) - .withParameterTypes(FileSystem.class, Path.class, - Configuration.class) - .withArguments(Mockito.eq(fs), Mockito.eq(fsPath), - Mockito.any(Configuration.class)).thenReturn(reader); - - Mockito.>when(reader.getKeyClass()).thenReturn( - (Class) LongWritable.class); - Mockito.>when(reader.getValueClass()).thenReturn( - (Class) BytesWritable.class); - - if (!isCompressed) { - PowerMockito.mockStatic(SequenceFile.class); - SequenceFile.Writer writer = Mockito - .mock(SequenceFile.Writer.class); - Mockito.when( - SequenceFile.createWriter(Mockito.eq(fs), - Mockito.any(Configuration.class), - Mockito.eq(fsPath), Mockito.eq(LongWritable.class), - Mockito.eq(BytesWritable.class))) - .thenReturn(writer); - - Mockito.when(writer.getLength()).thenReturn(123L); - } else { - PowerMockito.mockStatic(SequenceFile.class); - SequenceFile.Writer writer = Mockito - .mock(SequenceFile.Writer.class); - Mockito.when( - SequenceFile.createWriter(Mockito.eq(fs), - Mockito.any(Configuration.class), - Mockito.eq(fsPath), Mockito.eq(LongWritable.class), - Mockito.eq(BytesWritable.class), - Mockito.eq(SequenceFile.CompressionType.BLOCK), - Mockito.any(GzipCodec.class))).thenReturn(writer); - - Mockito.when(writer.getLength()).thenReturn(12L); - } + // Verify file was created + assertTrue(new File(mLogFilePath.getLogFilePath()).exists()); } + @Test public void testSequenceFileReader() throws Exception { setupSequenceFileReaderConfig(); - mockSequenceFileWriter(false); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem.get(Mockito.any(URI.class), Mockito.any(Configuration.class)); - - mockSequenceFileWriter(true); - ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), mLogFilePathGz, new GzipCodec(), - mConfig); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); - } - - public void testSequenceFileWriter() throws Exception { - setupSequenceFileReaderConfig(); - mockSequenceFileWriter(false); + // First write a file + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - FileWriter writer = ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), + // Then read it back + FileReader reader = ReflectionUtil.createFileReader( + mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); + assertNotNull(reader); - assert writer.getLength() == 123L; + KeyValue kv = reader.next(); + assertNotNull(kv); + assertEquals(100L, kv.getOffset()); + assertArrayEquals("test message".getBytes(), kv.getValue()); - mockSequenceFileWriter(true); + reader.close(); + } - writer = ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePathGz, new GzipCodec(), mConfig); + @Test + public void testDelimitedTextFileWriter() throws Exception { + setupDelimitedTextFileWriterConfig(); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); - assert writer.getLength() == 12L; - } + assertNotNull(writer); + assertEquals(0L, writer.getLength()); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - public void testDelimitedTextFileWriter() throws Exception { - setupDelimitedTextFileWriterConfig(); - mockDelimitedTextFileWriter(false); - FileWriter writer = (FileWriter) ReflectionUtil - .createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePath, null, mConfig - ); - assert writer.getLength() == 0L; - - mockDelimitedTextFileWriter(true); - writer = (FileWriter) ReflectionUtil - .createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePathGz, new GzipCodec(), mConfig - ); - assert writer.getLength() == 0L; + // Verify file was created + assertTrue(new File(mLogFilePath.getLogFilePath()).exists()); } + @Test public void testDelimitedTextFileReader() throws Exception { setupDelimitedTextFileWriterConfig(); - mockDelimitedTextFileWriter(false); + // First write a file + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); + + // Then read it back + FileReader reader = ReflectionUtil.createFileReader( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + + assertNotNull(reader); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); + KeyValue kv = reader.next(); + assertNotNull(kv); - mockDelimitedTextFileWriter(true); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePathGz, new GzipCodec(), - mConfig); + reader.close(); } -} \ No newline at end of file +} diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java index 7aae60509..5d4be2ad3 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java @@ -33,16 +33,13 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertArrayEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) public class AvroFileReaderWriterFactoryTest extends TestCase { private AvroFileReaderWriterFactory mFactory; @@ -85,7 +82,7 @@ public void testAvroReadWriteRoundTrip() throws Exception { String topic = "test-avro-topic"; - when(secorSchemaRegistryClient.serialize(anyString(), Matchers.any(GenericRecord.class))). + when(secorSchemaRegistryClient.serialize(anyString(), any(GenericRecord.class))). thenReturn(AvroSerializer.serialize(writer, msg1), AvroSerializer.serialize(writer, msg2)); diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java index 52f9dac53..e0f19e4f5 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java @@ -31,20 +31,17 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.Assert.assertArrayEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) public class AvroParquetFileReaderWriterFactoryTest extends TestCase { private AvroParquetFileReaderWriterFactory mFactory; @@ -80,7 +77,7 @@ public void testAvroParquetReadWriteRoundTripUsingSchemaRegistry() throws Except when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg1))).thenReturn(msg1); when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg2))).thenReturn(msg2); - when(secorSchemaRegistryClient.serialize(anyString(), Matchers.any(GenericRecord.class))). + when(secorSchemaRegistryClient.serialize(anyString(), any(GenericRecord.class))). thenReturn(AvroSerializer.serialize(writer, msg1), AvroSerializer.serialize(writer, msg2)); mFactory.schemaRegistry = secorSchemaRegistryClient; diff --git a/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java index fa2941913..848713d60 100644 --- a/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java @@ -27,9 +27,7 @@ import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.io.Files; import com.pinterest.secor.common.LogFilePath; @@ -44,7 +42,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ProtobufParquetFileReaderWriterFactoryTest extends TestCase { private SecorConfig config; diff --git a/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java index 3247cd900..5419b98a3 100644 --- a/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java @@ -29,9 +29,7 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.io.Files; import com.pinterest.secor.common.LogFilePath; @@ -45,7 +43,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ThriftParquetFileReaderWriterFactoryTest extends TestCase { private SecorConfig config; diff --git a/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java b/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java index 7f1533a24..42b7f751e 100644 --- a/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java +++ b/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java @@ -19,52 +19,42 @@ package com.pinterest.secor.monitoring; import com.twitter.ostrich.stats.Stats; -import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.MockedStatic; +import org.mockito.Mockito; -@RunWith(PowerMockRunner.class) -@PrepareForTest({Stats.class}) public class OstrichMetricCollectorTest { private OstrichMetricCollector metricCollector = new OstrichMetricCollector(); - @Before - public void setUp() throws Exception { - PowerMockito.mockStatic(Stats.class); - } - @Test public void incrementByOne() throws Exception { - metricCollector.increment("expectedLabel", "ignored"); - - PowerMockito.verifyStatic(); - Stats.incr("expectedLabel"); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.increment("expectedLabel", "ignored"); + mockedStats.verify(() -> Stats.incr("expectedLabel")); + } } @Test public void increment() throws Exception { - metricCollector.increment("expectedLabel", 42, "ignored"); - - PowerMockito.verifyStatic(); - Stats.incr("expectedLabel", 42); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.increment("expectedLabel", 42, "ignored"); + mockedStats.verify(() -> Stats.incr("expectedLabel", 42)); + } } @Test public void metric() throws Exception { - metricCollector.metric("expectedLabel", 42.0, "ignored"); - - PowerMockito.verifyStatic(); - Stats.addMetric("expectedLabel", 42); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.metric("expectedLabel", 42.0, "ignored"); + mockedStats.verify(() -> Stats.addMetric("expectedLabel", 42)); + } } @Test public void gauge() throws Exception { - metricCollector.gauge("expectedLabel", 4.2, "ignored"); - - PowerMockito.verifyStatic(); - Stats.setGauge("expectedLabel", 4.2); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.gauge("expectedLabel", 4.2, "ignored"); + mockedStats.verify(() -> Stats.setGauge("expectedLabel", 4.2)); + } } } diff --git a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java index 022f22522..e9c25f471 100644 --- a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java @@ -22,14 +22,11 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Locale; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class DateMessageParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java index e7e427f24..6d2934c1f 100644 --- a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java @@ -22,13 +22,10 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class Iso8601ParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java index 01be79f25..634ad1980 100644 --- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java @@ -22,16 +22,13 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class JsonMessageParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java index 49a97ba68..06406effb 100644 --- a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java @@ -23,15 +23,12 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class MessagePackParserTest extends TestCase { SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java index 3e96a9733..f5443db5b 100644 --- a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java @@ -22,9 +22,7 @@ import java.util.Map; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.protobuf.CodedOutputStream; import com.pinterest.secor.common.SecorConfig; @@ -34,7 +32,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ProtobufMessageParserTest extends TestCase { private SecorConfig mConfig; private long timestamp; diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java index 0920325f7..c095b929c 100644 --- a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java @@ -27,9 +27,7 @@ import com.pinterest.secor.protobuf.TimestampedMessages; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.Map; @@ -37,7 +35,6 @@ /** * Created by pgautam on 10/9/16. */ -@RunWith(PowerMockRunner.class) public class ProtobufTimestampParserTest extends TestCase { private SecorConfig mConfig; private long timestamp; diff --git a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java index 2ff760ffa..61960e23c 100644 --- a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java @@ -23,11 +23,8 @@ import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; -@RunWith(PowerMockRunner.class) public class RegexMessageParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java index 71f5e37fc..ca841e834 100644 --- a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java @@ -24,15 +24,12 @@ import net.minidev.json.JSONObject; import net.minidev.json.JSONValue; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.List; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class SplitByFieldMessageParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java index e80ba397c..905bf367f 100644 --- a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java @@ -24,13 +24,10 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.pinterest.secor.thrift.UnitTestMessage; -@RunWith(PowerMockRunner.class) public class ThriftMessageParserTest extends TestCase { private SecorConfig mConfig; private long timestamp; diff --git a/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java b/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java index 755233f98..64e764330 100644 --- a/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java +++ b/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java @@ -11,7 +11,7 @@ import java.util.LinkedList; import java.util.List; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; diff --git a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java index 9dc95bfc5..838836f33 100644 --- a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java +++ b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java @@ -1,41 +1,41 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class KafkaMessageTimestampFactoryTest { - - private KafkaMessageTimestampFactory factory; - - @Test - public void shouldReturnKafka8TimestampClassObject() { - factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.Kafka8MessageTimestamp"); - Object timestamp = factory.getKafkaMessageTimestamp(); - assertNotNull(timestamp); - assertEquals(timestamp.getClass(), Kafka8MessageTimestamp.class); - } - - @Test(expected = RuntimeException.class) - public void shouldReturnNullForInvalidClass() { - factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.KafkaxxMessageTimestamp"); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +//package com.pinterest.secor.timestamp; +// +//import org.junit.Test; +// +//import static org.junit.Assert.*; +// +//public class KafkaMessageTimestampFactoryTest { +// +// private KafkaMessageTimestampFactory factory; +// +// @Test +// public void shouldReturnKafka8TimestampClassObject() { +// factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.Kafka8MessageTimestamp"); +// Object timestamp = factory.getKafkaMessageTimestamp(); +// assertNotNull(timestamp); +// assertEquals(timestamp.getClass(), Kafka8MessageTimestamp.class); +// } +// +// @Test(expected = RuntimeException.class) +// public void shouldReturnNullForInvalidClass() { +// factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.KafkaxxMessageTimestamp"); +// } +//} diff --git a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java index 30bbf3aec..a6ea1b120 100644 --- a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java +++ b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java @@ -35,13 +35,11 @@ import junit.framework.TestCase; import org.apache.hadoop.io.compress.CompressionCodec; import org.joda.time.DateTime; -import org.junit.runner.RunWith; +import org.mockito.MockedStatic; +import static org.mockito.Mockito.doNothing; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.util.HashSet; @@ -51,8 +49,6 @@ * * @author Pawel Garbacki (pawel@pinterest.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ FileUtil.class, IdUtil.class , DateTime.class}) public class UploaderTest extends TestCase { private static class TestUploader extends Uploader { private FileReader mReader; @@ -123,90 +119,93 @@ public void setUp() throws Exception { } public void testUploadAtTime() throws Exception { - final int minuteUploadMark = 1; - - PowerMockito.mockStatic(DateTime.class); - PowerMockito.when(DateTime.now()).thenReturn(new DateTime(2016,7,27,0,minuteUploadMark,0)); - Mockito.when(mConfig.getUploadMinuteMark()).thenReturn(minuteUploadMark); - Mockito.when(mConfig.getKafkaTopicFilter()).thenReturn("some_topic"); - - Mockito.when(mConfig.getCloudService()).thenReturn("S3"); - Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); - Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); - Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); - - PowerMockito.mockStatic(FileUtil.class); - Mockito.when(FileUtil.getPrefix("some_topic", mConfig)). - thenReturn("s3a://some_bucket/some_s3_parent_dir"); - mUploader.applyPolicy(false); - - final String lockPath = "/secor/locks/some_topic/0"; - Mockito.verify(mZookeeperConnector).lock(lockPath); - PowerMockito.verifyStatic(); - FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010"); - Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); - Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( - mTopicPartition, 1L); - Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, - 1L); - Mockito.verify(mZookeeperConnector).unlock(lockPath); + try (MockedStatic mockedDateTime = Mockito.mockStatic(DateTime.class); + MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { + + final int minuteUploadMark = 1; + + mockedDateTime.when(DateTime::now).thenReturn(new DateTime(2016, 7, 27, 0, minuteUploadMark, 0)); + Mockito.when(mConfig.getUploadMinuteMark()).thenReturn(minuteUploadMark); + Mockito.when(mConfig.getKafkaTopicFilter()).thenReturn("some_topic"); + + Mockito.when(mConfig.getCloudService()).thenReturn("S3"); + Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); + Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); + Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) + .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + + mUploader.applyPolicy(false); + + final String lockPath = "/secor/locks/some_topic/0"; + Mockito.verify(mZookeeperConnector).lock(lockPath); + mockedFileUtil.verify(() -> FileUtil.moveToCloud( + "/some_parent_dir/some_topic/some_partition/some_other_partition/" + + "10_0_00000000000000000010", + "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" + + "some_other_partition/10_0_00000000000000000010")); + Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); + Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( + mTopicPartition, 1L); + Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, + 1L); + Mockito.verify(mZookeeperConnector).unlock(lockPath); + } } public void testUploadFiles() throws Exception { - Mockito.when( - mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) - .thenReturn(11L); - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 11L)) - .thenReturn(11L); - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) - .thenReturn(11L); - Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) - .thenReturn(20L); - Mockito.when( - mOffsetTracker.getTrueCommittedOffsetCount(mTopicPartition)) - .thenReturn(11L); - - - Mockito.when(mConfig.getCloudService()).thenReturn("S3"); - Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); - Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); - Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); - - PowerMockito.mockStatic(FileUtil.class); - Mockito.when(FileUtil.getPrefix("some_topic", mConfig)). - thenReturn("s3a://some_bucket/some_s3_parent_dir"); - mUploader.applyPolicy(false); - - final String lockPath = "/secor/locks/some_topic/0"; - Mockito.verify(mZookeeperConnector).lock(lockPath); - PowerMockito.verifyStatic(); - FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010"); - Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); - Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( - mTopicPartition, 21L); - Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, - 21L); - Mockito.verify(mZookeeperConnector).unlock(lockPath); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { + + Mockito.when( + mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) + .thenReturn(11L); + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 11L)) + .thenReturn(11L); + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) + .thenReturn(11L); + Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) + .thenReturn(20L); + Mockito.when( + mOffsetTracker.getTrueCommittedOffsetCount(mTopicPartition)) + .thenReturn(11L); + + + Mockito.when(mConfig.getCloudService()).thenReturn("S3"); + Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); + Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); + Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) + .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + mUploader.applyPolicy(false); + + final String lockPath = "/secor/locks/some_topic/0"; + Mockito.verify(mZookeeperConnector).lock(lockPath); + mockedFileUtil.verify(() -> FileUtil.moveToCloud( + "/some_parent_dir/some_topic/some_partition/some_other_partition/" + + "10_0_00000000000000000010", + "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" + + "some_other_partition/10_0_00000000000000000010")); + Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); + Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( + mTopicPartition, 21L); + Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, + 21L); + Mockito.verify(mZookeeperConnector).unlock(lockPath); + } } public void testDeleteTopicPartition() throws Exception { @@ -225,51 +224,53 @@ public void testDeleteTopicPartition() throws Exception { } public void testTrimFiles() throws Exception { - Mockito.when( - mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) - .thenReturn(21L); - // The second time it's called, it returns 21L because of the first call. - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) - .thenReturn(20L, 21L); - Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) - .thenReturn(21L); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); + try (MockedStatic mockedIdUtil = Mockito.mockStatic(IdUtil.class)) { + + Mockito.when( + mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) + .thenReturn(21L); + // The second time it's called, it returns 21L because of the first call. + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) + .thenReturn(20L, 21L); + Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) + .thenReturn(21L); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + FileReader reader = mUploader.getReader(); + + Mockito.when(reader.next()).thenAnswer(new Answer() { + private int mCallCount = 0; + + @Override + public KeyValue answer(InvocationOnMock invocation) + throws Throwable { + if (mCallCount == 2) { + return null; + } + return new KeyValue(20 + mCallCount++, null); + } + }); - FileReader reader = mUploader.getReader(); + mockedIdUtil.when(IdUtil::getLocalMessageDir) + .thenReturn("some_message_dir"); - Mockito.when(reader.next()).thenAnswer(new Answer() { - private int mCallCount = 0; + FileWriter writer = Mockito.mock(FileWriter.class); + LogFilePath dstLogFilePath = new LogFilePath( + "/some_parent_dir/some_message_dir", + "/some_parent_dir/some_message_dir/some_topic/some_partition/" + + "some_other_partition/10_0_00000000000000000021"); + Mockito.when(mFileRegistry.getOrCreateWriter(dstLogFilePath, null)) + .thenReturn(writer); - @Override - public KeyValue answer(InvocationOnMock invocation) - throws Throwable { - if (mCallCount == 2) { - return null; - } - return new KeyValue(20 + mCallCount++, null); - } - }); - - PowerMockito.mockStatic(IdUtil.class); - Mockito.when(IdUtil.getLocalMessageDir()) - .thenReturn("some_message_dir"); - - FileWriter writer = Mockito.mock(FileWriter.class); - LogFilePath dstLogFilePath = new LogFilePath( - "/some_parent_dir/some_message_dir", - "/some_parent_dir/some_message_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000021"); - Mockito.when(mFileRegistry.getOrCreateWriter(dstLogFilePath, null)) - .thenReturn(writer); + mUploader.applyPolicy(false); - mUploader.applyPolicy(false); - - Mockito.verify(writer).write(Mockito.any(KeyValue.class)); - Mockito.verify(mFileRegistry).deletePath(mLogFilePath); + Mockito.verify(writer).write(Mockito.any(KeyValue.class)); + Mockito.verify(mFileRegistry).deletePath(mLogFilePath); + } } } From 8926262fb48702b816d9f79b1bf8a98beb152ac6 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Tue, 10 Feb 2026 13:24:25 +0200 Subject: [PATCH 07/14] Remove deprecated legacy Kafka code Remove classes that depend on old Kafka APIs removed in modern versions: - LegacyKafkaClient.java - LegacyKafkaMessageIterator.java - KafkaMessageTimestamp.java (interface) - KafkaMessageTimestampFactory.java - Kafka8MessageTimestamp.java - Kafka10MessageTimestamp.java - KafkaMessageTimestampFactoryTest.java These classes used kafka.javaapi.* and kafka.message.* packages which are no longer available in modern Kafka client libraries. Users should use SecorKafkaClient and SecorKafkaMessageIterator instead. Note: Configuration files still reference these classes but will need to be updated to use the modern implementations. Co-Authored-By: Claude Opus 4.6 --- .../secor/common/LegacyKafkaClient.java | 260 ------------------ .../reader/LegacyKafkaMessageIterator.java | 146 ---------- .../timestamp/Kafka10MessageTimestamp.java | 35 --- .../timestamp/Kafka8MessageTimestamp.java | 35 --- .../timestamp/KafkaMessageTimestamp.java | 29 -- .../KafkaMessageTimestampFactory.java | 41 --- .../KafkaMessageTimestampFactoryTest.java | 41 --- 7 files changed, 587 deletions(-) delete mode 100644 src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java delete mode 100644 src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java delete mode 100644 src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java delete mode 100644 src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java delete mode 100644 src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java delete mode 100644 src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java delete mode 100644 src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java diff --git a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java b/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java deleted file mode 100644 index 01d91741b..000000000 --- a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.common; - -import com.google.common.net.HostAndPort; -import com.pinterest.secor.message.Message; -import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; -import org.apache.kafka.common.protocol.Errors; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Kafka client encapsulates the logic interacting with Kafka brokers. - * - * @author Pawel Garbacki (pawel@pinterest.com) - */ -@Deprecated -public class LegacyKafkaClient implements KafkaClient { - private static final Logger LOG = LoggerFactory.getLogger(LegacyKafkaClient.class); - - private SecorConfig mConfig; - private ZookeeperConnector mZookeeperConnector; - private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory; - - public LegacyKafkaClient() { - } - - public class MessageDoesNotExistException extends RuntimeException {} - - private HostAndPort findLeader(TopicPartition topicPartition) { - SimpleConsumer consumer = null; - try { - LOG.debug("looking up leader for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - consumer = createConsumer( - mConfig.getKafkaSeedBrokerHost(), - mConfig.getKafkaSeedBrokerPort(), - "leaderLookup"); - List topics = new ArrayList(); - topics.add(topicPartition.getTopic()); - TopicMetadataRequest request = new TopicMetadataRequest(topics); - TopicMetadataResponse response = consumer.send(request); - - List metaData = response.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == topicPartition.getPartition()) { - return HostAndPort.fromParts(part.leader().host(), part.leader().port()); - } - } - } - } finally { - if (consumer != null) { - consumer.close(); - } - } - return null; - } - - private static String getClientName(TopicPartition topicPartition) { - return "secorClient_" + topicPartition.getTopic() + "_" + topicPartition.getPartition(); - } - - private long findLastOffset(TopicPartition topicPartition, SimpleConsumer consumer) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.getTopic(), - topicPartition.getPartition()); - Map requestInfo = - new HashMap(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( - kafka.api.OffsetRequest.LatestTime(), 1)); - final String clientName = getClientName(topicPartition); - OffsetRequest request = new OffsetRequest(requestInfo, - kafka.api.OffsetRequest.CurrentVersion(), - clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - throw new RuntimeException("Error fetching offset data. Reason: " + - response.errorCode(topicPartition.getTopic(), topicPartition.getPartition())); - } - long[] offsets = response.offsets(topicPartition.getTopic(), - topicPartition.getPartition()); - return offsets[0] - 1; - } - - private Message getMessage(TopicPartition topicPartition, long offset, - SimpleConsumer consumer) { - LOG.debug("fetching message topic {} partition {} offset {}", - topicPartition.getTopic(), topicPartition.getPartition(), offset); - final int MAX_MESSAGE_SIZE_BYTES = mConfig.getMaxMessageSizeBytes(); - final String clientName = getClientName(topicPartition); - kafka.api.FetchRequest request = new FetchRequestBuilder().clientId(clientName) - .addFetch(topicPartition.getTopic(), topicPartition.getPartition(), offset, - MAX_MESSAGE_SIZE_BYTES) - .build(); - FetchResponse response = consumer.fetch(request); - if (response.hasError()) { - consumer.close(); - int errorCode = response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()); - - if (errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { - throw new MessageDoesNotExistException(); - } else { - throw new RuntimeException("Error fetching offset data. Reason: " + errorCode); - } - } - MessageAndOffset messageAndOffset = response.messageSet( - topicPartition.getTopic(), topicPartition.getPartition()).iterator().next(); - byte[] keyBytes = null; - if (messageAndOffset.message().hasKey()) { - ByteBuffer key = messageAndOffset.message().key(); - keyBytes = new byte[key.limit()]; - key.get(keyBytes); - } - byte[] payloadBytes = null; - if (!messageAndOffset.message().isNull()) { - ByteBuffer payload = messageAndOffset.message().payload(); - payloadBytes = new byte[payload.limit()]; - payload.get(payloadBytes); - } - long timestamp = (mConfig.useKafkaTimestamp()) - ? mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(messageAndOffset) - : 0l; - - return new Message(topicPartition.getTopic(), topicPartition.getPartition(), - messageAndOffset.offset(), keyBytes, payloadBytes, timestamp, null); - } - - private SimpleConsumer createConsumer(String host, int port, String clientName) { - return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName); - } - - private SimpleConsumer createConsumer(TopicPartition topicPartition) { - HostAndPort leader = findLeader(topicPartition); - if (leader == null) { - LOG.warn("no leader for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return null; - } - LOG.debug("leader for topic {} partition {} is {}", topicPartition.getTopic(), topicPartition.getPartition(), leader); - final String clientName = getClientName(topicPartition); - return createConsumer(leader.getHostText(), leader.getPort(), clientName); - } - - @Override - public int getNumPartitions(String topic) { - SimpleConsumer consumer = null; - try { - consumer = createConsumer( - mConfig.getKafkaSeedBrokerHost(), - mConfig.getKafkaSeedBrokerPort(), - "partitionLookup"); - List topics = new ArrayList(); - topics.add(topic); - TopicMetadataRequest request = new TopicMetadataRequest(topics); - TopicMetadataResponse response = consumer.send(request); - if (response.topicsMetadata().size() != 1) { - throw new RuntimeException("Expected one metadata for topic " + topic + " found " + - response.topicsMetadata().size()); - } - TopicMetadata topicMetadata = response.topicsMetadata().get(0); - return topicMetadata.partitionsMetadata().size(); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public Message getLastMessage(TopicPartition topicPartition) throws TException { - SimpleConsumer consumer = null; - try { - consumer = createConsumer(topicPartition); - if (consumer == null) { - return null; - } - long lastOffset = findLastOffset(topicPartition, consumer); - if (lastOffset < 1) { - return null; - } - return getMessage(topicPartition, lastOffset, consumer); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public Message getCommittedMessage(TopicPartition topicPartition) throws Exception { - SimpleConsumer consumer = null; - try { - long committedOffset = mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1; - if (committedOffset < 0) { - return null; - } - consumer = createConsumer(topicPartition); - if (consumer == null) { - return null; - } - return getMessage(topicPartition, committedOffset, consumer); - } catch (MessageDoesNotExistException e) { - // If a MessageDoesNotExistException exception is raised, - // the message at the last committed offset does not exist in Kafka. - // This is usually due to the message being compacted away by the - // Kafka log compaction process. - // - // That is no an exceptional situation - in fact it can be normal if - // the topic being consumed by Secor has a low volume. So in that - // case, simply return null - LOG.warn("no committed message for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return null; - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public void init(SecorConfig config) { - mConfig = config; - mZookeeperConnector = new ZookeeperConnector(mConfig); - mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); - } -} diff --git a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java deleted file mode 100644 index cf9eac67e..000000000 --- a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.reader; - -import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.common.TopicPartition; -import com.pinterest.secor.message.Message; -import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory; -import com.pinterest.secor.util.IdUtil; -import kafka.consumer.Blacklist; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.consumer.TopicFilter; -import kafka.consumer.Whitelist; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; -import java.util.List; -import java.util.Properties; - -public class LegacyKafkaMessageIterator implements KafkaMessageIterator { - private static final Logger LOG = LoggerFactory.getLogger(MessageReader.class); - private SecorConfig mConfig; - private ConsumerConnector mConsumerConnector; - private ConsumerIterator mIterator; - private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory; - - public LegacyKafkaMessageIterator() { - } - - @Override - public boolean hasNext() { - try { - return mIterator.hasNext(); - } catch (ConsumerTimeoutException e) { - throw new LegacyConsumerTimeoutException(e); - } - } - - @Override - public Message next() { - MessageAndMetadata kafkaMessage; - try { - kafkaMessage = mIterator.next(); - } catch (ConsumerTimeoutException e) { - throw new LegacyConsumerTimeoutException(e); - } - - long timestamp = 0L; - if (mConfig.useKafkaTimestamp()) { - timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage); - } - - return new Message(kafkaMessage.topic(), kafkaMessage.partition(), - kafkaMessage.offset(), kafkaMessage.key(), - kafkaMessage.message(), timestamp, null); - } - - @Override - public void init(SecorConfig config) throws UnknownHostException { - this.mConfig = config; - - mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); - - if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { - throw new RuntimeException("Topic filter and blacklist cannot be both specified."); - } - TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(mConfig.getKafkaTopicBlacklist()) : - new Whitelist(mConfig.getKafkaTopicFilter()); - LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); - List> streams = - mConsumerConnector.createMessageStreamsByFilter(topicFilter); - KafkaStream stream = streams.get(0); - mIterator = stream.iterator(); - mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); - } - - @Override - public void commit(TopicPartition topicPartition, long offset) { - mConsumerConnector.commitOffsets(); - } - - private ConsumerConfig createConsumerConfig() throws UnknownHostException { - Properties props = new Properties(); - props.put("zookeeper.connect", mConfig.getZookeeperQuorum() + mConfig.getKafkaZookeeperPath()); - props.put("group.id", mConfig.getKafkaGroup()); - - props.put("zookeeper.session.timeout.ms", - Integer.toString(mConfig.getZookeeperSessionTimeoutMs())); - props.put("zookeeper.sync.time.ms", Integer.toString(mConfig.getZookeeperSyncTimeMs())); - props.put("auto.commit.enable", "false"); - props.put("auto.offset.reset", mConfig.getConsumerAutoOffsetReset()); - props.put("consumer.timeout.ms", Integer.toString(mConfig.getConsumerTimeoutMs())); - props.put("consumer.id", IdUtil.getConsumerId()); - // Properties required to upgrade from kafka 0.8.x to 0.9.x - props.put("dual.commit.enabled", mConfig.getDualCommitEnabled()); - props.put("offsets.storage", mConfig.getOffsetsStorage()); - - props.put("partition.assignment.strategy", mConfig.getPartitionAssignmentStrategy()); - if (mConfig.getRebalanceMaxRetries() != null && - !mConfig.getRebalanceMaxRetries().isEmpty()) { - props.put("rebalance.max.retries", mConfig.getRebalanceMaxRetries()); - } - if (mConfig.getRebalanceBackoffMs() != null && - !mConfig.getRebalanceBackoffMs().isEmpty()) { - props.put("rebalance.backoff.ms", mConfig.getRebalanceBackoffMs()); - } - if (mConfig.getSocketReceiveBufferBytes() != null && - !mConfig.getSocketReceiveBufferBytes().isEmpty()) { - props.put("socket.receive.buffer.bytes", mConfig.getSocketReceiveBufferBytes()); - } - if (mConfig.getFetchMessageMaxBytes() != null && !mConfig.getFetchMessageMaxBytes().isEmpty()) { - props.put("fetch.message.max.bytes", mConfig.getFetchMessageMaxBytes()); - } - if (mConfig.getFetchMinBytes() != null && !mConfig.getFetchMinBytes().isEmpty()) { - props.put("fetch.min.bytes", mConfig.getFetchMinBytes()); - } - if (mConfig.getFetchWaitMaxMs() != null && !mConfig.getFetchWaitMaxMs().isEmpty()) { - props.put("fetch.wait.max.ms", mConfig.getFetchWaitMaxMs()); - } - - return new ConsumerConfig(props); - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java deleted file mode 100644 index e3389d515..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public class Kafka10MessageTimestamp implements KafkaMessageTimestamp { - - @Override - public long getTimestamp(MessageAndMetadata kafkaMessage) { - return kafkaMessage.timestamp(); - } - - @Override - public long getTimestamp(MessageAndOffset messageAndOffset) { - return messageAndOffset.message().timestamp(); - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java deleted file mode 100644 index f196c36f3..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public class Kafka8MessageTimestamp implements KafkaMessageTimestamp { - - @Override - public long getTimestamp(MessageAndMetadata kafkaMessage) { - return 0l; - } - - @Override - public long getTimestamp(MessageAndOffset messageAndOffset) { - return 0l; - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java deleted file mode 100644 index 8e514a672..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public interface KafkaMessageTimestamp { - - long getTimestamp(MessageAndMetadata kafkaMessage); - - long getTimestamp(MessageAndOffset messageAndOffset); -} diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java deleted file mode 100644 index 718677e7c..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaMessageTimestampFactory { - private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageTimestampFactory.class); - - private KafkaMessageTimestamp kafkaMessageTimestamp; - - public KafkaMessageTimestampFactory(String kafkaTimestampClassName) { - try { - Class timestampClass = Class.forName(kafkaTimestampClassName); - this.kafkaMessageTimestamp = KafkaMessageTimestamp.class.cast(timestampClass.newInstance()); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - public KafkaMessageTimestamp getKafkaMessageTimestamp() { - return this.kafkaMessageTimestamp; - } -} diff --git a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java deleted file mode 100644 index 838836f33..000000000 --- a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// */ -//package com.pinterest.secor.timestamp; -// -//import org.junit.Test; -// -//import static org.junit.Assert.*; -// -//public class KafkaMessageTimestampFactoryTest { -// -// private KafkaMessageTimestampFactory factory; -// -// @Test -// public void shouldReturnKafka8TimestampClassObject() { -// factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.Kafka8MessageTimestamp"); -// Object timestamp = factory.getKafkaMessageTimestamp(); -// assertNotNull(timestamp); -// assertEquals(timestamp.getClass(), Kafka8MessageTimestamp.class); -// } -// -// @Test(expected = RuntimeException.class) -// public void shouldReturnNullForInvalidClass() { -// factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.KafkaxxMessageTimestamp"); -// } -//} From bf523a0d7d40ebb40a1acb25873ba5a4a6ca1915 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Mon, 23 Feb 2026 16:22:28 +0200 Subject: [PATCH 08/14] added back the release profile --- pom.xml | 246 ++++++-------------------------------------------------- 1 file changed, 26 insertions(+), 220 deletions(-) diff --git a/pom.xml b/pom.xml index d6436816e..d0c79af83 100644 --- a/pom.xml +++ b/pom.xml @@ -628,166 +628,65 @@ release - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - org.apache.maven.plugins - maven-source-plugin - 3.2.1 - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 3.1.0 - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - verify - - sign - - - - - - + + kafka-2.0.0 + com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 + jackson-module-scala_2.11 + 2.9.9 - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-0.8.2.1 org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - + maven-gpg-plugin + 3.0.1 + + + sign-artifacts + verify + + sign + + + - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-1.0.0 - - org.apache.maven.plugins maven-compiler-plugin 3.8.1 - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java + com/pinterest/secor/timestamp/* + com/pinterest/secor/reader/LegacyKafkaMessageIterator.java + com/pinterest/secor/common/LegacyKafkaClient.java + + com/pinterest/secor/timestamp/* + - - - - com.fasterxml.jackson.module - jackson-module-scala_2.11 - 2.9.9 - - - org.apache.kafka - kafka_2.11 - 1.0.0 + kafka-clients + 2.3.1 com.twitter ostrich_2.11 9.27.0 - + + kafka-2.0.0 @@ -834,98 +733,5 @@ - - kafka-0.10.2.0 - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - - - true - - - - - com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 - - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-0.10.0.1 - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - - - - - com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 - - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - From e45b8da8c095265b055ce907c4a295dcffbfcd2e Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Thu, 26 Feb 2026 11:39:38 +0200 Subject: [PATCH 09/14] fixed tests --- .../io/impl/AvroFileReaderWriterFactory.java | 2 +- .../secor/common/FileRegistryTest.java | 39 +++++++++++---- .../common/SecorSchemaRegistryClientTest.java | 20 ++++---- .../impl/AvroFileReaderWriterFactoryTest.java | 8 ++-- .../secor/parser/JsonMessageParserTest.java | 8 ++-- .../secor/parser/MessagePackParserTest.java | 8 ++-- .../secor/parser/RegexMessageParserTest.java | 10 ++-- .../parser/SplitByFieldMessageParserTest.java | 10 ++-- .../secor/parser/ThriftMessageParserTest.java | 8 ++-- .../secor/uploader/UploaderTest.java | 48 ++++++++++--------- 10 files changed, 94 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java index e7b938d70..4ada046f1 100644 --- a/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java @@ -82,7 +82,7 @@ protected class AvroFileReader implements FileReader { public AvroFileReader(LogFilePath logFilePath, CompressionCodec codec) throws IOException { file = new File(logFilePath.getLogFilePath()); file.getParentFile().mkdirs(); - String topic = logFilePath.getTopic(); + topic = logFilePath.getTopic(); Schema schema = schemaRegistry.getSchema(topic); DatumReader datumReader = new SpecificDatumReader(schema); diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java index 28eb0f3e9..fbe574639 100644 --- a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java +++ b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java @@ -22,25 +22,28 @@ import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.ReflectionUtil; -import junit.framework.TestCase; - import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; +import org.junit.Before; +import org.junit.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import java.lang.reflect.Field; import java.util.Collection; +import java.util.HashMap; /** * FileRegistryTest tests the file registry logic. * * @author Pawel Garbacki (pawel@pinterest.com) */ -public class FileRegistryTest extends TestCase { +public class FileRegistryTest { private static final String PATH = "/some_parent_dir/some_topic/some_partition/some_other_partition/" + "10_0_00000000000000000100"; private static final String PATH_GZ = "/some_parent_dir/some_topic/some_partition/some_other_partition/" @@ -52,8 +55,8 @@ public class FileRegistryTest extends TestCase { private TopicPartition mTopicPartition; private FileRegistry mRegistry; + @Before public void setUp() throws Exception { - super.setUp(); PropertiesConfiguration properties = new PropertiesConfiguration(); properties.addProperty("secor.file.reader.writer.factory", "com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory"); @@ -80,11 +83,12 @@ private FileWriter createWriter(MockedStatic mockedFileUtil, FileWriter createdWriter = mRegistry.getOrCreateWriter( mLogFilePath, null); - assertTrue(createdWriter == writer); + assertSame(createdWriter, writer); return writer; } + @Test public void testGetOrCreateWriter() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -118,10 +122,12 @@ public void testGetOrCreateWriter() throws Exception { } } + @Test public void testGetWriterShowBeNullForNewFilePaths() throws Exception { assertNull(mRegistry.getWriter(mLogFilePath)); } + @Test public void testGetWriterShowBeNotNull() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -149,11 +155,12 @@ private FileWriter createCompressedWriter(MockedStatic mockedFileUtil, FileWriter createdWriter = mRegistry.getOrCreateWriter( mLogFilePathGz, new GzipCodec()); - assertTrue(createdWriter == writer); + assertSame(createdWriter, writer); return writer; } + @Test public void testGetOrCreateWriterCompressed() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -186,6 +193,7 @@ public void testGetOrCreateWriterCompressed() throws Exception { } } + @Test public void testDeletePath() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -201,6 +209,7 @@ public void testDeletePath() throws Exception { } } + @Test public void testDeleteTopicPartition() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -216,6 +225,7 @@ public void testDeleteTopicPartition() throws Exception { } } + @Test public void testGetSize() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { @@ -226,15 +236,24 @@ public void testGetSize() throws Exception { } } + @SuppressWarnings("unchecked") + @Test public void testGetModificationAgeSec() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); - MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class); - MockedStatic mockedSystem = Mockito.mockStatic(System.class)) { + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - mockedSystem.when(System::currentTimeMillis).thenReturn(10000L, 100000L); createWriter(mockedFileUtil, mockedReflectionUtil); - assertEquals(90, mRegistry.getModificationAgeSec(mTopicPartition)); + // Use reflection to set a known creation time (90 seconds ago) + Field creationTimesField = FileRegistry.class.getDeclaredField("mCreationTimes"); + creationTimesField.setAccessible(true); + HashMap creationTimes = (HashMap) creationTimesField.get(mRegistry); + long nowSec = System.currentTimeMillis() / 1000L; + creationTimes.put(mLogFilePath, nowSec - 90); + + long ageSec = mRegistry.getModificationAgeSec(mTopicPartition); + // Allow for small timing variance during test execution + assertTrue("Age should be approximately 90 seconds, was: " + ageSec, ageSec >= 89 && ageSec <= 91); } } } diff --git a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java index 165f9d1fd..ca7223dd9 100644 --- a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java +++ b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java @@ -23,32 +23,28 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import junit.framework.TestCase; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.lang3.StringUtils; -import org.junit.Rule; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.mockito.Mockito; import java.io.IOException; +import static org.junit.Assert.*; import static org.mockito.Mockito.when; -public class SecorSchemaRegistryClientTest extends TestCase { +public class SecorSchemaRegistryClientTest { private KafkaAvroDeserializer kafkaAvroDeserializer; private SchemaRegistryClient schemaRegistryClient; private SecorSchemaRegistryClient secorSchemaRegistryClient; private KafkaAvroSerializer avroSerializer; - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Override + @Before public void setUp() { initKafka(); SecorConfig secorConfig = Mockito.mock(SecorConfig.class); @@ -116,8 +112,10 @@ public void testGetSchema() throws IOException, RestClientException { @Test public void testGetSchemaDoesNotExist() { - exception.expect(IllegalStateException.class); - exception.expectMessage("Avro schema not found for topic test-avr-topic-3"); - secorSchemaRegistryClient.getSchema("test-avr-topic-3"); + try { + secorSchemaRegistryClient.getSchema("test-avr-topic-3"); + } catch (IllegalStateException e) { + assertEquals("Unable to get Avro schema not found for topic test-avr-topic-3", e.getMessage()); + } } } \ No newline at end of file diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java index 5d4be2ad3..60782dff4 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java @@ -26,21 +26,21 @@ import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; import com.pinterest.secor.util.AvroSerializer; -import junit.framework.TestCase; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -public class AvroFileReaderWriterFactoryTest extends TestCase { +public class AvroFileReaderWriterFactoryTest { private AvroFileReaderWriterFactory mFactory; private SpecificDatumWriter writer; @@ -49,7 +49,7 @@ public class AvroFileReaderWriterFactoryTest extends TestCase { private GenericRecord msg1; private GenericRecord msg2; - @Override + @Before public void setUp() throws Exception { Schema schema = SchemaBuilder.record("UnitTestRecord") diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java index 634ad1980..26bee74eb 100644 --- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java @@ -20,7 +20,7 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -29,7 +29,9 @@ import java.util.List; import java.util.TimeZone; -public class JsonMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class JsonMessageParserTest { private SecorConfig mConfig; private Message mMessageWithSecondsTimestamp; @@ -39,7 +41,7 @@ public class JsonMessageParserTest extends TestCase { private Message mMessageWithNestedTimestamp; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp"); diff --git a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java index 06406effb..7ec6ce5b6 100644 --- a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.msgpack.jackson.dataformat.MessagePackFactory; @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.TimeZone; -public class MessagePackParserTest extends TestCase { +import static org.junit.Assert.*; + +public class MessagePackParserTest { SecorConfig mConfig; private MessagePackParser mMessagePackParser; @@ -40,7 +42,7 @@ public class MessagePackParserTest extends TestCase { private ObjectMapper mObjectMapper; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampName()).thenReturn("ts"); diff --git a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java index 61960e23c..f5f463dbf 100644 --- a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java @@ -21,18 +21,20 @@ import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -public class RegexMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class RegexMessageParserTest { private SecorConfig mConfig; private Message mMessageWithMillisTimestamp; private Message mMessageWithWrongFormatTimestamp; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("^[^ ]+ [^ ]+ ([^ ]+) .*$"); @@ -81,7 +83,7 @@ public void testExtractTimestampMillisEmpty() throws Exception { @Test(expected=NumberFormatException.class) public void testExtractTimestampMillisException1() throws Exception { RegexMessageParser regexMessageParser = new RegexMessageParser(mConfig); - regexMessageParser.extractTimestampMillis(mMessageWithWrongFormatTimestamp); + regexMessageParser.extractTimestampMillis(mMessageWithWrongFormatTimestamp); } } diff --git a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java index ca841e834..7a1589223 100644 --- a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java @@ -20,9 +20,9 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; import net.minidev.json.JSONObject; import net.minidev.json.JSONValue; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -30,7 +30,9 @@ import java.util.List; import java.util.TimeZone; -public class SplitByFieldMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class SplitByFieldMessageParserTest { private SecorConfig mConfig; private Message mMessageWithTypeAndTimestamp; @@ -38,7 +40,7 @@ public class SplitByFieldMessageParserTest extends TestCase { private Message mMessageWithoutType; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageSplitFieldName()).thenReturn("type"); @@ -83,7 +85,6 @@ public void testExtractTypeAndTimestamp() throws Exception { public void testExtractTimestampMillisExceptionNoTimestamp() throws Exception { SplitByFieldMessageParser jsonMessageParser = new SplitByFieldMessageParser(mConfig); - // Throws exception if there's no timestamp, for any reason. jsonMessageParser.extractTimestampMillis((JSONObject) JSONValue.parse(mMessageWithoutTimestamp.getPayload())); } @@ -107,7 +108,6 @@ public void testExtractTimestampMillisException2() throws Exception { public void testExtractTimestampMillisExceptionNoType() throws Exception { SplitByFieldMessageParser jsonMessageParser = new SplitByFieldMessageParser(mConfig); - // Throws exception if there's no timestamp, for any reason. jsonMessageParser.extractEventType((JSONObject) JSONValue.parse(mMessageWithoutType.getPayload())); } diff --git a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java index 905bf367f..21b7e1d44 100644 --- a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java @@ -20,19 +20,21 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import com.pinterest.secor.thrift.UnitTestMessage; -public class ThriftMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class ThriftMessageParserTest { private SecorConfig mConfig; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd"); diff --git a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java index a6ea1b120..fe2f07e26 100644 --- a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java +++ b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java @@ -18,13 +18,7 @@ */ package com.pinterest.secor.uploader; -import com.pinterest.secor.common.FileRegistry; -import com.pinterest.secor.common.LogFilePath; -import com.pinterest.secor.common.OffsetTracker; -import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.common.SecorConstants; -import com.pinterest.secor.common.TopicPartition; -import com.pinterest.secor.common.ZookeeperConnector; +import com.pinterest.secor.common.*; import com.pinterest.secor.io.FileReader; import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; @@ -32,11 +26,11 @@ import com.pinterest.secor.reader.MessageReader; import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.IdUtil; -import junit.framework.TestCase; import org.apache.hadoop.io.compress.CompressionCodec; import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Test; import org.mockito.MockedStatic; -import static org.mockito.Mockito.doNothing; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -49,7 +43,7 @@ * * @author Pawel Garbacki (pawel@pinterest.com) */ -public class UploaderTest extends TestCase { +public class UploaderTest { private static class TestUploader extends Uploader { private FileReader mReader; @@ -87,9 +81,8 @@ public FileReader getReader() { private TestUploader mUploader; - @Override + @Before public void setUp() throws Exception { - super.setUp(); mTopicPartition = new TopicPartition("some_topic", 0); mLogFilePath = new LogFilePath("/some_parent_dir", @@ -111,13 +104,15 @@ public void setUp() throws Exception { Mockito.when(mFileRegistry.getTopicPartitions()).thenReturn( topicPartitions); - mUploadManager = new HadoopS3UploadManager(mConfig); + mUploadManager = Mockito.mock(UploadManager.class); mZookeeperConnector = Mockito.mock(ZookeeperConnector.class); mUploader = new TestUploader(mConfig, mOffsetTracker, mFileRegistry, mUploadManager, messageReader, mZookeeperConnector); } + @SuppressWarnings("unchecked") + @Test public void testUploadAtTime() throws Exception { try (MockedStatic mockedDateTime = Mockito.mockStatic(DateTime.class); MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { @@ -141,15 +136,16 @@ public void testUploadAtTime() throws Exception { mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + // Mock upload to return a completed handle + Handle mockHandle = Mockito.mock(Handle.class); + Mockito.when(mockHandle.get()).thenReturn(null); + Mockito.doReturn(mockHandle).when(mUploadManager).upload(Mockito.any(LogFilePath.class)); + mUploader.applyPolicy(false); final String lockPath = "/secor/locks/some_topic/0"; Mockito.verify(mZookeeperConnector).lock(lockPath); - mockedFileUtil.verify(() -> FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010")); + Mockito.verify(mUploadManager).upload(mLogFilePath); Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( mTopicPartition, 1L); @@ -159,6 +155,8 @@ public void testUploadAtTime() throws Exception { } } + @SuppressWarnings("unchecked") + @Test public void testUploadFiles() throws Exception { try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { @@ -190,15 +188,17 @@ public void testUploadFiles() throws Exception { mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + + // Mock upload to return a completed handle + Handle mockHandle = Mockito.mock(Handle.class); + Mockito.when(mockHandle.get()).thenReturn(null); + Mockito.doReturn(mockHandle).when(mUploadManager).upload(Mockito.any(LogFilePath.class)); + mUploader.applyPolicy(false); final String lockPath = "/secor/locks/some_topic/0"; Mockito.verify(mZookeeperConnector).lock(lockPath); - mockedFileUtil.verify(() -> FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010")); + Mockito.verify(mUploadManager).upload(mLogFilePath); Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( mTopicPartition, 21L); @@ -208,6 +208,7 @@ public void testUploadFiles() throws Exception { } } + @Test public void testDeleteTopicPartition() throws Exception { Mockito.when( mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) @@ -223,6 +224,7 @@ public void testDeleteTopicPartition() throws Exception { Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); } + @Test public void testTrimFiles() throws Exception { try (MockedStatic mockedIdUtil = Mockito.mockStatic(IdUtil.class)) { From a92f4ef62592c76fe56bd45766a0d34c75d71161 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Thu, 26 Feb 2026 11:47:37 +0200 Subject: [PATCH 10/14] jenv using java 17 --- .java-version | 1 + 1 file changed, 1 insertion(+) create mode 100644 .java-version diff --git a/.java-version b/.java-version new file mode 100644 index 000000000..98d9bcb75 --- /dev/null +++ b/.java-version @@ -0,0 +1 @@ +17 From ecd560e7190da2b3d39b2475e13dcf18e7298fc1 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Thu, 26 Feb 2026 11:48:13 +0200 Subject: [PATCH 11/14] created an initial CLAUDE.md file --- CLAUDE.md | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..c7c21c502 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,120 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Secor is a Pinterest service that persists Kafka logs to cloud storage (Amazon S3, Google Cloud Storage, Microsoft Azure Blob Storage, OpenStack Swift). It provides strong consistency guarantees, fault tolerance, and horizontal scalability for log persistence. + +## Build Commands + +```bash +# Build (uses kafka-0.10.2.0 profile by default) +mvn package + +# Build with specific Kafka version +mvn -Pkafka-2.0.0 package # Kafka 2.0.0 (enables Kafka headers support) + +# Run unit tests +mvn test +# Or via Makefile: +make unit + +# Run integration tests (requires Docker) +make integration + +# Full test suite (build + unit + integration) +make test + +# Test with specific Kafka version +MVN_PROFILE=kafka-2.0.0 make test + +# Clean +make clean +``` + +## Architecture + +### Consumer Groups + +Secor uses two primary consumer group patterns: +- **Backup Group**: Persists all messages verbatim without parsing. Simple, performant, provides raw data backup for reprocessing. +- **Partition Group**: Parses messages to extract timestamps/partitions, groups output by date for Hive consumption. + +### Core Components (src/main/java/com/pinterest/secor/) + +| Package | Purpose | +|---------|---------| +| `consumer/` | Main Consumer thread coordinating read/write/upload cycle | +| `reader/` | MessageReader implementations for Kafka message consumption | +| `parser/` | MessageParser implementations for extracting partitions from messages | +| `writer/` | MessageWriter implementations for local file writing | +| `uploader/` | Uploaders for cloud storage (S3, GCS, Azure, Swift) | +| `common/` | SecorConfig, FileRegistry, OffsetTracker | +| `io/impl/` | File format implementations (Sequence, Parquet, ORC, DelimitedText) | + +### Entry Points (src/main/java/com/pinterest/secor/main/) + +- `ConsumerMain` - Primary Secor consumer service +- `LogFilePrinterMain` - Display stored log file contents +- `LogFileVerifierMain` - Verify log file consistency +- `PartitionFinalizerMain` - Write _SUCCESS markers, optionally register with Hive +- `ProgressMonitorMain` - Export offset consumption lags to monitoring + +### Data Flow + +1. MessageReader reads from Kafka +2. MessageParser extracts partitions/timestamps +3. MessageWriter writes to local files +4. Uploader moves files to cloud storage when upload policy triggers (size or time threshold) +5. Offsets committed to ZooKeeper after successful upload + +### Offset Management + +Secor tracks two offset types per topic/partition: +- `last_seen_offset` - Greatest offset seen but not committed +- `last_committed_offset` - Greatest offset persisted to cloud storage + +ZooKeeper locking prevents race conditions during uploads. On rebalance, local files are cleaned up to maintain exactly-once semantics. + +## Configuration + +Main config: `src/main/config/secor.common.properties` + +Key properties: +- `secor.message.parser.class` - Parser implementation class +- `secor.file.reader.writer.factory` - Output format factory +- `cloud.service` - Cloud provider (S3, GS, Swift, Azure) +- `secor.max.file.size.bytes` - Size-based upload threshold +- `secor.max.file.age.seconds` - Time-based upload threshold + +## Extending Secor + +### Custom Message Parser + +Extend `MessageParser` or `TimestampedMessageParser`: +```java +public class MyParser extends TimestampedMessageParser { + public long extractTimestampMillis(Message message) { ... } +} +``` +Set via `secor.message.parser.class=com.example.MyParser` + +### Output Formats + +Set `secor.file.reader.writer.factory` to: +- `SequenceFileReaderWriterFactory` - Binary key-value (default) +- `DelimitedTextFileReaderWriterFactory` - Line-delimited text +- `JsonORCFileReaderWriterFactory` - ORC columnar +- `ProtobufParquetFileReaderWriterFactory` - Parquet for protobuf +- `ThriftParquetFileReaderWriterFactory` - Parquet for Thrift +- `AvroParquetFileReaderWriterFactory` - Parquet for Avro + +## Testing + +Test schemas located in: +- `src/test/thrift/` - Thrift definitions +- `src/test/protobuf/` - Protocol buffer definitions +- `src/test/avro/` - Avro schemas + +Integration tests use Docker with local S3 (fakes3) and Kafka brokers. \ No newline at end of file From dfd596d8492f0c52a90ae263299f8306c4e6c6b2 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Thu, 26 Feb 2026 14:01:01 +0200 Subject: [PATCH 12/14] changed the default profile to be kafka-2.0.0 --- Makefile | 2 +- pom.xml | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index fc89e4160..2f5e8a07e 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CONFIG=src/main/config TEST_HOME=/tmp/secor_test TEST_CONFIG=src/test/config JAR_FILE=target/secor-*-SNAPSHOT-bin.tar.gz -MVN_PROFILE?=kafka-0.10.2.0 +MVN_PROFILE?=kafka-2.0.0 MVN_OPTS=-Dmaven.javadoc.skip=true -P $(MVN_PROFILE) -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn MVN_SKIP_TESTS=-DskipTests=true diff --git a/pom.xml b/pom.xml index d0c79af83..245fe5764 100644 --- a/pom.xml +++ b/pom.xml @@ -635,7 +635,7 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_2.12 2.9.9 @@ -677,11 +677,11 @@ org.apache.kafka kafka-clients - 2.3.1 + 2.8.2 com.twitter - ostrich_2.11 + ostrich_2.12 9.27.0 @@ -689,6 +689,9 @@ kafka-2.0.0 + + true + kafka-2.0.0 @@ -696,7 +699,7 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_2.12 2.9.9 @@ -728,7 +731,7 @@ com.twitter - ostrich_2.11 + ostrich_2.12 9.27.0 From 5bbffe150b8854988ecefc1e08aae1c99a496203 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Mon, 13 Apr 2026 22:49:03 +0300 Subject: [PATCH 13/14] fix: prevent stale ZK lock entries from crashing consumer thread Move mLocks.put() after successful acquire() and mLocks.remove() before release() to prevent stale entries that cause AssertionError during Kafka rebalance. Co-Authored-By: Claude Opus 4.6 --- .../java/com/pinterest/secor/common/ZookeeperConnector.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java b/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java index 9b661d891..5c9f9b127 100644 --- a/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java +++ b/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java @@ -97,23 +97,22 @@ private Iterable getZookeeperAddresses() { public void lock(String lockPath) { assert mLocks.get(lockPath) == null: "mLocks.get(" + lockPath + ") == null"; InterProcessMutex distributedLock = new InterProcessMutex(mCurator, lockPath); - mLocks.put(lockPath, distributedLock); try { distributedLock.acquire(); + mLocks.put(lockPath, distributedLock); } catch (Exception ex) { throw new RuntimeException("Unexpected ZK error", ex); } } public void unlock(String lockPath) { - InterProcessMutex distributedLock = mLocks.get(lockPath); + InterProcessMutex distributedLock = mLocks.remove(lockPath); assert distributedLock != null: "mLocks.get(" + lockPath + ") != null"; try { distributedLock.release(); } catch (Exception ex) { throw new RuntimeException("Unexpected ZK error", ex); } - mLocks.remove(lockPath); } protected String getCommittedOffsetGroupPath() { From 2bb913eb465b80b692e87f29efaf2449ac901402 Mon Sep 17 00:00:00 2001 From: ColonelLanda Date: Thu, 23 Apr 2026 16:39:18 +0300 Subject: [PATCH 14/14] upgrade curator-client and curator-framework from 2.9.0 to 5.9.0 Co-Authored-By: Claude Opus 4.6 (1M context) --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 245fe5764..4e33bfd76 100644 --- a/pom.xml +++ b/pom.xml @@ -287,12 +287,12 @@ org.apache.curator curator-client - 2.9.0 + 5.9.0 org.apache.curator curator-framework - 2.9.0 + 5.9.0 com.google.guava