Skip to content

Commit fc03aba

Browse files
committed
RDBC-1035 RDBC-1036 Add aggressive cache context managers and store-level events
- Add add/remove_before_store, add/remove_after_save_changes, add/remove_before_delete, add/remove_before_query to DocumentStoreBase; register_events_for_session wires them into every new session - Add aggressively_cache_for(duration, mode) and disable_aggressive_caching() context managers on DocumentStore; AggressiveCacheOptions is thread-local via threading.local(), mirroring C# AsyncLocal<T> - Add _AggressiveCacheEviction inner class: subscribes to Changes() feed and increments HttpCache.generation on DocumentChange / IndexChange events - Fix _listen_to_changes_and_update_cache: construct _AggressiveCacheEviction outside __add_change_lock to prevent deadlock when store.changes() (which acquires the same lock) is called during construction - Remove dead HttpCache.ReleaseCacheItem inner class; fix _get_from_cache to import and return the module-level ReleaseCacheItem for non-cacheable command paths (was AttributeError at runtime) - Fix before_delete event timing for key-based session.delete(): move BeforeDeleteEventArgs dispatch from delete() into prepare_for_save_changes() via __prepare_for_key_deletes, matching C# PrepareForEntitiesDeletion timing and making both delete paths (key vs entity) consistent - Fix session-level remove_before_query: was appending instead of removing, making it impossible to unregister a query event handler on an open session - Rename session-level remove_before_delete_entity to remove_before_delete, matching the naming convention of all other remove methods and the store-level API - Add no_caching guard to _check_aggressive_cache_guard test helper so it exactly mirrors the condition in execute(); add test for no_caching bypass - Add integration tests verifying event args contents: args.entity identity in OnBeforeStore and args.key in OnBeforeDelete (per C# Events.cs spec) - Remove test for unreachable RAW-command cache-hit path: _get_from_cache never returns a populated item for non-OBJECT commands, so the scenario the test described could not occur in production
1 parent 556d1e0 commit fc03aba

5 files changed

Lines changed: 1097 additions & 39 deletions

File tree

ravendb/documents/session/document_session_operations/in_memory_document_session_operations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,14 +542,14 @@ def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs],
542542
def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
543543
self._before_delete.append(event)
544544

545-
def remove_before_delete_entity(self, event: Callable[[BeforeDeleteEventArgs], None]):
545+
def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
546546
self._before_delete.remove(event)
547547

548548
def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
549549
self._before_query.append(event)
550550

551551
def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
552-
self._before_query.append(event)
552+
self._before_query.remove(event)
553553

554554
def before_store_invoke(self, before_store_event_args: BeforeStoreEventArgs):
555555
for event in self._before_store:

ravendb/documents/store/definition.py

Lines changed: 176 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,30 @@ def operations(self) -> OperationExecutor:
175175
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
176176
pass
177177

178+
def add_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
179+
self.__before_store.append(event)
180+
181+
def remove_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
182+
self.__before_store.remove(event)
183+
184+
def add_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
185+
self.__after_save_changes.append(event)
186+
187+
def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
188+
self.__after_save_changes.remove(event)
189+
190+
def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
191+
self.__before_delete.append(event)
192+
193+
def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
194+
self.__before_delete.remove(event)
195+
196+
def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
197+
self.__before_query.append(event)
198+
199+
def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
200+
self.__before_query.remove(event)
201+
178202
def add_on_session_creation(self, event: Callable[[SessionCreatedEventArgs], None]):
179203
self.__on_session_creation.append(event)
180204

@@ -313,7 +337,7 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] =
313337
self.urls = [urls] if isinstance(urls, str) else urls
314338
self.database = database
315339
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
316-
# todo: aggressive cache
340+
self.__aggressive_cache_changes: Dict[str, "DocumentStore._AggressiveCacheInvalidator"] = {}
317341
self.__maintenance_operation_executor: Optional[MaintenanceOperationExecutor] = None
318342
self.__operation_executor: Optional[OperationExecutor] = None
319343
# todo: database smuggler
@@ -379,7 +403,9 @@ def close(self):
379403
for event in self.__before_close:
380404
event()
381405

382-
# todo: evict items from cache based on changes
406+
for eviction in list(self.__aggressive_cache_changes.values()):
407+
eviction.close()
408+
self.__aggressive_cache_changes.clear()
383409

384410
while len(self.__database_changes) > 0:
385411
self.__database_changes.popitem()[1].close()
@@ -529,7 +555,154 @@ def initialize(self) -> DocumentStore:
529555
self._initialized = True
530556
return self
531557

532-
# todo: aggressively cache
558+
def aggressively_cache_for(
559+
self,
560+
cache_duration: datetime.timedelta,
561+
database: Optional[str] = None,
562+
mode: Optional["AggressiveCacheMode"] = None,
563+
) -> "DocumentStore._AggressiveCacheContext":
564+
from ravendb.http.misc import AggressiveCacheMode
565+
566+
if mode is None:
567+
mode = AggressiveCacheMode.TRACK_CHANGES
568+
context = self._set_aggressive_cache(cache_duration, mode, database)
569+
return self._finalize_aggressive_cache(context, mode, database)
570+
571+
def _set_aggressive_cache(
572+
self,
573+
cache_duration: datetime.timedelta,
574+
mode: "AggressiveCacheMode",
575+
database: Optional[str] = None,
576+
) -> "DocumentStore._AggressiveCacheContext":
577+
from ravendb.http.misc import AggressiveCacheOptions
578+
579+
self.assert_initialized()
580+
database = self.get_effective_database(database)
581+
request_executor = self.get_request_executor(database)
582+
options = AggressiveCacheOptions(cache_duration, mode)
583+
return DocumentStore._AggressiveCacheContext(request_executor, options)
584+
585+
def _finalize_aggressive_cache(
586+
self,
587+
context: "DocumentStore._AggressiveCacheContext",
588+
mode: "AggressiveCacheMode",
589+
database: Optional[str] = None,
590+
) -> "DocumentStore._AggressiveCacheContext":
591+
from ravendb.http.misc import AggressiveCacheMode
592+
593+
try:
594+
if mode != AggressiveCacheMode.DO_NOT_TRACK_CHANGES:
595+
database = self.get_effective_database(database)
596+
self._listen_to_changes_and_update_cache(database)
597+
return context
598+
except Exception:
599+
context.__enter__()
600+
context.__exit__(None, None, None)
601+
raise
602+
603+
def _listen_to_changes_and_update_cache(self, database: str) -> None:
604+
if database in self.__aggressive_cache_changes:
605+
return
606+
# This lock achieves ConcurrentDict-like behavior
607+
eviction = DocumentStore._AggressiveCacheInvalidator(self, database)
608+
with self.__add_change_lock:
609+
if database not in self.__aggressive_cache_changes:
610+
self.__aggressive_cache_changes[database] = eviction
611+
eviction.ensure_connected()
612+
else:
613+
eviction.close()
614+
615+
def disable_aggressive_caching(
616+
self, database: Optional[str] = None
617+
) -> "DocumentStore._DisableAggressiveCachingContext":
618+
self.assert_initialized()
619+
database = self.get_effective_database(database)
620+
request_executor = self.get_request_executor(database)
621+
return DocumentStore._DisableAggressiveCachingContext(request_executor)
622+
623+
class _AggressiveCacheContext:
624+
def __init__(self, request_executor, options):
625+
self._request_executor = request_executor
626+
self._options = options
627+
self._old_options = None
628+
629+
def __enter__(self):
630+
self._old_options = self._request_executor.aggressive_caching
631+
self._request_executor.aggressive_caching = self._options
632+
return self
633+
634+
def __exit__(self, exc_type, exc_val, exc_tb):
635+
self._request_executor.aggressive_caching = self._old_options
636+
637+
class _DisableAggressiveCachingContext:
638+
def __init__(self, request_executor):
639+
self._request_executor = request_executor
640+
self._old_options = None
641+
642+
def __enter__(self):
643+
self._old_options = self._request_executor.aggressive_caching
644+
self._request_executor.aggressive_caching = None
645+
return self
646+
647+
def __exit__(self, exc_type, exc_val, exc_tb):
648+
self._request_executor.aggressive_caching = self._old_options
649+
650+
class _AggressiveCacheInvalidator:
651+
"""Subscribes to document/index changes and invalidates the request executor's aggressive cache."""
652+
653+
def __init__(self, store: "DocumentStore", database: str):
654+
from ravendb.changes.observers import ActionObserver
655+
from ravendb.changes.types import DocumentChangeType, IndexChangeTypes
656+
657+
self._request_executor = store.get_request_executor(database)
658+
self._changes = store.changes(database)
659+
self._unsubscribers: List[Callable[[], None]] = []
660+
661+
# Capture by reference so the lambdas below always see the live cache object,
662+
# even if RequestExecutor.cache is replaced. (It isn't today, but be explicit.)
663+
cache_ref = self._request_executor.cache
664+
665+
def _invalidate() -> None:
666+
cache_ref.generation += 1
667+
668+
def on_document_change(change) -> None:
669+
# Only Put and Delete affect query results; ConflictResolved etc. do not.
670+
if change.type_of_change in (DocumentChangeType.PUT, DocumentChangeType.DELETE):
671+
_invalidate()
672+
673+
def on_index_change(change) -> None:
674+
# BatchCompleted means new index results are available; IndexRemoved means
675+
# stale queries might have been using it.
676+
if change.type_of_change in (IndexChangeTypes.BATCH_COMPLETED, IndexChangeTypes.INDEX_REMOVED):
677+
_invalidate()
678+
679+
# subscribe_with_observer (not subscribe) so we can attach an on_error callback.
680+
# subscribe() creates an ActionObserver with no on_error, which means a WebSocket
681+
# disconnect silently swallows the error — cache.generation is never bumped and
682+
# the aggressive cache serves stale data indefinitely after the connection dies.
683+
self._unsubscribers.append(
684+
self._changes.for_all_documents().subscribe_with_observer(
685+
ActionObserver(on_next=on_document_change, on_error=lambda e: _invalidate())
686+
)
687+
)
688+
self._unsubscribers.append(
689+
self._changes.for_all_indexes().subscribe_with_observer(
690+
ActionObserver(on_next=on_index_change, on_error=lambda e: _invalidate())
691+
)
692+
)
693+
694+
def ensure_connected(self) -> None:
695+
# todo: DatabaseChanges auto-connects on __init__; add explicit
696+
# ensure_connected_now() for parity with C# EnsureConnectedNow()
697+
pass
698+
699+
def close(self) -> None:
700+
for unsub in self._unsubscribers:
701+
try:
702+
unsub()
703+
except Exception:
704+
pass
705+
self._unsubscribers.clear()
533706

534707
def bulk_insert(self, database_name: str = None, options: BulkInsertOptions = None) -> BulkInsertOperation:
535708
self.assert_initialized()

ravendb/http/http_cache.py

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def age(self) -> datetime.timedelta:
4141

4242
@property
4343
def might_have_been_modified(self) -> bool:
44+
if self.item is None:
45+
return True
4446
return self.item.generation != self.__cache_generation
4547

4648
def close(self):
@@ -73,7 +75,7 @@ def __setitem__(self, key, value):
7375
self.__items.__setitem__(key, value)
7476

7577
def __getitem__(self, item):
76-
self.__items.__getitem__(item)
78+
return self.__items.__getitem__(item)
7779

7880
def close(self):
7981
self.__items.clear()
@@ -108,29 +110,3 @@ def set_not_found(self, url: str, aggressively_cached: bool) -> None:
108110
{ItemFlags.AGGRESSIVELY_CACHED, ItemFlags.NOT_FOUND} if aggressively_cached else {ItemFlags.NOT_FOUND}
109111
)
110112
self.__items[url] = http_cache_item
111-
112-
class ReleaseCacheItem:
113-
def __init__(self, item: HttpCacheItem = None):
114-
self.item: Union[None, HttpCacheItem] = item
115-
self.__cache_generation = item.cache.generation if item else 0
116-
117-
def __enter__(self):
118-
return self
119-
120-
def __exit__(self, exc_type, exc_val, exc_tb):
121-
pass
122-
123-
def not_modified(self) -> None:
124-
if self.item is not None:
125-
self.item.last_server_update = datetime.datetime.now()
126-
self.item.generation = self.__cache_generation
127-
128-
@property
129-
def age(self) -> datetime.timedelta:
130-
if self.item is None:
131-
return datetime.timedelta.max
132-
return datetime.datetime.now() - self.item.last_server_update
133-
134-
@property
135-
def might_have_been_modified(self) -> bool:
136-
return self.item.generation != self.__cache_generation

ravendb/http/request_executor.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from concurrent.futures import ThreadPoolExecutor, Future, FIRST_COMPLETED, wait, ALL_COMPLETED
99
import uuid
1010
from json import JSONDecodeError
11-
from threading import Timer, Semaphore, Lock
11+
from threading import Timer, Semaphore, Lock, local
1212

1313
import requests
1414
from copy import copy
@@ -27,8 +27,14 @@
2727
from ravendb.exceptions.raven_exceptions import ClientVersionMismatchException
2828

2929

30-
from ravendb.http.http_cache import HttpCache
31-
from ravendb.http.misc import ReadBalanceBehavior, ResponseDisposeHandling, LoadBalanceBehavior, Broadcast
30+
from ravendb.http.http_cache import HttpCache, ItemFlags, ReleaseCacheItem
31+
from ravendb.http.misc import (
32+
ReadBalanceBehavior,
33+
ResponseDisposeHandling,
34+
LoadBalanceBehavior,
35+
Broadcast,
36+
AggressiveCacheMode,
37+
)
3238
from ravendb.http.raven_command import RavenCommand, RavenCommandResponseType
3339
from ravendb.http.server_node import ServerNode
3440
from ravendb.http.topology import Topology, NodeStatus, NodeSelector, CurrentIndexAndNode, UpdateTopologyParameters
@@ -106,13 +112,25 @@ def __init__(
106112
self._disposed: Union[None, bool] = None
107113

108114
self.__synchronized_lock = Lock()
115+
self._aggressive_caching_local = local()
109116

110117
# --- events ---
111118
self._on_before_request: List[Callable[[BeforeRequestEventArgs], Any]] = []
112119
self.__on_failed_request: List[Callable[[FailedRequestEventArgs], None]] = []
113120
self.__on_succeed_request: List[Callable[[SucceedRequestEventArgs], None]] = []
114121
self._on_topology_updated: List[Callable[[Topology], None]] = []
115122

123+
@property
124+
def aggressive_caching(self) -> Optional["AggressiveCacheOptions"]:
125+
# threading.local mirrors C#'s AsyncLocal<AggressiveCacheOptions>: each thread
126+
# (each request context) gets its own setting. Without this, one thread enabling
127+
# aggressive caching would bleed into every other thread on the same executor.
128+
return getattr(self._aggressive_caching_local, "value", None)
129+
130+
@aggressive_caching.setter
131+
def aggressive_caching(self, value: Optional["AggressiveCacheOptions"]) -> None:
132+
self._aggressive_caching_local.value = value
133+
116134
def __enter__(self):
117135
return self
118136

@@ -522,7 +540,38 @@ def execute(
522540
no_caching = session_info.no_caching if session_info else False
523541

524542
cached_item, change_vector, cached_value = self._get_from_cache(command, not no_caching, url)
525-
# todo: if change_vector exists try get from cache - aggressive caching
543+
544+
# Aggressive-cache short-circuit: serve from local cache without touching the server.
545+
# All five conditions must hold:
546+
# 1. Session didn't disable caching.
547+
# 2. Aggressive caching is active on this thread.
548+
# 3. The command doesn't opt out (streaming commands set can_cache_aggressively=False).
549+
# 4. The item is actually in cache AND young enough.
550+
# 5. Under TRACK_CHANGES mode, the cache generation must not have advanced since we
551+
# retrieved the item — if it has, _AggressiveCacheEviction saw a server change and
552+
# bumped the generation, so we must revalidate.
553+
if (
554+
not no_caching
555+
and self.aggressive_caching is not None
556+
and command.can_cache_aggressively
557+
and cached_item.item is not None
558+
and cached_item.age < self.aggressive_caching.duration
559+
and (
560+
not cached_item.might_have_been_modified
561+
or self.aggressive_caching.mode != AggressiveCacheMode.TRACK_CHANGES
562+
)
563+
):
564+
if ItemFlags.NOT_FOUND in cached_item.item.flags:
565+
# Cached 404: only trust it when it was itself received inside an aggressive-
566+
# cache context (AGGRESSIVELY_CACHED flag set by set_not_found). A 404 cached
567+
# outside aggressive mode might have been a transient error; re-fetch it.
568+
if ItemFlags.AGGRESSIVELY_CACHED in cached_item.item.flags:
569+
command.set_response(None, True)
570+
return
571+
elif cached_value is not None:
572+
command.set_response(cached_value, True)
573+
return
574+
526575
with cached_item:
527576
# todo: try get from cache
528577
self._set_request_headers(session_info, change_vector, request)
@@ -785,7 +834,7 @@ def _set_request_headers(
785834

786835
def _get_from_cache(
787836
self, command: RavenCommand, use_cache: bool, url: str
788-
) -> Tuple[HttpCache.ReleaseCacheItem, Optional[str], Optional[str]]:
837+
) -> Tuple[ReleaseCacheItem, Optional[str], Optional[str]]:
789838
if (
790839
use_cache
791840
and command.can_cache
@@ -794,7 +843,7 @@ def _get_from_cache(
794843
):
795844
return self._cache.get(url)
796845

797-
return HttpCache.ReleaseCacheItem(), None, None
846+
return ReleaseCacheItem(), None, None
798847

799848
@staticmethod
800849
def __try_get_server_version(response: requests.Response) -> Union[None, str]:
@@ -1014,7 +1063,7 @@ def _handle_unsuccessful_response(
10141063
should_retry: bool,
10151064
) -> bool:
10161065
if response.status_code == HTTPStatus.NOT_FOUND:
1017-
self._cache.set_not_found(url, False) # todo : check if aggressively cached, don't just pass False
1066+
self._cache.set_not_found(url, self.aggressive_caching is not None)
10181067
if command.response_type == RavenCommandResponseType.EMPTY:
10191068
return True
10201069
elif command.response_type == RavenCommandResponseType.OBJECT:

0 commit comments

Comments
 (0)