Conversation
oshai
left a comment
There was a problem hiding this comment.
Please talk with me before doing any changes, to decide how to proceed with this pull request.
| } | ||
| } | ||
|
|
||
| private class InternalCacheLoader extends com.google.common.cache.CacheLoader<K, ComposableFuture<V>> { |
There was a problem hiding this comment.
check if this can be a static class
| public ListenableFuture<ComposableFuture<V>> reload(final K key, final ComposableFuture<V> oldValue) { | ||
| ListenableFutureTask<ComposableFuture<V>> task = ListenableFutureTask.create(() -> { | ||
| ComposableFuture<V> loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); | ||
| V value = loadFuture.get(loadTimeout, loadTimeoutUnit); |
There was a problem hiding this comment.
Blocking contradicts the idea of that class, let's discuss how we can avoid that.
| /** | ||
| * A wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in the configured interval on access | ||
| */ | ||
| public class RefreshLoadingCacheDelegate<K, V> implements TypedCache<K, V> { |
There was a problem hiding this comment.
We should think about the design here, because it feels like mostly a copy of LoadingCacheDelegate
| public ListenableFuture<ComposableFuture<V>> reload(final K key, final ComposableFuture<V> oldValue) { | ||
| ListenableFutureTask<ComposableFuture<V>> task = ListenableFutureTask.create(() -> { | ||
| ComposableFuture<V> loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); | ||
| V value = loadFuture.get(loadTimeout, loadTimeoutUnit); |
There was a problem hiding this comment.
You can implement reload this way:
final SettableFuture<ComposableFuture<V>> future = SettableFuture.create();
loader.load(cacheName, key).materialize().andThen(t -> {
if (t.isSuccess()) {
future.set(ComposableFutures.fromValue(t.getValue()));
} else {
future.setException(t.getError());
}
});
return future;
|
|
||
| @Override | ||
| public ComposableFuture<V> load(@Nonnull final K key) { | ||
| return loader.load(cacheName, key).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); |
There was a problem hiding this comment.
Why did you add withTimeout?
There was a problem hiding this comment.
I added timeout because LoadingCacheDelegate has the option to set timeout to the load and i felt that it was missing here.
Don't you agree?
| return loadElements(Lists.newArrayList(keys)); | ||
| } | ||
| }); | ||
| this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries)); |
There was a problem hiding this comment.
Add diamond: new InternalCacheLoader<>
| private final Counter refreshErrors; | ||
| private final Counter refreshTimeouts; | ||
|
|
||
| public RefreshLoadingCacheDelegate(final TypedCache<K, ValueWithWriteTime<V>> cache, final CacheLoader<K, V> loader, final String cacheName, final MetricFactory metricFactory, |
There was a problem hiding this comment.
consider making private / package private
| public ComposableFuture<V> getAsync(final K key) { | ||
| ComposableFuture<ValueWithWriteTime<V>> futureValue = cache.getAsync(key); | ||
| futureValue.consume(value -> { | ||
| if (value.isSuccess() && shouldRefresh(value.getValue(), System.currentTimeMillis())) { |
There was a problem hiding this comment.
use a time supplier instead of System.currentTimeMillis() and avoid Thread.sleep in tests.
| } | ||
|
|
||
| private boolean shouldRefresh(final ValueWithWriteTime<V> value, long currentTimeMillis) { | ||
| return value != null && currentTimeMillis - value.getWriteTime() >= refreshAfterWriteDuration; |
There was a problem hiding this comment.
consider randomizing the TTL
| return extractValues(resultMap); | ||
| } | ||
|
|
||
| private List<K> collectKeysToRefresh(final Map<K, ValueWithWriteTime<V>> result, final long currentTimeMillis) { |
There was a problem hiding this comment.
You can return a Stream here
| return cache.deleteAsync(key); | ||
| } | ||
|
|
||
| private class InternalCacheLoader implements CacheLoader<K, ValueWithWriteTime<V>> { |
| } | ||
| } | ||
|
|
||
| private class InternalEntityMapper implements EntryMapper<K, ValueWithWriteTime<V>> { |
| if (alreadyRefreshing == null) { | ||
| incRefreshCount(); | ||
| internalCacheLoader.load(cacheName, key) | ||
| .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch from loader; cache name: " + cacheName) |
There was a problem hiding this comment.
why use withTimeout? it's in LoadingCacheDelegate only for historical reasons.
| .collect(Collectors.toList()); | ||
|
|
||
| if (!keysToRefresh.isEmpty()) { | ||
| incRefreshCount(); |
There was a problem hiding this comment.
inc by the number of keys?
| internalCacheLoader.load(cacheName, keysToRefresh) | ||
| .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch bulk from loader; cache name: " + cacheName) | ||
| .consume(res -> { | ||
| keysToRefresh.forEach(refreshingKeys::remove); |
There was a problem hiding this comment.
Alternatively: refreshingKeys.keySet().removeAll(keysToRefresh);
a wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in a configured time
LocalAsyncCache - Support refresh of a cache value
Introduce new RefreshLoadingCacheDelegate - a wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in a configured time