Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion server/src/instant/db/datalog.clj
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
:variable symbol?
:function (s/keys :req-un [(or ::$not ::$isNull ::$comparator ::$entityIdStartsWith)])))

(s/def ::idx-key #{:ea :eav :av :ave :vae})
(s/def ::idx-key #{:ea :eav :av :ave :vae :mutated})
(s/def ::data-type #{:string :number :boolean :date})
(s/def ::index-map (s/keys :req-un [::idx-key ::data-type]))
(s/def ::index (s/or :keyword ::idx-key
Expand Down Expand Up @@ -2775,6 +2775,21 @@
(let [{:keys [cte-cols symbol-fields pattern page-info]} pattern-meta
{:keys [symbol-values symbol-values-for-topics]} acc
topics (named-pattern->topics pattern symbol-values-for-topics)
;; The page-info pattern (order/limit) re-sorts the result, so its
;; topic exists to catch a row whose sort key *changes* (an update),
;; including a row reordering into the window. A *new* row entering
;; the window is already caught by the where/enumeration topic, so
;; the page topic doesn't need to fire on inserts -- and firing on
;; them is what made it app-wide spam (e.g. for
;; `messages where conversation.id = X order by createdAt`, the
;; topic is `[:ave _ createdAt _]`, which every new message anywhere
;; matched). Rewrite the page topic's index to `:mutated`, which
;; only value-changed updates emit (see
;; topics/topics-for-triple-update), so it matches reorders but not
;; inserts.
topics (if (and page-info (flags/order-topics-match-updates-only?))
(mapv (fn [topic] (assoc topic 0 #{:mutated})) topics)
topics)
{:keys [join-rows page-info-rows symbol-values symbol-values-for-topics]}
(reduce (fn [acc row]
(let [join-row (sql-row->triple row cte-cols coerce-uuids?)
Expand Down
4 changes: 3 additions & 1 deletion server/src/instant/db/model/triple.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
#(s/gen #{"foo" (UUID/randomUUID) 25 nil true})))

(s/def ::triple (s/cat :e ::lookup :a ::attr-id :v ::value))
(s/def ::index #{:ea :eav :av :ave :vae})
;; `:ea`..`:vae` are the triple index permutations; `:mutated` is a topic-only
;; marker meaning "this attr's value changed" (see topics/topics-for-triple-update).
(s/def ::index #{:ea :eav :av :ave :vae :mutated})
(s/def ::md5 ::uspec/non-blank-string)

(s/def ::enhanced-triple
Expand Down
14 changes: 14 additions & 0 deletions server/src/instant/flags.clj
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@
(defn use-coarse-topics? [app-id]
(contains? (flag :coarse-topics-apps) app-id))

(defn order-topics-match-updates-only?
"When true (the default), an ordered+limited query's page-info topic only
matches sort-key changes on rows that *already exist* -- value updates, plus
first setting or clearing the value (see topics/lifecycle-entities) -- not the
writes that *create* a brand-new row. New rows are caught by the
where/enumeration topic, so the page topic doesn't need to fire on them, and
not firing keeps a new row in some *other* parent (e.g. a new message in
another conversation for `messages where conversation.id = X order by
createdAt`) from re-running the query, while still catching a row reordering
into the window. Set the `skip-order-topic-updates-only` toggle to revert to
the old app-wide behavior."
[]
(not (toggled? :skip-order-topic-updates-only)))

(defn use-get-datalog-queries-for-topics-v3? []
(toggled? :use-get-datalog-queries-for-topics-v3? true))

Expand Down
90 changes: 74 additions & 16 deletions server/src/instant/reactive/topics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,23 @@
:else
v-parsed)))

(defn- topics-for-triple-insert [change]
(defn- topics-for-triple-insert [change created-entities]
(let [m (columns->map (:columns change) true)
e (UUID/fromString (:entity_id m))
a (UUID/fromString (:attr_id m))
v (parse-v m)
ks (->> #{:ea :eav :av :ave :vae}
(filter m)
set)]
set)
;; A value appearing on an entity that *already existed* (e.g. setting an
;; order attr for the first time) is a re-sort an ordered+limited query
;; must catch, so we add the `:mutated` marker. A value written while
;; *creating* the entity is already caught by the where/enumeration topic
;; (the new row's id/link triples), so we leave it off -- that's what
;; keeps a new row in some other parent from re-running the query.
ks (if (contains? created-entities e)
ks
(conj ks :mutated))]
[[ks #{e} #{a} #{v}]]))

(defn- topics-for-triple-update
Expand Down Expand Up @@ -114,34 +123,83 @@

(and (= e old-e)
(= a old-a))
[[ks #{e} #{a} (set [v old-v])]]
;; `:mutated` marks that this attr's value changed (vs was just
;; inserted). Ordered+limited queries subscribe to it so they catch a
;; row reordering into the window without also re-running on every
;; insert (new rows are caught by the where/enumeration topic).
[[(conj ks :mutated) #{e} #{a} (set [v old-v])]]

;; We shouldn't hit this, but just in case
:else
[[ks #{e} #{a} #{v}]
[ks #{e} #{a} #{old-v}]])))
[[(conj ks :mutated) #{e} #{a} #{v}]
[(conj ks :mutated) #{e} #{a} #{old-v}]])))

(defn- topics-for-triple-delete [change]
(defn- topics-for-triple-delete [change deleted-entities]
(let [m (columns->map (:identity change) true)
e (UUID/fromString (:entity_id m))
a (UUID/fromString (:attr_id m))
v (parse-v m)
ks (->> #{:ea :eav :av :ave :vae}
(filter m)
set)]
set)
;; Retracting one of an entity's values (without deleting the whole
;; entity) can drop a row out of an ordered window, so we mark it
;; `:mutated` like an update. Deleting the *entity* instead removes its
;; id triple and is caught by the where/enumeration topic, so its value
;; retractions don't need the marker (see lifecycle-entities).
ks (if (contains? deleted-entities e)
ks
(conj ks :mutated))]
[[ks #{e} #{a} #{v}]]))

(defn topics-for-change [{:keys [action] :as change}]
(case action
:insert (topics-for-triple-insert change)
:update (topics-for-triple-update change)
:delete (topics-for-triple-delete change)
[]))
(defn- id-self-triple?
"An entity's id triple is its self-triple `[e id-attr e]`: the value equals the
entity and it's an object (non-ref) attr. Its insert/delete is our marker that
the *entity itself* was created/destroyed in this batch (the client always
writes the id triple on create, and we skip no-op id updates otherwise)."
[m]
(boolean
(and (not (:eav m))
(:value m)
(= (:entity_id m) (<-json (:value m))))))

(defn- lifecycle-entities
"Scans a batch of triple changes for entities whose existence changed in it,
keyed off the id self-triple. Returns `{:created #{..} :deleted #{..}}`. A
value insert/delete on an entity *outside* these sets is a value appearing or
disappearing on a surviving row -- a re-sort -- so it earns `:mutated`; one
*inside* them is part of creating/destroying the row and is already caught by
the where/enumeration topic."
[changes]
(reduce (fn [acc {:keys [action columns identity]}]
(case action
:insert (let [m (columns->map columns true)]
(cond-> acc
(id-self-triple? m)
(update :created conj (UUID/fromString (:entity_id m)))))
:delete (let [m (columns->map identity true)]
(cond-> acc
(id-self-triple? m)
(update :deleted conj (UUID/fromString (:entity_id m)))))
acc))
{:created #{} :deleted #{}}
changes))

(defn topics-for-change
([change]
(topics-for-change change nil))
([{:keys [action] :as change} {:keys [created deleted]}]
(case action
:insert (topics-for-triple-insert change (or created #{}))
:update (topics-for-triple-update change)
:delete (topics-for-triple-delete change (or deleted #{}))
[])))

(defn topics-for-triple-changes [changes]
(->> changes
(mapcat topics-for-change)
set))
(let [lifecycle (lifecycle-entities changes)]
(->> changes
(mapcat #(topics-for-change % lifecycle))
set)))

(defn- topics-for-ident-upsert [{:keys [columns]}]
(let [indexes #{:ea :eav :av :ave :vae}
Expand Down
Loading
Loading