Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 43 additions & 93 deletions syncstorage-spanner/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,13 @@ impl SpannerDb {
user: &UserIdentifier,
collection_id: i32,
) -> DbResult<SyncTimestamp> {
// This will also update the counts in user_collections, since `update_collection_sync`
// is called very early to ensure the record exists, and return the timestamp.
// This will also write the tombstone if there are no records and we're explicitly
// specifying a TOMBSTONE collection_id.
// This function should be called after any write operation.
// Upsert the `user_collections` row so `modified` is always refreshed
// after a write, including the just emptied collection case, where
// tombstone retention depends on the timestamp sticking around.
//
// When quota is enabled, count + total_bytes are recomputed from
// bsos. When disabled, those columns are omitted from
// the write to keep Spanner's mutation count down.
let timestamp = self.checked_timestamp()?;
let (mut sqlparams, mut sqltypes) = params! {
"fxa_uid" => user.fxa_uid.clone(),
Expand All @@ -343,105 +345,53 @@ impl SpannerDb {
};
sqltypes.insert("modified".to_owned(), as_type(TypeCode::TIMESTAMP));

self.metrics
.clone()
.start_timer("storage.quota.update_existing_totals", None);
let calc_sql = if self.quota.enabled {
"SELECT SUM(BYTE_LENGTH(payload)), COUNT(*)
FROM bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id
GROUP BY fxa_uid"
} else {
"SELECT 1
FROM bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id
LIMIT 1"
};
let upsert_sql = if self.quota.enabled {
self.metrics
.clone()
.start_timer("storage.quota.update_existing_totals", None);

let result = {
let (sqlparams, sqlparam_types) = params! {
let (q_params, q_types) = params! {
"fxa_uid" => user.fxa_uid.clone(),
"fxa_kid" => user.fxa_kid.clone(),
"collection_id" => collection_id,
};

self.sql(calc_sql)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute(&self.conn)?
.one_or_none()
.await?
};
let set_sql = if let Some(mut result) = result {
// Update the user_collections table to reflect current numbers.
// If there are BSOs, there are user_collections (or else something
// really bad already happened.)
if self.quota.enabled {
sqlparams.insert(
"total_bytes".to_owned(),
result[0].take_string_value().into_spanner_value(),
);
sqlparams.insert(
"count".to_owned(),
result[1].take_string_value().into_spanner_value(),
);
sqltypes.insert("total_bytes".to_owned(), support::as_type(TypeCode::INT64));
sqltypes.insert("count".to_owned(), support::as_type(TypeCode::INT64));
"UPDATE user_collections
SET modified = @modified,
count = @count,
total_bytes = @total_bytes
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id"
} else {
"UPDATE user_collections
SET modified = @modified
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id"
}
} else {
// Otherwise, there are no BSOs that match, check to see if there are
// any collections.
let result = self
// SUM/COUNT without GROUP BY always returns one row; COALESCE
// covers the empty-bsos case (collection just emptied 0, 0).
let mut result = self
.sql(
"SELECT 1 FROM user_collections
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id",
"SELECT COALESCE(SUM(BYTE_LENGTH(payload)), 0), COUNT(*)
FROM bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id",
)
.await?
.params(sqlparams.clone())
.param_types(sqltypes.clone())
.params(q_params)
.param_types(q_types)
.execute(&self.conn)?
.one_or_none()
.one()
.await?;
if result.is_none() {
// No collections, so insert what we've got.
if self.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, total_bytes, count)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)"
}
} else {
// there are collections, best modify what's there.
// NOTE, tombstone is a single collection id, it would have been created above.
if self.quota.enabled {
"UPDATE user_collections SET modified=@modified, total_bytes=0, count=0
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id"
} else {
"UPDATE user_collections SET modified=@modified
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id"
}
}
sqlparams.insert(
"total_bytes".to_owned(),
result[0].take_string_value().into_spanner_value(),
);
sqlparams.insert(
"count".to_owned(),
result[1].take_string_value().into_spanner_value(),
);
sqltypes.insert("total_bytes".to_owned(), as_type(TypeCode::INT64));
sqltypes.insert("count".to_owned(), as_type(TypeCode::INT64));

"INSERT OR UPDATE INTO user_collections
(fxa_uid, fxa_kid, collection_id, modified, total_bytes, count)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, @total_bytes, @count)"
} else {
"INSERT OR UPDATE INTO user_collections
(fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)"
};
self.sql(set_sql)

self.sql(upsert_sql)
.await?
.params(sqlparams)
.param_types(sqltypes)
Expand Down