Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.split.SplitFileCache;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.tableOps.FateEnv;
Expand Down Expand Up @@ -557,12 +558,17 @@ ManagerGoalState getManagerGoalState() {
}

private Splitter splitter;
private SplitFileCache splitFileCache;

@Override
public Splitter getSplitter() {
return splitter;
}

@Override
public SplitFileCache getSplitFileCache() {
return splitFileCache;
}

public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
return upgradeCoordinator.getStatus();
}
Expand Down Expand Up @@ -1118,6 +1124,7 @@ boolean canSuspendTablets() {

this.splitter = new Splitter(this);
this.splitter.start();
this.splitFileCache = new SplitFileCache(context);

try {
Predicate<ZooUtil.LockID> isLockHeld =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.split;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;

public class SplitFileCache {

private static final Logger LOG = LoggerFactory.getLogger(SplitFileCache.class);

private static class CacheKey {

final TableId tableId;
final TabletFile tabletFile;

public CacheKey(TableId tableId, TabletFile tabletFile) {
this.tableId = tableId;
this.tabletFile = tabletFile;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(tableId, cacheKey.tableId)
&& Objects.equals(tabletFile, cacheKey.tabletFile);
}

@Override
public int hashCode() {
return Objects.hash(tableId, tabletFile);
}

}

public static <T extends TabletFile> Map<T,FileSKVIterator.FileRange> tryToGetFirstAndLastRows(
ServerContext context, TableConfiguration tableConf, Set<T> dataFiles) {

HashMap<T,FileSKVIterator.FileRange> dataFilesInfo = new HashMap<>();

long t1 = System.currentTimeMillis();

for (T dataFile : dataFiles) {

FileSKVIterator reader = null;
FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath());
try {
reader = FileOperations.getInstance().newReaderBuilder()
.forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService())
.withTableConfiguration(tableConf).build();

dataFilesInfo.put(dataFile, reader.getFileRange());
} catch (IOException ioe) {
LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
LOG.warn("failed to close " + dataFile, ioe);
}
}
}

}

long t2 = System.currentTimeMillis();

String message = String.format("Found first and last keys for %d data files in %6.2f secs",
dataFiles.size(), (t2 - t1) / 1000.0);
if (t2 - t1 > 500) {
LOG.debug(message);
} else {
LOG.trace(message);
}

return dataFilesInfo;
}

final LoadingCache<CacheKey,FileSKVIterator.FileRange> splitFileCache;

public SplitFileCache(ServerContext context) {
Weigher<CacheKey,FileSKVIterator.FileRange> weigher = (key, frange) -> key.tableId.canonical()
.length() + key.tabletFile.getPath().toString().length()
+ (frange.empty ? 0
: frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength());

CacheLoader<CacheKey,FileSKVIterator.FileRange> loader = key -> {
TableConfiguration tableConf = context.getTableConfiguration(key.tableId);
return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile))
.get(key.tabletFile);
};

splitFileCache = context.getCaches().createNewBuilder(Caches.CacheName.SPLITTER_FILES, true)
.expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher)
.build(loader);
}

public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
return splitFileCache.get(new CacheKey(tableId, tabletFile));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,23 @@

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.split.FindSplits;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;

public class Splitter {

private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
Expand Down Expand Up @@ -115,104 +101,13 @@ private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> split
}
}

public static <T extends TabletFile> Map<T,FileSKVIterator.FileRange> tryToGetFirstAndLastRows(
ServerContext context, TableConfiguration tableConf, Set<T> dataFiles) {

HashMap<T,FileSKVIterator.FileRange> dataFilesInfo = new HashMap<>();

long t1 = System.currentTimeMillis();

for (T dataFile : dataFiles) {

FileSKVIterator reader = null;
FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath());
try {
reader = FileOperations.getInstance().newReaderBuilder()
.forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService())
.withTableConfiguration(tableConf).build();

dataFilesInfo.put(dataFile, reader.getFileRange());
} catch (IOException ioe) {
LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
LOG.warn("failed to close " + dataFile, ioe);
}
}
}

}

long t2 = System.currentTimeMillis();

String message = String.format("Found first and last keys for %d data files in %6.2f secs",
dataFiles.size(), (t2 - t1) / 1000.0);
if (t2 - t1 > 500) {
LOG.debug(message);
} else {
LOG.trace(message);
}

return dataFilesInfo;
}

private static class CacheKey {

final TableId tableId;
final TabletFile tabletFile;

public CacheKey(TableId tableId, TabletFile tabletFile) {
this.tableId = tableId;
this.tabletFile = tabletFile;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(tableId, cacheKey.tableId)
&& Objects.equals(tabletFile, cacheKey.tabletFile);
}

@Override
public int hashCode() {
return Objects.hash(tableId, tabletFile);
}

}

final LoadingCache<CacheKey,FileSKVIterator.FileRange> splitFileCache;

public Splitter(Manager manager) {
this.manager = manager;
ServerContext context = manager.getContext();

this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1)
.numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build();

Weigher<CacheKey,FileSKVIterator.FileRange> weigher = (key, frange) -> key.tableId.canonical()
.length() + key.tabletFile.getPath().toString().length()
+ (frange.empty ? 0
: frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength());

CacheLoader<CacheKey,FileSKVIterator.FileRange> loader = key -> {
TableConfiguration tableConf = context.getTableConfiguration(key.tableId);
return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile))
.get(key.tabletFile);
};

splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true)
.expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher)
.build(loader);

}

public synchronized void start() {
Expand All @@ -223,10 +118,6 @@ public synchronized void stop() {
splitExecutor.shutdownNow();
}

public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
return splitFileCache.get(new CacheKey(tableId, tabletFile));
}

public void initiateSplit(KeyExtent extent) {
// Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste
// work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.EventPublisher;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.split.SplitFileCache;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tables.TableManager;
Expand Down Expand Up @@ -55,7 +55,7 @@ public interface FateEnv {

ExecutorService getTabletRefreshThreadPool();

Splitter getSplitter();
SplitFileCache getSplitFileCache();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is the driver behind this entire PR. Needed to expose a more narrow type here for fate. Found this while experimenting with distributed fate.


ExecutorService getRenamePool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var newTablets = splitInfo.getTablets();

var newTabletsFiles = getNewTabletFiles(fateId, newTablets, tabletMetadata,
file -> env.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));
file -> env.getSplitFileCache().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));

addNewTablets(fateId, env, tabletMetadata, opid, newTablets, newTabletsFiles);

Expand Down
Loading