Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 253 additions & 103 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ mongodb = ["dozer-ingestion/mongodb"]
onnx = ["dozer-sql/onnx"]
tokio-console = ["dozer-tracing/tokio-console"]
javascript = ["dozer-ingestion/javascript", "dozer-sql/javascript"]
wasm = ["dozer-sql/wasm"]
datafusion = ["dozer-ingestion/datafusion"]
1 change: 1 addition & 0 deletions dozer-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ proptest = "1.3.1"
python = ["dozer-sql-expression/python"]
onnx = ["dozer-sql-expression/onnx"]
javascript = ["dozer-sql-expression/javascript"]
wasm = ["dozer-sql-expression/wasm"]
3 changes: 3 additions & 0 deletions dozer-sql/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ async-recursion = "1.0.5"

dozer-deno = { path = "../../dozer-deno", optional = true }
deno_core = { workspace = true, optional = true }
wasmi = { version = "1.0.9", optional = true }

[dev-dependencies]
proptest = "1.2.0"
wat = "1.248.0"

[features]
bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"]
python = ["dozer-types/python-auto-initialize"]
onnx = ["dep:ort", "dep:ndarray", "dep:half"]
javascript = ["dep:dozer-deno", "dep:deno_core"]
wasm = ["dep:wasmi"]
163 changes: 163 additions & 0 deletions dozer-sql/expression/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,26 @@ impl ExpressionBuilder {
Err(Error::JavaScriptNotEnabled)
}
}

UdfType::Wasm(config) => {
#[cfg(feature = "wasm")]
{
self.parse_wasm_udf(
function_name.clone(),
config,
sql_function,
schema,
udfs,
)
.await
}

#[cfg(not(feature = "wasm"))]
{
let _ = config;
Err(Error::WasmNotEnabled)
}
}
};
}

Expand Down Expand Up @@ -995,6 +1015,34 @@ impl ExpressionBuilder {
Ok(Expression::JavaScriptUdf(udf))
}

#[cfg(feature = "wasm")]
async fn parse_wasm_udf(
&mut self,
name: String,
config: &dozer_types::models::udf_config::WasmConfig,
function: &Function,
schema: &Schema,
udfs: &[UdfConfig],
) -> Result<Expression, Error> {
let mut args = vec![];
for argument in &function.args {
let arg = self
.parse_sql_function_arg(false, argument, schema, udfs)
.await?;
args.push(arg);
}

let udf = crate::wasm::Udf::new(
name,
config.module.clone(),
config.function.clone(),
FieldType::try_from(config.return_type.as_str())
.map_err(|_| Error::InvalidWasmReturnType(config.return_type.clone()))?,
args,
)?;
Ok(Expression::WasmUdf(udf))
}

async fn parse_sql_in_list_operator(
&mut self,
parse_aggregations: bool,
Expand Down Expand Up @@ -1062,3 +1110,118 @@ pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {

output_schema
}

#[cfg(test)]
mod wasm_builder_tests {
use super::*;
#[cfg(feature = "wasm")]
use dozer_types::types::{Field, Record};
use dozer_types::{
models::udf_config::{UdfConfig, UdfType, WasmConfig},
types::FieldDefinition,
};
use sqlparser::{
ast::{SelectItem, SetExpr, Statement},
dialect::GenericDialect,
parser::Parser,
};

fn first_projection_expr(sql: &str) -> Expr {
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql).unwrap();
let Statement::Query(query) = ast.into_iter().next().unwrap() else {
panic!("expected query");
};
let SetExpr::Select(select) = *query.body else {
panic!("expected select");
};
match select.projection.into_iter().next().unwrap() {
SelectItem::UnnamedExpr(expr) => expr,
_ => panic!("expected unnamed expression"),
}
}

fn schema(field_type: FieldType) -> Schema {
let mut schema = Schema::new();
schema.field(
FieldDefinition::new(
"value".to_string(),
field_type,
false,
SourceDefinition::Dynamic,
),
false,
);
schema
}

fn wasm_udf(module: String) -> Vec<UdfConfig> {
vec![UdfConfig {
name: "add_one".to_string(),
config: UdfType::Wasm(WasmConfig {
module,
function: None,
return_type: "int".to_string(),
}),
}]
}

#[cfg(feature = "wasm")]
fn write_wasm_module() -> String {
let wasm = wat::parse_str(
r#"
(module
(func (export "add_one") (param i64) (result i64)
local.get 0
i64.const 1
i64.add))
"#,
)
.unwrap();
let path = std::env::temp_dir().join(format!(
"dozer-expression-builder-wasm-{}.wasm",
std::process::id()
));
std::fs::write(&path, wasm).unwrap();
path.to_string_lossy().to_string()
}

#[cfg(feature = "wasm")]
#[test]
fn builds_and_evaluates_wasm_udf() {
let sql_expr = first_projection_expr("SELECT add_one(value) FROM t");
let schema = schema(FieldType::Int);
let runtime = Arc::new(Runtime::new().unwrap());
let mut builder = ExpressionBuilder::new(schema.fields.len(), runtime.clone());
let mut expression = runtime
.block_on(builder.build(false, &sql_expr, &schema, &wasm_udf(write_wasm_module())))
.unwrap();
let record = Record::new(vec![Field::Int(41)]);

assert_eq!(
expression.evaluate(&record, &schema).unwrap(),
Field::Int(42)
);
assert_eq!(
expression.get_type(&schema).unwrap().return_type,
FieldType::Int
);
}

#[cfg(not(feature = "wasm"))]
#[test]
fn wasm_udf_requires_feature() {
let sql_expr = first_projection_expr("SELECT add_one(value) FROM t");
let schema = schema(FieldType::Int);
let runtime = Arc::new(Runtime::new().unwrap());
let mut builder = ExpressionBuilder::new(schema.fields.len(), runtime.clone());
let result = runtime.block_on(builder.build(
false,
&sql_expr,
&schema,
&wasm_udf("./missing.wasm".to_string()),
));

assert!(matches!(result, Err(Error::WasmNotEnabled)));
}
}
9 changes: 9 additions & 0 deletions dozer-sql/expression/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ pub enum Error {
#[error("JavaScript UDF error: {0}")]
JavaScript(#[from] crate::javascript::Error),

#[error("WASM UDF is not enabled")]
WasmNotEnabled,
#[error("Invalid WASM UDF return type: {0}")]
InvalidWasmReturnType(String),

#[cfg(feature = "wasm")]
#[error("WASM UDF error: {0}")]
Wasm(#[from] crate::wasm::Error),

// Legacy error types.
#[error("Sql error: {0}")]
SqlError(#[source] OperationError),
Expand Down
8 changes: 8 additions & 0 deletions dozer-sql/expression/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub enum Expression {
},
#[cfg(feature = "javascript")]
JavaScriptUdf(crate::javascript::Udf),
#[cfg(feature = "wasm")]
WasmUdf(crate::wasm::Udf),
}

impl Expression {
Expand Down Expand Up @@ -285,6 +287,8 @@ impl Expression {
}
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => udf.to_string(schema),
#[cfg(feature = "wasm")]
Expression::WasmUdf(udf) => udf.to_string(schema),
Expression::IsNull { arg } => arg.to_string(schema) + " IS NULL ",
Expression::IsNotNull { arg } => arg.to_string(schema) + " IS NOT NULL ",
}
Expand Down Expand Up @@ -378,6 +382,8 @@ impl Expression {
Expression::IsNotNull { arg } => evaluate_is_not_null(schema, arg, record),
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => udf.evaluate(record, schema),
#[cfg(feature = "wasm")]
Expression::WasmUdf(udf) => udf.evaluate(record, schema),
}
}

Expand Down Expand Up @@ -487,6 +493,8 @@ impl Expression {
)),
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => Ok(udf.get_type()),
#[cfg(feature = "wasm")]
Expression::WasmUdf(udf) => Ok(udf.get_type()),
Expression::IsNull { arg: _ } => Ok(ExpressionType::new(
FieldType::Boolean,
false,
Expand Down
2 changes: 2 additions & 0 deletions dozer-sql/expression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod javascript;
mod onnx;
#[cfg(feature = "python")]
mod python_udf;
#[cfg(feature = "wasm")]
mod wasm;

pub use num_traits;
pub use sqlparser;
Expand Down
Loading