-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreateSnapshottingAggregateRootRepository.ts
More file actions
96 lines (86 loc) · 3.66 KB
/
createSnapshottingAggregateRootRepository.ts
File metadata and controls
96 lines (86 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import type { Event, EventsRaisedByAggregateRoots, EventStore } from "../../eventStore/EventStore.ts";
import type { AggregateRootRepository } from "../AggregateRootRepository.ts";
import type {
AggregateRootDefinitionMap,
AggregateRootDefinitionMapTypes,
} from "../AggregateRootDefinition.ts";
import type { SnapshotStorage } from "../SnapshotStorage.ts";
/**
* Some aggregates have very large event streams. It can be helpful to take a snapshot of the aggregate to avoid loading
* a large number of events when retrieving an aggregate.
*/
export function createSnapshottingAggregateRootRepository<
TAggregateDefinitionMap extends AggregateRootDefinitionMap<TAggregateMapTypes>,
TAggregateMapTypes extends AggregateRootDefinitionMapTypes = AggregateRootDefinitionMapTypes,
>(
{ eventStore, aggregateRoots, snapshotStorage }: {
eventStore: EventStore<EventsRaisedByAggregateRoots<TAggregateDefinitionMap, TAggregateMapTypes>>;
aggregateRoots: TAggregateDefinitionMap;
snapshotStorage: SnapshotStorage<TAggregateDefinitionMap, TAggregateMapTypes>;
},
): AggregateRootRepository<TAggregateDefinitionMap, TAggregateMapTypes> {
return {
retrieve: async (
{ aggregateRootId, aggregateRootType },
) => {
const definition = aggregateRoots[aggregateRootType];
const snapshot = await snapshotStorage.retrieve({
aggregateRootId,
aggregateRootType,
stateVersion: definition.state.version,
});
const events = eventStore.retrieve({
aggregateRootId,
aggregateRootType: aggregateRootType as string,
fromVersion: snapshot && snapshot.aggregateVersion,
});
let state = structuredClone(snapshot ? snapshot.state : definition.state.initialState);
let aggregateVersion = snapshot ? snapshot.aggregateVersion : undefined;
for await (const event of events) {
state = definition.state.reducer(state, event.payload);
aggregateVersion = event.aggregateVersion;
}
return {
aggregateRootId,
aggregateRootType,
aggregateVersion,
state,
};
},
persist: async ({ aggregateRoot, pendingEventPayloads }) => {
const events: Event[] = pendingEventPayloads.map(
(payload, i) => ({
aggregateRootType: aggregateRoot.aggregateRootType as string,
aggregateRootId: aggregateRoot.aggregateRootId,
recordedAt: new Date(),
aggregateVersion: (aggregateRoot.aggregateVersion ?? 0) + (i + 1),
payload,
}),
);
const definition = aggregateRoots[aggregateRoot.aggregateRootType];
let state = aggregateRoot.state;
for await (const event of events) {
state = definition.state.reducer(state, event.payload);
}
const aggregateRootVersion: number | undefined = events.length > 0
? events.at(-1)!.aggregateVersion
: aggregateRoot.aggregateVersion;
// Persist events first, such that we can be sure there were no integrity
// violations.
await eventStore.persist(events);
// Persist a snapshot for this aggregate to storage. Depending on the size of
// the state, we could choose to do this less often only persist snapshots for
// every N events raised. This does not require a transaction, since the system
// will self recover in the event a snapshot cannot be persisted.
await snapshotStorage.persist({
stateVersion: definition.state.version,
aggregateRoot: {
state,
aggregateRootId: aggregateRoot.aggregateRootId,
aggregateVersion: aggregateRootVersion,
aggregateRootType: aggregateRoot.aggregateRootType,
},
});
},
};
}