Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Digest;
Expand All @@ -27,11 +28,20 @@
import com.google.devtools.build.lib.remote.util.Utils;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;

/** Downloads blobs by sequentially fetching chunks via the SplitBlob API. */
/** Downloads blobs by fetching chunks through a per-blob sliding window via the SplitBlob API. */
public class ChunkedBlobDownloader {
// Guard against pathological fanout from a single large chunked blob. This is only a per-blob
// cap; chunk requests still flow through CombinedCache and the shared remote cache transport
// stack below it, which is what bounds active remote RPC concurrency across blobs.
private static final int MAX_IN_FLIGHT_CHUNK_DOWNLOADS = 16;
Comment thread
tyler-french marked this conversation as resolved.

Comment thread
tyler-french marked this conversation as resolved.
private final GrpcCacheClient grpcCacheClient;
private final CombinedCache combinedCache;
private final DigestUtil digestUtil;
Expand All @@ -45,8 +55,8 @@ public ChunkedBlobDownloader(

/**
* Downloads a blob using chunked download via the SplitBlob API. This should be called with
* virtual threads, as it blocks on futures via {@link
* com.google.devtools.build.lib.remote.util.Utils#getFromFuture}.
* virtual threads, as it may block while waiting for chunk metadata and completed chunk
* downloads.
*/
public void downloadChunked(
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
Expand Down Expand Up @@ -81,11 +91,135 @@ private List<Digest> getChunkDigests(RemoteActionExecutionContext context, Diges
return chunkDigests;
}

private static final class PendingDownload {
private final Digest digest;
private final ListenableFuture<byte[]> future;
private final List<Integer> chunkIndices = new ArrayList<>(1);

PendingDownload(Digest digest, ListenableFuture<byte[]> future, int firstChunkIndex) {
this.digest = digest;
this.future = future;
chunkIndices.add(firstChunkIndex);
}

void addChunkIndex(int chunkIndex) {
chunkIndices.add(chunkIndex);
}

Digest digest() {
return digest;
}

ListenableFuture<byte[]> future() {
return future;
}

List<Integer> chunkIndices() {
return chunkIndices;
}
}

private void downloadAndReassembleChunks(
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out)
throws IOException, InterruptedException {
for (Digest chunkDigest : chunkDigests) {
getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out));
new DownloadSession(context, chunkDigests, out).run();
}

private final class DownloadSession {
private final LinkedBlockingQueue<PendingDownload> completedDownloads =
new LinkedBlockingQueue<>();
private final Map<Digest, PendingDownload> activeDownloads =
new HashMap<>(MAX_IN_FLIGHT_CHUNK_DOWNLOADS);
private final Map<Integer, byte[]> readyChunks =
new HashMap<>(MAX_IN_FLIGHT_CHUNK_DOWNLOADS);
private final RemoteActionExecutionContext context;
private final List<Digest> chunkDigests;
private final OutputStream out;
private int nextToStart = 0;
private int nextToWrite = 0;

DownloadSession(
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out) {
this.context = context;
this.chunkDigests = chunkDigests;
this.out = out;
}

void run() throws IOException, InterruptedException {
try {
fillWindow();
while (nextToWrite < chunkDigests.size()) {
drainCompletedDownloads();
drainReadyChunks();
fillWindow();
}
} finally {
cancelAllDownloads();
}
}

private void fillWindow() {
while (nextToStart < chunkDigests.size()) {
if (nextToStart - nextToWrite >= MAX_IN_FLIGHT_CHUNK_DOWNLOADS) {
return;
}
Digest chunkDigest = chunkDigests.get(nextToStart);
PendingDownload existing = activeDownloads.get(chunkDigest);
if (existing != null) {
existing.addChunkIndex(nextToStart);
nextToStart++;
continue;
Comment thread
tyler-french marked this conversation as resolved.
}
startDownload(chunkDigest, nextToStart);
nextToStart++;
}
}

private void startDownload(Digest chunkDigest, int chunkIndex) {
PendingDownload download =
new PendingDownload(
chunkDigest, combinedCache.downloadBlob(context, chunkDigest), chunkIndex);
activeDownloads.put(chunkDigest, download);
download.future().addListener(() -> completedDownloads.add(download), directExecutor());
}

private void drainCompletedDownloads() throws IOException, InterruptedException {
PendingDownload download = completedDownloads.take();
do {
processCompletedDownload(download);
download = completedDownloads.poll();
} while (download != null);
}

private void processCompletedDownload(PendingDownload download)
throws IOException, InterruptedException {
activeDownloads.remove(download.digest());
byte[] chunkData = getFromFuture(download.future());
for (int chunkIndex : download.chunkIndices()) {
Comment thread
tyler-french marked this conversation as resolved.
if (chunkIndex == nextToWrite) {
out.write(chunkData);
nextToWrite++;
} else {
readyChunks.put(chunkIndex, chunkData);
}
}
}

private void drainReadyChunks() throws IOException {
while (true) {
byte[] chunk = readyChunks.remove(nextToWrite);
if (chunk == null) {
return;
}
out.write(chunk);
nextToWrite++;
}
}

private void cancelAllDownloads() {
for (PendingDownload download : activeDownloads.values()) {
download.future().cancel(/* mayInterruptIfRunning= */ true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,28 @@

package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Digest;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
import com.google.devtools.build.lib.remote.chunking.FastCdcChunker;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Uploads blobs in chunks using Content-Defined Chunking with FastCDC 2020.
Expand All @@ -44,6 +50,10 @@
* </ol>
*/
public class ChunkedBlobUploader {
// Guard against pathological fanout from a single large chunked blob. This is only a per-blob
// cap; chunk uploads still flow through CombinedCache and the shared remote cache transport
// stack below it, which is what bounds active remote RPC concurrency across blobs.
private static final int MAX_IN_FLIGHT_CHUNK_UPLOADS = 16;
Comment thread
tyler-french marked this conversation as resolved.

private final GrpcCacheClient grpcCacheClient;
private final CombinedCache combinedCache;
Expand Down Expand Up @@ -104,18 +114,139 @@ private void uploadMissingChunks(
if (missingDigests.isEmpty()) {
return;
}
new UploadSession(context, missingDigests, chunkDigests).run(file);
}

Set<Digest> uploaded = new HashSet<>();
try (InputStream input = file.getInputStream()) {
for (Digest chunkDigest : chunkDigests) {
if (missingDigests.contains(chunkDigest) && uploaded.add(chunkDigest)) {
ByteString.Output out = ByteString.newOutput((int) chunkDigest.getSizeBytes());
ByteStreams.limit(input, chunkDigest.getSizeBytes()).transferTo(out);
getFromFuture(combinedCache.uploadBlob(context, chunkDigest, out.toByteString()));
} else {
input.skipNBytes(chunkDigest.getSizeBytes());
private final class UploadSession {
private final LinkedBlockingQueue<ListenableFuture<Void>> completedUploads =
new LinkedBlockingQueue<>();
private final Set<ListenableFuture<Void>> inFlightUploads =
new HashSet<>(MAX_IN_FLIGHT_CHUNK_UPLOADS);
private final Set<Digest> scheduledDigests = new HashSet<>();
private final RemoteActionExecutionContext context;
private final ImmutableSet<Digest> missingDigests;
private final List<Digest> chunkDigests;

UploadSession(
RemoteActionExecutionContext context,
ImmutableSet<Digest> missingDigests,
List<Digest> chunkDigests) {
this.context = context;
this.missingDigests = missingDigests;
this.chunkDigests = chunkDigests;
}

void run(Path file) throws IOException, InterruptedException {
try {
long offset = 0;
for (Digest chunkDigest : chunkDigests) {
drainCompletedUploads();
long chunkOffset = offset;
offset += chunkDigest.getSizeBytes();
if (!shouldScheduleUpload(chunkDigest)) {
continue;
}
if (inFlightUploads.size() >= MAX_IN_FLIGHT_CHUNK_UPLOADS) {
awaitCompletedUpload();
}
startUpload(file, chunkOffset, chunkDigest);
}
while (!inFlightUploads.isEmpty()) {
awaitCompletedUpload();
}
} finally {
cancelAllUploads();
}
}

private boolean shouldScheduleUpload(Digest chunkDigest) {
return missingDigests.contains(chunkDigest) && scheduledDigests.add(chunkDigest);
}

private void startUpload(Path file, long chunkOffset, Digest chunkDigest) {
ListenableFuture<Void> upload =
combinedCache.uploadBlob(
context, chunkDigest, new ChunkBlob(file, chunkOffset, chunkDigest));
inFlightUploads.add(upload);
upload.addListener(() -> completedUploads.add(upload), directExecutor());
}

private void drainCompletedUploads() throws IOException, InterruptedException {
while (true) {
ListenableFuture<Void> upload = completedUploads.poll();
if (upload == null) {
return;
}
finishUpload(upload);
}
}

private void awaitCompletedUpload() throws IOException, InterruptedException {
finishUpload(completedUploads.take());
drainCompletedUploads();
}

private void finishUpload(ListenableFuture<Void> upload)
throws IOException, InterruptedException {
inFlightUploads.remove(upload);
getFromFuture(upload);
}

private void cancelAllUploads() {
for (ListenableFuture<Void> upload : inFlightUploads) {
upload.cancel(/* mayInterruptIfRunning= */ true);
}
}
}

private static final class ChunkBlob implements Blob {
private final Path file;
private final long offset;
private final Digest digest;

private ChunkBlob(Path file, long offset, Digest digest) {
this.file = file;
this.offset = offset;
this.digest = digest;
}

@Override
public InputStream get() throws IOException {
InputStream input = file.getInputStream();
boolean success = false;
try {
seekOrSkip(input, offset);
InputStream limitedInput = ByteStreams.limit(input, digest.getSizeBytes());
success = true;
return limitedInput;
} catch (EOFException e) {
throw new IOException("file was concurrently modified during upload: " + file, e);
} finally {
if (!success) {
input.close();
}
}
}

@Override
public String description() {
return "chunk %s at offset %d of file %s"
.formatted(DigestUtil.toString(digest), offset, file);
}
}

private static void seekOrSkip(InputStream input, long offset) throws IOException {
if (offset == 0) {
return;
}
if (input instanceof FileInputStream fileInputStream) {
FileChannel channel = fileInputStream.getChannel();
if (channel.size() < offset) {
throw new EOFException();
Comment thread
tyler-french marked this conversation as resolved.
}
channel.position(offset);
return;
Comment thread
tyler-french marked this conversation as resolved.
}
input.skipNBytes(offset);
}
}
Loading
Loading