Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tokio={version="1", features = ["full", "tracing"] }
parking_lot="0.11.2"
tokio-util={version="^0.6", features = ["full"] }
crc32fast="1.3.2"
float-ord="^0.3"
futures = { version = "0.3.0", features = ["thread-pool"]}
hex = "0.4.3"
git-version = "0.3.5"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod metrics;
pub mod pubsub;
pub mod server;
pub mod set;
pub mod sorted_set;
pub mod string;
pub mod transaction;

Expand Down
230 changes: 230 additions & 0 deletions src/cmd/sorted_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
//! # Sorted Set command handlers
use crate::{
connection::Connection,
error::Error,
value::{
bytes_to_number, bytes_to_range_floatord,
sorted_set::{IOption, IResult},
},
value::{sorted_set::SortedSet, Value},
};
use bytes::Bytes;
use float_ord::FloatOrd;
use std::collections::VecDeque;

/// Adds all the specified members with the specified scores to the sorted set
/// stored at key. It is possible to specify multiple score / member pairs. If a
/// specified member is already a member of the sorted set, the score is updated
/// and the element reinserted at the right position to ensure the correct
/// ordering.
///
/// If key does not exist, a new sorted set with the specified members as sole
/// members is created, like if the sorted set was empty. If the key exists but
/// does not hold a sorted set, an error is returned.
///
/// The score values should be the string representation of a double precision
/// floating point number. +inf and -inf values are valid values as well.
pub async fn zadd(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
let key = args.pop_front().ok_or(Error::Syntax)?;
let option = IOption::new(&mut args)?;
if args.is_empty() {
return Err(Error::InvalidArgsCount("ZADD".to_owned()));
}
if args.len() % 2 != 0 {
return Err(Error::Syntax);
}
if args.len() != 2 && option.incr {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal opinion but why not do a match?

    match v {
        _ if v.is_empty() => return 1,
        _ if v.len() % 2 != 0 => return 2,
        _ if v.len() != 2 && option.incr => 3,
        _ => return 4,
    };

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's rather interesting, I like it 😄

return Err(Error::OptsNotCompatible(
"INCR option supports a single increment-element pair".to_owned(),
));
}
let result = conn
.db()
.get(&key)
.map_mut(|v| match v {
Value::SortedSet(x) => {
let mut insert: usize = 0;
let mut updated: usize = 0;

loop {
let score = match args.pop_front() {
Some(x) => bytes_to_number::<f64>(&x)?,
None => break,
};
let value = args.pop_front().ok_or(Error::Syntax)?;
match x.insert(FloatOrd(score), value, &option) {
IResult::Inserted => insert += 1,
IResult::Updated => updated += 1,
_ => {}
}
}

Ok(if option.return_change {
updated
} else {
insert
}
.into())
}
_ => Err(Error::WrongType),
})
.unwrap_or_else(|| {
let mut x = SortedSet::new();
let mut insert: usize = 0;
let mut updated: usize = 0;

loop {
let score = match args.pop_front() {
Some(x) => bytes_to_number::<f64>(&x)?,
None => break,
};
let value = args.pop_front().ok_or(Error::Syntax)?;
match x.insert(FloatOrd(score), value, &option) {
IResult::Inserted => insert += 1,
IResult::Updated => updated += 1,
_ => {}
}
}

conn.db().set(key.clone(), x.into(), None);

Ok(if option.return_change {
updated
} else {
insert
}
.into())
})?;

conn.db().bump_version(&key);

Ok(result)
}

/// Increments the score of member in the sorted set stored at key by increment.
/// If member does not exist in the sorted set, it is added with increment as
/// its score (as if its previous score was 0.0). If key does not exist, a new
/// sorted set with the specified member as its sole member is created.
pub async fn zincr_by(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
let key = args.pop_front().ok_or(Error::Syntax)?;
let score = bytes_to_number::<f64>(&args.pop_front().ok_or(Error::Syntax)?)?;
let value = args.pop_front().ok_or(Error::Syntax)?;
let option = IOption::incr();
let result = conn
.db()
.get(&key)
.map_mut(|v| match v {
Value::SortedSet(x) => {
let _ = x.insert(FloatOrd(score), value.clone(), &option);
Ok(x.get_score(&value).unwrap_or_default().into())
}
_ => Err(Error::WrongType),
})
.unwrap_or_else(|| {
#[allow(clippy::mutable_key_type)]
let mut x = SortedSet::new();
let _ = x.insert(FloatOrd(score), value.clone(), &option);
Ok(x.get_score(&value).unwrap_or_default().into())
})?;

conn.db().bump_version(&key);
Ok(result)
}

/// Returns the sorted set cardinality (number of elements) of the sorted set
/// stored at key.
pub async fn zcard(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
conn.db()
.get(&args[0])
.map(|v| match v {
Value::SortedSet(x) => Ok(x.len().into()),
_ => Err(Error::WrongType),
})
.unwrap_or(Ok(0.into()))
}

/// Returns the number of elements in the sorted set at key with a score between
/// min and max.
///
/// The min and max arguments have the same semantic as described for
/// ZRANGEBYSCORE.
pub async fn zcount(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
let min = bytes_to_range_floatord(&args[1])?;
let max = bytes_to_range_floatord(&args[2])?;
conn.db()
.get(&args[0])
.map(|v| match v {
Value::SortedSet(x) => Ok(x.count_values_by_score_range(min, max).into()),
_ => Err(Error::WrongType),
})
.unwrap_or(Ok(0.into()))
}

#[cfg(test)]
mod test {
use crate::{
cmd::test::{create_connection, run_command},
error::Error,
};

#[tokio::test]
async fn test_set_wrong_type() {
let c = create_connection();

let _ = run_command(&c, &["set", "foo", "1"]).await;

assert_eq!(
Err(Error::WrongType),
run_command(&c, &["zadd", "foo", "5", "bar", "1", "foo"]).await,
);
}

#[tokio::test]
async fn test_zadd() {
let c = create_connection();

assert_eq!(
Ok(2.into()),
run_command(&c, &["zadd", "foo", "5", "bar", "1", "foo"]).await,
);
assert_eq!(
Ok(0.into()),
run_command(&c, &["zadd", "foo", "5", "bar", "1", "foo"]).await,
);
assert_eq!(Ok(2.into()), run_command(&c, &["zcard", "foo"]).await,);
}

#[tokio::test]
async fn test_zcount() {
let c = create_connection();

assert_eq!(
Ok(3.into()),
run_command(
&c,
&["zadd", "foo", "5", "bar", "1", "foo", "5.9", "foobar"]
)
.await,
);
assert_eq!(
Ok(0.into()),
run_command(&c, &["zadd", "foo", "nx", "511", "bar", "10", "foo"]).await,
);
assert_eq!(
Ok(2.into()),
run_command(&c, &["zcount", "foo", "1", "5"]).await,
);
assert_eq!(
Ok(1.into()),
run_command(&c, &["zcount", "foo", "1", "(5"]).await,
);
assert_eq!(
Ok(0.into()),
run_command(&c, &["zcount", "foo", "(1", "(5"]).await,
);
assert_eq!(
Ok(3.into()),
run_command(&c, &["zcount", "foo", "-inf", "+inf"]).await,
);
}
}
38 changes: 38 additions & 0 deletions src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@ pub mod command;

// Returns the server time
dispatcher! {
sorted_set {
ZADD {
cmd::sorted_set::zadd,
[Flag::Write Flag::DenyOom Flag::Fast],
-4,
1,
1,
1,
true,
},
ZCARD {
cmd::sorted_set::zcard,
[Flag::ReadOnly Flag::Fast],
2,
1,
1,
1,
true,
},
ZCOUNT {
cmd::sorted_set::zcount,
[Flag::ReadOnly Flag::Fast],
4,
1,
1,
1,
true,
},
ZINCRBY {
cmd::sorted_set::zincr_by,
[Flag::Write Flag::DenyOom Flag::Fast],
4,
1,
1,
1,
true,
}
},
set {
SADD {
cmd::set::sadd,
Expand Down
35 changes: 34 additions & 1 deletion src/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ pub mod checksum;
pub mod cursor;
pub mod expiration;
pub mod float;
pub mod sorted_set;
pub mod typ;

use crate::{error::Error, value_try_from, value_vec_try_from};
use bytes::{Bytes, BytesMut};
use float_ord::FloatOrd;
use redis_zero_protocol_parser::Value as ParsedValue;
use sha2::{Digest, Sha256};
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{Bound, HashMap, HashSet, VecDeque},
convert::{TryFrom, TryInto},
str::FromStr,
};
Expand All @@ -30,6 +32,8 @@ pub enum Value {
List(VecDeque<checksum::Value>),
/// Set. This type cannot be serialized
Set(HashSet<Bytes>),
/// Sorted set
SortedSet(sorted_set::SortedSet),
/// Vector/Array of values
Array(Vec<Value>),
/// Bytes/Strings/Binary data
Expand Down Expand Up @@ -225,6 +229,29 @@ pub fn bytes_to_int<T: FromStr>(bytes: &[u8]) -> Result<T, Error> {
.map_err(|_| Error::NotANumberType("an integer".to_owned()))
}

/// Converts bytes to a Range number
pub fn bytes_to_range<T: FromStr>(bytes: &[u8]) -> Result<Bound<T>, Error> {
match bytes {
b"-inf" | b"+inf" | b"inf" => Ok(Bound::Unbounded),
_ => {
if bytes[0] == b'(' {
Ok(Bound::Excluded(bytes_to_number::<T>(&(bytes[1..]))?))
} else {
Ok(Bound::Included(bytes_to_number::<T>(bytes)?))
}
}
}
}

/// Converts bytes to a Range of float FloatOrd numbers
pub fn bytes_to_range_floatord(bytes: &[u8]) -> Result<Bound<sorted_set::Score>, Error> {
match bytes_to_range(bytes)? {
Bound::Included(n) => Ok(Bound::Included(FloatOrd(n))),
Bound::Excluded(n) => Ok(Bound::Excluded(FloatOrd(n))),
Bound::Unbounded => Ok(Bound::Unbounded),
}
}

impl<'a> From<&ParsedValue<'a>> for Value {
fn from(value: &ParsedValue) -> Self {
match value {
Expand Down Expand Up @@ -281,6 +308,12 @@ impl From<&str> for Value {
}
}

impl From<sorted_set::SortedSet> for Value {
fn from(value: sorted_set::SortedSet) -> Self {
Value::SortedSet(value)
}
}

impl From<HashMap<Bytes, Bytes>> for Value {
fn from(value: HashMap<Bytes, Bytes>) -> Value {
Value::Hash(value)
Expand Down
Loading