diff --git a/.java-version b/.java-version new file mode 100644 index 000000000..98d9bcb75 --- /dev/null +++ b/.java-version @@ -0,0 +1 @@ +17 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..c7c21c502 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,120 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Secor is a Pinterest service that persists Kafka logs to cloud storage (Amazon S3, Google Cloud Storage, Microsoft Azure Blob Storage, OpenStack Swift). It provides strong consistency guarantees, fault tolerance, and horizontal scalability for log persistence. + +## Build Commands + +```bash +# Build (uses kafka-0.10.2.0 profile by default) +mvn package + +# Build with specific Kafka version +mvn -Pkafka-2.0.0 package # Kafka 2.0.0 (enables Kafka headers support) + +# Run unit tests +mvn test +# Or via Makefile: +make unit + +# Run integration tests (requires Docker) +make integration + +# Full test suite (build + unit + integration) +make test + +# Test with specific Kafka version +MVN_PROFILE=kafka-2.0.0 make test + +# Clean +make clean +``` + +## Architecture + +### Consumer Groups + +Secor uses two primary consumer group patterns: +- **Backup Group**: Persists all messages verbatim without parsing. Simple, performant, provides raw data backup for reprocessing. +- **Partition Group**: Parses messages to extract timestamps/partitions, groups output by date for Hive consumption. + +### Core Components (src/main/java/com/pinterest/secor/) + +| Package | Purpose | +|---------|---------| +| `consumer/` | Main Consumer thread coordinating read/write/upload cycle | +| `reader/` | MessageReader implementations for Kafka message consumption | +| `parser/` | MessageParser implementations for extracting partitions from messages | +| `writer/` | MessageWriter implementations for local file writing | +| `uploader/` | Uploaders for cloud storage (S3, GCS, Azure, Swift) | +| `common/` | SecorConfig, FileRegistry, OffsetTracker | +| `io/impl/` | File format implementations (Sequence, Parquet, ORC, DelimitedText) | + +### Entry Points (src/main/java/com/pinterest/secor/main/) + +- `ConsumerMain` - Primary Secor consumer service +- `LogFilePrinterMain` - Display stored log file contents +- `LogFileVerifierMain` - Verify log file consistency +- `PartitionFinalizerMain` - Write _SUCCESS markers, optionally register with Hive +- `ProgressMonitorMain` - Export offset consumption lags to monitoring + +### Data Flow + +1. MessageReader reads from Kafka +2. MessageParser extracts partitions/timestamps +3. MessageWriter writes to local files +4. Uploader moves files to cloud storage when upload policy triggers (size or time threshold) +5. Offsets committed to ZooKeeper after successful upload + +### Offset Management + +Secor tracks two offset types per topic/partition: +- `last_seen_offset` - Greatest offset seen but not committed +- `last_committed_offset` - Greatest offset persisted to cloud storage + +ZooKeeper locking prevents race conditions during uploads. On rebalance, local files are cleaned up to maintain exactly-once semantics. + +## Configuration + +Main config: `src/main/config/secor.common.properties` + +Key properties: +- `secor.message.parser.class` - Parser implementation class +- `secor.file.reader.writer.factory` - Output format factory +- `cloud.service` - Cloud provider (S3, GS, Swift, Azure) +- `secor.max.file.size.bytes` - Size-based upload threshold +- `secor.max.file.age.seconds` - Time-based upload threshold + +## Extending Secor + +### Custom Message Parser + +Extend `MessageParser` or `TimestampedMessageParser`: +```java +public class MyParser extends TimestampedMessageParser { + public long extractTimestampMillis(Message message) { ... } +} +``` +Set via `secor.message.parser.class=com.example.MyParser` + +### Output Formats + +Set `secor.file.reader.writer.factory` to: +- `SequenceFileReaderWriterFactory` - Binary key-value (default) +- `DelimitedTextFileReaderWriterFactory` - Line-delimited text +- `JsonORCFileReaderWriterFactory` - ORC columnar +- `ProtobufParquetFileReaderWriterFactory` - Parquet for protobuf +- `ThriftParquetFileReaderWriterFactory` - Parquet for Thrift +- `AvroParquetFileReaderWriterFactory` - Parquet for Avro + +## Testing + +Test schemas located in: +- `src/test/thrift/` - Thrift definitions +- `src/test/protobuf/` - Protocol buffer definitions +- `src/test/avro/` - Avro schemas + +Integration tests use Docker with local S3 (fakes3) and Kafka brokers. \ No newline at end of file diff --git a/Makefile b/Makefile index fc89e4160..2f5e8a07e 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CONFIG=src/main/config TEST_HOME=/tmp/secor_test TEST_CONFIG=src/test/config JAR_FILE=target/secor-*-SNAPSHOT-bin.tar.gz -MVN_PROFILE?=kafka-0.10.2.0 +MVN_PROFILE?=kafka-2.0.0 MVN_OPTS=-Dmaven.javadoc.skip=true -P $(MVN_PROFILE) -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn MVN_SKIP_TESTS=-DskipTests=true diff --git a/pom.xml b/pom.xml index 7cd31b12d..4e33bfd76 100644 --- a/pom.xml +++ b/pom.xml @@ -47,11 +47,13 @@ - 1.6 - 1.6 + 17 + 17 UTF-8 UTF-8 - 1.9.0 + 1.17.0 + 3.3.6 + 4.29.3 kafka-legacy @@ -89,37 +91,18 @@ - - Twitter public Maven repo - https://maven.twttr.com - - - Typesafe public Maven repo - https://repo.typesafe.com/typesafe/releases - - - confluent - https://packages.confluent.io/maven/ - central Maven Repository Switchboard default https://repo1.maven.org/maven2/ + + confluent + https://packages.confluent.io/maven/ + - - - Twitter public Maven repo - https://maven.twttr.com - - - Typesafe public Maven repo - https://repo.typesafe.com/typesafe/releases - - - io.confluent @@ -134,17 +117,17 @@ org.apache.parquet parquet-avro - 1.9.0 + ${parquet.version} com.google.protobuf protobuf-java - 3.11.1 + ${protobuf.version} com.google.protobuf protobuf-java-util - 3.11.1 + ${protobuf.version} com.amazonaws @@ -204,22 +187,22 @@ org.apache.hadoop hadoop-common - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-hdfs-client - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core - 2.9.2 + ${hadoop.version} org.apache.hadoop hadoop-aws - 2.9.2 + ${hadoop.version} net.java.dev.jets3t @@ -250,7 +233,7 @@ org.apache.hadoop hadoop-openstack - 2.9.2 + ${hadoop.version} @@ -304,12 +287,12 @@ org.apache.curator curator-client - 2.9.0 + 5.9.0 org.apache.curator curator-framework - 2.9.0 + 5.9.0 com.google.guava @@ -324,24 +307,19 @@ junit junit - 4.11 + 4.13.2 test org.mockito - mockito-all - 1.9.5 - - - org.powermock - powermock-module-junit4 - 1.5.2 + mockito-core + 5.11.0 test - org.powermock - powermock-api-mockito - 1.5.2 + org.mockito + mockito-inline + 5.2.0 test @@ -377,11 +355,23 @@ org.apache.orc orc-core - 1.6.2 + 2.2.2 + + + org.apache.hive + hive-storage-api + 2.8.1 + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + src/main/config @@ -475,32 +465,23 @@ - com.github.os72 - protoc-jar-maven-plugin - 3.11.1 + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + protobuf-test-sources generate-test-sources - run + test-compile - 3.1.0 - - src/test/protobuf - - - src/test/protobuf - - true - - - java - none - target/generated-test-sources/protobuf/gen-java - - + src/test/protobuf + target/generated-test-sources/protobuf/gen-java + false @@ -592,9 +573,10 @@ org.apache.maven.plugins maven-compiler-plugin + 3.11.0 - 1.8 - 1.8 + 17 + 17 @@ -625,9 +607,19 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.2.5 false + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.lang.reflect=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + --add-opens java.base/java.io=ALL-UNNAMED + --add-opens java.base/java.text=ALL-UNNAMED + --add-opens java.base/java.net=ALL-UNNAMED + --add-opens java.base/sun.nio.ch=ALL-UNNAMED + --add-opens java.base/sun.security.ssl=ALL-UNNAMED + @@ -636,168 +628,70 @@ release - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - org.apache.maven.plugins - maven-source-plugin - 3.2.1 - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 3.1.0 - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - verify - - sign - - - - - - + + kafka-2.0.0 + com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 + jackson-module-scala_2.12 + 2.9.9 - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-0.8.2.1 org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - + maven-gpg-plugin + 3.0.1 + + + sign-artifacts + verify + + sign + + + - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-1.0.0 - - org.apache.maven.plugins maven-compiler-plugin 3.8.1 - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java + com/pinterest/secor/timestamp/* + com/pinterest/secor/reader/LegacyKafkaMessageIterator.java + com/pinterest/secor/common/LegacyKafkaClient.java + + com/pinterest/secor/timestamp/* + - - - - com.fasterxml.jackson.module - jackson-module-scala_2.11 - 2.9.9 - - - org.apache.kafka - kafka_2.11 - 1.0.0 + kafka-clients + 2.8.2 com.twitter - ostrich_2.11 + ostrich_2.12 9.27.0 - + + kafka-2.0.0 + + true + kafka-2.0.0 @@ -805,7 +699,7 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_2.12 2.9.9 @@ -837,103 +731,10 @@ com.twitter - ostrich_2.11 + ostrich_2.12 9.27.0 - - kafka-0.10.2.0 - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - - - true - - - - - com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 - - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - - - kafka-0.10.0.1 - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - - com/pinterest/secor/common/SecorKafkaClient.java - com/pinterest/secor/reader/SecorKafkaMessageIterator.java - - - - - - - - - com.fasterxml.jackson.module - jackson-module-scala_2.10 - 2.10.2 - - - - - - org.apache.kafka - kafka_2.10 - 0.10.2.2 - - - org.slf4j - slf4j-simple - - - - - com.twitter - ostrich_2.10 - 9.18.0 - - - diff --git a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java b/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java deleted file mode 100644 index 01d91741b..000000000 --- a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.common; - -import com.google.common.net.HostAndPort; -import com.pinterest.secor.message.Message; -import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; -import org.apache.kafka.common.protocol.Errors; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Kafka client encapsulates the logic interacting with Kafka brokers. - * - * @author Pawel Garbacki (pawel@pinterest.com) - */ -@Deprecated -public class LegacyKafkaClient implements KafkaClient { - private static final Logger LOG = LoggerFactory.getLogger(LegacyKafkaClient.class); - - private SecorConfig mConfig; - private ZookeeperConnector mZookeeperConnector; - private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory; - - public LegacyKafkaClient() { - } - - public class MessageDoesNotExistException extends RuntimeException {} - - private HostAndPort findLeader(TopicPartition topicPartition) { - SimpleConsumer consumer = null; - try { - LOG.debug("looking up leader for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - consumer = createConsumer( - mConfig.getKafkaSeedBrokerHost(), - mConfig.getKafkaSeedBrokerPort(), - "leaderLookup"); - List topics = new ArrayList(); - topics.add(topicPartition.getTopic()); - TopicMetadataRequest request = new TopicMetadataRequest(topics); - TopicMetadataResponse response = consumer.send(request); - - List metaData = response.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == topicPartition.getPartition()) { - return HostAndPort.fromParts(part.leader().host(), part.leader().port()); - } - } - } - } finally { - if (consumer != null) { - consumer.close(); - } - } - return null; - } - - private static String getClientName(TopicPartition topicPartition) { - return "secorClient_" + topicPartition.getTopic() + "_" + topicPartition.getPartition(); - } - - private long findLastOffset(TopicPartition topicPartition, SimpleConsumer consumer) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.getTopic(), - topicPartition.getPartition()); - Map requestInfo = - new HashMap(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( - kafka.api.OffsetRequest.LatestTime(), 1)); - final String clientName = getClientName(topicPartition); - OffsetRequest request = new OffsetRequest(requestInfo, - kafka.api.OffsetRequest.CurrentVersion(), - clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - throw new RuntimeException("Error fetching offset data. Reason: " + - response.errorCode(topicPartition.getTopic(), topicPartition.getPartition())); - } - long[] offsets = response.offsets(topicPartition.getTopic(), - topicPartition.getPartition()); - return offsets[0] - 1; - } - - private Message getMessage(TopicPartition topicPartition, long offset, - SimpleConsumer consumer) { - LOG.debug("fetching message topic {} partition {} offset {}", - topicPartition.getTopic(), topicPartition.getPartition(), offset); - final int MAX_MESSAGE_SIZE_BYTES = mConfig.getMaxMessageSizeBytes(); - final String clientName = getClientName(topicPartition); - kafka.api.FetchRequest request = new FetchRequestBuilder().clientId(clientName) - .addFetch(topicPartition.getTopic(), topicPartition.getPartition(), offset, - MAX_MESSAGE_SIZE_BYTES) - .build(); - FetchResponse response = consumer.fetch(request); - if (response.hasError()) { - consumer.close(); - int errorCode = response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()); - - if (errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { - throw new MessageDoesNotExistException(); - } else { - throw new RuntimeException("Error fetching offset data. Reason: " + errorCode); - } - } - MessageAndOffset messageAndOffset = response.messageSet( - topicPartition.getTopic(), topicPartition.getPartition()).iterator().next(); - byte[] keyBytes = null; - if (messageAndOffset.message().hasKey()) { - ByteBuffer key = messageAndOffset.message().key(); - keyBytes = new byte[key.limit()]; - key.get(keyBytes); - } - byte[] payloadBytes = null; - if (!messageAndOffset.message().isNull()) { - ByteBuffer payload = messageAndOffset.message().payload(); - payloadBytes = new byte[payload.limit()]; - payload.get(payloadBytes); - } - long timestamp = (mConfig.useKafkaTimestamp()) - ? mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(messageAndOffset) - : 0l; - - return new Message(topicPartition.getTopic(), topicPartition.getPartition(), - messageAndOffset.offset(), keyBytes, payloadBytes, timestamp, null); - } - - private SimpleConsumer createConsumer(String host, int port, String clientName) { - return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName); - } - - private SimpleConsumer createConsumer(TopicPartition topicPartition) { - HostAndPort leader = findLeader(topicPartition); - if (leader == null) { - LOG.warn("no leader for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return null; - } - LOG.debug("leader for topic {} partition {} is {}", topicPartition.getTopic(), topicPartition.getPartition(), leader); - final String clientName = getClientName(topicPartition); - return createConsumer(leader.getHostText(), leader.getPort(), clientName); - } - - @Override - public int getNumPartitions(String topic) { - SimpleConsumer consumer = null; - try { - consumer = createConsumer( - mConfig.getKafkaSeedBrokerHost(), - mConfig.getKafkaSeedBrokerPort(), - "partitionLookup"); - List topics = new ArrayList(); - topics.add(topic); - TopicMetadataRequest request = new TopicMetadataRequest(topics); - TopicMetadataResponse response = consumer.send(request); - if (response.topicsMetadata().size() != 1) { - throw new RuntimeException("Expected one metadata for topic " + topic + " found " + - response.topicsMetadata().size()); - } - TopicMetadata topicMetadata = response.topicsMetadata().get(0); - return topicMetadata.partitionsMetadata().size(); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public Message getLastMessage(TopicPartition topicPartition) throws TException { - SimpleConsumer consumer = null; - try { - consumer = createConsumer(topicPartition); - if (consumer == null) { - return null; - } - long lastOffset = findLastOffset(topicPartition, consumer); - if (lastOffset < 1) { - return null; - } - return getMessage(topicPartition, lastOffset, consumer); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public Message getCommittedMessage(TopicPartition topicPartition) throws Exception { - SimpleConsumer consumer = null; - try { - long committedOffset = mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1; - if (committedOffset < 0) { - return null; - } - consumer = createConsumer(topicPartition); - if (consumer == null) { - return null; - } - return getMessage(topicPartition, committedOffset, consumer); - } catch (MessageDoesNotExistException e) { - // If a MessageDoesNotExistException exception is raised, - // the message at the last committed offset does not exist in Kafka. - // This is usually due to the message being compacted away by the - // Kafka log compaction process. - // - // That is no an exceptional situation - in fact it can be normal if - // the topic being consumed by Secor has a low volume. So in that - // case, simply return null - LOG.warn("no committed message for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition()); - return null; - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - - @Override - public void init(SecorConfig config) { - mConfig = config; - mZookeeperConnector = new ZookeeperConnector(mConfig); - mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); - } -} diff --git a/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java b/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java index 9b661d891..5c9f9b127 100644 --- a/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java +++ b/src/main/java/com/pinterest/secor/common/ZookeeperConnector.java @@ -97,23 +97,22 @@ private Iterable getZookeeperAddresses() { public void lock(String lockPath) { assert mLocks.get(lockPath) == null: "mLocks.get(" + lockPath + ") == null"; InterProcessMutex distributedLock = new InterProcessMutex(mCurator, lockPath); - mLocks.put(lockPath, distributedLock); try { distributedLock.acquire(); + mLocks.put(lockPath, distributedLock); } catch (Exception ex) { throw new RuntimeException("Unexpected ZK error", ex); } } public void unlock(String lockPath) { - InterProcessMutex distributedLock = mLocks.get(lockPath); + InterProcessMutex distributedLock = mLocks.remove(lockPath); assert distributedLock != null: "mLocks.get(" + lockPath + ") != null"; try { distributedLock.release(); } catch (Exception ex) { throw new RuntimeException("Unexpected ZK error", ex); } - mLocks.remove(lockPath); } protected String getCommittedOffsetGroupPath() { diff --git a/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java index e7b938d70..4ada046f1 100644 --- a/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactory.java @@ -82,7 +82,7 @@ protected class AvroFileReader implements FileReader { public AvroFileReader(LogFilePath logFilePath, CompressionCodec codec) throws IOException { file = new File(logFilePath.getLogFilePath()); file.getParentFile().mkdirs(); - String topic = logFilePath.getTopic(); + topic = logFilePath.getTopic(); Schema schema = schemaRegistry.getSchema(topic); DatumReader datumReader = new SpecificDatumReader(schema); diff --git a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java deleted file mode 100644 index cf9eac67e..000000000 --- a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.reader; - -import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.common.TopicPartition; -import com.pinterest.secor.message.Message; -import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory; -import com.pinterest.secor.util.IdUtil; -import kafka.consumer.Blacklist; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.consumer.TopicFilter; -import kafka.consumer.Whitelist; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; -import java.util.List; -import java.util.Properties; - -public class LegacyKafkaMessageIterator implements KafkaMessageIterator { - private static final Logger LOG = LoggerFactory.getLogger(MessageReader.class); - private SecorConfig mConfig; - private ConsumerConnector mConsumerConnector; - private ConsumerIterator mIterator; - private KafkaMessageTimestampFactory mKafkaMessageTimestampFactory; - - public LegacyKafkaMessageIterator() { - } - - @Override - public boolean hasNext() { - try { - return mIterator.hasNext(); - } catch (ConsumerTimeoutException e) { - throw new LegacyConsumerTimeoutException(e); - } - } - - @Override - public Message next() { - MessageAndMetadata kafkaMessage; - try { - kafkaMessage = mIterator.next(); - } catch (ConsumerTimeoutException e) { - throw new LegacyConsumerTimeoutException(e); - } - - long timestamp = 0L; - if (mConfig.useKafkaTimestamp()) { - timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage); - } - - return new Message(kafkaMessage.topic(), kafkaMessage.partition(), - kafkaMessage.offset(), kafkaMessage.key(), - kafkaMessage.message(), timestamp, null); - } - - @Override - public void init(SecorConfig config) throws UnknownHostException { - this.mConfig = config; - - mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); - - if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { - throw new RuntimeException("Topic filter and blacklist cannot be both specified."); - } - TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(mConfig.getKafkaTopicBlacklist()) : - new Whitelist(mConfig.getKafkaTopicFilter()); - LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); - List> streams = - mConsumerConnector.createMessageStreamsByFilter(topicFilter); - KafkaStream stream = streams.get(0); - mIterator = stream.iterator(); - mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); - } - - @Override - public void commit(TopicPartition topicPartition, long offset) { - mConsumerConnector.commitOffsets(); - } - - private ConsumerConfig createConsumerConfig() throws UnknownHostException { - Properties props = new Properties(); - props.put("zookeeper.connect", mConfig.getZookeeperQuorum() + mConfig.getKafkaZookeeperPath()); - props.put("group.id", mConfig.getKafkaGroup()); - - props.put("zookeeper.session.timeout.ms", - Integer.toString(mConfig.getZookeeperSessionTimeoutMs())); - props.put("zookeeper.sync.time.ms", Integer.toString(mConfig.getZookeeperSyncTimeMs())); - props.put("auto.commit.enable", "false"); - props.put("auto.offset.reset", mConfig.getConsumerAutoOffsetReset()); - props.put("consumer.timeout.ms", Integer.toString(mConfig.getConsumerTimeoutMs())); - props.put("consumer.id", IdUtil.getConsumerId()); - // Properties required to upgrade from kafka 0.8.x to 0.9.x - props.put("dual.commit.enabled", mConfig.getDualCommitEnabled()); - props.put("offsets.storage", mConfig.getOffsetsStorage()); - - props.put("partition.assignment.strategy", mConfig.getPartitionAssignmentStrategy()); - if (mConfig.getRebalanceMaxRetries() != null && - !mConfig.getRebalanceMaxRetries().isEmpty()) { - props.put("rebalance.max.retries", mConfig.getRebalanceMaxRetries()); - } - if (mConfig.getRebalanceBackoffMs() != null && - !mConfig.getRebalanceBackoffMs().isEmpty()) { - props.put("rebalance.backoff.ms", mConfig.getRebalanceBackoffMs()); - } - if (mConfig.getSocketReceiveBufferBytes() != null && - !mConfig.getSocketReceiveBufferBytes().isEmpty()) { - props.put("socket.receive.buffer.bytes", mConfig.getSocketReceiveBufferBytes()); - } - if (mConfig.getFetchMessageMaxBytes() != null && !mConfig.getFetchMessageMaxBytes().isEmpty()) { - props.put("fetch.message.max.bytes", mConfig.getFetchMessageMaxBytes()); - } - if (mConfig.getFetchMinBytes() != null && !mConfig.getFetchMinBytes().isEmpty()) { - props.put("fetch.min.bytes", mConfig.getFetchMinBytes()); - } - if (mConfig.getFetchWaitMaxMs() != null && !mConfig.getFetchWaitMaxMs().isEmpty()) { - props.put("fetch.wait.max.ms", mConfig.getFetchWaitMaxMs()); - } - - return new ConsumerConfig(props); - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java deleted file mode 100644 index e3389d515..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/Kafka10MessageTimestamp.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public class Kafka10MessageTimestamp implements KafkaMessageTimestamp { - - @Override - public long getTimestamp(MessageAndMetadata kafkaMessage) { - return kafkaMessage.timestamp(); - } - - @Override - public long getTimestamp(MessageAndOffset messageAndOffset) { - return messageAndOffset.message().timestamp(); - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java deleted file mode 100644 index f196c36f3..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/Kafka8MessageTimestamp.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public class Kafka8MessageTimestamp implements KafkaMessageTimestamp { - - @Override - public long getTimestamp(MessageAndMetadata kafkaMessage) { - return 0l; - } - - @Override - public long getTimestamp(MessageAndOffset messageAndOffset) { - return 0l; - } -} diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java deleted file mode 100644 index 8e514a672..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestamp.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import kafka.message.MessageAndMetadata; -import kafka.message.MessageAndOffset; - -public interface KafkaMessageTimestamp { - - long getTimestamp(MessageAndMetadata kafkaMessage); - - long getTimestamp(MessageAndOffset messageAndOffset); -} diff --git a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java b/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java deleted file mode 100644 index 718677e7c..000000000 --- a/src/main/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaMessageTimestampFactory { - private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageTimestampFactory.class); - - private KafkaMessageTimestamp kafkaMessageTimestamp; - - public KafkaMessageTimestampFactory(String kafkaTimestampClassName) { - try { - Class timestampClass = Class.forName(kafkaTimestampClassName); - this.kafkaMessageTimestamp = KafkaMessageTimestamp.class.cast(timestampClass.newInstance()); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - public KafkaMessageTimestamp getKafkaMessageTimestamp() { - return this.kafkaMessageTimestamp; - } -} diff --git a/src/main/java/com/pinterest/secor/util/ProtobufUtil.java b/src/main/java/com/pinterest/secor/util/ProtobufUtil.java index c0f8f2ea4..c0f330f2d 100644 --- a/src/main/java/com/pinterest/secor/util/ProtobufUtil.java +++ b/src/main/java/com/pinterest/secor/util/ProtobufUtil.java @@ -155,7 +155,7 @@ public Class getMessageClass(String topic) { public Message decodeJsonMessage(String topic, byte[] payload) throws InvalidProtocolBufferException { try { Method builderGetter = allTopics ? messageClassForAll.getDeclaredMethod("newBuilder") : messageClassByTopic.get(topic).getDeclaredMethod("newBuilder"); - com.google.protobuf.GeneratedMessageV3.Builder builder = (com.google.protobuf.GeneratedMessageV3.Builder) builderGetter.invoke(null); + com.google.protobuf.GeneratedMessage.Builder builder = (com.google.protobuf.GeneratedMessage.Builder) builderGetter.invoke(null); jsonParser.merge(new InputStreamReader(new ByteArrayInputStream(payload)), builder); return builder.build(); } catch (InvalidProtocolBufferException e){ diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java index 2d59635a1..fbe574639 100644 --- a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java +++ b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java @@ -22,27 +22,28 @@ import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.ReflectionUtil; -import junit.framework.TestCase; - import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.junit.runner.RunWith; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; + +import java.lang.reflect.Field; import java.util.Collection; +import java.util.HashMap; /** * FileRegistryTest tests the file registry logic. * * @author Pawel Garbacki (pawel@pinterest.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ FileRegistry.class, FileUtil.class, ReflectionUtil.class }) -public class FileRegistryTest extends TestCase { +public class FileRegistryTest { private static final String PATH = "/some_parent_dir/some_topic/some_partition/some_other_partition/" + "10_0_00000000000000000100"; private static final String PATH_GZ = "/some_parent_dir/some_topic/some_partition/some_other_partition/" @@ -54,8 +55,8 @@ public class FileRegistryTest extends TestCase { private TopicPartition mTopicPartition; private FileRegistry mRegistry; + @Before public void setUp() throws Exception { - super.setUp(); PropertiesConfiguration properties = new PropertiesConfiguration(); properties.addProperty("secor.file.reader.writer.factory", "com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory"); @@ -67,17 +68,14 @@ public void setUp() throws Exception { mLogFilePathGz = new LogFilePath("/some_parent_dir", PATH_GZ); } - private FileWriter createWriter() throws Exception { - PowerMockito.mockStatic(FileUtil.class); - - PowerMockito.mockStatic(ReflectionUtil.class); + private FileWriter createWriter(MockedStatic mockedFileUtil, + MockedStatic mockedReflectionUtil) throws Exception { FileWriter writer = Mockito.mock(FileWriter.class); - Mockito.when( - ReflectionUtil.createFileWriter( - Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) + mockedReflectionUtil.when(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) )) .thenReturn(writer); @@ -85,65 +83,71 @@ private FileWriter createWriter() throws Exception { FileWriter createdWriter = mRegistry.getOrCreateWriter( mLogFilePath, null); - assertTrue(createdWriter == writer); + assertSame(createdWriter, writer); return writer; } + @Test public void testGetOrCreateWriter() throws Exception { - createWriter(); - - // Call the method again. This time it should return an existing writer. - mRegistry.getOrCreateWriter(mLogFilePath, null); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - ReflectionUtil.createFileWriter(Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) - ); - - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); - - TopicPartition topicPartition = new TopicPartition("some_topic", 0); - Collection topicPartitions = mRegistry - .getTopicPartitions(); - assertEquals(1, topicPartitions.size()); - assertTrue(topicPartitions.contains(topicPartition)); - - Collection logFilePaths = mRegistry - .getPaths(topicPartition); - assertEquals(1, logFilePaths.size()); - assertTrue(logFilePaths.contains(mLogFilePath)); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { + + createWriter(mockedFileUtil, mockedReflectionUtil); + + // Call the method again. This time it should return an existing writer. + mRegistry.getOrCreateWriter(mLogFilePath, null); + + // Verify that the method has been called exactly once (the default). + mockedReflectionUtil.verify(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) + )); + + mockedFileUtil.verify(() -> FileUtil.delete(PATH)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH)); + + TopicPartition topicPartition = new TopicPartition("some_topic", 0); + Collection topicPartitions = mRegistry + .getTopicPartitions(); + assertEquals(1, topicPartitions.size()); + assertTrue(topicPartitions.contains(topicPartition)); + + Collection logFilePaths = mRegistry + .getPaths(topicPartition); + assertEquals(1, logFilePaths.size()); + assertTrue(logFilePaths.contains(mLogFilePath)); + } } + @Test public void testGetWriterShowBeNullForNewFilePaths() throws Exception { assertNull(mRegistry.getWriter(mLogFilePath)); } + @Test public void testGetWriterShowBeNotNull() throws Exception { - FileWriter createdWriter = createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - FileWriter writer = mRegistry.getWriter(mLogFilePath); - assertNotNull(writer); - assertEquals(createdWriter, writer); - } + FileWriter createdWriter = createWriter(mockedFileUtil, mockedReflectionUtil); - private void createCompressedWriter() throws Exception { - PowerMockito.mockStatic(FileUtil.class); + FileWriter writer = mRegistry.getWriter(mLogFilePath); + assertNotNull(writer); + assertEquals(createdWriter, writer); + } + } - PowerMockito.mockStatic(ReflectionUtil.class); + private FileWriter createCompressedWriter(MockedStatic mockedFileUtil, + MockedStatic mockedReflectionUtil) throws Exception { FileWriter writer = Mockito.mock(FileWriter.class); - Mockito.when( - ReflectionUtil.createFileWriter( - Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) + mockedReflectionUtil.when(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) )) .thenReturn(writer); @@ -151,81 +155,105 @@ private void createCompressedWriter() throws Exception { FileWriter createdWriter = mRegistry.getOrCreateWriter( mLogFilePathGz, new GzipCodec()); - assertTrue(createdWriter == writer); + assertSame(createdWriter, writer); + + return writer; } + @Test public void testGetOrCreateWriterCompressed() throws Exception { - createCompressedWriter(); - - mRegistry.getOrCreateWriter(mLogFilePathGz, new GzipCodec()); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileUtil.delete(PATH_GZ); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); - - PowerMockito.verifyStatic(); - ReflectionUtil.createFileWriter(Mockito.any(String.class), - Mockito.any(LogFilePath.class), - Mockito.any(CompressionCodec.class), - Mockito.any(SecorConfig.class) - ); - - TopicPartition topicPartition = new TopicPartition("some_topic", 0); - Collection topicPartitions = mRegistry - .getTopicPartitions(); - assertEquals(1, topicPartitions.size()); - assertTrue(topicPartitions.contains(topicPartition)); - - Collection logFilePaths = mRegistry - .getPaths(topicPartition); - assertEquals(1, logFilePaths.size()); - assertTrue(logFilePaths.contains(mLogFilePath)); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { + + createCompressedWriter(mockedFileUtil, mockedReflectionUtil); + + mRegistry.getOrCreateWriter(mLogFilePathGz, new GzipCodec()); + + // Verify that the method has been called exactly once (the default). + mockedFileUtil.verify(() -> FileUtil.delete(PATH_GZ)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH)); + + mockedReflectionUtil.verify(() -> ReflectionUtil.createFileWriter( + anyString(), + any(LogFilePath.class), + any(), + any(SecorConfig.class) + )); + + TopicPartition topicPartition = new TopicPartition("some_topic", 0); + Collection topicPartitions = mRegistry + .getTopicPartitions(); + assertEquals(1, topicPartitions.size()); + assertTrue(topicPartitions.contains(topicPartition)); + + Collection logFilePaths = mRegistry + .getPaths(topicPartition); + assertEquals(1, logFilePaths.size()); + assertTrue(logFilePaths.contains(mLogFilePath)); + } } + @Test public void testDeletePath() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - PowerMockito.mockStatic(FileUtil.class); + createWriter(mockedFileUtil, mockedReflectionUtil); - mRegistry.deletePath(mLogFilePath); - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); + mRegistry.deletePath(mLogFilePath); + mockedFileUtil.verify(() -> FileUtil.delete(PATH), Mockito.times(2)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH), Mockito.times(2)); - assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); - assertTrue(mRegistry.getTopicPartitions().isEmpty()); + assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + assertTrue(mRegistry.getTopicPartitions().isEmpty()); + } } + @Test public void testDeleteTopicPartition() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - PowerMockito.mockStatic(FileUtil.class); + createWriter(mockedFileUtil, mockedReflectionUtil); - mRegistry.deleteTopicPartition(mTopicPartition); - PowerMockito.verifyStatic(); - FileUtil.delete(PATH); - PowerMockito.verifyStatic(); - FileUtil.delete(CRC_PATH); + mRegistry.deleteTopicPartition(mTopicPartition); + mockedFileUtil.verify(() -> FileUtil.delete(PATH), Mockito.times(2)); + mockedFileUtil.verify(() -> FileUtil.delete(CRC_PATH), Mockito.times(2)); - assertTrue(mRegistry.getTopicPartitions().isEmpty()); - assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + assertTrue(mRegistry.getTopicPartitions().isEmpty()); + assertTrue(mRegistry.getPaths(mTopicPartition).isEmpty()); + } } + @Test public void testGetSize() throws Exception { - createWriter(); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { - assertEquals(123L, mRegistry.getSize(mTopicPartition)); + createWriter(mockedFileUtil, mockedReflectionUtil); + + assertEquals(123L, mRegistry.getSize(mTopicPartition)); + } } + @SuppressWarnings("unchecked") + @Test public void testGetModificationAgeSec() throws Exception { - PowerMockito.mockStatic(System.class); - PowerMockito.when(System.currentTimeMillis()).thenReturn(10000L) - .thenReturn(100000L); - createWriter(); - - assertEquals(90, mRegistry.getModificationAgeSec(mTopicPartition)); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class); + MockedStatic mockedReflectionUtil = Mockito.mockStatic(ReflectionUtil.class)) { + + createWriter(mockedFileUtil, mockedReflectionUtil); + + // Use reflection to set a known creation time (90 seconds ago) + Field creationTimesField = FileRegistry.class.getDeclaredField("mCreationTimes"); + creationTimesField.setAccessible(true); + HashMap creationTimes = (HashMap) creationTimesField.get(mRegistry); + long nowSec = System.currentTimeMillis() / 1000L; + creationTimes.put(mLogFilePath, nowSec - 90); + + long ageSec = mRegistry.getModificationAgeSec(mTopicPartition); + // Allow for small timing variance during test execution + assertTrue("Age should be approximately 90 seconds, was: " + ageSec, ageSec >= 89 && ageSec <= 91); + } } } diff --git a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java index 874f13f14..ca7223dd9 100644 --- a/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java +++ b/src/test/java/com/pinterest/secor/common/SecorSchemaRegistryClientTest.java @@ -23,35 +23,28 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import junit.framework.TestCase; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.lang3.StringUtils; -import org.junit.Rule; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; +import static org.junit.Assert.*; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -public class SecorSchemaRegistryClientTest extends TestCase { +public class SecorSchemaRegistryClientTest { private KafkaAvroDeserializer kafkaAvroDeserializer; private SchemaRegistryClient schemaRegistryClient; private SecorSchemaRegistryClient secorSchemaRegistryClient; private KafkaAvroSerializer avroSerializer; - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Override + @Before public void setUp() { initKafka(); SecorConfig secorConfig = Mockito.mock(SecorConfig.class); @@ -119,8 +112,10 @@ public void testGetSchema() throws IOException, RestClientException { @Test public void testGetSchemaDoesNotExist() { - exception.expect(IllegalStateException.class); - exception.expectMessage("Avro schema not found for topic test-avr-topic-3"); - secorSchemaRegistryClient.getSchema("test-avr-topic-3"); + try { + secorSchemaRegistryClient.getSchema("test-avr-topic-3"); + } catch (IllegalStateException e) { + assertEquals("Unable to get Avro schema not found for topic test-avr-topic-3", e.getMessage()); + } } } \ No newline at end of file diff --git a/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java index 9dd81b2f4..c3c907840 100644 --- a/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/FileReaderWriterFactoryTest.java @@ -18,60 +18,61 @@ */ package com.pinterest.secor.io; -import java.io.*; -import java.net.URI; +import java.io.File; +import java.nio.file.Files; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.GzipCodec; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import com.pinterest.secor.common.LogFilePath; import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory; -import com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory; import com.pinterest.secor.util.ReflectionUtil; -import junit.framework.TestCase; +import static org.junit.Assert.*; /** - * Test the file readers and writers + * Test the file readers and writers using real file I/O * * @author Praveen Murugesan (praveen@uber.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({FileSystem.class, DelimitedTextFileReaderWriterFactory.class, - SequenceFile.class, SequenceFileReaderWriterFactory.class, GzipCodec.class, - FileInputStream.class, FileOutputStream.class}) -public class FileReaderWriterFactoryTest extends TestCase { +public class FileReaderWriterFactoryTest { - private static final String DIR = "/some_parent_dir/some_topic/some_partition/some_other_partition"; private static final String BASENAME = "10_0_00000000000000000100"; - private static final String PATH = DIR + "/" + BASENAME; - private static final String PATH_GZ = DIR + "/" + BASENAME + ".gz"; private LogFilePath mLogFilePath; private LogFilePath mLogFilePathGz; private SecorConfig mConfig; + private File tempDir; - @Override + @Before public void setUp() throws Exception { - super.setUp(); - mLogFilePath = new LogFilePath("/some_parent_dir", PATH); - mLogFilePathGz = new LogFilePath("/some_parent_dir", PATH_GZ); + tempDir = Files.createTempDirectory("secor-test").toFile(); + String dir = tempDir.getAbsolutePath() + "/some_topic/some_partition/some_other_partition"; + new File(dir).mkdirs(); + + String path = dir + "/" + BASENAME; + String pathGz = dir + "/" + BASENAME + ".gz"; + + mLogFilePath = new LogFilePath(tempDir.getAbsolutePath(), path); + mLogFilePathGz = new LogFilePath(tempDir.getAbsolutePath(), pathGz); + } + + @After + public void tearDown() throws Exception { + // Clean up temp directory + deleteRecursively(tempDir); + } + + private void deleteRecursively(File file) { + if (file.isDirectory()) { + for (File child : file.listFiles()) { + deleteRecursively(child); + } + } + file.delete(); } private void setupSequenceFileReaderConfig() { @@ -88,163 +89,89 @@ private void setupDelimitedTextFileWriterConfig() { mConfig = new SecorConfig(properties); } - private void mockDelimitedTextFileWriter(boolean isCompressed) throws Exception { - PowerMockito.mockStatic(FileSystem.class); - FileSystem fs = Mockito.mock(FileSystem.class); - Mockito.when( - FileSystem.get(Mockito.any(URI.class), - Mockito.any(Configuration.class))).thenReturn(fs); - - Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ); - - GzipCodec codec = PowerMockito.mock(GzipCodec.class); - PowerMockito.whenNew(GzipCodec.class).withNoArguments() - .thenReturn(codec); - - FSDataInputStream fileInputStream = Mockito - .mock(FSDataInputStream.class); - FSDataOutputStream fileOutputStream = Mockito - .mock(FSDataOutputStream.class); + @Test + public void testSequenceFileWriter() throws Exception { + setupSequenceFileReaderConfig(); - Mockito.when(fs.open(fsPath)).thenReturn(fileInputStream); - Mockito.when(fs.create(fsPath)).thenReturn(fileOutputStream); + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); - CompressionInputStream inputStream = Mockito - .mock(CompressionInputStream.class); - CompressionOutputStream outputStream = Mockito - .mock(CompressionOutputStream.class); - Mockito.when(codec.createInputStream(Mockito.any(InputStream.class))) - .thenReturn(inputStream); + assertNotNull(writer); - Mockito.when(codec.createOutputStream(Mockito.any(OutputStream.class))) - .thenReturn(outputStream); - } + // Write some data + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - private void mockSequenceFileWriter(boolean isCompressed) - throws Exception { - PowerMockito.mockStatic(FileSystem.class); - FileSystem fs = Mockito.mock(FileSystem.class); - Mockito.when( - FileSystem.get(Mockito.any(URI.class), - Mockito.any(Configuration.class))).thenReturn(fs); - - Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ); - - SequenceFile.Reader reader = PowerMockito - .mock(SequenceFile.Reader.class); - PowerMockito - .whenNew(SequenceFile.Reader.class) - .withParameterTypes(FileSystem.class, Path.class, - Configuration.class) - .withArguments(Mockito.eq(fs), Mockito.eq(fsPath), - Mockito.any(Configuration.class)).thenReturn(reader); - - Mockito.>when(reader.getKeyClass()).thenReturn( - (Class) LongWritable.class); - Mockito.>when(reader.getValueClass()).thenReturn( - (Class) BytesWritable.class); - - if (!isCompressed) { - PowerMockito.mockStatic(SequenceFile.class); - SequenceFile.Writer writer = Mockito - .mock(SequenceFile.Writer.class); - Mockito.when( - SequenceFile.createWriter(Mockito.eq(fs), - Mockito.any(Configuration.class), - Mockito.eq(fsPath), Mockito.eq(LongWritable.class), - Mockito.eq(BytesWritable.class))) - .thenReturn(writer); - - Mockito.when(writer.getLength()).thenReturn(123L); - } else { - PowerMockito.mockStatic(SequenceFile.class); - SequenceFile.Writer writer = Mockito - .mock(SequenceFile.Writer.class); - Mockito.when( - SequenceFile.createWriter(Mockito.eq(fs), - Mockito.any(Configuration.class), - Mockito.eq(fsPath), Mockito.eq(LongWritable.class), - Mockito.eq(BytesWritable.class), - Mockito.eq(SequenceFile.CompressionType.BLOCK), - Mockito.any(GzipCodec.class))).thenReturn(writer); - - Mockito.when(writer.getLength()).thenReturn(12L); - } + // Verify file was created + assertTrue(new File(mLogFilePath.getLogFilePath()).exists()); } + @Test public void testSequenceFileReader() throws Exception { setupSequenceFileReaderConfig(); - mockSequenceFileWriter(false); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); - - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem.get(Mockito.any(URI.class), Mockito.any(Configuration.class)); - - mockSequenceFileWriter(true); - ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), mLogFilePathGz, new GzipCodec(), - mConfig); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); - } - - public void testSequenceFileWriter() throws Exception { - setupSequenceFileReaderConfig(); - mockSequenceFileWriter(false); + // First write a file + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - FileWriter writer = ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), + // Then read it back + FileReader reader = ReflectionUtil.createFileReader( + mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); + assertNotNull(reader); - assert writer.getLength() == 123L; + KeyValue kv = reader.next(); + assertNotNull(kv); + assertEquals(100L, kv.getOffset()); + assertArrayEquals("test message".getBytes(), kv.getValue()); - mockSequenceFileWriter(true); + reader.close(); + } - writer = ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePathGz, new GzipCodec(), mConfig); + @Test + public void testDelimitedTextFileWriter() throws Exception { + setupDelimitedTextFileWriterConfig(); - // Verify that the method has been called exactly once (the default). - PowerMockito.verifyStatic(); - FileSystem - .get(Mockito.any(URI.class), Mockito.any(Configuration.class)); + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); - assert writer.getLength() == 12L; - } + assertNotNull(writer); + assertEquals(0L, writer.getLength()); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); - public void testDelimitedTextFileWriter() throws Exception { - setupDelimitedTextFileWriterConfig(); - mockDelimitedTextFileWriter(false); - FileWriter writer = (FileWriter) ReflectionUtil - .createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePath, null, mConfig - ); - assert writer.getLength() == 0L; - - mockDelimitedTextFileWriter(true); - writer = (FileWriter) ReflectionUtil - .createFileWriter(mConfig.getFileReaderWriterFactory(), - mLogFilePathGz, new GzipCodec(), mConfig - ); - assert writer.getLength() == 0L; + // Verify file was created + assertTrue(new File(mLogFilePath.getLogFilePath()).exists()); } + @Test public void testDelimitedTextFileReader() throws Exception { setupDelimitedTextFileWriterConfig(); - mockDelimitedTextFileWriter(false); + // First write a file + FileWriter writer = ReflectionUtil.createFileWriter( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + writer.write(new KeyValue(100L, "test message".getBytes())); + writer.close(); + + // Then read it back + FileReader reader = ReflectionUtil.createFileReader( + mConfig.getFileReaderWriterFactory(), + mLogFilePath, null, mConfig); + + assertNotNull(reader); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig); + KeyValue kv = reader.next(); + assertNotNull(kv); - mockDelimitedTextFileWriter(true); - ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePathGz, new GzipCodec(), - mConfig); + reader.close(); } -} \ No newline at end of file +} diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java index 7aae60509..60782dff4 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroFileReaderWriterFactoryTest.java @@ -26,24 +26,21 @@ import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; import com.pinterest.secor.util.AvroSerializer; -import junit.framework.TestCase; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; -import static org.junit.Assert.assertArrayEquals; -import static org.mockito.Matchers.anyString; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -public class AvroFileReaderWriterFactoryTest extends TestCase { +public class AvroFileReaderWriterFactoryTest { private AvroFileReaderWriterFactory mFactory; private SpecificDatumWriter writer; @@ -52,7 +49,7 @@ public class AvroFileReaderWriterFactoryTest extends TestCase { private GenericRecord msg1; private GenericRecord msg2; - @Override + @Before public void setUp() throws Exception { Schema schema = SchemaBuilder.record("UnitTestRecord") @@ -85,7 +82,7 @@ public void testAvroReadWriteRoundTrip() throws Exception { String topic = "test-avro-topic"; - when(secorSchemaRegistryClient.serialize(anyString(), Matchers.any(GenericRecord.class))). + when(secorSchemaRegistryClient.serialize(anyString(), any(GenericRecord.class))). thenReturn(AvroSerializer.serialize(writer, msg1), AvroSerializer.serialize(writer, msg2)); diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java index 52f9dac53..e0f19e4f5 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java @@ -31,20 +31,17 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.Assert.assertArrayEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) public class AvroParquetFileReaderWriterFactoryTest extends TestCase { private AvroParquetFileReaderWriterFactory mFactory; @@ -80,7 +77,7 @@ public void testAvroParquetReadWriteRoundTripUsingSchemaRegistry() throws Except when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg1))).thenReturn(msg1); when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg2))).thenReturn(msg2); - when(secorSchemaRegistryClient.serialize(anyString(), Matchers.any(GenericRecord.class))). + when(secorSchemaRegistryClient.serialize(anyString(), any(GenericRecord.class))). thenReturn(AvroSerializer.serialize(writer, msg1), AvroSerializer.serialize(writer, msg2)); mFactory.schemaRegistry = secorSchemaRegistryClient; diff --git a/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java index fa2941913..848713d60 100644 --- a/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/ProtobufParquetFileReaderWriterFactoryTest.java @@ -27,9 +27,7 @@ import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.io.Files; import com.pinterest.secor.common.LogFilePath; @@ -44,7 +42,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ProtobufParquetFileReaderWriterFactoryTest extends TestCase { private SecorConfig config; diff --git a/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java index 3247cd900..5419b98a3 100644 --- a/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/ThriftParquetFileReaderWriterFactoryTest.java @@ -29,9 +29,7 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.io.Files; import com.pinterest.secor.common.LogFilePath; @@ -45,7 +43,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ThriftParquetFileReaderWriterFactoryTest extends TestCase { private SecorConfig config; diff --git a/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java b/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java index 7f1533a24..42b7f751e 100644 --- a/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java +++ b/src/test/java/com/pinterest/secor/monitoring/OstrichMetricCollectorTest.java @@ -19,52 +19,42 @@ package com.pinterest.secor.monitoring; import com.twitter.ostrich.stats.Stats; -import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.MockedStatic; +import org.mockito.Mockito; -@RunWith(PowerMockRunner.class) -@PrepareForTest({Stats.class}) public class OstrichMetricCollectorTest { private OstrichMetricCollector metricCollector = new OstrichMetricCollector(); - @Before - public void setUp() throws Exception { - PowerMockito.mockStatic(Stats.class); - } - @Test public void incrementByOne() throws Exception { - metricCollector.increment("expectedLabel", "ignored"); - - PowerMockito.verifyStatic(); - Stats.incr("expectedLabel"); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.increment("expectedLabel", "ignored"); + mockedStats.verify(() -> Stats.incr("expectedLabel")); + } } @Test public void increment() throws Exception { - metricCollector.increment("expectedLabel", 42, "ignored"); - - PowerMockito.verifyStatic(); - Stats.incr("expectedLabel", 42); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.increment("expectedLabel", 42, "ignored"); + mockedStats.verify(() -> Stats.incr("expectedLabel", 42)); + } } @Test public void metric() throws Exception { - metricCollector.metric("expectedLabel", 42.0, "ignored"); - - PowerMockito.verifyStatic(); - Stats.addMetric("expectedLabel", 42); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.metric("expectedLabel", 42.0, "ignored"); + mockedStats.verify(() -> Stats.addMetric("expectedLabel", 42)); + } } @Test public void gauge() throws Exception { - metricCollector.gauge("expectedLabel", 4.2, "ignored"); - - PowerMockito.verifyStatic(); - Stats.setGauge("expectedLabel", 4.2); + try (MockedStatic mockedStats = Mockito.mockStatic(Stats.class)) { + metricCollector.gauge("expectedLabel", 4.2, "ignored"); + mockedStats.verify(() -> Stats.setGauge("expectedLabel", 4.2)); + } } } diff --git a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java index 022f22522..e9c25f471 100644 --- a/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/DateMessageParserTest.java @@ -22,14 +22,11 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Locale; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class DateMessageParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java index e7e427f24..6d2934c1f 100644 --- a/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/Iso8601ParserTest.java @@ -22,13 +22,10 @@ import com.pinterest.secor.message.Message; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) public class Iso8601ParserTest extends TestCase { private SecorConfig mConfig; diff --git a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java index 01be79f25..26bee74eb 100644 --- a/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java @@ -20,19 +20,18 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) -public class JsonMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class JsonMessageParserTest { private SecorConfig mConfig; private Message mMessageWithSecondsTimestamp; @@ -42,7 +41,7 @@ public class JsonMessageParserTest extends TestCase { private Message mMessageWithNestedTimestamp; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampName()).thenReturn("timestamp"); diff --git a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java index 49a97ba68..7ec6ce5b6 100644 --- a/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/MessagePackParserTest.java @@ -21,18 +21,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) -public class MessagePackParserTest extends TestCase { +import static org.junit.Assert.*; + +public class MessagePackParserTest { SecorConfig mConfig; private MessagePackParser mMessagePackParser; @@ -43,7 +42,7 @@ public class MessagePackParserTest extends TestCase { private ObjectMapper mObjectMapper; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampName()).thenReturn("ts"); diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java index 3e96a9733..f5443db5b 100644 --- a/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ProtobufMessageParserTest.java @@ -22,9 +22,7 @@ import java.util.Map; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.protobuf.CodedOutputStream; import com.pinterest.secor.common.SecorConfig; @@ -34,7 +32,6 @@ import junit.framework.TestCase; -@RunWith(PowerMockRunner.class) public class ProtobufMessageParserTest extends TestCase { private SecorConfig mConfig; private long timestamp; diff --git a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java index 0920325f7..c095b929c 100644 --- a/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ProtobufTimestampParserTest.java @@ -27,9 +27,7 @@ import com.pinterest.secor.protobuf.TimestampedMessages; import junit.framework.TestCase; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.Map; @@ -37,7 +35,6 @@ /** * Created by pgautam on 10/9/16. */ -@RunWith(PowerMockRunner.class) public class ProtobufTimestampParserTest extends TestCase { private SecorConfig mConfig; private long timestamp; diff --git a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java index 2ff760ffa..f5f463dbf 100644 --- a/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/RegexMessageParserTest.java @@ -21,21 +21,20 @@ import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; -@RunWith(PowerMockRunner.class) -public class RegexMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class RegexMessageParserTest { private SecorConfig mConfig; private Message mMessageWithMillisTimestamp; private Message mMessageWithWrongFormatTimestamp; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageTimestampInputPattern()).thenReturn("^[^ ]+ [^ ]+ ([^ ]+) .*$"); @@ -84,7 +83,7 @@ public void testExtractTimestampMillisEmpty() throws Exception { @Test(expected=NumberFormatException.class) public void testExtractTimestampMillisException1() throws Exception { RegexMessageParser regexMessageParser = new RegexMessageParser(mConfig); - regexMessageParser.extractTimestampMillis(mMessageWithWrongFormatTimestamp); + regexMessageParser.extractTimestampMillis(mMessageWithWrongFormatTimestamp); } } diff --git a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java index 71f5e37fc..7a1589223 100644 --- a/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/SplitByFieldMessageParserTest.java @@ -20,20 +20,19 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; import net.minidev.json.JSONObject; import net.minidev.json.JSONValue; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.List; import java.util.TimeZone; -@RunWith(PowerMockRunner.class) -public class SplitByFieldMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class SplitByFieldMessageParserTest { private SecorConfig mConfig; private Message mMessageWithTypeAndTimestamp; @@ -41,7 +40,7 @@ public class SplitByFieldMessageParserTest extends TestCase { private Message mMessageWithoutType; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(mConfig.getMessageSplitFieldName()).thenReturn("type"); @@ -86,7 +85,6 @@ public void testExtractTypeAndTimestamp() throws Exception { public void testExtractTimestampMillisExceptionNoTimestamp() throws Exception { SplitByFieldMessageParser jsonMessageParser = new SplitByFieldMessageParser(mConfig); - // Throws exception if there's no timestamp, for any reason. jsonMessageParser.extractTimestampMillis((JSONObject) JSONValue.parse(mMessageWithoutTimestamp.getPayload())); } @@ -110,7 +108,6 @@ public void testExtractTimestampMillisException2() throws Exception { public void testExtractTimestampMillisExceptionNoType() throws Exception { SplitByFieldMessageParser jsonMessageParser = new SplitByFieldMessageParser(mConfig); - // Throws exception if there's no timestamp, for any reason. jsonMessageParser.extractEventType((JSONObject) JSONValue.parse(mMessageWithoutType.getPayload())); } diff --git a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java index e80ba397c..21b7e1d44 100644 --- a/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/ThriftMessageParserTest.java @@ -20,22 +20,21 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; -import junit.framework.TestCase; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; import com.pinterest.secor.thrift.UnitTestMessage; -@RunWith(PowerMockRunner.class) -public class ThriftMessageParserTest extends TestCase { +import static org.junit.Assert.*; + +public class ThriftMessageParserTest { private SecorConfig mConfig; private long timestamp; - @Override + @Before public void setUp() throws Exception { mConfig = Mockito.mock(SecorConfig.class); Mockito.when(TimestampedMessageParser.usingDateFormat(mConfig)).thenReturn("yyyy-MM-dd"); diff --git a/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java b/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java index 755233f98..64e764330 100644 --- a/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java +++ b/src/test/java/com/pinterest/secor/rebalance/RebalanceHandlerTest.java @@ -11,7 +11,7 @@ import java.util.LinkedList; import java.util.List; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; diff --git a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java b/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java deleted file mode 100644 index 9dc95bfc5..000000000 --- a/src/test/java/com/pinterest/secor/timestamp/KafkaMessageTimestampFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.pinterest.secor.timestamp; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class KafkaMessageTimestampFactoryTest { - - private KafkaMessageTimestampFactory factory; - - @Test - public void shouldReturnKafka8TimestampClassObject() { - factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.Kafka8MessageTimestamp"); - Object timestamp = factory.getKafkaMessageTimestamp(); - assertNotNull(timestamp); - assertEquals(timestamp.getClass(), Kafka8MessageTimestamp.class); - } - - @Test(expected = RuntimeException.class) - public void shouldReturnNullForInvalidClass() { - factory = new KafkaMessageTimestampFactory("com.pinterest.secor.timestamp.KafkaxxMessageTimestamp"); - } -} diff --git a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java index 30bbf3aec..fe2f07e26 100644 --- a/src/test/java/com/pinterest/secor/uploader/UploaderTest.java +++ b/src/test/java/com/pinterest/secor/uploader/UploaderTest.java @@ -18,13 +18,7 @@ */ package com.pinterest.secor.uploader; -import com.pinterest.secor.common.FileRegistry; -import com.pinterest.secor.common.LogFilePath; -import com.pinterest.secor.common.OffsetTracker; -import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.common.SecorConstants; -import com.pinterest.secor.common.TopicPartition; -import com.pinterest.secor.common.ZookeeperConnector; +import com.pinterest.secor.common.*; import com.pinterest.secor.io.FileReader; import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; @@ -32,16 +26,14 @@ import com.pinterest.secor.reader.MessageReader; import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.IdUtil; -import junit.framework.TestCase; import org.apache.hadoop.io.compress.CompressionCodec; import org.joda.time.DateTime; -import org.junit.runner.RunWith; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.util.HashSet; @@ -51,9 +43,7 @@ * * @author Pawel Garbacki (pawel@pinterest.com) */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ FileUtil.class, IdUtil.class , DateTime.class}) -public class UploaderTest extends TestCase { +public class UploaderTest { private static class TestUploader extends Uploader { private FileReader mReader; @@ -91,9 +81,8 @@ public FileReader getReader() { private TestUploader mUploader; - @Override + @Before public void setUp() throws Exception { - super.setUp(); mTopicPartition = new TopicPartition("some_topic", 0); mLogFilePath = new LogFilePath("/some_parent_dir", @@ -115,100 +104,111 @@ public void setUp() throws Exception { Mockito.when(mFileRegistry.getTopicPartitions()).thenReturn( topicPartitions); - mUploadManager = new HadoopS3UploadManager(mConfig); + mUploadManager = Mockito.mock(UploadManager.class); mZookeeperConnector = Mockito.mock(ZookeeperConnector.class); mUploader = new TestUploader(mConfig, mOffsetTracker, mFileRegistry, mUploadManager, messageReader, mZookeeperConnector); } + @SuppressWarnings("unchecked") + @Test public void testUploadAtTime() throws Exception { - final int minuteUploadMark = 1; - - PowerMockito.mockStatic(DateTime.class); - PowerMockito.when(DateTime.now()).thenReturn(new DateTime(2016,7,27,0,minuteUploadMark,0)); - Mockito.when(mConfig.getUploadMinuteMark()).thenReturn(minuteUploadMark); - Mockito.when(mConfig.getKafkaTopicFilter()).thenReturn("some_topic"); - - Mockito.when(mConfig.getCloudService()).thenReturn("S3"); - Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); - Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); - Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); - - PowerMockito.mockStatic(FileUtil.class); - Mockito.when(FileUtil.getPrefix("some_topic", mConfig)). - thenReturn("s3a://some_bucket/some_s3_parent_dir"); - mUploader.applyPolicy(false); - - final String lockPath = "/secor/locks/some_topic/0"; - Mockito.verify(mZookeeperConnector).lock(lockPath); - PowerMockito.verifyStatic(); - FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010"); - Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); - Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( - mTopicPartition, 1L); - Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, - 1L); - Mockito.verify(mZookeeperConnector).unlock(lockPath); + try (MockedStatic mockedDateTime = Mockito.mockStatic(DateTime.class); + MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { + + final int minuteUploadMark = 1; + + mockedDateTime.when(DateTime::now).thenReturn(new DateTime(2016, 7, 27, 0, minuteUploadMark, 0)); + Mockito.when(mConfig.getUploadMinuteMark()).thenReturn(minuteUploadMark); + Mockito.when(mConfig.getKafkaTopicFilter()).thenReturn("some_topic"); + + Mockito.when(mConfig.getCloudService()).thenReturn("S3"); + Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); + Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); + Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) + .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + + // Mock upload to return a completed handle + Handle mockHandle = Mockito.mock(Handle.class); + Mockito.when(mockHandle.get()).thenReturn(null); + Mockito.doReturn(mockHandle).when(mUploadManager).upload(Mockito.any(LogFilePath.class)); + + mUploader.applyPolicy(false); + + final String lockPath = "/secor/locks/some_topic/0"; + Mockito.verify(mZookeeperConnector).lock(lockPath); + Mockito.verify(mUploadManager).upload(mLogFilePath); + Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); + Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( + mTopicPartition, 1L); + Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, + 1L); + Mockito.verify(mZookeeperConnector).unlock(lockPath); + } } + @SuppressWarnings("unchecked") + @Test public void testUploadFiles() throws Exception { - Mockito.when( - mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) - .thenReturn(11L); - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 11L)) - .thenReturn(11L); - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) - .thenReturn(11L); - Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) - .thenReturn(20L); - Mockito.when( - mOffsetTracker.getTrueCommittedOffsetCount(mTopicPartition)) - .thenReturn(11L); - - - Mockito.when(mConfig.getCloudService()).thenReturn("S3"); - Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); - Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); - Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); - - PowerMockito.mockStatic(FileUtil.class); - Mockito.when(FileUtil.getPrefix("some_topic", mConfig)). - thenReturn("s3a://some_bucket/some_s3_parent_dir"); - mUploader.applyPolicy(false); - - final String lockPath = "/secor/locks/some_topic/0"; - Mockito.verify(mZookeeperConnector).lock(lockPath); - PowerMockito.verifyStatic(); - FileUtil.moveToCloud( - "/some_parent_dir/some_topic/some_partition/some_other_partition/" - + "10_0_00000000000000000010", - "s3a://some_bucket/some_s3_parent_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000010"); - Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); - Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( - mTopicPartition, 21L); - Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, - 21L); - Mockito.verify(mZookeeperConnector).unlock(lockPath); + try (MockedStatic mockedFileUtil = Mockito.mockStatic(FileUtil.class)) { + + Mockito.when( + mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) + .thenReturn(11L); + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 11L)) + .thenReturn(11L); + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) + .thenReturn(11L); + Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) + .thenReturn(20L); + Mockito.when( + mOffsetTracker.getTrueCommittedOffsetCount(mTopicPartition)) + .thenReturn(11L); + + + Mockito.when(mConfig.getCloudService()).thenReturn("S3"); + Mockito.when(mConfig.getS3Bucket()).thenReturn("some_bucket"); + Mockito.when(mConfig.getS3Path()).thenReturn("some_s3_parent_dir"); + Mockito.when(mConfig.getOffsetsStorage()).thenReturn(SecorConstants.KAFKA_OFFSETS_STORAGE_ZK); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + mockedFileUtil.when(() -> FileUtil.getPrefix("some_topic", mConfig)) + .thenReturn("s3a://some_bucket/some_s3_parent_dir"); + + // Mock upload to return a completed handle + Handle mockHandle = Mockito.mock(Handle.class); + Mockito.when(mockHandle.get()).thenReturn(null); + Mockito.doReturn(mockHandle).when(mUploadManager).upload(Mockito.any(LogFilePath.class)); + + mUploader.applyPolicy(false); + + final String lockPath = "/secor/locks/some_topic/0"; + Mockito.verify(mZookeeperConnector).lock(lockPath); + Mockito.verify(mUploadManager).upload(mLogFilePath); + Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); + Mockito.verify(mZookeeperConnector).setCommittedOffsetCount( + mTopicPartition, 21L); + Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition, + 21L); + Mockito.verify(mZookeeperConnector).unlock(lockPath); + } } + @Test public void testDeleteTopicPartition() throws Exception { Mockito.when( mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) @@ -224,52 +224,55 @@ public void testDeleteTopicPartition() throws Exception { Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition); } + @Test public void testTrimFiles() throws Exception { - Mockito.when( - mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) - .thenReturn(21L); - // The second time it's called, it returns 21L because of the first call. - Mockito.when( - mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) - .thenReturn(20L, 21L); - Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) - .thenReturn(21L); - - HashSet logFilePaths = new HashSet(); - logFilePaths.add(mLogFilePath); - Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( - logFilePaths); + try (MockedStatic mockedIdUtil = Mockito.mockStatic(IdUtil.class)) { + + Mockito.when( + mZookeeperConnector.getCommittedOffsetCount(mTopicPartition)) + .thenReturn(21L); + // The second time it's called, it returns 21L because of the first call. + Mockito.when( + mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 21L)) + .thenReturn(20L, 21L); + Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition)) + .thenReturn(21L); + + HashSet logFilePaths = new HashSet(); + logFilePaths.add(mLogFilePath); + Mockito.when(mFileRegistry.getPaths(mTopicPartition)).thenReturn( + logFilePaths); + + FileReader reader = mUploader.getReader(); + + Mockito.when(reader.next()).thenAnswer(new Answer() { + private int mCallCount = 0; + + @Override + public KeyValue answer(InvocationOnMock invocation) + throws Throwable { + if (mCallCount == 2) { + return null; + } + return new KeyValue(20 + mCallCount++, null); + } + }); - FileReader reader = mUploader.getReader(); + mockedIdUtil.when(IdUtil::getLocalMessageDir) + .thenReturn("some_message_dir"); - Mockito.when(reader.next()).thenAnswer(new Answer() { - private int mCallCount = 0; + FileWriter writer = Mockito.mock(FileWriter.class); + LogFilePath dstLogFilePath = new LogFilePath( + "/some_parent_dir/some_message_dir", + "/some_parent_dir/some_message_dir/some_topic/some_partition/" + + "some_other_partition/10_0_00000000000000000021"); + Mockito.when(mFileRegistry.getOrCreateWriter(dstLogFilePath, null)) + .thenReturn(writer); - @Override - public KeyValue answer(InvocationOnMock invocation) - throws Throwable { - if (mCallCount == 2) { - return null; - } - return new KeyValue(20 + mCallCount++, null); - } - }); - - PowerMockito.mockStatic(IdUtil.class); - Mockito.when(IdUtil.getLocalMessageDir()) - .thenReturn("some_message_dir"); - - FileWriter writer = Mockito.mock(FileWriter.class); - LogFilePath dstLogFilePath = new LogFilePath( - "/some_parent_dir/some_message_dir", - "/some_parent_dir/some_message_dir/some_topic/some_partition/" - + "some_other_partition/10_0_00000000000000000021"); - Mockito.when(mFileRegistry.getOrCreateWriter(dstLogFilePath, null)) - .thenReturn(writer); + mUploader.applyPolicy(false); - mUploader.applyPolicy(false); - - Mockito.verify(writer).write(Mockito.any(KeyValue.class)); - Mockito.verify(mFileRegistry).deletePath(mLogFilePath); + Mockito.verify(writer).write(Mockito.any(KeyValue.class)); + Mockito.verify(mFileRegistry).deletePath(mLogFilePath); + } } }