Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 42 additions & 41 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.v2.vo.BKey;
import net.spy.memcached.v2.vo.BTreeElement;
import net.spy.memcached.v2.vo.BTreeElements;
import net.spy.memcached.v2.vo.BTreeGetResult;
import net.spy.memcached.v2.vo.BTreePositionElement;
import net.spy.memcached.v2.vo.BTreeSMGetResult;
import net.spy.memcached.v2.vo.BTreeUpdateElement;
import net.spy.memcached.v2.vo.BopDeleteArgs;
import net.spy.memcached.v2.vo.BopGetArgs;
import net.spy.memcached.v2.vo.GetOption;
import net.spy.memcached.v2.vo.SMGetElements;

public class AsyncArcusCommands<T> implements AsyncArcusCommandsIF<T> {

Expand Down Expand Up @@ -1353,12 +1353,12 @@ public void gotData(String bKey, int flags, byte[] data, byte[] eFlag) {
}

@Override
public ArcusFuture<BTreeElements<T>> bopGet(String key, BKey from, BKey to, BopGetArgs args) {
public ArcusFuture<BTreeGetResult<T>> bopGet(String key, BKey from, BKey to, BopGetArgs args) {
verifyBKeyTypesMatch(from, to);

AbstractArcusResult<BTreeElements<T>> result =
new AbstractArcusResult<>(new AtomicReference<>(new BTreeElements<>(new ArrayList<>())));
ArcusFutureImpl<BTreeElements<T>> future = new ArcusFutureImpl<>(result);
AbstractArcusResult<BTreeGetResult<T>> result =
new AbstractArcusResult<>(new AtomicReference<>(new BTreeGetResult<>(new ArrayList<>())));
ArcusFutureImpl<BTreeGetResult<T>> future = new ArcusFutureImpl<>(result);
BTreeGet get = createBTreeGet(from, to, args);
ArcusClient client = arcusClientSupplier.get();

Expand Down Expand Up @@ -1426,9 +1426,9 @@ private static BTreeGet createBTreeGet(BKey from, BKey to, BopGetArgs args) {
}

@Override
public ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
BKey from, BKey to,
BopGetArgs args) {
public ArcusFuture<Map<String, BTreeGetResult<T>>> bopMultiGet(List<String> keys,
BKey from, BKey to,
BopGetArgs args) {
keyValidator.validateKey(keys);
keyValidator.checkDupKey(keys);
verifyBKeyTypesMatch(from, to);
Expand All @@ -1439,28 +1439,28 @@ public ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
client.groupingKeys(keys, ArcusClient.BOPGET_BULK_CHUNK_SIZE, APIType.BOP_GET);

Collection<CompletableFuture<?>> futures = new ArrayList<>();
Map<CompletableFuture<Map<String, BTreeElements<T>>>, List<String>> futureToKeys =
Map<CompletableFuture<Map<String, BTreeGetResult<T>>>, List<String>> futureToKeys =
new HashMap<>();

for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKeys) {
BTreeGetBulk<T> getBulk =
createBTreeGetBulk(entry.getKey(), entry.getValue(), from, to, args);
CompletableFuture<Map<String, BTreeElements<T>>> future =
CompletableFuture<Map<String, BTreeGetResult<T>>> future =
bopMultiGetPerNode(client, getBulk).toCompletableFuture();
futureToKeys.put(future, entry.getValue());
futures.add(future);
}

return new ArcusMultiFuture<>(futures, () -> {
Map<String, BTreeElements<T>> results = new HashMap<>();
for (Map.Entry<CompletableFuture<Map<String, BTreeElements<T>>>, List<String>> entry
Map<String, BTreeGetResult<T>> results = new HashMap<>();
for (Map.Entry<CompletableFuture<Map<String, BTreeGetResult<T>>>, List<String>> entry
: futureToKeys.entrySet()) {
if (entry.getKey().isCompletedExceptionally()) {
for (String key : entry.getValue()) {
results.put(key, null);
}
} else {
Map<String, BTreeElements<T>> result = entry.getKey().join();
Map<String, BTreeGetResult<T>> result = entry.getKey().join();
if (result != null) {
results.putAll(result);
}
Expand All @@ -1470,11 +1470,11 @@ public ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
});
}

private ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGetPerNode(ArcusClient client,
BTreeGetBulk<T> getBulk) {
AbstractArcusResult<Map<String, BTreeElements<T>>> result =
private ArcusFuture<Map<String, BTreeGetResult<T>>> bopMultiGetPerNode(ArcusClient client,
BTreeGetBulk<T> getBulk) {
AbstractArcusResult<Map<String, BTreeGetResult<T>>> result =
new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>()));
ArcusFutureImpl<Map<String, BTreeElements<T>>> future = new ArcusFutureImpl<>(result);
ArcusFutureImpl<Map<String, BTreeGetResult<T>>> future = new ArcusFutureImpl<>(result);

BTreeGetBulkOperation.Callback cb = new BTreeGetBulkOperation.Callback() {
@Override
Expand Down Expand Up @@ -1504,18 +1504,18 @@ public void complete() {
public void gotKey(String key, int elementCount, OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.get().put(key, new BTreeElements<>(new ArrayList<>()));
result.get().put(key, new BTreeGetResult<>(new ArrayList<>()));
break;
case TRIMMED:
BTreeElements<T> elements = new BTreeElements<>(new ArrayList<>());
BTreeGetResult<T> elements = new BTreeGetResult<>(new ArrayList<>());
elements.trimmed();
result.get().put(key, elements);
break;
case ERR_NOT_FOUND:
break;
case ERR_NOT_FOUND_ELEMENT:
// Put empty BTreeElements for the BTree item key
result.get().put(key, new BTreeElements<>(new ArrayList<>()));
result.get().put(key, new BTreeGetResult<>(new ArrayList<>()));
break;
default:
/*
Expand All @@ -1529,7 +1529,7 @@ public void gotKey(String key, int elementCount, OperationStatus status) {
@Override
public void gotElement(String key, int flags, Object bKey, byte[] eFlag, byte[] data) {
CachedData cachedData = new CachedData(flags, data, tcForCollection.getMaxSize());
BTreeElements<T> elements = result.get().get(key);
BTreeGetResult<T> elements = result.get().get(key);
elements.addElement(new BTreeElement<>(
BKey.of(bKey), tcForCollection.decode(cachedData), eFlag));
}
Expand All @@ -1555,8 +1555,8 @@ private BTreeGetBulk<T> createBTreeGetBulk(MemcachedNode node, List<String> keys
}

@Override
public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
boolean unique, BopGetArgs args) {
public ArcusFuture<BTreeSMGetResult<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
boolean unique, BopGetArgs args) {
keyValidator.validateKey(keys);
keyValidator.checkDupKey(keys);
verifyBKeyTypesMatch(from, to);
Expand All @@ -1566,11 +1566,11 @@ public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey fro
Collection<Map.Entry<MemcachedNode, List<String>>> arrangedKeys =
client.groupingKeys(keys, ArcusClient.SMGET_CHUNK_SIZE, APIType.BOP_SMGET);

List<CompletableFuture<SMGetElements<T>>> smGetFutures = new ArrayList<>();
List<CompletableFuture<BTreeSMGetResult<T>>> smGetFutures = new ArrayList<>();

for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKeys) {
BTreeSMGet<T> smGet = createBTreeSMGet(from, to, args, unique, entry);
CompletableFuture<SMGetElements<T>> future =
CompletableFuture<BTreeSMGetResult<T>> future =
bopSortMergeGetPerNode(client, smGet).toCompletableFuture();
smGetFutures.add(future);
}
Expand All @@ -1579,29 +1579,30 @@ public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey fro
Collection<CompletableFuture<?>> futures =
(Collection<CompletableFuture<?>>) (Collection<?>) smGetFutures;
return new ArcusMultiFuture<>(futures, () -> {
List<SMGetElements<T>> results = new ArrayList<>();
for (CompletableFuture<SMGetElements<T>> future : smGetFutures) {
List<BTreeSMGetResult<T>> results = new ArrayList<>();
for (CompletableFuture<BTreeSMGetResult<T>> future : smGetFutures) {
if (!future.isCompletedExceptionally()) {
results.add(future.join());
}
}
return SMGetElements.mergeSMGetElements(results, from.compareTo(to) <= 0, unique,
return BTreeSMGetResult.mergeSMGetElements(results, from.compareTo(to) <= 0, unique,
args.getCount());
});
}

private ArcusFuture<SMGetElements<T>> bopSortMergeGetPerNode(ArcusClient client,
BTreeSMGet<T> smGet) {
List<SMGetElements.Element<T>> elementList = new ArrayList<>();
List<SMGetElements.MissedKey> missedKeys = new ArrayList<>();
List<SMGetElements.TrimmedKey> trimmedKeys = new ArrayList<>();
SMGetElements<T> smGetElements = new SMGetElements<>(elementList, missedKeys, trimmedKeys);
private ArcusFuture<BTreeSMGetResult<T>> bopSortMergeGetPerNode(ArcusClient client,
BTreeSMGet<T> smGet) {
List<BTreeSMGetResult.Element<T>> elementList = new ArrayList<>();
List<BTreeSMGetResult.MissedKey> missedKeys = new ArrayList<>();
List<BTreeSMGetResult.TrimmedKey> trimmedKeys = new ArrayList<>();
BTreeSMGetResult<T> smGetElements = new BTreeSMGetResult<>(
elementList, missedKeys, trimmedKeys);

AtomicReference<SMGetElements<T>> atomicReference = new AtomicReference<>(smGetElements);
AbstractArcusResult<SMGetElements<T>> result =
AtomicReference<BTreeSMGetResult<T>> atomicReference = new AtomicReference<>(smGetElements);
AbstractArcusResult<BTreeSMGetResult<T>> result =
new AbstractArcusResult<>(atomicReference);

ArcusFutureImpl<SMGetElements<T>> future = new ArcusFutureImpl<>(result);
ArcusFutureImpl<BTreeSMGetResult<T>> future = new ArcusFutureImpl<>(result);

BTreeSortMergeGetOperation.Callback cb = new BTreeSortMergeGetOperation.Callback() {
@Override
Expand Down Expand Up @@ -1635,17 +1636,17 @@ public void gotData(String key, int flags, Object bKey, byte[] eFlag, byte[] dat
CachedData cachedData = new CachedData(flags, data, tcForCollection.getMaxSize());
BTreeElement<T> btreeElement = new BTreeElement<>(
BKey.of(bKey), tcForCollection.decode(cachedData), eFlag);
elementList.add(new SMGetElements.Element<>(key, btreeElement));
elementList.add(new BTreeSMGetResult.Element<>(key, btreeElement));
}

@Override
public void gotMissedKey(String key, OperationStatus cause) {
missedKeys.add(new SMGetElements.MissedKey(key, cause.getStatusCode()));
missedKeys.add(new BTreeSMGetResult.MissedKey(key, cause.getStatusCode()));
}

@Override
public void gotTrimmedKey(String key, Object bKey) {
trimmedKeys.add(new SMGetElements.TrimmedKey(key, BKey.of(bKey)));
trimmedKeys.add(new BTreeSMGetResult.TrimmedKey(key, BKey.of(bKey)));
}
};
Operation op = client.getOpFact().bopsmget(smGet, cb);
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import net.spy.memcached.collection.ElementValueType;
import net.spy.memcached.v2.vo.BKey;
import net.spy.memcached.v2.vo.BTreeElement;
import net.spy.memcached.v2.vo.BTreeElements;
import net.spy.memcached.v2.vo.BTreeGetResult;
import net.spy.memcached.v2.vo.BTreePositionElement;
import net.spy.memcached.v2.vo.BTreeSMGetResult;
import net.spy.memcached.v2.vo.BTreeUpdateElement;
import net.spy.memcached.v2.vo.BopDeleteArgs;
import net.spy.memcached.v2.vo.BopGetArgs;
import net.spy.memcached.v2.vo.GetOption;
import net.spy.memcached.v2.vo.SMGetElements;

public interface AsyncArcusCommandsIF<T> {

Expand Down Expand Up @@ -690,11 +690,11 @@ ArcusFuture<Map.Entry<Boolean, BTreeElement<T>>> bopUpsertAndGetTrimmed(
* @param from BKey range start
* @param to BKey range end
* @param args arguments for get operation
* @return {@code BTreeElements} with found elements,
* empty {@code BTreeElements} if no elements are found in the range but key exists,
* @return {@code BTreeGetResult} with found elements,
* empty {@code BTreeGetResult} if no elements are found in the range but key exists,
* {@code null} if key is not found
*/
ArcusFuture<BTreeElements<T>> bopGet(String key, BKey from, BKey to, BopGetArgs args);
ArcusFuture<BTreeGetResult<T>> bopGet(String key, BKey from, BKey to, BopGetArgs args);

/**
* Get elements from multiple btree items.
Expand All @@ -703,11 +703,11 @@ ArcusFuture<Map.Entry<Boolean, BTreeElement<T>>> bopUpsertAndGetTrimmed(
* @param from BKey range start
* @param to BKey range end
* @param args arguments for get operation
* @return map of key to {@code BTreeElements} with found elements,
* empty {@code BTreeElements} if no elements are found in the range but key exists,
* @return map of key to {@code BTreeGetResult} with found elements,
* empty {@code BTreeGetResult} if no elements are found in the range but key exists,
* no {@code Map.Entry} in the map if the key is not found
*/
ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(
ArcusFuture<Map<String, BTreeGetResult<T>>> bopMultiGet(
List<String> keys, BKey from, BKey to, BopGetArgs args);

/**
Expand All @@ -718,10 +718,10 @@ ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(
* @param to BKey range end
* @param unique whether to return unique elements only
* @param args arguments for get operation
* @return {@code SMGetElements} containing sort-merged elements,
* empty {@code SMGetElements} if no matching elements exist
* @return {@code BTreeSMGetResult} containing sort-merged elements,
* empty {@code BTreeSMGetResult} if no matching elements exist
*/
ArcusFuture<SMGetElements<T>> bopSortMergeGet(
ArcusFuture<BTreeSMGetResult<T>> bopSortMergeGet(
List<String> keys, BKey from, BKey to, boolean unique, BopGetArgs args);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.util.Collections;
import java.util.List;

public final class BTreeElements<V> {
public final class BTreeGetResult<V> {
private boolean isTrimmed;
private final List<BTreeElement<V>> elements;

public BTreeElements(List<BTreeElement<V>> elements) {
public BTreeGetResult(List<BTreeElement<V>> elements) {
if (elements == null) {
throw new IllegalArgumentException("Elements map cannot be null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

import net.spy.memcached.ops.StatusCode;

public final class SMGetElements<V> {
public final class BTreeSMGetResult<V> {
private final List<Element<V>> elements;
private final List<MissedKey> missedKeys;
private final List<TrimmedKey> trimmedKeys;

public SMGetElements(List<Element<V>> elements,
List<MissedKey> missedKeys,
List<TrimmedKey> trimmedKeys) {
public BTreeSMGetResult(List<Element<V>> elements,
List<MissedKey> missedKeys,
List<TrimmedKey> trimmedKeys) {
if (elements == null || missedKeys == null || trimmedKeys == null) {
throw new IllegalArgumentException("Arguments cannot be null");
}
Expand All @@ -25,9 +25,9 @@ public SMGetElements(List<Element<V>> elements,
this.trimmedKeys = trimmedKeys;
}

public static <T> SMGetElements<T> mergeSMGetElements(List<SMGetElements<T>> smGetElementsList,
boolean ascending,
boolean unique, int count) {
public static <T> BTreeSMGetResult<T> mergeSMGetElements(
List<BTreeSMGetResult<T>> smGetElementsList,
boolean ascending, boolean unique, int count) {
List<Element<T>> elements = new ArrayList<>();
List<MissedKey> missedKeys = new ArrayList<>();
List<TrimmedKey> trimmedKeys = new ArrayList<>();
Expand All @@ -49,11 +49,11 @@ public static <T> SMGetElements<T> mergeSMGetElements(List<SMGetElements<T>> smG
});
}

return new SMGetElements<>(elements, missedKeys, trimmedKeys);
return new BTreeSMGetResult<>(elements, missedKeys, trimmedKeys);
}

private static <T> void mergeSMGetElements(
List<SMGetElements<T>> smGetElementsList,
List<BTreeSMGetResult<T>> smGetElementsList,
List<Element<T>> elements,
List<MissedKey> missedKeys,
List<TrimmedKey> trimmedKeys,
Expand All @@ -67,7 +67,7 @@ private static <T> void mergeSMGetElements(
// 2) Initialize the priority queue with the first element from each list
// and collect missed keys and trimmed keys
for (int i = 0; i < smGetElementsList.size(); i++) {
SMGetElements<T> smGetElements = smGetElementsList.get(i);
BTreeSMGetResult<T> smGetElements = smGetElementsList.get(i);
List<Element<T>> eachElements = smGetElements.getElements();
if (!eachElements.isEmpty()) {
pq.offer(new ElementWithIndex<>(eachElements.get(0), i, 0));
Expand Down
Loading
Loading