diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java index c2fd63e6bfa4e5..6e897d68df7d97 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.List; @@ -67,8 +68,15 @@ private List getChunkDigests(RemoteActionExecutionContext context, Diges private void downloadAndReassembleChunks( RemoteActionExecutionContext context, List chunkDigests, OutputStream out) throws IOException, InterruptedException { + OutputStream chunkOut = + new FilterOutputStream(out) { + @Override + public void write(byte[] b, int off, int len) throws IOException { + this.out.write(b, off, len); + } + }; for (Digest chunkDigest : chunkDigests) { - getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out)); + getFromFuture(combinedCache.downloadBlob(context, chunkDigest, chunkOut)); } } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java index c3c01b23cd00d8..4c963e816aaa9c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java @@ -23,14 +23,20 @@ import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.SplitBlobResponse; import com.google.common.util.concurrent.Futures; +import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.MaybePathBacked; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.SyscallCache; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.Map; +import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -79,13 +85,7 @@ public void downloadChunked_singleChunk_downloadsAndReassembles() throws Excepti SplitBlobResponse.newBuilder().addChunkDigests(chunkDigest).build(); when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) .thenReturn(Futures.immediateFuture(splitResponse)); - when(combinedCache.downloadBlob(any(), eq(chunkDigest), any())) - .thenAnswer( - invocation -> { - OutputStream out = invocation.getArgument(2); - out.write(chunkData); - return Futures.immediateFuture(null); - }); + whenDownloadBlobWrites(chunkDigest, chunkData); ByteArrayOutputStream out = new ByteArrayOutputStream(); downloader.downloadChunked(context, blobDigest, out); @@ -111,45 +111,38 @@ public void downloadChunked_multipleChunks_downloadsAndReassemblesInOrder() thro .build(); when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) .thenReturn(Futures.immediateFuture(splitResponse)); - when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any())) - .thenAnswer( - invocation -> { - OutputStream out = invocation.getArgument(2); - out.write(chunk1Data); - return Futures.immediateFuture(null); - }); - when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any())) - .thenAnswer( - invocation -> { - OutputStream out = invocation.getArgument(2); - out.write(chunk2Data); - return Futures.immediateFuture(null); - }); - when(combinedCache.downloadBlob(any(), eq(chunk3Digest), any())) + Map chunks = + Map.of( + chunk1Digest, chunk1Data, + chunk2Digest, chunk2Data, + chunk3Digest, chunk3Data); + + // Per-chunk downloads must not see the final output's backing path. + when(combinedCache.downloadBlob(any(), any(Digest.class), any(OutputStream.class))) .thenAnswer( invocation -> { + byte[] data = chunks.get(invocation.getArgument(1)); + assertThat(data).isNotNull(); OutputStream out = invocation.getArgument(2); - out.write(chunk3Data); + assertThat(out).isNotInstanceOf(MaybePathBacked.class); + out.write(data); return Futures.immediateFuture(null); }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); + InMemoryFileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256); + PathBackedOutputStream out = new PathBackedOutputStream(fs.getPath("/output")); downloader.downloadChunked(context, blobDigest, out); assertThat(out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9}); - verify(combinedCache).downloadBlob(any(), eq(chunk1Digest), any()); - verify(combinedCache).downloadBlob(any(), eq(chunk2Digest), any()); - verify(combinedCache).downloadBlob(any(), eq(chunk3Digest), any()); + verify(combinedCache).downloadBlob(any(), eq(chunk1Digest), any(OutputStream.class)); + verify(combinedCache).downloadBlob(any(), eq(chunk2Digest), any(OutputStream.class)); + verify(combinedCache).downloadBlob(any(), eq(chunk3Digest), any(OutputStream.class)); } @Test public void downloadChunked_emptyChunkList_producesEmptyOutput() throws Exception { Digest blobDigest = DIGEST_UTIL.compute(new byte[0]); - SplitBlobResponse splitResponse = SplitBlobResponse.getDefaultInstance(); - when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) - .thenReturn(Futures.immediateFuture(splitResponse)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); downloader.downloadChunked(context, blobDigest, out); @@ -171,17 +164,36 @@ public void downloadChunked_chunkFailsAfterPartialWrite_throwsIOException() thro .build(); when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) .thenReturn(Futures.immediateFuture(splitResponse)); - when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any())) + whenDownloadBlobWrites(chunk1Digest, chunk1Data); + when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any(OutputStream.class))) + .thenReturn(Futures.immediateFailedFuture(new IOException("connection reset"))); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + assertThrows(IOException.class, () -> downloader.downloadChunked(context, blobDigest, out)); + } + + private void whenDownloadBlobWrites(Digest digest, byte[] data) { + when(combinedCache.downloadBlob(any(), eq(digest), any(OutputStream.class))) .thenAnswer( invocation -> { OutputStream out = invocation.getArgument(2); - out.write(chunk1Data); - return Futures.immediateFuture(null); + out.write(data); + return Futures.immediateVoidFuture(); }); - when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any())) - .thenReturn(Futures.immediateFailedFuture(new IOException("connection reset"))); + } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - assertThrows(IOException.class, () -> downloader.downloadChunked(context, blobDigest, out)); + private static class PathBackedOutputStream extends ByteArrayOutputStream + implements MaybePathBacked { + private final Path path; + + PathBackedOutputStream(Path path) { + this.path = path; + } + + @Nullable + @Override + public Path maybeGetPath() { + return path; + } } }