Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ravendb/changes/database_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
import ssl
from threading import Lock
from threading import Event, Lock
from typing import TYPE_CHECKING, Dict, Optional, Callable, Any, List

from websocket import WebSocket
Expand Down Expand Up @@ -59,6 +59,7 @@ def __init__(

self._command_id = 0
self._immediate_connection = 0
self._connected_event = Event()

self._logger = logging.getLogger("database_changes")
handler = logging.FileHandler("changes.log")
Expand Down Expand Up @@ -114,6 +115,7 @@ def do_work(self):
try:
if not self.client_websocket.connected:
self._ensure_websocket_connected(url)
self._connected_event.set()
self.process_changes()
except ChangeProcessingException as e:
self.notify_about_error(e)
Expand Down Expand Up @@ -180,6 +182,11 @@ def _notify_subscribers(type_of_change: str, change_json_dict: Dict[str, Any], o
for observable in observables.values():
observable.send(result)

def ensure_connected_now(self, timeout: float = 15.0) -> None:
"""Block until the websocket connection is established or timeout expires."""
if not self._connected_event.wait(timeout):
raise TimeoutError(f"DatabaseChanges failed to connect within {timeout}s")

def close(self):
self._closed = True
self.client_websocket.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,14 +542,14 @@ def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs],
def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self._before_delete.append(event)

def remove_before_delete_entity(self, event: Callable[[BeforeDeleteEventArgs], None]):
def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self._before_delete.remove(event)

def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self._before_query.append(event)

def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self._before_query.append(event)
self._before_query.remove(event)

def before_store_invoke(self, before_store_event_args: BeforeStoreEventArgs):
for event in self._before_store:
Expand Down
177 changes: 174 additions & 3 deletions ravendb/documents/store/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ def operations(self) -> OperationExecutor:
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
pass

def add_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
self.__before_store.append(event)

def remove_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
self.__before_store.remove(event)

def add_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
self.__after_save_changes.append(event)

def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
self.__after_save_changes.remove(event)

def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self.__before_delete.append(event)

def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self.__before_delete.remove(event)

def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self.__before_query.append(event)

def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self.__before_query.remove(event)

def add_on_session_creation(self, event: Callable[[SessionCreatedEventArgs], None]):
self.__on_session_creation.append(event)

Expand Down Expand Up @@ -313,7 +337,7 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] =
self.urls = [urls] if isinstance(urls, str) else urls
self.database = database
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
# todo: aggressive cache
self.__aggressive_cache_changes: Dict[str, "DocumentStore._AggressiveCacheInvalidator"] = {}
self.__maintenance_operation_executor: Optional[MaintenanceOperationExecutor] = None
self.__operation_executor: Optional[OperationExecutor] = None
# todo: database smuggler
Expand Down Expand Up @@ -379,7 +403,9 @@ def close(self):
for event in self.__before_close:
event()

# todo: evict items from cache based on changes
for cache_invalidator in list(self.__aggressive_cache_changes.values()):
cache_invalidator.close()
self.__aggressive_cache_changes.clear()

while len(self.__database_changes) > 0:
self.__database_changes.popitem()[1].close()
Expand Down Expand Up @@ -529,7 +555,152 @@ def initialize(self) -> DocumentStore:
self._initialized = True
return self

# todo: aggressively cache
def aggressively_cache_for(
self,
cache_duration: datetime.timedelta,
database: Optional[str] = None,
mode: Optional["AggressiveCacheMode"] = None,
) -> "DocumentStore._AggressiveCacheContext":
from ravendb.http.misc import AggressiveCacheMode

if mode is None:
mode = AggressiveCacheMode.TRACK_CHANGES
context = self._set_aggressive_cache(cache_duration, mode, database)
return self._finalize_aggressive_cache(context, mode, database)

def _set_aggressive_cache(
self,
cache_duration: datetime.timedelta,
mode: "AggressiveCacheMode",
database: Optional[str] = None,
) -> "DocumentStore._AggressiveCacheContext":
from ravendb.http.misc import AggressiveCacheOptions

self.assert_initialized()
database = self.get_effective_database(database)
request_executor = self.get_request_executor(database)
options = AggressiveCacheOptions(cache_duration, mode)
return DocumentStore._AggressiveCacheContext(request_executor, options)

def _finalize_aggressive_cache(
self,
context: "DocumentStore._AggressiveCacheContext",
mode: "AggressiveCacheMode",
database: Optional[str] = None,
) -> "DocumentStore._AggressiveCacheContext":
from ravendb.http.misc import AggressiveCacheMode

try:
if mode != AggressiveCacheMode.DO_NOT_TRACK_CHANGES:
database = self.get_effective_database(database)
self._listen_to_changes_and_update_cache(database)
return context
except Exception:
context.__enter__()
context.__exit__(None, None, None)
raise

def _listen_to_changes_and_update_cache(self, database: str) -> None:
if database in self.__aggressive_cache_changes:
return
# This lock achieves ConcurrentDict-like behavior
cache_invalidator = DocumentStore._AggressiveCacheInvalidator(self, database)
with self.__add_change_lock:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's replace the giant comment above with "this lock is here to achieve ConcurrentDict-like behavior" - will increase readability by far

if database not in self.__aggressive_cache_changes:
self.__aggressive_cache_changes[database] = cache_invalidator
cache_invalidator.ensure_connected()
else:
cache_invalidator.close()

def disable_aggressive_caching(
self, database: Optional[str] = None
) -> "DocumentStore._DisableAggressiveCachingContext":
self.assert_initialized()
database = self.get_effective_database(database)
request_executor = self.get_request_executor(database)
return DocumentStore._DisableAggressiveCachingContext(request_executor)

class _AggressiveCacheContext:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that duplicated? _DisableAggressiveCachingContext should be used and this class should be removed

def __init__(self, request_executor, options):
self._request_executor = request_executor
self._options = options
self._old_options = None

def __enter__(self):
self._old_options = self._request_executor.aggressive_caching
self._request_executor.aggressive_caching = self._options
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._request_executor.aggressive_caching = self._old_options

class _DisableAggressiveCachingContext:
def __init__(self, request_executor):
self._request_executor = request_executor
self._old_options = None

def __enter__(self):
self._old_options = self._request_executor.aggressive_caching
self._request_executor.aggressive_caching = None
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._request_executor.aggressive_caching = self._old_options

class _AggressiveCacheInvalidator:
"""Subscribes to document/index changes and invalidates the request executor's aggressive cache."""

def __init__(self, store: "DocumentStore", database: str):
from ravendb.changes.observers import ActionObserver
from ravendb.changes.types import DocumentChangeType, IndexChangeTypes

self._request_executor = store.get_request_executor(database)
self._changes = store.changes(database)
self._unsubscribers: List[Callable[[], None]] = []

# Capture by reference so the lambdas below always see the live cache object,
# even if RequestExecutor.cache is replaced. (It isn't today, but be explicit.)
cache_ref = self._request_executor.cache

def _invalidate() -> None:
cache_ref.generation += 1

def on_document_change(change) -> None:
# Only Put and Delete affect query results; ConflictResolved etc. do not.
if change.type_of_change in (DocumentChangeType.PUT, DocumentChangeType.DELETE):
_invalidate()

def on_index_change(change) -> None:
# BatchCompleted means new index results are available; IndexRemoved means
# stale queries might have been using it.
if change.type_of_change in (IndexChangeTypes.BATCH_COMPLETED, IndexChangeTypes.INDEX_REMOVED):
_invalidate()

# subscribe_with_observer (not subscribe) so we can attach an on_error callback.
# subscribe() creates an ActionObserver with no on_error, which means a WebSocket
# disconnect silently swallows the error — cache.generation is never bumped and
# the aggressive cache serves stale data indefinitely after the connection dies.
self._unsubscribers.append(
self._changes.for_all_documents().subscribe_with_observer(
ActionObserver(on_next=on_document_change, on_error=lambda e: _invalidate())
)
)
self._unsubscribers.append(
self._changes.for_all_indexes().subscribe_with_observer(
ActionObserver(on_next=on_index_change, on_error=lambda e: _invalidate())
)
)

def ensure_connected(self) -> None:
self._changes.ensure_connected_now()

def close(self) -> None:
for unsub in self._unsubscribers:
try:
unsub()
except Exception:
pass
self._unsubscribers.clear()

def bulk_insert(self, database_name: str = None, options: BulkInsertOptions = None) -> BulkInsertOperation:
self.assert_initialized()
Expand Down
30 changes: 3 additions & 27 deletions ravendb/http/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def age(self) -> datetime.timedelta:

@property
def might_have_been_modified(self) -> bool:
if self.item is None:
return True
return self.item.generation != self.__cache_generation

def close(self):
Expand Down Expand Up @@ -73,7 +75,7 @@ def __setitem__(self, key, value):
self.__items.__setitem__(key, value)

def __getitem__(self, item):
self.__items.__getitem__(item)
return self.__items.__getitem__(item)

def close(self):
self.__items.clear()
Expand Down Expand Up @@ -108,29 +110,3 @@ def set_not_found(self, url: str, aggressively_cached: bool) -> None:
{ItemFlags.AGGRESSIVELY_CACHED, ItemFlags.NOT_FOUND} if aggressively_cached else {ItemFlags.NOT_FOUND}
)
self.__items[url] = http_cache_item

class ReleaseCacheItem:
def __init__(self, item: HttpCacheItem = None):
self.item: Union[None, HttpCacheItem] = item
self.__cache_generation = item.cache.generation if item else 0

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def not_modified(self) -> None:
if self.item is not None:
self.item.last_server_update = datetime.datetime.now()
self.item.generation = self.__cache_generation

@property
def age(self) -> datetime.timedelta:
if self.item is None:
return datetime.timedelta.max
return datetime.datetime.now() - self.item.last_server_update

@property
def might_have_been_modified(self) -> bool:
return self.item.generation != self.__cache_generation
Loading
Loading