diff --git a/client/www/components/dash/Billing.tsx b/client/www/components/dash/Billing.tsx index a26247cbc3..a14aac4998 100644 --- a/client/www/components/dash/Billing.tsx +++ b/client/www/components/dash/Billing.tsx @@ -24,7 +24,14 @@ export function roundToDecimal(num: number, decimalPlaces: number) { export function friendlyUsage(usage: number) { if (usage < GB_1) { - return `${roundToDecimal(usage / (1024 * 1024), 2)} MB`; + const mb = roundToDecimal(usage / (1024 * 1024), 2); + if (mb === 0) { + const kb = roundToDecimal(usage / 1024, 2); + if (kb !== 0) { + return `${kb} KB`; + } + } + return `${mb} MB`; } return `${roundToDecimal(usage / (1024 * 1024 * 1024), 2)} GB`; } diff --git a/server/resources/migrations/113_triples_size_updates_table.down.sql b/server/resources/migrations/113_triples_size_updates_table.down.sql new file mode 100644 index 0000000000..023b66246c --- /dev/null +++ b/server/resources/migrations/113_triples_size_updates_table.down.sql @@ -0,0 +1,246 @@ +create or replace function triples_insert_batch_trigger() +returns trigger as $$ +declare + debug_info text; +begin + + -- Update pg_size on triples + update triples t + set pg_size = public.triples_column_size(t) + from newrows n + where t.app_id = n.app_id + and t.entity_id = n.entity_id + and t.attr_id = n.attr_id + and t.value_md5 = n.value_md5; + + return null; +end; +$$ language plpgsql; + +create or replace function triples_update_batch_trigger() +returns trigger as $$ +declare + ents_msg text; + app_id_setting uuid; + log_to_table_setting boolean; +begin + -- Don't let this trigger cause itself to fire. We let it fire + -- twice because postgres 16 has some bug (fixed in 17) where + -- pg_column_size on insert is different than pg_column_size on + -- update, possibly due to how it handles nulls? If that happens, + -- then the second time it fires it will get the right value. + if pg_trigger_depth() > 2 then + return null; + end if; + + if pg_trigger_depth() <= 1 then + -- Update sweeper with deleted files + with old_files as ( + select app_id, value #>> '{}' as location_id + from oldrows + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + ), new_files as ( + select app_id, value #>> '{}' as location_id + from newrows + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + ) + insert into app_files_to_sweep (app_id, location_id) + select o.app_id, o.location_id + from old_files o + left join new_files n + on o.app_id = n.app_id + and o.location_id = n.location_id + where o.location_id is not null and n.location_id is null + on conflict do nothing; + + + end if; + + select case current_setting('instant.wal_msg_app_id', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_app_id', true)::uuid + end + into app_id_setting; + + if app_id_setting is not null then + + -- Write the entities to the wal + with by_etype as ( + -- Forward entities + select a.etype, n.entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + union + -- Ref entities + select a.reverse_etype etype, json_uuid_to_uuid(n.value) entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + where n.vae + ), + etypes as ( + select distinct etype from by_etype + ), + -- Map of etype to attr + attr_map as ( + select a.etype, a.id + from attrs a + join etypes e on e.etype = a.etype + where a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + and a.cardinality = 'one' + ), + -- Get all of the ents, grouped by etype and entity_id + by_entity as ( + select e.etype, + t.entity_id, + json_object_agg(t.attr_id::text, t.value) as attrs + from triples t + join by_etype e + on t.entity_id = e.entity_id + join attr_map a + on a.etype = e.etype + and t.attr_id = a.id + where t.app_id = app_id_setting + and t.ea + group by e.etype, t.entity_id + ) + select json_agg(json_build_array(etype, entity_id, attrs))::text + into ents_msg + from by_entity; + + select case current_setting('instant.wal_msg_log_to_table', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_log_to_table', true)::boolean + end + into log_to_table_setting; + + + if ents_msg is not null then + if log_to_table_setting is not null and log_to_table_setting then + insert into wal_logs (id, created_at, hour_bucket, prefix, content) + values (gen_random_uuid(), now(), date_part('hour', now() at time zone 'UTC')::int % 8, 'update_ents', ents_msg); + else + perform pg_logical_emit_message(true, 'update_ents', ents_msg); + end if; + end if; + end if; + + update triples t + set pg_size = public.triples_column_size(t) + from newrows s + where s.app_id = t.app_id + and s.entity_id = t.entity_id + and s.attr_id = t.attr_id + and s.value_md5 = t.value_md5 + and public.triples_column_size(t) is distinct from t.pg_size; + + return null; +end; +$$ language plpgsql; + + +create or replace function triples_delete_batch_trigger() +returns trigger as $$ +declare + ents_msg text; + app_id_setting uuid; + log_to_table_setting boolean; +begin + -- Update sweeper with deleted files + insert into app_files_to_sweep (app_id, location_id) + select app_id, value #>> '{}' as location_id + from oldrows + -- This should match the attr_id for $files.location-id + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + on conflict do nothing; + + select case current_setting('instant.wal_msg_app_id', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_app_id', true)::uuid + end + into app_id_setting; + + if app_id_setting is not null then + + -- Write the entities to the wal + with by_etype as ( + -- Forward entities + select a.etype, n.entity_id + from oldrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + union + -- Ref entities + select a.reverse_etype etype, json_uuid_to_uuid(n.value) entity_id + from oldrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + where n.vae + ), + etypes as ( + select distinct etype from by_etype + ), + -- Map of etype to attr + attr_map as ( + select a.etype, a.id + from attrs a + join etypes e on e.etype = a.etype + where a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + and a.cardinality = 'one' + ), + -- Get all of the ents, grouped by etype and entity_id + by_entity as ( + select e.etype, + t.entity_id, + json_object_agg(t.attr_id::text, t.value) as attrs + from triples t + join by_etype e + on t.entity_id = e.entity_id + join attr_map a + on a.etype = e.etype + and t.attr_id = a.id + where t.app_id = app_id_setting + and t.ea + group by e.etype, t.entity_id + ) + select json_agg(json_build_array(etype, entity_id, attrs))::text + into ents_msg + from by_entity; + + select case current_setting('instant.wal_msg_log_to_table', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_log_to_table', true)::boolean + end + into log_to_table_setting; + + + if ents_msg is not null then + if log_to_table_setting is not null and log_to_table_setting then + insert into wal_logs (id, created_at, hour_bucket, prefix, content) + values (gen_random_uuid(), now(), date_part('hour', now() at time zone 'UTC')::int % 8, 'delete_ents', ents_msg); + else + perform pg_logical_emit_message(true, 'delete_ents', ents_msg); + end if; + end if; + end if; + + + return null; +end; +$$ language plpgsql; + +drop trigger clean_triples_size_updates_trigger on attrs; +drop function clean_triples_size_updates(); + +drop table triples_size_aggregate; + +drop table triples_size_updates; diff --git a/server/resources/migrations/113_triples_size_updates_table.up.sql b/server/resources/migrations/113_triples_size_updates_table.up.sql new file mode 100644 index 0000000000..dc74973f51 --- /dev/null +++ b/server/resources/migrations/113_triples_size_updates_table.up.sql @@ -0,0 +1,364 @@ +create table triples_size_updates ( + id bigserial primary key, + -- Intentionally did not add a foreign key constraint to keep inserts fast + app_id uuid not null, + attr_id uuid not null, + pg_size bigint not null +); + +-- Minimize bottleneck on generating ids. Creates gaps in ids, but +-- it's no problem for us. +alter sequence triples_size_updates_id_seq cache 128; + +-- Make the autovacuum more aggressive to quickly clean up dead tuples. +ALTER TABLE triples_size_updates SET ( + autovacuum_vacuum_scale_factor = 0, + autovacuum_vacuum_threshold = 1000, + autovacuum_vacuum_insert_scale_factor = 0, + autovacuum_vacuum_insert_threshold = 1000, + autovacuum_analyze_scale_factor = 0, + autovacuum_analyze_threshold = 1000, + autovacuum_vacuum_cost_delay = 0, + autovacuum_vacuum_cost_limit = 10000 +); + +-- We run this trigger instead of using a foreign key to make the +-- insert path as fast as possible. We only insert from the triples +-- trigger, so there shouldn't be a way to get an attr_id that doesn't +-- exist. This trigger is to guard against inflated sizes if we happen +-- to delete an attr and then immediately create a new attr with the same id. +-- (we don't do it for apps, because regenerating an app is less likely) +create or replace function clean_triples_size_updates() +returns trigger as $$ +begin + -- There shouldn't be enough triples_size_updates for us to need an index on attr_id + delete from triples_size_updates where triples_size_updates.attr_id = old.id; + return old; +end; +$$ language plpgsql; + + +create trigger clean_triples_size_updates_trigger +before delete on attrs +for each row +execute function clean_triples_size_updates(); + +create table triples_size_aggregate ( + app_id uuid not null references apps (id) on delete cascade, + attr_id uuid not null references attrs (id) on delete cascade, + pg_size bigint not null, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + primary key (app_id, attr_id) +); + +create index on triples_size_aggregate (attr_id); + +create trigger update_updated_at_trigger +before update on triples_size_aggregate +for each row +execute function update_updated_at_column(); + +create or replace function triples_insert_batch_trigger() +returns trigger as $$ +declare + ents_msg text; + app_id_setting uuid; + log_to_table_setting boolean; +begin + select case current_setting('instant.wal_msg_app_id', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_app_id', true)::uuid + end + into app_id_setting; + + -- This is exactly the same as the update trigger, but there's + -- not an easy way to extract it into a separate function because + -- it references newrows + if app_id_setting is not null then + + -- Write the entities to the wal + with by_etype as ( + -- Forward entities + select a.etype, n.entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + union + -- Ref entities + select a.reverse_etype etype, json_uuid_to_uuid(n.value) entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + where n.vae + ), + etypes as ( + select distinct etype from by_etype + ), + -- Map of etype to attr + attr_map as ( + select a.etype, a.id + from attrs a + join etypes e on e.etype = a.etype + where a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + and a.cardinality = 'one' + ), + -- Get all of the ents, grouped by etype and entity_id + by_entity as ( + select e.etype, + t.entity_id, + json_object_agg(t.attr_id::text, t.value) as attrs + from triples t + join by_etype e + on t.entity_id = e.entity_id + join attr_map a + on a.etype = e.etype + and t.attr_id = a.id + where t.app_id = app_id_setting + and t.ea + group by e.etype, t.entity_id + ) + select json_agg(json_build_array(etype, entity_id, attrs))::text + into ents_msg + from by_entity; + + select case current_setting('instant.wal_msg_log_to_table', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_log_to_table', true)::boolean + end + into log_to_table_setting; + + + if ents_msg is not null then + if log_to_table_setting is not null and log_to_table_setting then + insert into wal_logs (id, created_at, hour_bucket, prefix, content) + values (gen_random_uuid(), now(), date_part('hour', now() at time zone 'UTC')::int % 8, 'update_ents', ents_msg); + else + perform pg_logical_emit_message(true, 'update_ents', ents_msg); + end if; + end if; + end if; + + + insert into triples_size_updates (app_id, attr_id, pg_size) + select n.app_id, n.attr_id, sum(pg_column_size(n.*))::bigint + from newrows n + group by n.app_id, n.attr_id; + + return null; +end; +$$ language plpgsql; + +create or replace function triples_update_batch_trigger() +returns trigger as $$ +declare + ents_msg text; + app_id_setting uuid; + log_to_table_setting boolean; +begin + -- Update sweeper with deleted files + with old_files as ( + select app_id, value #>> '{}' as location_id + from oldrows + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + ), new_files as ( + select app_id, value #>> '{}' as location_id + from newrows + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + ) + insert into app_files_to_sweep (app_id, location_id) + select o.app_id, o.location_id + from old_files o + left join new_files n + on o.app_id = n.app_id + and o.location_id = n.location_id + where o.location_id is not null and n.location_id is null + on conflict do nothing; + + select case current_setting('instant.wal_msg_app_id', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_app_id', true)::uuid + end + into app_id_setting; + + if app_id_setting is not null then + + -- Write the entities to the wal + with by_etype as ( + -- Forward entities + select a.etype, n.entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + union + -- Ref entities + select a.reverse_etype etype, json_uuid_to_uuid(n.value) entity_id + from newrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + where n.vae + ), + etypes as ( + select distinct etype from by_etype + ), + -- Map of etype to attr + attr_map as ( + select a.etype, a.id + from attrs a + join etypes e on e.etype = a.etype + where a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + and a.cardinality = 'one' + ), + -- Get all of the ents, grouped by etype and entity_id + by_entity as ( + select e.etype, + t.entity_id, + json_object_agg(t.attr_id::text, t.value) as attrs + from triples t + join by_etype e + on t.entity_id = e.entity_id + join attr_map a + on a.etype = e.etype + and t.attr_id = a.id + where t.app_id = app_id_setting + and t.ea + group by e.etype, t.entity_id + ) + select json_agg(json_build_array(etype, entity_id, attrs))::text + into ents_msg + from by_entity; + + select case current_setting('instant.wal_msg_log_to_table', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_log_to_table', true)::boolean + end + into log_to_table_setting; + + + if ents_msg is not null then + if log_to_table_setting is not null and log_to_table_setting then + insert into wal_logs (id, created_at, hour_bucket, prefix, content) + values (gen_random_uuid(), now(), date_part('hour', now() at time zone 'UTC')::int % 8, 'update_ents', ents_msg); + else + perform pg_logical_emit_message(true, 'update_ents', ents_msg); + end if; + end if; + end if; + + insert into triples_size_updates (app_id, attr_id, pg_size) + select app_id, attr_id, sum(delta)::bigint + from (select n.app_id, n.attr_id, pg_column_size(n.*)::bigint as delta from newrows n + union all + select o.app_id, o.attr_id, -pg_column_size(o.*)::bigint as delta from oldrows o) x + group by app_id, attr_id + having sum(delta) <> 0; + + return null; +end; +$$ language plpgsql; + +create or replace function triples_delete_batch_trigger() +returns trigger as $$ +declare + ents_msg text; + app_id_setting uuid; + log_to_table_setting boolean; +begin + -- Update sweeper with deleted files + insert into app_files_to_sweep (app_id, location_id) + select app_id, value #>> '{}' as location_id + from oldrows + -- This should match the attr_id for $files.location-id + where attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' + on conflict do nothing; + + select case current_setting('instant.wal_msg_app_id', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_app_id', true)::uuid + end + into app_id_setting; + + if app_id_setting is not null then + + -- Write the entities to the wal + with by_etype as ( + -- Forward entities + select a.etype, n.entity_id + from oldrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + union + -- Ref entities + select a.reverse_etype etype, json_uuid_to_uuid(n.value) entity_id + from oldrows n + join attrs a + on n.attr_id = a.id + and a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + where n.vae + ), + etypes as ( + select distinct etype from by_etype + ), + -- Map of etype to attr + attr_map as ( + select a.etype, a.id + from attrs a + join etypes e on e.etype = a.etype + where a.app_id in (app_id_setting, 'a1111111-1111-1111-1111-111111111ca7') + and a.cardinality = 'one' + ), + -- Get all of the ents, grouped by etype and entity_id + by_entity as ( + select e.etype, + t.entity_id, + json_object_agg(t.attr_id::text, t.value) as attrs + from triples t + join by_etype e + on t.entity_id = e.entity_id + join attr_map a + on a.etype = e.etype + and t.attr_id = a.id + where t.app_id = app_id_setting + and t.ea + group by e.etype, t.entity_id + ) + select json_agg(json_build_array(etype, entity_id, attrs))::text + into ents_msg + from by_entity; + + select case current_setting('instant.wal_msg_log_to_table', true) + when null then null + when '' then null + else current_setting('instant.wal_msg_log_to_table', true)::boolean + end + into log_to_table_setting; + + + if ents_msg is not null then + if log_to_table_setting is not null and log_to_table_setting then + insert into wal_logs (id, created_at, hour_bucket, prefix, content) + values (gen_random_uuid(), now(), date_part('hour', now() at time zone 'UTC')::int % 8, 'delete_ents', ents_msg); + else + perform pg_logical_emit_message(true, 'delete_ents', ents_msg); + end if; + end if; + end if; + + insert into triples_size_updates (app_id, attr_id, pg_size) + select app_id, attr_id, -sum(pg_column_size(o.*))::bigint + from oldrows o + group by app_id, attr_id; + + return null; +end; +$$ language plpgsql; diff --git a/server/src/instant/bootstrap_triples_size.clj b/server/src/instant/bootstrap_triples_size.clj new file mode 100644 index 0000000000..51e83e050d --- /dev/null +++ b/server/src/instant/bootstrap_triples_size.clj @@ -0,0 +1,99 @@ +(ns instant.bootstrap-triples-size + (:require + [instant.config :as config] + [instant.jdbc.aurora :as aurora] + [instant.jdbc.copy :as copy] + [instant.jdbc.sql :as sql] + [instant.jdbc.wal :as wal] + [instant.util.hsql :as uhsql] + [instant.util.tracer :as tracer]) + (:import + (java.util HashMap) + (java.util.function BiFunction Function) + (org.postgresql.jdbc PgConnection))) + +(def insert-triples-size-rows-q + (uhsql/preformat {:insert-into [[:triples-size-updates [:app-id :attr-id :pg-size]] + {:select [:app-id :attr-id :pg-size] + :from [[[:unnest :?app-id :?attr-id :?pg-size] + [:t [:composite :app-id :attr-id :pg-size]]]]}]})) +(defn insert-triples-size-rows [group] + (let [params (loop [app-id (transient []) + attr-id (transient []) + pg-size (transient []) + group group] + (if-let [item (first group)] + (recur (conj! app-id (:app_id item)) + (conj! attr-id (:attr_id item)) + (conj! pg-size (:pg_size item)) + (next group)) + {:app-id (with-meta (persistent! app-id) + {:pgtype "uuid[]"}) + :attr-id (with-meta (persistent! attr-id) + {:pgtype "uuid[]"}) + :pg-size (with-meta (persistent! pg-size) + {:pgtype "bigint[]"})}))] + (sql/do-execute! ::insert-triples-size-rows + (aurora/conn-pool :write) + (uhsql/formatp insert-triples-size-rows-q + params)))) + +(defn delete-triples-size-rows [ids] + (sql/do-execute! ::delete-triples-size-rows + (aurora/conn-pool :write) + ["delete from triples_size_updates where id = ANY(?::bigint[])" + (with-meta ids {:pgtype "bigint[]"})])) + +(defn collect-sizes [^PgConnection conn] + (let [m (HashMap.) + app-map-fn (reify Function + (apply [_ _k] + (HashMap.)))] + (doseq [{:keys [app_id attr_id pg_size]} + (copy/copy-seq conn + "copy (select app_id, attr_id, pg_column_size(triples) from triples) to stdout with (format binary)" + [{:name :app_id + :pgtype "uuid"} + {:name :attr_id + :pgtype "uuid"} + {:name :pg_size + :pgtype "integer"}])] + (let [app-map (.computeIfAbsent m app_id app-map-fn)] + (HashMap/.compute app-map attr_id (reify BiFunction + (apply [_ _k v] + (if v + (+ pg_size v) + (long pg_size))))))) + (for [[app-id attr-map] m + [attr-id size] attr-map] + {:app_id app-id + :attr_id attr-id + :pg_size size}))) + +(def slot-name "bootstrap_triples_size") + +(defn bootstrap + "Run this in the repl on one production instance after running the migration that + adds the new tables." + [] + (let [{:keys [connection]} (wal/create-sync-db-replication-slot-and-connection + (config/get-aurora-config) + slot-name) + _ (when-not connection + (throw (ex-info "Did not create replication slot" {:slot-name slot-name}))) + _ (wal/drop-logical-replication-slot (aurora/conn-pool :write) + slot-name) + {:keys [initial-rows + ids-to-delete]} + (tracer/with-span! {:name ::get-initial-rows} + (with-open [conn ^PgConnection connection] + {:initial-rows (collect-sizes conn) + :ids-to-delete (map :id (sql/select ::ids-to-delete conn ["select id from triples_size_updates"]))}))] + + (tracer/with-span! {:name ::insert-initial-rows} + (doseq [group (partition-all 5000 initial-rows)] + (insert-triples-size-rows group))) + (tracer/with-span! {:name ::delete-existing} + (doseq [group (partition-all 5000 ids-to-delete)] + (delete-triples-size-rows group))) + (println "Set the disable-triples-size-collection flag to false to start the collection process."))) diff --git a/server/src/instant/core.clj b/server/src/instant/core.clj index 4a9317c81b..3a67f69e76 100644 --- a/server/src/instant/core.clj +++ b/server/src/instant/core.clj @@ -16,7 +16,6 @@ [instant.db.indexing-jobs :as indexing-jobs] [instant.db.hint-testing :as hint-testing] [instant.db.model.wal-log :as wal-log-model] - [instant.model.history :as history-model] [instant.db.model.transaction :as tx-model] [instant.demo-routes :as demo-routes] [instant.storage.sweeper :as storage-sweeper] @@ -33,6 +32,8 @@ [instant.log-config :as log-config] [instant.mma-example :as mma-example] [instant.machine-summaries] + [instant.model.history :as history-model] + [instant.model.triples-size-updates :as triples-size-updates] [instant.nippy] [instant.nrepl :as nrepl] [instant.oauth-apps.routes :as oauth-app-routes] @@ -345,7 +346,10 @@ (history-model/stop))) (future (tracer/with-span! {:name "stop-loadbalancer-listener"} - (loadbalancer-listener/stop))))) + (loadbalancer-listener/stop))) + (future + (tracer/with-span! {:name "stop-triples-size-updates"} + (triples-size-updates/stop-global))))) (tracer/shutdown)) (defn add-shutdown-hook [] @@ -464,6 +468,8 @@ (catch Throwable t (tracer/record-exception-span! t {:name "load-balancer-listener-init-error" :escaping? false}))) + (with-log-init :triples-size-updates + (triples-size-updates/start-global)) (log/info "Finished initializing")) (catch Throwable t (log/error t "Error in startup") diff --git a/server/src/instant/dash/admin.clj b/server/src/instant/dash/admin.clj index aebf4322b1..3dd5b000da 100644 --- a/server/src/instant/dash/admin.clj +++ b/server/src/instant/dash/admin.clj @@ -4,7 +4,7 @@ [instant.jdbc.aurora :as aurora] [instant.jdbc.sql :as sql] [clojure.core :as c] - [instant.flags :refer [get-emails]] + [instant.flags :as flags :refer [get-emails]] [instant.stripe :as stripe] [instant.model.app-file :as app-file-model])) @@ -109,17 +109,36 @@ [:customer-email :user_email] :monthly-revenue :start-timestamp - [{:select [[[:coalesce - [:* - [:sum :s.triples_pg_size] - [:case - [:= [:pg_relation_size "triples"] 0] 1 - :else [:/ - [:cast [:pg_total_relation_size "triples"] :numeric] - [:pg_relation_size "triples"]]]] - 0]]] - :from [[:attr-sketches :s]] - :where [:= :s.app_id :apps.id]} + [(if (flags/new-db-size?) + {:select [[[:coalesce + [:* + [:sum :agg.pg_size] + [:case + [:= [:pg_relation_size "triples"] 0] 1 + :else [:/ + [:cast [:pg_total_relation_size "triples"] :numeric] + [:pg_relation_size "triples"]]]] + 0]]] + :from [[:triples-size-aggregate :agg]] + :join [[:attrs :a] [:= :a.id :agg.attr_id]] + :where [:and + [:= :agg.app_id :apps.id] + [:= nil :a.deletion_marked_at]]} + + {:select [[[:coalesce + [:* + [:sum :s.triples_pg_size] + [:case + [:= [:pg_relation_size "triples"] 0] 1 + :else [:/ + [:cast [:pg_total_relation_size "triples"] :numeric] + [:pg_relation_size "triples"]]]] + 0]]] + :from [[:attr-sketches :s]] + :join [[:attrs :a] [:= :a.id :s.attr_id]] + :where [:and + [:= :s.app_id :apps.id] + [:= nil :a.deletion_marked_at]]}) :usage] [{:select [[[:coalesce [:sum :s.total] 0]]] :from [[:attr_sketches :s]] @@ -135,17 +154,36 @@ [:customer-email :user_email] :monthly-revenue :start-timestamp - [{:select [[[:coalesce - [:* - [:sum :s.triples_pg_size] - [:case - [:= [:pg_relation_size "triples"] 0] 1 - :else [:/ - [:cast [:pg_total_relation_size "triples"] :numeric] - [:pg_relation_size "triples"]]]] - 0]]] - :from [[:attr-sketches :s]] - :where [:in :s.app_id {:select :id :from :apps :where [:= :apps.org_id :orgs.id]}]} + [(if (flags/new-db-size?) + {:select [[[:coalesce + [:* + [:sum :agg.pg_size] + [:case + [:= [:pg_relation_size "triples"] 0] 1 + :else [:/ + [:cast [:pg_total_relation_size "triples"] :numeric] + [:pg_relation_size "triples"]]]] + 0]]] + :from [[:triples-size-aggregate :agg]] + :join [[:attrs :a] [:= :a.id :agg.attr_id]] + :where [:and + [:in :agg.app_id {:select :id :from :apps :where [:= :apps.org_id :orgs.id]}] + [:= nil :a.deletion_marked_at]]} + + {:select [[[:coalesce + [:* + [:sum :s.triples_pg_size] + [:case + [:= [:pg_relation_size "triples"] 0] 1 + :else [:/ + [:cast [:pg_total_relation_size "triples"] :numeric] + [:pg_relation_size "triples"]]]] + 0]]] + :from [[:attr-sketches :s]] + :join [[:attrs :a] [:= :a.id :s.attr_id]] + :where [:and + [:in :s.app_id {:select :id :from :apps :where [:= :apps.org_id :orgs.id]}] + [:= nil :a.deletion_marked_at]]}) :usage] [{:select [[[:coalesce [:sum :s.total] 0]]] :from [[:attr_sketches :s]] diff --git a/server/src/instant/db/attr_sketch.clj b/server/src/instant/db/attr_sketch.clj index 73da809940..22eba0fb27 100644 --- a/server/src/instant/db/attr_sketch.clj +++ b/server/src/instant/db/attr_sketch.clj @@ -332,6 +332,7 @@ :app-id (:app_id record) :attr-id (:attr_id record) :max-lsn (:max_lsn record) + ;; TODO(dww): Remove after deploying triples-size-updates :triples-pg-size (:triples_pg_size record)})) (defn- find-sketch-rows @@ -506,6 +507,7 @@ (update :reverse-total conj (:total reverse-sketch)) (update :reverse-bins conj (when reverse-sketch (compress-bins reverse-sketch))) + ;; TODO(dww): Remove after deploying triples-size-updates (update :triples-pg-size conj triples-pg-size)))) {:id (with-meta [] {:pgtype "uuid[]"}) :width (with-meta [] {:pgtype "integer[]"}) @@ -517,6 +519,7 @@ :reverse-depth (with-meta [] {:pgtype "integer[]"}) :reverse-total (with-meta [] {:pgtype "bigint[]"}) :reverse-bins (with-meta [] {:pgtype "bytea[]"}) + ;; TODO(dww): Remove after deploying triples-size-updates :triples-pg-size (with-meta [] {:pgtype "bigint[]"}) :lsn lsn :previous-lsn previous-lsn @@ -536,6 +539,7 @@ [[:unnest :?reverse-depth] :reverse-depth] [[:unnest :?reverse-total] :reverse-total] [[:unnest :?reverse-bins] :reverse-bins] + ;; TODO(dww): Remove after deploying triples-size-updates [[:unnest :?triples-pg-size] :triples-pg-size] [:?lsn :max-lsn]]}] [:update-sketches @@ -551,6 +555,7 @@ :reverse-depth :data.reverse-depth :reverse-total :data.reverse-total :reverse-bins :data.reverse-bins + ;; TODO(dww): Remove after deploying triples-size-updates :triples-pg-size :data.triples-pg-size} :where [:= :attr_sketches.id :data.id]}] [:update-wal-aggregator-status @@ -598,6 +603,7 @@ (update :reverse-total conj (:total reverse-sketch)) (update :reverse-bins conj (when reverse-sketch (compress-bins reverse-sketch))) + ;; TODO(dww): Remove after deploying triples-size-updates (update :triples-pg-size conj triples-pg-size)))) {:app-id (with-meta [] {:pgtype "uuid[]"}) :attr-id (with-meta [] {:pgtype "uuid[]"}) @@ -610,6 +616,7 @@ :reverse-depth (with-meta [] {:pgtype "integer[]"}) :reverse-total (with-meta [] {:pgtype "bigint[]"}) :reverse-bins (with-meta [] {:pgtype "bytea[]"}) + ;; TODO(dww): Remove after deploying triples-size-updates :triples-pg-size (with-meta [] {:pgtype "bigint[]"})} sketches) cols [:id :max-lsn :app-id :attr-id @@ -631,6 +638,7 @@ [[:unnest :?reverse-depth] :reverse-depth] [[:unnest :?reverse-total] :reverse-total] [[:unnest :?reverse-bins] :reverse-bins] + ;; TODO(dww): Remove after deploying triples-size-updates [[:unnest :?triples-pg-size] :triples-pg-size]]}] [:changes {:insert-into [[:attr-sketches cols] {:select (qualify-cols :data cols) diff --git a/server/src/instant/flags.clj b/server/src/instant/flags.clj index fb57fe26bf..f1bc620bb0 100644 --- a/server/src/instant/flags.clj +++ b/server/src/instant/flags.clj @@ -452,3 +452,18 @@ (defn use-reactive-cache-for-verify-token? [app-id] (contains? (flag :use-reactive-cache-for-verify-token-apps) app-id)) + +(defn triples-size-collection-batch-size [] + (flag :triples-size-collection-batch-size 5000)) + +(defn triples-size-collection-max-loops [] + (flag :triples-size-collection-max-loops 1000)) + +(defn disable-triples-size-collection? [] + ;; Defaults to disabled so that we can bootstrap before + ;; we start the process. + (flag :disable-triples-size-collection true)) + +;; TODO(dww): Remove after deploying triples-size-updates +(defn new-db-size? [] + (toggled? :use-new-db-size false)) diff --git a/server/src/instant/jdbc/failover.clj b/server/src/instant/jdbc/failover.clj index 8fc0ab37df..ab2cba2c4b 100644 --- a/server/src/instant/jdbc/failover.clj +++ b/server/src/instant/jdbc/failover.clj @@ -493,8 +493,10 @@ (do (println "Got a bad tx row" row) (quit)) - (sql/execute! next-pool ["SELECT setval('transactions_id_seq', ?::bigint, true)" - (+ (:id row) 1000)])) + (do + (sql/execute! next-pool ["SELECT setval('transactions_id_seq', ?::bigint, true)" + (+ (:id row) 1000)]) + (sql/execute! next-pool ["SELECT setval('triples_size_updates_id_seq', nextval('triples_size_updates_id_seq') + 100000, true)"]))) (do (when (> i 100) (println "Waited too long for data to sync") @@ -565,8 +567,10 @@ (do (println "Got a bad tx row" row) (quit)) - (sql/execute! next-pool ["SELECT setval('transactions_id_seq', ?::bigint, true)" - (+ (:id row) 1000)])) + (do + (sql/execute! next-pool ["SELECT setval('transactions_id_seq', ?::bigint, true)" + (+ (:id row) 1000)]) + (sql/execute! next-pool ["SELECT setval('triples_size_updates_id_seq', nextval('triples_size_updates_id_seq') + 100000, true)"]))) (do (when (> i 100) (println "Waited too long for data to sync") diff --git a/server/src/instant/model/app.clj b/server/src/instant/model/app.clj index 6ef87a68da..44be76f2f5 100644 --- a/server/src/instant/model/app.clj +++ b/server/src/instant/model/app.clj @@ -474,14 +474,23 @@ (sql/select-one ::app-usage conn - ["SELECT - (sum(s.triples_pg_size) * - CASE - WHEN pg_relation_size('triples') = 0 THEN 1 - ELSE pg_total_relation_size('triples')::numeric / pg_relation_size('triples') - END) as num_bytes - FROM attr_sketches s join attrs a on s.attr_id = a.id - WHERE a.deletion_marked_at is null and s.app_id = ?::uuid" app-id]))) + (if (flags/new-db-size?) ;; TODO(dww): Remove after deploying triples-size-updates + ["SELECT + (sum(agg.pg_size) * + CASE + WHEN pg_relation_size('triples') = 0 THEN 1 + ELSE pg_total_relation_size('triples')::numeric / pg_relation_size('triples') + END) as num_bytes + FROM triples_size_aggregate agg join attrs a on agg.attr_id = a.id + WHERE a.deletion_marked_at is null and agg.app_id = ?::uuid" app-id] + ["SELECT + (sum(s.triples_pg_size) * + CASE + WHEN pg_relation_size('triples') = 0 THEN 1 + ELSE pg_total_relation_size('triples')::numeric / pg_relation_size('triples') + END) as num_bytes + FROM attr_sketches s join attrs a on s.attr_id = a.id + WHERE a.deletion_marked_at is null and s.app_id = ?::uuid" app-id])))) (defn decrypt-connection-string [app-id encrypted-connection-string] (-> (crypt-util/aead-decrypt {:ciphertext encrypted-connection-string diff --git a/server/src/instant/model/org.clj b/server/src/instant/model/org.clj index 7d10307342..d9faec7f06 100644 --- a/server/src/instant/model/org.clj +++ b/server/src/instant/model/org.clj @@ -1,6 +1,7 @@ (ns instant.model.org (:require [instant.config :as config] + [instant.flags :as flags] [instant.jdbc.aurora :as aurora] [instant.jdbc.sql :as sql] [instant.model.app :as app-model] @@ -164,7 +165,6 @@ query (uhsql/formatp invites-for-org-q params)] (sql/select ::invites-for-org conn query)))) - (def org-for-user-q (uhsql/preformat {:with [[:membered {:select [:o.id :o.title @@ -227,7 +227,6 @@ (ex/assert-record! :org {:args [{:user-id user-id :org-id org-id}]}))))) - (def create-org-q (uhsql/preformat {:with [[:org {:insert-into :orgs :values [{:id :?org-id @@ -264,7 +263,27 @@ query (uhsql/formatp delete-org-q params)] (sql/execute-one! ::delete! conn query)))) -(def usage-q +(def usage-q-new + (uhsql/preformat {:select [[[:coalesce + [:* + [:sum :agg.pg_size] + [:case + [:= :0 [:pg_relation_size [:inline "triples"]]] :1 + :else [:/ + [:cast [:pg_total_relation_size [:inline "triples"]] :numeric] + [:pg_relation_size [:inline "triples"]]]]] + :0] + :num_bytes]] + :from [[:triples-size-aggregate :agg]] + :join [[:apps :app] [:= :agg.app_id :app.id] + [:attrs :attr] [:= :agg.attr_id :attr.id]] + :where [:and + [:= :app.org_id :?org-id] + [:= nil :app.deletion-marked-at] + [:= nil :attr.deletion-marked-at]]})) + +;; TODO(dww): Remove after deploying triples-size-updates +(def usage-q-old (uhsql/preformat {:select [[[:coalesce [:* [:sum :s.triples_pg_size] @@ -300,7 +319,10 @@ (sql/select-one ::org-usage conn - (uhsql/formatp usage-q {:org-id org-id})))) + (uhsql/formatp (if (flags/new-db-size?) + usage-q-new + usage-q-old) + {:org-id org-id})))) (def rename-q (uhsql/preformat {:update :orgs :set {:title :?title} diff --git a/server/src/instant/model/triples_size_updates.clj b/server/src/instant/model/triples_size_updates.clj new file mode 100644 index 0000000000..001b9dedf5 --- /dev/null +++ b/server/src/instant/model/triples_size_updates.clj @@ -0,0 +1,89 @@ +(ns instant.model.triples-size-updates + (:require + [chime.core] + [instant.config :as config] + [instant.discord :as discord] + [instant.flags :as flags] + [instant.jdbc.aurora :as aurora] + [instant.jdbc.sql :as sql] + [instant.util.hsql :as uhsql] + [instant.util.tracer :as tracer]) + (:import + (java.time Duration Instant))) + +(defonce process (atom nil)) + +(def collect-batch-q + (uhsql/preformat + {:with [[:ids {:select :id + :from :triples-size-updates + :for [:update :skip-locked] + :limit '?batch-size}] + [:deletes {:delete-from :triples-size-updates + :using :ids + :where [:= :triples-size-updates.id :ids.id] + :returning [:app-id :attr-id :pg-size]}]] + + :insert-into [[:triples_size_aggregate [:app-id :attr-id :pg-size]] + {:select [:deletes.app-id :deletes.attr-id [[:sum :deletes.pg_size] :pg-size]] + :from :deletes + ;; Join filters out (app_id, attr_id) whose parent was deleted mid-batch. + :join [:apps [:= :apps.id :deletes.app-id] + :attrs [:= :attrs.id :deletes.attr-id]] + :group-by [:deletes.app-id :deletes.attr-id]}] + :on-conflict {:on-constraint :triples_size_aggregate_pkey} + :do-update-set {:pg-size [:+ + :triples_size_aggregate.pg_size + :excluded.pg_size]}})) +(defn collect-batch! + ([] (collect-batch! (aurora/conn-pool :write) (flags/triples-size-collection-batch-size))) + ([conn batch-size] + (sql/do-execute! ::collect-batch! + conn + (uhsql/formatp collect-batch-q {:batch-size batch-size})))) + +(defn collect-batches! + "Adds triples_size_updates to the triples_size_aggregates table and deletes them. + Turn it off with the `disable-triples-size-collection` feature flag." + [max-loops] + (tracer/with-span! {:name ::collect-batches + :attributes {:max-loops max-loops}} + (loop [loops 0 + total-collected 0] + (if (= loops max-loops) + (when (config/prod?) + (discord/send-error-async! (str (:instateam discord/mention-constants) + " collect triples size is backed up after " loops " iterations."))) + (let [update-count (:next.jdbc/update-count (first (collect-batch!)))] + (if (zero? update-count) + (tracer/add-data! {:attributes {:total-collected total-collected + :loops loops}}) + (recur (inc loops) + (+ total-collected (long update-count))))))))) + +(defn start [] + (let [chime (chime.core/chime-at (chime.core/periodic-seq (Instant/now) + (Duration/ofMinutes (if (config/dev?) + 60 + 5))) + (fn [_] + (when-not (or (flags/failing-over?) + (flags/disable-triples-size-collection?)) + (collect-batches! (flags/triples-size-collection-max-loops)))))] + {:shutdown (fn [] + (.close chime))})) + +(defn stop [process] + ((:shutdown process))) + +(defn start-global [] + (reset! process (start))) + +(defn stop-global [] + (when-let [p @process] + (stop p) + (reset! process nil))) + +(defn restart [] + (stop-global) + (start-global)) diff --git a/server/src/instant/reactive/aggregator.clj b/server/src/instant/reactive/aggregator.clj index 71215f8c5a..6a76291309 100644 --- a/server/src/instant/reactive/aggregator.clj +++ b/server/src/instant/reactive/aggregator.clj @@ -27,7 +27,7 @@ ;; -------------- ;; Initialization -(def triples-copy-sql "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea, pg_size from triples order by app_id, attr_id) to stdout with (format binary)") +(def triples-copy-sql "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea from triples order by app_id, attr_id) to stdout with (format binary)") (defn initial-sketch-seq "Returns a lazy seq of sketches with app-id and attr-id, expects `copy-sql` to sort by @@ -50,9 +50,7 @@ {:name :eav :pgtype "boolean"} {:name :ea - :pgtype "boolean"} - {:name :pg-size - :pgtype "integer"}] + :pgtype "boolean"}] {:handle-json-parse-error (fn [e _props] ;; Replace objects that are too large to read ;; with an empty object. That will keep it out @@ -100,8 +98,7 @@ triples (transient {}) reverse-triples (transient {}) sketch (cms/make-sketch) - reverse-sketch (cms/make-sketch) - triples-pg-size 0] + reverse-sketch (cms/make-sketch)] (if (and (= app-id (:app-id (first s))) (= attr-id (:attr-id (first s)))) (let [triple (update-triple (first s)) @@ -118,8 +115,7 @@ (cond-> (transient {}) ref-k (assoc! ref-k 1)) (cms/add-batch sketch (persistent! triples)) - (cms/add-batch reverse-sketch (persistent! reverse-triples)) - (+ triples-pg-size (long (:pg-size triple)))) + (cms/add-batch reverse-sketch (persistent! reverse-triples))) (recur (rest s) app-id attr-id @@ -127,8 +123,7 @@ (cond-> reverse-triples ref-k (assoc! ref-k (inc (get reverse-triples ref-k 0)))) sketch - reverse-sketch - (+ triples-pg-size (long (:pg-size triple)))))) + reverse-sketch))) (let [forward-sketch (cms/add-batch sketch (persistent! triples)) reverse-sketch (cms/add-batch reverse-sketch (persistent! reverse-triples))] (vswap! sketch-count inc) @@ -136,8 +131,7 @@ :attr-id attr-id :sketch forward-sketch :reverse-sketch (when (pos? (:total reverse-sketch)) - reverse-sketch) - :triples-pg-size triples-pg-size} + reverse-sketch)} (collect s))))) (end-span true))))] (collect copy-seq))) @@ -190,6 +184,7 @@ "created_at" (assoc data :created-at value) "ea" (assoc data :ea value) "eav" (assoc data :eav value) + ;; TODO(dww): Remove after deploying triples-size-updates "pg_size" (assoc data :pg-size value) data)) {} @@ -310,9 +305,11 @@ :checked-data-type (:checked-data-type triples-data)} reverse-record (when (store-reverse? triples-data) {:value (:entity-id triples-data)}) + ;; TODO(dww): Remove after deploying triples-size-updates pg-size (:pg-size triples-data)] (cond-> acc true (update-in [key :records record] (fnil + 0) incr) + ;; TODO(dww): Remove after deploying triples-size-updates pg-size (update-in [key :triples-pg-size] (fnil + 0) (* incr pg-size)) true (update-in [key :max-lsn] lsn-max lsn) reverse-record (update-in [key :reverse-records reverse-record] (fnil + 0) incr)))) @@ -354,6 +351,7 @@ (conj acc (cond-> sketch true (update :sketch cms/add-batch records) true (assoc :max-lsn max-lsn) + ;; TODO(dww): Remove after deploying triples-size-updates triples-pg-size (update :triples-pg-size (fnil + 0) triples-pg-size) (seq reverse-records) (update :reverse-sketch (fnil cms/add-batch (cms/make-sketch)) reverse-records))) diff --git a/server/src/instant/util/test.clj b/server/src/instant/util/test.clj index 3d31a912ef..56560f9865 100644 --- a/server/src/instant/util/test.clj +++ b/server/src/instant/util/test.clj @@ -271,7 +271,7 @@ (assoc acc (select-keys sketch [:app-id :attr-id]) sketch)) {} (aggregator/initial-sketch-seq conn - (format "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea, pg_size from triples where app_id = '%s'::uuid order by app_id, attr_id) to stdout with (format binary)" + (format "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea from triples where app_id = '%s'::uuid order by app_id, attr_id) to stdout with (format binary)" app-id))))) (defn lookup-with-in-memory-sketches diff --git a/server/test/instant/model/triples_size_updates_test.clj b/server/test/instant/model/triples_size_updates_test.clj new file mode 100644 index 0000000000..fb6fa7897d --- /dev/null +++ b/server/test/instant/model/triples_size_updates_test.clj @@ -0,0 +1,154 @@ +(ns instant.model.triples-size-updates-test + (:require + [clojure.test :as test :refer [deftest is testing]] + [instant.db.model.attr :as attr-model] + [instant.db.transaction :as tx] + [instant.fixtures :refer [with-empty-app]] + [instant.jdbc.aurora :as aurora] + [instant.jdbc.sql :as sql] + [instant.model.triples-size-updates :as tsu])) + +(defn queue-rows [app-id] + (sql/select (aurora/conn-pool :read) + ["select attr_id, pg_size from triples_size_updates + where app_id = ?::uuid order by id" app-id])) + +(defn aggregate-by-attr [app-id] + (->> (sql/select (aurora/conn-pool :read) + ["select attr_id, pg_size from triples_size_aggregate + where app_id = ?::uuid" app-id]) + (reduce (fn [m {:keys [attr_id pg_size]}] + (assoc m attr_id pg_size)) + {}))) + +(defn actual-sizes-by-attr + "Computes SUM(pg_column_size(t)) per attr_id directly from the + triples table. This is the ground truth the aggregate should match." + [app-id] + (->> (sql/select (aurora/conn-pool :read) + ["select attr_id, sum(pg_column_size(t))::bigint as pg_size + from triples t where app_id = ?::uuid group by attr_id" app-id]) + (reduce (fn [m {:keys [attr_id pg_size]}] + (assoc m attr_id pg_size)) + {}))) + +(defn assert-aggregate-matches-actual! [app-id] + ;; A (app_id, attr_id) row in triples_size_aggregate persists with + ;; pg_size=0 after all its triples are deleted — actual-sizes-by-attr + ;; doesn't return those at all. Treat 0-valued aggregate rows as + ;; equivalent to absent. + (is (= (actual-sizes-by-attr app-id) + (into {} (remove (fn [[_ v]] (zero? v))) + (aggregate-by-attr app-id))))) + +(defn drain! [] + (tsu/collect-batches! 1000000)) + +(defn make-blob-attr [etype label] + {:id (random-uuid) + :forward-identity [(random-uuid) etype label] + :unique? false + :index? false + :value-type :blob + :cardinality :one}) + +(defn add-attrs! [app attrs] + (tx/transact! (aurora/conn-pool :write) + (attr-model/get-by-app-id (:id app)) + (:id app) + (mapv (fn [a] [:add-attr a]) attrs))) + +(defn transact! [app ops] + (tx/transact! (aurora/conn-pool :write) + (attr-model/get-by-app-id (:id app)) + (:id app) + ops)) + +(deftest insert-emits-positive-rows-and-collect-aggregates + (with-empty-app + (fn [app] + (let [name-attr (make-blob-attr "users" "name") + age-attr (make-blob-attr "users" "age") + eid (random-uuid)] + (add-attrs! app [name-attr age-attr]) + (drain!) ;; flush anything from app/attr creation + + (transact! app + [[:add-triple eid (:id name-attr) "Alice"] + [:add-triple eid (:id age-attr) 42]]) + + (testing "queue has a positive row per attr touched" + (let [rows (queue-rows (:id app)) + by-attr (group-by :attr_id rows)] + (is (= #{(:id name-attr) (:id age-attr)} + (set (keys by-attr)))) + (is (every? #(pos? (:pg_size %)) rows)))) + + (testing "collect drains the queue" + (drain!) + (is (empty? (queue-rows (:id app))))) + + (testing "aggregate matches SUM(pg_column_size) per attr" + (assert-aggregate-matches-actual! (:id app))))))) + +(deftest update-emits-delta-and-aggregate-stays-correct + (with-empty-app + (fn [app] + (let [name-attr (make-blob-attr "users" "name") + eid (random-uuid)] + (add-attrs! app [name-attr]) + (transact! app [[:add-triple eid (:id name-attr) "X"]]) + (drain!) + (assert-aggregate-matches-actual! (:id app)) + + (testing "updating to a larger value emits a delta" + (let [size-before (get (aggregate-by-attr (:id app)) (:id name-attr))] + (transact! app + [[:add-triple eid (:id name-attr) + "A much longer string than the previous one"]]) + (let [rows (queue-rows (:id app))] + (is (seq rows)) + (is (every? #(= (:id name-attr) (:attr_id %)) rows))) + (drain!) + (assert-aggregate-matches-actual! (:id app)) + (is (< size-before + (get (aggregate-by-attr (:id app)) (:id name-attr)))))) + + (testing "updating to an identical value emits no rows" + (transact! app + [[:add-triple eid (:id name-attr) + "A much longer string than the previous one"]]) + (is (empty? (queue-rows (:id app))))))))) + +(deftest delete-emits-negative-rows-and-zeros-out-aggregate + (with-empty-app + (fn [app] + (let [name-attr (make-blob-attr "users" "name") + eid (random-uuid)] + (add-attrs! app [name-attr]) + (transact! app [[:add-triple eid (:id name-attr) "Alice"]]) + (drain!) + (let [size-before (get (aggregate-by-attr (:id app)) (:id name-attr))] + (is (pos? size-before)) + + (testing "deleting the entity emits a negative row matching the insert" + (transact! app [[:delete-entity eid "users"]]) + (let [rows (queue-rows (:id app))] + (is (seq rows)) + (is (every? #(neg? (:pg_size %)) rows)) + (is (= (- size-before) + (reduce + (map :pg_size rows)))))) + + (testing "after collect, aggregate goes to zero" + (drain!) + (assert-aggregate-matches-actual! (:id app)) + (is (zero? (get (aggregate-by-attr (:id app)) (:id name-attr) 0))))))))) + +(deftest collect-is-noop-on-empty-queue + (with-empty-app + (fn [app] + (drain!) + (is (empty? (queue-rows (:id app)))) + ;; calling again on an empty queue should be a no-op, not error + (drain!) + (is (empty? (queue-rows (:id app))))))) diff --git a/server/test/instant/reactive/aggregator_test.clj b/server/test/instant/reactive/aggregator_test.clj index 1f914dfd27..85d7751b35 100644 --- a/server/test/instant/reactive/aggregator_test.clj +++ b/server/test/instant/reactive/aggregator_test.clj @@ -23,7 +23,7 @@ (defn copy-sql-for-app-ids "copy command that only copies the app we are interested in" [app-ids] - (format "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea, pg_size from triples where app_id = ANY('{%s}'::uuid[]) order by app_id, attr_id) to stdout with (format binary)" + (format "copy (select app_id, attr_id, entity_id, value, checked_data_type, created_at, eav, ea from triples where app_id = ANY('{%s}'::uuid[]) order by app_id, attr_id) to stdout with (format binary)" (string/join "," (map #(UUID/.toString %) app-ids)))) (defn check-sketches [app r] @@ -59,22 +59,7 @@ (cms/check (:reverse-sketch (get sketches attr_id)) nil entity_id)) - (str "count mismatch for reverse ref " (resolvers/->friendly r attr_id) " entity_id=" entity_id))) - - (testing "triples_pg_size is correct" - (is (= [] - (take 100 (keep (fn [t] - (when (not= (:pg_size t) - (:actual_pg_size t)) - (select-keys t [:attr_id :entity_id :value :pg_size :actual_pg_size :triples_column_size]))) - triples)))) - (doseq [attr (attr-model/get-by-app-id (:id app))] - (is (= (:sum (sql/select-one (aurora/conn-pool :read) - ["select sum(pg_column_size(triples)) sum from triples where app_id = ?::uuid and attr_id = ?::uuid" - (:id app) - (:id attr)])) - (get-in sketches [(:id attr) :triples-pg-size])) - (str "pg_size mismatch for " (resolvers/->friendly r (:id attr)))))))) + (str "count mismatch for reverse ref " (resolvers/->friendly r attr_id) " entity_id=" entity_id))))) (deftest bootstrap (with-empty-app