Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
Expand Down Expand Up @@ -67,8 +68,15 @@ private List<Digest> getChunkDigests(RemoteActionExecutionContext context, Diges
private void downloadAndReassembleChunks(
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out)
throws IOException, InterruptedException {
OutputStream chunkOut =
new FilterOutputStream(out) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.out.write(b, off, len);
}
};
for (Digest chunkDigest : chunkDigests) {
getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out));
getFromFuture(combinedCache.downloadBlob(context, chunkDigest, chunkOut));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.SplitBlobResponse;
import com.google.common.util.concurrent.Futures;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.MaybePathBacked;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -79,13 +85,7 @@ public void downloadChunked_singleChunk_downloadsAndReassembles() throws Excepti
SplitBlobResponse.newBuilder().addChunkDigests(chunkDigest).build();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));
when(combinedCache.downloadBlob(any(), eq(chunkDigest), any()))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunkData);
return Futures.immediateFuture(null);
});
whenDownloadBlobWrites(chunkDigest, chunkData);

ByteArrayOutputStream out = new ByteArrayOutputStream();
downloader.downloadChunked(context, blobDigest, out);
Expand All @@ -111,45 +111,38 @@ public void downloadChunked_multipleChunks_downloadsAndReassemblesInOrder() thro
.build();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));
when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any()))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunk1Data);
return Futures.immediateFuture(null);
});
when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any()))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunk2Data);
return Futures.immediateFuture(null);
});
when(combinedCache.downloadBlob(any(), eq(chunk3Digest), any()))
Map<Digest, byte[]> chunks =
Map.of(
chunk1Digest, chunk1Data,
chunk2Digest, chunk2Data,
chunk3Digest, chunk3Data);

// Per-chunk downloads must not see the final output's backing path.
when(combinedCache.downloadBlob(any(), any(Digest.class), any(OutputStream.class)))
.thenAnswer(
invocation -> {
byte[] data = chunks.get(invocation.getArgument(1));
assertThat(data).isNotNull();
OutputStream out = invocation.getArgument(2);
out.write(chunk3Data);
assertThat(out).isNotInstanceOf(MaybePathBacked.class);
out.write(data);
return Futures.immediateFuture(null);
});

ByteArrayOutputStream out = new ByteArrayOutputStream();
InMemoryFileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
PathBackedOutputStream out = new PathBackedOutputStream(fs.getPath("/output"));
downloader.downloadChunked(context, blobDigest, out);

assertThat(out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9});
verify(combinedCache).downloadBlob(any(), eq(chunk1Digest), any());
verify(combinedCache).downloadBlob(any(), eq(chunk2Digest), any());
verify(combinedCache).downloadBlob(any(), eq(chunk3Digest), any());
verify(combinedCache).downloadBlob(any(), eq(chunk1Digest), any(OutputStream.class));
verify(combinedCache).downloadBlob(any(), eq(chunk2Digest), any(OutputStream.class));
verify(combinedCache).downloadBlob(any(), eq(chunk3Digest), any(OutputStream.class));
}

@Test
public void downloadChunked_emptyChunkList_producesEmptyOutput() throws Exception {
Digest blobDigest = DIGEST_UTIL.compute(new byte[0]);

SplitBlobResponse splitResponse = SplitBlobResponse.getDefaultInstance();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));

ByteArrayOutputStream out = new ByteArrayOutputStream();
downloader.downloadChunked(context, blobDigest, out);

Expand All @@ -171,17 +164,36 @@ public void downloadChunked_chunkFailsAfterPartialWrite_throwsIOException() thro
.build();
when(grpcCacheClient.splitBlob(any(), eq(blobDigest)))
.thenReturn(Futures.immediateFuture(splitResponse));
when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any()))
whenDownloadBlobWrites(chunk1Digest, chunk1Data);
when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any(OutputStream.class)))
.thenReturn(Futures.immediateFailedFuture(new IOException("connection reset")));

ByteArrayOutputStream out = new ByteArrayOutputStream();
assertThrows(IOException.class, () -> downloader.downloadChunked(context, blobDigest, out));
}

private void whenDownloadBlobWrites(Digest digest, byte[] data) {
when(combinedCache.downloadBlob(any(), eq(digest), any(OutputStream.class)))
.thenAnswer(
invocation -> {
OutputStream out = invocation.getArgument(2);
out.write(chunk1Data);
return Futures.immediateFuture(null);
out.write(data);
return Futures.immediateVoidFuture();
});
when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any()))
.thenReturn(Futures.immediateFailedFuture(new IOException("connection reset")));
}

ByteArrayOutputStream out = new ByteArrayOutputStream();
assertThrows(IOException.class, () -> downloader.downloadChunked(context, blobDigest, out));
private static class PathBackedOutputStream extends ByteArrayOutputStream
implements MaybePathBacked {
private final Path path;

PathBackedOutputStream(Path path) {
this.path = path;
}

@Nullable
@Override
public Path maybeGetPath() {
return path;
}
}
}
Loading