Skip to content

Commit 2f69e4b

Browse files
authored
Merge pull request #285 from redknightlois/RDBC-1048
RDBC-1048 Fix silent data corruption bugs in Document Session
2 parents f0b2928 + 398f400 commit 2f69e4b

8 files changed

Lines changed: 241 additions & 14 deletions

File tree

.github/pull_request_template.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
### Issue link
2+
3+
https://issues.hibernatingrhinos.com/issue/RDBC-...
4+
5+
### Additional description
6+
7+
...Include details of the change made in this Pull Request or additional notes for the solution. Anything that can be useful for reviewers of this PR...
8+
9+
### Type of change
10+
11+
- [ ] Bug fix
12+
- [ ] Regression bug fix
13+
- [ ] Optimization
14+
- [ ] New feature
15+
16+
### How risky is the change?
17+
18+
- [ ] Low
19+
- [ ] Moderate
20+
- [ ] High
21+
- [ ] Not relevant
22+
23+
### Backward compatibility
24+
25+
- [ ] Non breaking change
26+
- [ ] Ensured. Please explain how has it been implemented?
27+
- [ ] Breaking change
28+
- [ ] Not relevant
29+
30+
### Is it platform specific issue?
31+
32+
- [ ] Yes. Please list the affected platforms.
33+
- [ ] No
34+
35+
### Documentation update
36+
37+
- [ ] This change requires a documentation update. Please mark the issue on YouTrack using `Python Documentation Required` tag.
38+
- [ ] No documentation update is needed
39+
40+
### Testing by Contributor
41+
42+
- [ ] Tests have been added that prove the fix is effective or that the feature works
43+
- [ ] It has been verified by manual testing
44+
- [ ] Existing tests verify the correct behavior
45+
46+
### Is there any existing behavior change of other features due to this change?
47+
48+
- [ ] Yes. Please list the affected features/subsystems and provide appropriate explanation
49+
- [ ] No

ravendb/documents/operations/batch.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ def get_command_type(obj_node: dict) -> CommandType:
127127
elif command_type == CommandType.COUNTERS:
128128
self._handle_counters(batch_result)
129129
elif command_type == CommandType.TIME_SERIES:
130-
break # todo: RavenDB-13474 add to time series cache
130+
continue # todo: RavenDB-13474 add to time series cache
131131
elif command_type == CommandType.TIME_SERIES_COPY or command_type == CommandType.BATCH_PATCH:
132-
break
132+
continue
133133
else:
134134
raise ValueError(f"Command {command_type} is not supported")
135135

@@ -190,7 +190,7 @@ def _handle_patch(self, batch_result: dict) -> None:
190190
self._throw_missing_field(CommandType.PATCH, "PatchStatus")
191191

192192
status = PatchStatus(patch_status)
193-
if status == PatchStatus.CREATED or PatchStatus.PATCHED:
193+
if status in (PatchStatus.CREATED, PatchStatus.PATCHED):
194194
document = batch_result.get("ModifiedDocument")
195195
if not document:
196196
return

ravendb/documents/operations/compare_exchange/compare_exchange.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def delete(self, index: int) -> None:
133133
self._state = CompareExchangeValueState.DELETED
134134

135135
def __assert_state(self) -> None:
136-
if self._state == CompareExchangeValueState.NONE or CompareExchangeValueState.MISSING:
136+
if self._state in (CompareExchangeValueState.NONE, CompareExchangeValueState.MISSING):
137137
return
138138
elif self._state == CompareExchangeValueState.CREATED:
139139
raise RuntimeError(f"The compare exchange value with key {self._key} was already stored.")
@@ -144,7 +144,7 @@ def get_command(
144144
self, conventions: DocumentConventions
145145
) -> Optional[Union[DeleteCompareExchangeCommandData, PutCompareExchangeCommandData]]:
146146
s = self._state
147-
if s == CompareExchangeValueState.NONE or CompareExchangeValueState.CREATED:
147+
if s in (CompareExchangeValueState.NONE, CompareExchangeValueState.CREATED):
148148
if not self.__value:
149149
return None
150150

ravendb/documents/session/document_session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,8 @@ def wait_for_replication_after_save_changes(
841841
builder_options = builder.get_options()
842842
replication_options = builder_options.replication_options
843843
if replication_options is None:
844-
builder_options.replication_options = ReplicationBatchOptions()
844+
replication_options = ReplicationBatchOptions()
845+
builder_options.replication_options = replication_options
845846

846847
if replication_options.wait_for_replicas_timeout is None:
847848
replication_options.wait_for_replicas_timeout = (
@@ -860,7 +861,8 @@ def wait_for_indexes_after_save_changes(
860861
index_options = builder_options.index_options
861862

862863
if index_options is None:
863-
builder_options.index_options = IndexBatchOptions()
864+
index_options = IndexBatchOptions()
865+
builder_options.index_options = index_options
864866

865867
if index_options.wait_for_indexes_timeout is None:
866868
index_options.wait_for_indexes_timeout = (

ravendb/documents/session/document_session_operations/in_memory_document_session_operations.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -983,13 +983,13 @@ def validate_cluster_transaction(self, result: SaveChangesData) -> None:
983983
)
984984

985985
for command_data in result.session_commands:
986-
if command_data.command_type == CommandType.PUT or CommandType.DELETE:
986+
if command_data.command_type in (CommandType.PUT, CommandType.DELETE):
987987
if command_data.change_vector is not None:
988988
raise ValueError(
989989
f"Optimistic concurrency for {command_data.key} "
990990
f"is not supported when using a cluster transaction"
991991
)
992-
elif command_data.command_type == CommandType.COMPARE_EXCHANGE_DELETE or CommandType.COMPARE_EXCHANGE_PUT:
992+
elif command_data.command_type in (CommandType.COMPARE_EXCHANGE_DELETE, CommandType.COMPARE_EXCHANGE_PUT):
993993
pass
994994
else:
995995
raise ValueError(f"The command '{command_data.command_type}' is not supported in a cluster session.")
@@ -1874,9 +1874,10 @@ def _get_operation_result(self, object_type: Type[_T], result: _T) -> _T:
18741874
# todo: cast result on object_type
18751875
raise TypeError(f"Unable to cast {result.__class__.__name__} to {object_type.__name__}")
18761876

1877-
# todo: implement method below
18781877
def update_session_after_save_changes(self, result: BatchCommandResult):
18791878
returned_transaction_index = result.transaction_index
1879+
if returned_transaction_index is not None:
1880+
self.session_info.last_cluster_transaction_index = returned_transaction_index
18801881

18811882
def _process_query_parameters(
18821883
self, object_type: type, index_name: str, collection_name: str, conventions: DocumentConventions
@@ -1913,7 +1914,7 @@ def get_options(self) -> BatchOptions:
19131914
def with_timeout(
19141915
self, timeout: datetime.timedelta
19151916
) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:
1916-
self.get_options().replication_options.wait_for_indexes_timeout = timeout
1917+
self.get_options().replication_options.wait_for_replicas_timeout = timeout
19171918
return self
19181919

19191920
def throw_on_timeout(self, should_throw: bool) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:
@@ -1946,11 +1947,13 @@ def with_timeout(self, timeout: datetime.timedelta) -> InMemoryDocumentSessionOp
19461947
return self
19471948

19481949
def throw_on_timeout(self, should_throw: bool) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:
1949-
self.get_options().index_options.throw_on_timeout_in_wait_for_replicas = should_throw
1950+
self.get_options().index_options.throw_on_timeout_in_wait_for_indexes = should_throw
19501951
return self
19511952

1952-
def wait_for_indexes(self, *indexes: str) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:
1953-
self.get_options().index_options.wait_for_indexes = indexes
1953+
def wait_for_indexes(
1954+
self, indexes: Optional[List[str]] = None
1955+
) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:
1956+
self.get_options().index_options.wait_for_specific_indexes = indexes
19541957
return self
19551958

19561959
class SaveChangesData:

ravendb/tests/jvm_migrated_tests/cluster_tests/test_cluster_transaction.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import unittest
2+
3+
from ravendb.documents.commands.batches import CommandType, CountersBatchCommandData
4+
from ravendb.documents.operations.counters import CounterOperation, CounterOperationType
15
from ravendb.documents.session.misc import SessionOptions, TransactionMode
26
from ravendb.infrastructure.entities import User
37
from ravendb.tests.test_base import TestBase
@@ -74,3 +78,64 @@ def test_session_sequence(self):
7478
user1.age = 10
7579
session.store(user1, "users/1")
7680
session.save_changes()
81+
82+
def test_throw_on_unsupported_operations(self):
83+
session_options = SessionOptions(
84+
transaction_mode=TransactionMode.CLUSTER_WIDE,
85+
disable_atomic_document_writes_in_cluster_wide_transaction=True,
86+
)
87+
88+
with self.store.open_session(session_options=session_options) as session:
89+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
90+
InMemoryDocumentSessionOperations,
91+
)
92+
93+
counter_op = CounterOperation("likes", CounterOperationType.INCREMENT, 1)
94+
counter_cmd = CountersBatchCommandData("docs/1", counter_op)
95+
96+
save_changes_data = InMemoryDocumentSessionOperations.SaveChangesData(session)
97+
save_changes_data.session_commands.append(counter_cmd)
98+
99+
with self.assertRaises(ValueError) as ctx:
100+
session.validate_cluster_transaction(save_changes_data)
101+
102+
self.assertIn("not supported", str(ctx.exception))
103+
104+
def test_compare_exchange_double_create_raises(self):
105+
session_options = SessionOptions(
106+
transaction_mode=TransactionMode.CLUSTER_WIDE,
107+
disable_atomic_document_writes_in_cluster_wide_transaction=True,
108+
)
109+
110+
with self.store.open_session(session_options=session_options) as session:
111+
session.advanced.cluster_transaction.create_compare_exchange_value("users/emails/john", "john@doe.com")
112+
113+
with self.assertRaises(RuntimeError):
114+
session.advanced.cluster_transaction.create_compare_exchange_value("users/emails/john", "other@doe.com")
115+
116+
117+
class TestClusterTransactionValidation(unittest.TestCase):
118+
def test_cluster_tx_rejects_unsupported_command_types(self):
119+
import inspect
120+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
121+
InMemoryDocumentSessionOperations,
122+
)
123+
124+
src = inspect.getsource(InMemoryDocumentSessionOperations.validate_cluster_transaction)
125+
self.assertNotIn(
126+
"== CommandType.PUT or CommandType.DELETE",
127+
src,
128+
"Cluster TX validation uses 'x == A or B' (always True). Must use 'x in (A, B)'.",
129+
)
130+
131+
def test_compare_exchange_rejects_double_create(self):
132+
from ravendb.documents.operations.compare_exchange.compare_exchange import (
133+
CompareExchangeSessionValue,
134+
CompareExchangeValueState,
135+
)
136+
137+
value = CompareExchangeSessionValue.__new__(CompareExchangeSessionValue)
138+
value._key = "test"
139+
value._state = CompareExchangeValueState.CREATED
140+
with self.assertRaises(RuntimeError):
141+
value._CompareExchangeSessionValue__assert_state()

ravendb/tests/session_tests/test_advanced.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ravendb.documents.operations.indexes import PutIndexesOperation
44
from ravendb.tests.test_base import TestBase
55
from ravendb.exceptions.exceptions import InvalidOperationException
6+
import datetime
67
import unittest
78
import pathlib
89
import os
@@ -126,6 +127,74 @@ def test_try_delete_attachment_putted_in_the_same_session(self):
126127
with self.assertRaises(InvalidOperationException):
127128
session.advanced.attachments.delete("users/1-A", "my_text_file")
128129

130+
def test_wait_for_replication_timeout_propagates(self):
131+
with self.store.open_session() as session:
132+
session.store(User("Idan", 30), "users/1-A")
133+
session.advanced.wait_for_replication_after_save_changes(
134+
lambda opts: opts.with_timeout(datetime.timedelta(seconds=5))
135+
)
136+
batch_options = session._save_changes_options
137+
self.assertIsNotNone(batch_options)
138+
self.assertIsNotNone(batch_options.replication_options)
139+
self.assertEqual(batch_options.replication_options.wait_for_replicas_timeout, datetime.timedelta(seconds=5))
140+
141+
def test_wait_for_indexes_throw_on_timeout_propagates(self):
142+
with self.store.open_session() as session:
143+
session.store(User("Idan", 30), "users/1-A")
144+
session.advanced.wait_for_indexes_after_save_changes(lambda opts: opts.throw_on_timeout(False))
145+
batch_options = session._save_changes_options
146+
self.assertIsNotNone(batch_options)
147+
self.assertIsNotNone(batch_options.index_options)
148+
self.assertIs(batch_options.index_options.throw_on_timeout_in_wait_for_indexes, False)
149+
150+
def test_wait_for_indexes_specific_indexes_propagates(self):
151+
with self.store.open_session() as session:
152+
session.store(User("Idan", 30), "users/1-A")
153+
session.advanced.wait_for_indexes_after_save_changes(lambda opts: opts.wait_for_indexes(["MyIndex"]))
154+
batch_options = session._save_changes_options
155+
self.assertIsNotNone(batch_options)
156+
self.assertIsNotNone(batch_options.index_options)
157+
self.assertIn("MyIndex", batch_options.index_options.wait_for_specific_indexes)
158+
159+
160+
class _FakeSession:
161+
def __init__(self):
162+
self._save_changes_options = None
163+
164+
165+
class TestWaitForOptions(unittest.TestCase):
166+
def test_replication_timeout_propagates(self):
167+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
168+
InMemoryDocumentSessionOperations,
169+
)
170+
171+
session = _FakeSession()
172+
builder = InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder(session)
173+
builder.with_timeout(datetime.timedelta(seconds=5))
174+
self.assertEqual(
175+
session._save_changes_options.replication_options.wait_for_replicas_timeout, datetime.timedelta(seconds=5)
176+
)
177+
178+
def test_indexes_throw_on_timeout_propagates(self):
179+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
180+
InMemoryDocumentSessionOperations,
181+
)
182+
183+
session = _FakeSession()
184+
builder = InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder(session)
185+
builder.throw_on_timeout(False)
186+
self.assertIs(session._save_changes_options.index_options.throw_on_timeout_in_wait_for_indexes, False)
187+
188+
def test_specific_indexes_propagates(self):
189+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
190+
InMemoryDocumentSessionOperations,
191+
)
192+
193+
session = _FakeSession()
194+
builder = InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder(session)
195+
builder.wait_for_indexes(["MyIndex"])
196+
self.assertIn("MyIndex", session._save_changes_options.index_options.wait_for_specific_indexes)
197+
129198

130199
if __name__ == "__main__":
131200
unittest.main()

ravendb/tests/session_tests/test_time_series.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import unittest
2+
13
from ravendb.tests.test_base import TestBase, User
24
from datetime import datetime, timedelta
35

@@ -65,3 +67,40 @@ def test_time_series_cache(self):
6567

6668
tsf.get(base + timedelta(days=2), base + timedelta(days=6))
6769
self.assertEqual(session.advanced.number_of_requests, 4)
70+
71+
def test_batch_processes_all_results_after_time_series(self):
72+
with self.store.open_session() as session:
73+
session.store(User("Target"), "users/ts-target")
74+
session.save_changes()
75+
76+
with self.store.open_session() as session:
77+
session.store(User("NewDoc"), "users/new-doc")
78+
tsf = session.time_series_for("users/ts-target", "HeartRate")
79+
tsf.append_single(datetime.now(), 70, "watch")
80+
session.save_changes()
81+
82+
with self.store.open_session() as session:
83+
new_doc = session.load("users/new-doc", User)
84+
self.assertIsNotNone(new_doc, "PUT after time series operation must be processed.")
85+
self.assertEqual(new_doc.name, "NewDoc")
86+
87+
88+
class TestBatchResultProcessing(unittest.TestCase):
89+
def test_batch_does_not_break_after_time_series(self):
90+
import inspect
91+
from ravendb.documents.operations.batch import BatchOperation
92+
93+
src = inspect.getsource(BatchOperation)
94+
lines = src.split("\n")
95+
for i, line in enumerate(lines):
96+
if "CommandType.TIME_SERIES" in line:
97+
for j in range(i + 1, min(i + 3, len(lines))):
98+
next_line = lines[j].strip()
99+
if next_line and not next_line.startswith("#"):
100+
self.assertNotEqual(
101+
next_line,
102+
"break",
103+
"Batch processing uses 'break' after TIME_SERIES, "
104+
"dropping all subsequent results. Must use 'continue'.",
105+
)
106+
break

0 commit comments

Comments
 (0)