Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.SplitBlobResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import javax.annotation.Nullable;

/** Downloads blobs by sequentially fetching chunks via the SplitBlob API. */
public class ChunkedBlobDownloader {
private final GrpcCacheClient grpcCacheClient;
private final CombinedCache combinedCache;
private final DigestUtil digestUtil;

public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient, CombinedCache combinedCache) {
public ChunkedBlobDownloader(
GrpcCacheClient grpcCacheClient, CombinedCache combinedCache, DigestUtil digestUtil) {
this.grpcCacheClient = grpcCacheClient;
this.combinedCache = combinedCache;
this.digestUtil = digestUtil;
}

/**
Expand All @@ -43,14 +51,23 @@ public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient, CombinedCache comb
public void downloadChunked(
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
throws IOException, InterruptedException {
@Nullable DigestOutputStream digestOut = null;
if (grpcCacheClient.shouldVerifyDownloads()) {
digestOut = digestUtil.newDigestOutputStream(out);
out = digestOut;
}

List<Digest> chunkDigests = getChunkDigests(context, blobDigest);
downloadAndReassembleChunks(context, chunkDigests, out);
if (digestOut != null) {
Utils.verifyBlobContents(blobDigest, digestOut.digest());
}
}

private List<Digest> getChunkDigests(RemoteActionExecutionContext context, Digest blobDigest)
throws IOException, InterruptedException {
if (blobDigest.getSizeBytes() == 0) {
return List.of();
return ImmutableList.of();
}
ListenableFuture<SplitBlobResponse> splitResponseFuture =
grpcCacheClient.splitBlob(context, blobDigest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ boolean supported() throws IOException {
synchronized (this) {
config = ChunkingConfig.fromServerCapabilities(getRemoteServerCapabilities());
if (config != null) {
downloader = new ChunkedBlobDownloader(grpcClient, CombinedCache.this);
downloader = new ChunkedBlobDownloader(grpcClient, CombinedCache.this, digestUtil);
uploader = new ChunkedBlobUploader(grpcClient, CombinedCache.this, config, digestUtil);
}
initialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public class GrpcCacheClient implements RemoteCacheClient, MissingDigestsFinder

private final AtomicBoolean closed = new AtomicBoolean();

boolean shouldVerifyDownloads() {
return options.remoteVerifyDownloads;
}

@VisibleForTesting
public GrpcCacheClient(
ReferenceCountedChannel channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import build.bazel.remote.execution.v2.SplitBlobResponse;
import com.google.common.util.concurrent.Futures;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
Expand Down Expand Up @@ -56,7 +57,8 @@ public class ChunkedBlobDownloaderTest {

@Before
public void setUp() {
downloader = new ChunkedBlobDownloader(grpcCacheClient, combinedCache);
when(grpcCacheClient.shouldVerifyDownloads()).thenReturn(true);
downloader = new ChunkedBlobDownloader(grpcCacheClient, combinedCache, DIGEST_UTIL);
}

@Test
Expand Down Expand Up @@ -184,4 +186,56 @@ public void downloadChunked_chunkFailsAfterPartialWrite_throwsIOException() thro
ByteArrayOutputStream out = new ByteArrayOutputStream();
assertThrows(IOException.class, () -> downloader.downloadChunked(context, blobDigest, out));
}

@Test
public void downloadChunked_blobDigestMismatch_throwsOutputDigestMismatch() throws Exception {
byte[] chunkData = new byte[] {1, 2, 3};
Digest chunkDigest = DIGEST_UTIL.compute(chunkData);
Digest blobDigest = DIGEST_UTIL.compute(new byte[] {4, 5, 6});

SplitBlobResponse splitResponse =
SplitBlobResponse.newBuilder().addChunkDigests(chunkDigest).build();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));
when(combinedCache.downloadBlob(any(), eq(chunkDigest), any()))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunkData);
return Futures.immediateFuture(null);
});

OutputDigestMismatchException e =
assertThrows(
OutputDigestMismatchException.class,
() -> downloader.downloadChunked(context, blobDigest, new ByteArrayOutputStream()));

assertThat(e).hasMessageThat().contains(blobDigest.getHash());
assertThat(e).hasMessageThat().contains(chunkDigest.getHash());
}

@Test
public void downloadChunked_blobDigestMismatchVerificationDisabled_succeeds() throws Exception {
when(grpcCacheClient.shouldVerifyDownloads()).thenReturn(false);
byte[] chunkData = new byte[] {1, 2, 3};
Digest chunkDigest = DIGEST_UTIL.compute(chunkData);
Digest blobDigest = DIGEST_UTIL.compute(new byte[] {4, 5, 6});

SplitBlobResponse splitResponse =
SplitBlobResponse.newBuilder().addChunkDigests(chunkDigest).build();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));
when(combinedCache.downloadBlob(any(), eq(chunkDigest), any()))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunkData);
return Futures.immediateFuture(null);
});

ByteArrayOutputStream out = new ByteArrayOutputStream();
downloader.downloadChunked(context, blobDigest, out);

assertThat(out.toByteArray()).isEqualTo(chunkData);
}
}