diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 72a9c0e687d..1cb9a97188a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -111,6 +111,7 @@ import org.apache.accumulo.manager.merge.FindMergeableRangeTask; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; +import org.apache.accumulo.manager.split.SplitFileCache; import org.apache.accumulo.manager.split.Splitter; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.tableOps.FateEnv; @@ -557,12 +558,17 @@ ManagerGoalState getManagerGoalState() { } private Splitter splitter; + private SplitFileCache splitFileCache; - @Override public Splitter getSplitter() { return splitter; } + @Override + public SplitFileCache getSplitFileCache() { + return splitFileCache; + } + public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } @@ -1118,6 +1124,7 @@ boolean canSuspendTablets() { this.splitter = new Splitter(this); this.splitter.start(); + this.splitFileCache = new SplitFileCache(context); try { Predicate isLockHeld = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitFileCache.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitFileCache.java new file mode 100644 index 00000000000..4455cef553d --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitFileCache.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.split; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; + +public class SplitFileCache { + + private static final Logger LOG = LoggerFactory.getLogger(SplitFileCache.class); + + private static class CacheKey { + + final TableId tableId; + final TabletFile tabletFile; + + public CacheKey(TableId tableId, TabletFile tabletFile) { + this.tableId = tableId; + this.tabletFile = tabletFile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(tableId, cacheKey.tableId) + && Objects.equals(tabletFile, cacheKey.tabletFile); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, tabletFile); + } + + } + + public static Map tryToGetFirstAndLastRows( + ServerContext context, TableConfiguration tableConf, Set dataFiles) { + + HashMap dataFilesInfo = new HashMap<>(); + + long t1 = System.currentTimeMillis(); + + for (T dataFile : dataFiles) { + + FileSKVIterator reader = null; + FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath()); + try { + reader = FileOperations.getInstance().newReaderBuilder() + .forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService()) + .withTableConfiguration(tableConf).build(); + + dataFilesInfo.put(dataFile, reader.getFileRange()); + } catch (IOException ioe) { + LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException ioe) { + LOG.warn("failed to close " + dataFile, ioe); + } + } + } + + } + + long t2 = System.currentTimeMillis(); + + String message = String.format("Found first and last keys for %d data files in %6.2f secs", + dataFiles.size(), (t2 - t1) / 1000.0); + if (t2 - t1 > 500) { + LOG.debug(message); + } else { + LOG.trace(message); + } + + return dataFilesInfo; + } + + final LoadingCache splitFileCache; + + public SplitFileCache(ServerContext context) { + Weigher weigher = (key, frange) -> key.tableId.canonical() + .length() + key.tabletFile.getPath().toString().length() + + (frange.empty ? 0 + : frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength()); + + CacheLoader loader = key -> { + TableConfiguration tableConf = context.getTableConfiguration(key.tableId); + return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) + .get(key.tabletFile); + }; + + splitFileCache = context.getCaches().createNewBuilder(Caches.CacheName.SPLITTER_FILES, true) + .expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher) + .build(loader); + } + + public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) { + return splitFileCache.get(new CacheKey(tableId, tabletFile)); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 58d02a4f81f..1f21fde170e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -20,37 +20,23 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.benmanes.caffeine.cache.CacheLoader; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.Weigher; - public class Splitter { private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); @@ -115,82 +101,6 @@ private void seedSplits(FateInstanceType instanceType, Map split } } - public static Map tryToGetFirstAndLastRows( - ServerContext context, TableConfiguration tableConf, Set dataFiles) { - - HashMap dataFilesInfo = new HashMap<>(); - - long t1 = System.currentTimeMillis(); - - for (T dataFile : dataFiles) { - - FileSKVIterator reader = null; - FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath()); - try { - reader = FileOperations.getInstance().newReaderBuilder() - .forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService()) - .withTableConfiguration(tableConf).build(); - - dataFilesInfo.put(dataFile, reader.getFileRange()); - } catch (IOException ioe) { - LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException ioe) { - LOG.warn("failed to close " + dataFile, ioe); - } - } - } - - } - - long t2 = System.currentTimeMillis(); - - String message = String.format("Found first and last keys for %d data files in %6.2f secs", - dataFiles.size(), (t2 - t1) / 1000.0); - if (t2 - t1 > 500) { - LOG.debug(message); - } else { - LOG.trace(message); - } - - return dataFilesInfo; - } - - private static class CacheKey { - - final TableId tableId; - final TabletFile tabletFile; - - public CacheKey(TableId tableId, TabletFile tabletFile) { - this.tableId = tableId; - this.tabletFile = tabletFile; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CacheKey cacheKey = (CacheKey) o; - return Objects.equals(tableId, cacheKey.tableId) - && Objects.equals(tabletFile, cacheKey.tabletFile); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, tabletFile); - } - - } - - final LoadingCache splitFileCache; - public Splitter(Manager manager) { this.manager = manager; ServerContext context = manager.getContext(); @@ -198,21 +108,6 @@ public Splitter(Manager manager) { this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1) .numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build(); - Weigher weigher = (key, frange) -> key.tableId.canonical() - .length() + key.tabletFile.getPath().toString().length() - + (frange.empty ? 0 - : frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength()); - - CacheLoader loader = key -> { - TableConfiguration tableConf = context.getTableConfiguration(key.tableId); - return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) - .get(key.tabletFile); - }; - - splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true) - .expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher) - .build(loader); - } public synchronized void start() { @@ -223,10 +118,6 @@ public synchronized void stop() { splitExecutor.shutdownNow(); } - public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) { - return splitFileCache.get(new CacheKey(tableId, tabletFile)); - } - public void initiateSplit(KeyExtent extent) { // Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste // work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/FateEnv.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/FateEnv.java index 6b78e503e4c..e76a613f37b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/FateEnv.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/FateEnv.java @@ -27,7 +27,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.EventPublisher; -import org.apache.accumulo.manager.split.Splitter; +import org.apache.accumulo.manager.split.SplitFileCache; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.tables.TableManager; @@ -55,7 +55,7 @@ public interface FateEnv { ExecutorService getTabletRefreshThreadPool(); - Splitter getSplitter(); + SplitFileCache getSplitFileCache(); ExecutorService getRenamePool(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index f3147165b16..4c9b1f11c47 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -111,7 +111,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { var newTablets = splitInfo.getTablets(); var newTabletsFiles = getNewTabletFiles(fateId, newTablets, tabletMetadata, - file -> env.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file)); + file -> env.getSplitFileCache().getCachedFileInfo(splitInfo.getOriginal().tableId(), file)); addNewTablets(fateId, env, tabletMetadata, opid, newTablets, newTabletsFiles); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 0e242f75e04..eae62009f72 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -66,7 +66,7 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.split.Splitter; +import org.apache.accumulo.manager.split.SplitFileCache; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; import org.apache.hadoop.fs.Path; @@ -237,12 +237,16 @@ public void testManyColumns() throws Exception { EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); Ample ample = EasyMock.mock(Ample.class); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); - Splitter splitter = EasyMock.mock(Splitter.class); - EasyMock.expect(splitter.getCachedFileInfo(tableId, file1)).andReturn(newFileInfo("a", "z")); - EasyMock.expect(splitter.getCachedFileInfo(tableId, file2)).andReturn(newFileInfo("a", "b")); - EasyMock.expect(splitter.getCachedFileInfo(tableId, file3)).andReturn(newFileInfo("d", "f")); - EasyMock.expect(splitter.getCachedFileInfo(tableId, file4)).andReturn(newFileInfo("d", "j")); - EasyMock.expect(manager.getSplitter()).andReturn(splitter).atLeastOnce(); + SplitFileCache splitFileCache = EasyMock.mock(SplitFileCache.class); + EasyMock.expect(splitFileCache.getCachedFileInfo(tableId, file1)) + .andReturn(newFileInfo("a", "z")); + EasyMock.expect(splitFileCache.getCachedFileInfo(tableId, file2)) + .andReturn(newFileInfo("a", "b")); + EasyMock.expect(splitFileCache.getCachedFileInfo(tableId, file3)) + .andReturn(newFileInfo("d", "f")); + EasyMock.expect(splitFileCache.getCachedFileInfo(tableId, file4)) + .andReturn(newFileInfo("d", "j")); + EasyMock.expect(manager.getSplitFileCache()).andReturn(splitFileCache).atLeastOnce(); EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); @@ -389,8 +393,8 @@ public void testManyColumns() throws Exception { tabletsMutator.close(); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator, - tablet2Mutator, tablet3Mutator, cr, compactions); + EasyMock.replay(manager, context, ample, tabletMeta, splitFileCache, tabletsMutator, + tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet SortedSet splits = new TreeSet<>(List.of(newExtent1.endRow(), newExtent2.endRow())); @@ -399,8 +403,8 @@ public void testManyColumns() throws Exception { List.of(dir1, dir2)); updateTablets.call(fateId, manager); - EasyMock.verify(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator, - tablet2Mutator, tablet3Mutator, cr, compactions); + EasyMock.verify(manager, context, ample, tabletMeta, splitFileCache, tabletsMutator, + tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); } @Test