Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
import de.ii.xtraplatform.blobs.domain.BlobSource;
import de.ii.xtraplatform.blobs.domain.BlobWriter;
import de.ii.xtraplatform.blobs.domain.ImmutableBlob;
import io.minio.GetObjectArgs;
import io.minio.GetObjectArgs.Builder;
import io.minio.ListObjectsArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.RemoveObjectArgs;
import io.minio.Result;
import io.minio.StatObjectArgs;
import io.minio.StatObjectResponse;
import io.minio.messages.Item;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
Expand All @@ -39,20 +37,29 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.ws.rs.core.EntityTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobSourceS3 implements BlobSource, BlobWriter, BlobLocals {

public class BlobSourceS3 implements BlobSource, BlobWriter, BlobLocals, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(BlobSourceS3.class);

private final MinioClient minioClient;
private final String bucket;
private final Path root;
@Nullable private final Path prefix;
private final BlobCache cache;
private final S3OperationsHelper s3Operations;
private final CacheOperationsHelper cacheOperations;
private final PathHelper pathHelper;

@Override
public void close() throws IOException {
if (minioClient != null) {
try {
minioClient.close();
} catch (Exception e) {
throw new IOException("Failed to close MinioClient", e);
}
}
}

public BlobSourceS3(MinioClient minioClient, String bucket, Path root, BlobCache cache) {
this(minioClient, bucket, root, cache, null);
Expand All @@ -62,69 +69,38 @@ public BlobSourceS3(
MinioClient minioClient, String bucket, Path root, BlobCache cache, Path prefix) {
this.minioClient = minioClient;
this.bucket = bucket;
this.root = root;
this.cache = cache;
this.prefix = prefix;
this.s3Operations = new S3OperationsHelper(minioClient, bucket);
this.cacheOperations = new CacheOperationsHelper(cache);
this.pathHelper = new PathHelper(root, prefix);
}

@Override
public boolean canHandle(Path path) {
return Objects.isNull(prefix) || path.startsWith(prefix);
return pathHelper.canHandle(path);
}

@Override
public boolean has(Path path) throws IOException {
return getStat(path).isPresent();
return s3Operations.getStat(pathHelper.full(path)).isPresent();
}

@Override
public Optional<InputStream> content(Path path) throws IOException {
return getCurrent(path);
return s3Operations.getCurrent(pathHelper.full(path));
}

@Override
public Optional<Blob> get(Path path) throws IOException {
return getStat(path)
.map(
stat ->
ImmutableBlob.of(
path,
stat.size(),
stat.lastModified().toInstant().toEpochMilli(),
Optional.of(new EntityTag(stat.etag())),
Optional.ofNullable(stat.contentType()),
supplierMayThrow(
() ->
content(path)
.orElseThrow(
() ->
new IOException(
"Unexpected error, could not get " + path)))));
}

@Override
public long size(Path path) throws IOException {
return getStat(path).map(StatObjectResponse::size).orElse(-1L);
}

@Override
public long lastModified(Path path) throws IOException {
return getStat(path).map(stat -> stat.lastModified().toInstant().toEpochMilli()).orElse(-1L);
}

// TODO: walkInfo
@Override
public Stream<Path> walk(Path path, int maxDepth, BiPredicate<Path, PathAttributes> matcher)
throws IOException {
if (!canHandle(path) || maxDepth <= 0) {
if (!pathHelper.canHandle(path) || maxDepth <= 0) {
return Stream.empty();
}

if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(MARKER.S3, "S3 walk {}", path);
}

Path prefix = Path.of(full(path));
Path prefix = Path.of(pathHelper.full(path));

Set<Path> paths = new HashSet<>();

Expand All @@ -138,7 +114,6 @@ public Stream<Path> walk(Path path, int maxDepth, BiPredicate<Path, PathAttribut
.build())
.spliterator();

// TODO: concat path?
return StreamSupport.stream(results, false)
.flatMap(
result -> {
Expand Down Expand Up @@ -195,7 +170,7 @@ public boolean isHidden() {

@Override
public void put(Path path, InputStream content) throws IOException {
if (!canHandle(path)) {
if (!pathHelper.canHandle(path)) {
return;
}

Expand All @@ -205,7 +180,7 @@ public void put(Path path, InputStream content) throws IOException {
}

minioClient.putObject(
PutObjectArgs.builder().bucket(bucket).object(full(path)).stream(
PutObjectArgs.builder().bucket(bucket).object(pathHelper.full(path)).stream(
buffer, buffer.available(), -1)
.build());
} catch (Throwable e) {
Expand All @@ -215,7 +190,7 @@ public void put(Path path, InputStream content) throws IOException {

@Override
public void delete(Path path) throws IOException {
if (!canHandle(path)) {
if (!pathHelper.canHandle(path)) {
return;
}

Expand All @@ -225,105 +200,61 @@ public void delete(Path path) throws IOException {
}

minioClient.removeObject(
RemoveObjectArgs.builder().bucket(bucket).object(full(path)).build());
RemoveObjectArgs.builder().bucket(bucket).object(pathHelper.full(path)).build());
} catch (Throwable e) {
throw new IOException("S3 Driver", e);
}
}

@Override
public Optional<Path> asLocalPath(Path path, boolean writable) throws IOException {
if (writable) {
throw new IOException("Local resources from S3 cannot be written to");
}

Optional<StatObjectResponse> stat = getStat(path);

if (stat.isPresent()) {
String eTag = stat.get().etag();
Optional<Path> cachePath = cache.get(path, eTag);

if (cachePath.isPresent()) {
if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(MARKER.S3, "S3 using local cache {}", cachePath.get());
}
return cachePath;
}

Optional<InputStream> content = content(path);

if (content.isPresent()) {
if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(MARKER.S3, "S3 updating local cache for {}", path);
}

return Optional.of(cache.put(path, eTag, content.get()))
.map(
p -> {
if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(MARKER.S3, "S3 updated local cache {}", p);
}

return p;
});
}
}

return Optional.empty();
public Optional<Blob> get(Path path) throws IOException {
return s3Operations
.getStat(pathHelper.full(path))
.map(
stat ->
ImmutableBlob.of(
path,
stat.size(),
stat.lastModified().toInstant().toEpochMilli(),
Optional.of(new EntityTag(stat.etag())),
Optional.ofNullable(stat.contentType()),
supplierMayThrow(
() ->
content(path)
.orElseThrow(
() ->
new IOException(
"Unexpected error, could not get " + path)))));
}

private Optional<StatObjectResponse> getStat(Path path) {
if (!canHandle(path)) {
return Optional.empty();
}

if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(MARKER.S3, "S3 get stat {}", path);
}

try {
return Optional.of(
minioClient.statObject(
StatObjectArgs.builder().bucket(bucket).object(full(path)).build()));
} catch (Throwable e) {
return Optional.empty();
}
@Override
public long size(Path path) throws IOException {
return s3Operations.getStat(pathHelper.full(path)).map(StatObjectResponse::size).orElse(-1L);
}

public Optional<InputStream> getCurrent(Path path) throws IOException {
return getByETag(path, null);
@Override
public long lastModified(Path path) throws IOException {
return s3Operations
.getStat(pathHelper.full(path))
.map(stat -> stat.lastModified().toInstant().toEpochMilli())
.orElse(-1L);
}

public Optional<InputStream> getByETag(Path path, String eTag) {
if (!canHandle(path)) {
return Optional.empty();
}

if (LOGGER.isDebugEnabled(MARKER.S3)) {
LOGGER.debug(
MARKER.S3,
"S3 get content {} {}",
path,
Objects.nonNull(eTag) ? "if-none-match " + eTag : "");
}

Builder builder = GetObjectArgs.builder().bucket(bucket).object(full(path));

if (Objects.nonNull(eTag)) {
builder.notMatchETag(eTag);
@Override
public Optional<Path> asLocalPath(Path path, boolean writable) throws IOException {
if (writable) {
throw new IOException("Local resources from S3 cannot be written to");
}

try {
return Optional.of(minioClient.getObject(builder.build()));
} catch (Throwable e) {
Optional<StatObjectResponse> stat = s3Operations.getStat(pathHelper.full(path));
if (stat.isEmpty()) {
return Optional.empty();
// throw new IOException("S3 Driver", e);
}
}

private String full(Path path) {
return Objects.isNull(prefix)
? root.resolve(path).toString()
: root.resolve(prefix.relativize(path)).toString();
String eTag = stat.get().etag();
Optional<Path> cachePath = cacheOperations.getCachedPath(path, eTag);
if (cachePath.isPresent()) {
return cachePath;
}
return cacheOperations.updateCacheAndReturnPath(path, eTag, content(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public boolean isAvailable(StoreSource storeSource) {
if (storeSource instanceof StoreSourceS3) {
try {
Tuple<MinioClient, String> client = getClient((StoreSourceS3) storeSource);
String bucket = client.second();

return client.first().bucketExists(BucketExistsArgs.builder().bucket(bucket).build());
try (MinioClient minioClient = client.first()) {
String bucket = client.second();
return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucket).build());
}
} catch (Throwable e) {
LogContext.error(LOGGER, e, "S3 Driver");
return false;
Expand All @@ -63,38 +64,39 @@ public boolean isAvailable(StoreSource storeSource) {
@Override
public BlobSource init(StoreSource storeSource, Content contentType) throws IOException {
Tuple<MinioClient, String> client = getClient((StoreSourceS3) storeSource);
MinioClient minioClient = client.first();
String bucket = client.second();
Path root = Path.of("");

if (!storeSource.isSingleContent()) {
root = root.resolve(contentType.getPrefix());
}

BlobSource blobSource =
storeSource.isSingleContent() && storeSource.getPrefix().isPresent()
? new BlobSourceS3(
client.first(), bucket, root, cache, Path.of(storeSource.getPrefix().get()))
: new BlobSourceS3(client.first(), bucket, root, cache);

return blobSource;
if (storeSource.isSingleContent() && storeSource.getPrefix().isPresent()) {
return new BlobSourceS3(
minioClient, bucket, root, cache, Path.of(storeSource.getPrefix().get()));
}
return new BlobSourceS3(minioClient, bucket, root, cache);
}

@SuppressWarnings("PMD.CloseResource")
private Tuple<MinioClient, String> getClient(StoreSourceS3 storeSource) {
boolean hasScheme = storeSource.getSrc().matches("^[a-zA-Z0-9]+://.*");
String scheme = storeSource.getInsecure() ? "http://" : "https://";
String source =
hasScheme ? storeSource.getSrc().replaceFirst("[a-zA-Z0-9]+://", "") : storeSource.getSrc();
URI uri = URI.create(scheme + source);

String host = uri.getHost();
int port = uri.getPort() == -1 ? (storeSource.getInsecure() ? 80 : 443) : uri.getPort();
String bucket = uri.getPath().replaceFirst("^/", "").replaceFirst("/$", "");

if (bucket.isEmpty()) {
throw new IllegalArgumentException(
"Bucket name cannot be empty (" + storeSource.getSrc() + ")");
}

String host = uri.getHost();
int port = uri.getPort() == -1 ? storeSource.getInsecure() ? 80 : 443 : uri.getPort();

MinioClient minioClient =
MinioClient.builder()
.endpoint(host, port, !storeSource.getInsecure())
Expand Down
Loading