Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 152 additions & 5 deletions dozer-sql/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ use crate::aggregation::factory::AggregationProcessorFactory;
use crate::builder::PipelineError::InvalidQuery;
use crate::errors::PipelineError;
use crate::selection::factory::SelectionProcessorFactory;
use crate::selection::in_subquery::{
InSubqueryProcessorFactory, LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT,
};
use dozer_core::app::AppPipeline;
use dozer_core::node::PortHandle;
use dozer_core::DEFAULT_PORT_HANDLE;
use dozer_sql_expression::builder::{ExpressionBuilder, NameOrAlias};
use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier, TableFactor};
use dozer_sql_expression::sqlparser::ast::{
BinaryOperator, Expr as SqlExpr, SetOperator, SetQuantifier, TableFactor,
};
use dozer_types::models::udf_config::UdfConfig;

use dozer_sql_expression::sqlparser::{
Expand Down Expand Up @@ -121,6 +126,11 @@ struct TableInfo {
override_name: Option<String>,
}

struct InSubquerySelection {
expr: Box<SqlExpr>,
subquery: Box<Query>,
}

fn query_to_pipeline(
table_info: TableInfo,
query: Query,
Expand Down Expand Up @@ -242,6 +252,11 @@ fn select_to_pipeline(
pipeline_idx: usize,
is_top_select: bool,
) -> Result<String, PipelineError> {
let mut select = select;
let (in_subquery_selection, residual_selection) =
split_in_subquery_selection(select.selection.take())?;
select.selection = residual_selection;

// FROM clause
let Some(from) = select.from.into_iter().next() else {
return Err(PipelineError::UnsupportedSqlError(
Expand All @@ -258,6 +273,8 @@ fn select_to_pipeline(

let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id());
let (gen_product_name, product_output_port) = output_node;
let mut upstream_node = gen_product_name;
let mut upstream_port = product_output_port;

for (source_name, processor_name, processor_port) in input_nodes {
if let Some(table_info) = query_ctx
Expand All @@ -276,6 +293,66 @@ fn select_to_pipeline(
}
}

if let Some(in_subquery_selection) = in_subquery_selection {
let in_subquery_source_name = format!("in_subquery_{}", query_ctx.get_next_processor_id());
query_to_pipeline(
TableInfo {
name: NameOrAlias(in_subquery_source_name.clone(), None),
override_name: None,
},
*in_subquery_selection.subquery,
pipeline,
query_ctx,
pipeline_idx,
false,
)?;

let in_subquery_output = query_ctx
.pipeline_map
.get(&(pipeline_idx, in_subquery_source_name.clone()))
.ok_or_else(|| {
PipelineError::InvalidQuery(format!(
"Invalid IN subquery {in_subquery_source_name}"
))
})?
.clone();
let in_subquery_processor_name =
format!("in_subquery--{}", query_ctx.get_next_processor_id());
if !query_ctx
.processors_list
.insert(in_subquery_processor_name.clone())
{
return Err(PipelineError::ProcessorAlreadyExists(
in_subquery_processor_name,
));
}

let in_subquery_processor = InSubqueryProcessorFactory::new(
in_subquery_processor_name.clone(),
*in_subquery_selection.expr,
query_ctx.udfs.clone(),
query_ctx.runtime.clone(),
);
pipeline.add_processor(
Box::new(in_subquery_processor),
in_subquery_processor_name.clone(),
);
pipeline.connect_nodes(
upstream_node,
upstream_port,
in_subquery_processor_name.clone(),
LEFT_IN_SUBQUERY_PORT,
);
pipeline.connect_nodes(
in_subquery_output.node,
in_subquery_output.port,
in_subquery_processor_name.clone(),
RIGHT_IN_SUBQUERY_PORT,
);
upstream_node = in_subquery_processor_name;
upstream_port = DEFAULT_PORT_HANDLE;
}

let aggregation = AggregationProcessorFactory::new(
gen_agg_name.clone(),
select.projection,
Expand Down Expand Up @@ -304,8 +381,8 @@ fn select_to_pipeline(
pipeline.add_processor(Box::new(selection), gen_selection_name.clone());

pipeline.connect_nodes(
gen_product_name,
product_output_port,
upstream_node,
upstream_port,
gen_selection_name.clone(),
DEFAULT_PORT_HANDLE,
);
Expand All @@ -318,8 +395,8 @@ fn select_to_pipeline(
);
} else {
pipeline.connect_nodes(
gen_product_name,
product_output_port,
upstream_node,
upstream_port,
gen_agg_name.clone(),
DEFAULT_PORT_HANDLE,
);
Expand Down Expand Up @@ -360,6 +437,76 @@ fn select_to_pipeline(
Ok(gen_agg_name)
}

fn split_in_subquery_selection(
selection: Option<SqlExpr>,
) -> Result<(Option<InSubquerySelection>, Option<SqlExpr>), PipelineError> {
match selection {
Some(selection) => extract_in_subquery_selection(selection),
None => Ok((None, None)),
}
}

fn extract_in_subquery_selection(
selection: SqlExpr,
) -> Result<(Option<InSubquerySelection>, Option<SqlExpr>), PipelineError> {
match selection {
SqlExpr::InSubquery {
expr,
subquery,
negated,
} => {
if negated {
return Err(PipelineError::InvalidQuery(
"NOT IN subqueries are not supported".to_string(),
));
}
Ok((Some(InSubquerySelection { expr, subquery }), None))
}
SqlExpr::BinaryOp {
left,
op: BinaryOperator::And,
right,
} => {
let (left_in_subquery, left_residual) = extract_in_subquery_selection(*left)?;
let (right_in_subquery, right_residual) = extract_in_subquery_selection(*right)?;
let in_subquery = match (left_in_subquery, right_in_subquery) {
(Some(_), Some(_)) => {
return Err(PipelineError::InvalidQuery(
"Only one IN subquery is supported in a WHERE clause".to_string(),
))
}
(Some(in_subquery), None) | (None, Some(in_subquery)) => Some(in_subquery),
(None, None) => None,
};

Ok((
in_subquery,
combine_residual_and(left_residual, right_residual),
))
}
SqlExpr::Nested(expr) => {
let (in_subquery, residual) = extract_in_subquery_selection(*expr)?;
Ok((
in_subquery,
residual.map(|expr| SqlExpr::Nested(Box::new(expr))),
))
}
_ => Ok((None, Some(selection))),
}
}

fn combine_residual_and(left: Option<SqlExpr>, right: Option<SqlExpr>) -> Option<SqlExpr> {
match (left, right) {
(Some(left), Some(right)) => Some(SqlExpr::BinaryOp {
left: Box::new(left),
op: BinaryOperator::And,
right: Box::new(right),
}),
(Some(expr), None) | (None, Some(expr)) => Some(expr),
(None, None) => None,
}
}

#[allow(clippy::too_many_arguments)]
fn set_to_pipeline(
table_info: TableInfo,
Expand Down
21 changes: 21 additions & 0 deletions dozer-sql/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,27 @@ fn test_correct_into_clause() {
assert!(result.is_ok());
}

#[test]
fn test_in_subquery_where_clause() {
let sql = r#"
SELECT id
INTO matched_users
FROM users
WHERE id IN (SELECT id FROM allowed_users) AND active = 1
"#;
let runtime = create_test_runtime();
let context = statement_to_pipeline(
sql,
&mut AppPipeline::new_with_default_flags(),
None,
vec![],
runtime,
)
.unwrap();

assert!(context.output_tables_map.contains_key("matched_users"));
}

#[test]
fn test_missing_into_in_nested_from_clause() {
let sql = r#"SELECT a FROM (SELECT a from b)"#;
Expand Down
Loading