From e6067a2965a1721315c43c8d249716ca21d0c8e6 Mon Sep 17 00:00:00 2001 From: jamilahmadzai Date: Sun, 24 May 2026 00:34:56 +0200 Subject: [PATCH] Support streaming IN subqueries Lower top-level IN (SELECT ...) predicates into a two-input selection processor so right-side stream changes can reveal or hide stored left rows. Cover duplicate right membership, right-side updates, NULL keys, operation ids, residual AND filters, chained IN subqueries, and invalid subquery shapes. --- dozer-sql/src/builder/mod.rs | 159 +++++- dozer-sql/src/builder/tests.rs | 94 ++++ dozer-sql/src/selection/in_subquery.rs | 733 +++++++++++++++++++++++++ dozer-sql/src/selection/mod.rs | 1 + 4 files changed, 981 insertions(+), 6 deletions(-) create mode 100644 dozer-sql/src/selection/in_subquery.rs diff --git a/dozer-sql/src/builder/mod.rs b/dozer-sql/src/builder/mod.rs index 3ccf3ed356..2aab855a2c 100644 --- a/dozer-sql/src/builder/mod.rs +++ b/dozer-sql/src/builder/mod.rs @@ -2,11 +2,16 @@ use crate::aggregation::factory::AggregationProcessorFactory; use crate::builder::PipelineError::InvalidQuery; use crate::errors::PipelineError; use crate::selection::factory::SelectionProcessorFactory; +use crate::selection::in_subquery::{ + InSubqueryProcessorFactory, LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT, +}; use dozer_core::app::AppPipeline; use dozer_core::node::PortHandle; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_sql_expression::builder::{ExpressionBuilder, NameOrAlias}; -use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier, TableFactor}; +use dozer_sql_expression::sqlparser::ast::{ + BinaryOperator, Expr as SqlExpr, SelectItem, SetOperator, SetQuantifier, TableFactor, +}; use dozer_types::models::udf_config::UdfConfig; use dozer_sql_expression::sqlparser::{ @@ -121,6 +126,11 @@ struct TableInfo { override_name: Option, } +struct InSubquerySelection { + expr: Box, + subquery: Box, +} + fn query_to_pipeline( table_info: TableInfo, query: Query, @@ -236,12 +246,16 @@ fn query_to_pipeline( fn select_to_pipeline( table_info: TableInfo, - select: Select, + mut select: Select, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, pipeline_idx: usize, is_top_select: bool, ) -> Result { + let (in_subquery_selections, residual_selection) = + split_in_subquery_selection(select.selection.take())?; + select.selection = residual_selection; + // FROM clause let Some(from) = select.from.into_iter().next() else { return Err(PipelineError::UnsupportedSqlError( @@ -258,6 +272,8 @@ fn select_to_pipeline( let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id()); let (gen_product_name, product_output_port) = output_node; + let mut upstream_node = gen_product_name; + let mut upstream_port = product_output_port; for (source_name, processor_name, processor_port) in input_nodes { if let Some(table_info) = query_ctx @@ -276,6 +292,57 @@ fn select_to_pipeline( } } + for in_subquery_selection in in_subquery_selections { + let subquery_name = format!("in_subquery_{}", query_ctx.get_next_processor_id()); + query_to_pipeline( + TableInfo { + name: NameOrAlias(subquery_name.clone(), None), + override_name: None, + }, + *in_subquery_selection.subquery, + pipeline, + query_ctx, + pipeline_idx, + false, + )?; + + let subquery_output = query_ctx + .pipeline_map + .get(&(pipeline_idx, subquery_name.clone())) + .cloned() + .ok_or_else(|| { + PipelineError::InvalidQuery(format!("Invalid IN subquery {subquery_name}")) + })?; + + let in_subquery_node = format!("in_subquery--{}", query_ctx.get_next_processor_id()); + if !query_ctx.processors_list.insert(in_subquery_node.clone()) { + return Err(PipelineError::ProcessorAlreadyExists(in_subquery_node)); + } + + let in_subquery = InSubqueryProcessorFactory::new( + in_subquery_node.clone(), + *in_subquery_selection.expr, + query_ctx.udfs.clone(), + query_ctx.runtime.clone(), + ); + pipeline.add_processor(Box::new(in_subquery), in_subquery_node.clone()); + pipeline.connect_nodes( + upstream_node, + upstream_port, + in_subquery_node.clone(), + LEFT_IN_SUBQUERY_PORT, + ); + pipeline.connect_nodes( + subquery_output.node, + subquery_output.port, + in_subquery_node.clone(), + RIGHT_IN_SUBQUERY_PORT, + ); + + upstream_node = in_subquery_node; + upstream_port = DEFAULT_PORT_HANDLE; + } + let aggregation = AggregationProcessorFactory::new( gen_agg_name.clone(), select.projection, @@ -304,8 +371,8 @@ fn select_to_pipeline( pipeline.add_processor(Box::new(selection), gen_selection_name.clone()); pipeline.connect_nodes( - gen_product_name, - product_output_port, + upstream_node, + upstream_port, gen_selection_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -318,8 +385,8 @@ fn select_to_pipeline( ); } else { pipeline.connect_nodes( - gen_product_name, - product_output_port, + upstream_node, + upstream_port, gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -360,6 +427,86 @@ fn select_to_pipeline( Ok(gen_agg_name) } +fn split_in_subquery_selection( + selection: Option, +) -> Result<(Vec, Option), PipelineError> { + match selection { + Some(selection) => extract_in_subquery_selection(selection), + None => Ok((vec![], None)), + } +} + +fn extract_in_subquery_selection( + selection: SqlExpr, +) -> Result<(Vec, Option), PipelineError> { + match selection { + SqlExpr::InSubquery { + expr, + subquery, + negated, + } => { + if negated { + return Err(PipelineError::InvalidQuery( + "NOT IN subqueries are not supported".to_string(), + )); + } + validate_in_subquery_projection(&subquery)?; + Ok((vec![InSubquerySelection { expr, subquery }], None)) + } + SqlExpr::BinaryOp { + left, + op: BinaryOperator::And, + right, + } => { + let (mut left_in_subqueries, left_residual) = extract_in_subquery_selection(*left)?; + let (mut right_in_subqueries, right_residual) = extract_in_subquery_selection(*right)?; + left_in_subqueries.append(&mut right_in_subqueries); + Ok(( + left_in_subqueries, + combine_residual_and(left_residual, right_residual), + )) + } + SqlExpr::Nested(expr) => { + let (in_subqueries, residual) = extract_in_subquery_selection(*expr)?; + Ok(( + in_subqueries, + residual.map(|expr| SqlExpr::Nested(Box::new(expr))), + )) + } + other => Ok((vec![], Some(other))), + } +} + +fn combine_residual_and(left: Option, right: Option) -> Option { + match (left, right) { + (Some(left), Some(right)) => Some(SqlExpr::BinaryOp { + left: Box::new(left), + op: BinaryOperator::And, + right: Box::new(right), + }), + (Some(expr), None) | (None, Some(expr)) => Some(expr), + (None, None) => None, + } +} + +fn validate_in_subquery_projection(query: &Query) -> Result<(), PipelineError> { + match query.body.as_ref() { + SetExpr::Select(select) if select.projection.len() != 1 => Err( + PipelineError::InvalidQuery("IN subquery must return exactly one column".to_string()), + ), + SetExpr::Select(select) => match select.projection.first() { + Some(SelectItem::Wildcard(_)) | Some(SelectItem::QualifiedWildcard(_, _)) => { + Err(PipelineError::InvalidQuery( + "IN subquery must project a single expression or column".to_string(), + )) + } + _ => Ok(()), + }, + SetExpr::Query(query) => validate_in_subquery_projection(query), + _ => Ok(()), + } +} + #[allow(clippy::too_many_arguments)] fn set_to_pipeline( table_info: TableInfo, diff --git a/dozer-sql/src/builder/tests.rs b/dozer-sql/src/builder/tests.rs index 0426c4dc84..63d5e62544 100644 --- a/dozer-sql/src/builder/tests.rs +++ b/dozer-sql/src/builder/tests.rs @@ -1,6 +1,18 @@ use super::statement_to_pipeline; use crate::{errors::PipelineError, tests::utils::create_test_runtime}; use dozer_core::app::AppPipeline; + +fn build_pipeline(sql: &str) -> Result { + let runtime = create_test_runtime(); + statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ) +} + #[test] #[should_panic] fn disallow_zero_outgoing_ndes() { @@ -145,6 +157,88 @@ fn test_correct_into_clause() { assert!(result.is_ok()); } +#[test] +fn test_in_subquery_where_clause_keeps_residual_predicates() { + let sql = r#" + SELECT users.CustomerID + INTO matched_customers + FROM users + WHERE users.Spending > 10 + AND users.CustomerID IN ( + SELECT allowed.CustomerID FROM allowed + ) + "#; + + let context = build_pipeline(sql).unwrap(); + + assert!(context.output_tables_map.contains_key("matched_customers")); + assert!(context.used_sources.contains(&"users".to_string())); + assert!(context.used_sources.contains(&"allowed".to_string())); +} + +#[test] +fn test_multiple_in_subqueries_can_be_chained() { + let sql = r#" + SELECT users.CustomerID + INTO matched_customers + FROM users + WHERE users.CustomerID IN ( + SELECT allowed.CustomerID FROM allowed + ) + AND users.Country IN ( + SELECT allowed_countries.Country FROM allowed_countries + ) + "#; + + let context = build_pipeline(sql).unwrap(); + + assert!(context.output_tables_map.contains_key("matched_customers")); + assert!(context.used_sources.contains(&"users".to_string())); + assert!(context.used_sources.contains(&"allowed".to_string())); + assert!(context + .used_sources + .contains(&"allowed_countries".to_string())); +} + +#[test] +fn test_not_in_subquery_is_rejected() { + let sql = r#" + SELECT users.CustomerID + INTO matched_customers + FROM users + WHERE users.CustomerID NOT IN ( + SELECT blocked.CustomerID FROM blocked + ) + "#; + + let result = build_pipeline(sql); + + assert!(matches!( + result, + Err(PipelineError::InvalidQuery(message)) if message.contains("NOT IN subqueries") + )); +} + +#[test] +fn test_in_subquery_rejects_multi_column_projection() { + let sql = r#" + SELECT users.CustomerID + INTO matched_customers + FROM users + WHERE users.CustomerID IN ( + SELECT allowed.CustomerID, allowed.Country FROM allowed + ) + "#; + + let result = build_pipeline(sql); + + assert!(matches!( + result, + Err(PipelineError::InvalidQuery(message)) + if message.contains("exactly one column") + )); +} + #[test] fn test_missing_into_in_nested_from_clause() { let sql = r#"SELECT a FROM (SELECT a from b)"#; diff --git a/dozer-sql/src/selection/in_subquery.rs b/dozer-sql/src/selection/in_subquery.rs new file mode 100644 index 0000000000..8d0160339c --- /dev/null +++ b/dozer-sql/src/selection/in_subquery.rs @@ -0,0 +1,733 @@ +use std::{collections::HashMap, sync::Arc}; + +use dozer_core::{ + channels::ProcessorChannelForwarder, + epoch::Epoch, + event::EventHub, + node::{PortHandle, Processor, ProcessorFactory}, + DEFAULT_PORT_HANDLE, +}; +use dozer_sql_expression::{ + builder::ExpressionBuilder, execution::Expression, sqlparser::ast::Expr as SqlExpr, +}; +use dozer_types::{ + errors::internal::BoxedError, + models::udf_config::UdfConfig, + node::OpIdentifier, + tonic::async_trait, + types::{Field, Operation, Record, Schema, TableOperation}, +}; +use tokio::runtime::Runtime; + +use crate::errors::PipelineError; + +pub(crate) const LEFT_IN_SUBQUERY_PORT: PortHandle = 0; +pub(crate) const RIGHT_IN_SUBQUERY_PORT: PortHandle = 1; + +#[derive(Debug)] +pub(crate) struct InSubqueryProcessorFactory { + id: String, + left_expr: SqlExpr, + udfs: Vec, + runtime: Arc, +} + +impl InSubqueryProcessorFactory { + pub(crate) fn new( + id: String, + left_expr: SqlExpr, + udfs: Vec, + runtime: Arc, + ) -> Self { + Self { + id, + left_expr, + udfs, + runtime, + } + } +} + +#[async_trait] +impl ProcessorFactory for InSubqueryProcessorFactory { + fn id(&self) -> String { + self.id.clone() + } + + fn type_name(&self) -> String { + "InSubquery".to_string() + } + + fn get_input_ports(&self) -> Vec { + vec![LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT] + } + + fn get_output_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + async fn get_output_schema( + &self, + _output_port: &PortHandle, + input_schemas: &HashMap, + ) -> Result { + let left_schema = input_schemas + .get(&LEFT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(LEFT_IN_SUBQUERY_PORT))?; + let right_schema = input_schemas + .get(&RIGHT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(RIGHT_IN_SUBQUERY_PORT))?; + + validate_right_schema(right_schema)?; + Ok(left_schema.clone()) + } + + async fn build( + &self, + input_schemas: HashMap, + _output_schemas: HashMap, + _event_hub: EventHub, + ) -> Result, BoxedError> { + let left_schema = input_schemas + .get(&LEFT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(LEFT_IN_SUBQUERY_PORT))? + .clone(); + let right_schema = input_schemas + .get(&RIGHT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(RIGHT_IN_SUBQUERY_PORT))?; + + validate_right_schema(right_schema)?; + + let left_expr = match ExpressionBuilder::new(left_schema.fields.len(), self.runtime.clone()) + .build(false, &self.left_expr, &left_schema, &self.udfs) + .await + { + Ok(expression) => expression, + Err(error) => return Err(error.into()), + }; + + Ok(Box::new(InSubqueryProcessor::new(left_schema, left_expr))) + } +} + +fn validate_right_schema(schema: &Schema) -> Result<(), PipelineError> { + if schema.fields.len() == 1 { + Ok(()) + } else { + Err(PipelineError::InvalidQuery( + "IN subquery must return exactly one column".to_string(), + )) + } +} + +#[derive(Debug)] +struct InSubqueryProcessor { + left_schema: Schema, + left_expr: Expression, + left_records: HashMap>, + right_counts: HashMap, +} + +impl InSubqueryProcessor { + fn new(left_schema: Schema, left_expr: Expression) -> Self { + Self { + left_schema, + left_expr, + left_records: HashMap::new(), + right_counts: HashMap::new(), + } + } + + fn left_key(&mut self, record: &Record) -> Result { + Ok(self.left_expr.evaluate(record, &self.left_schema)?) + } + + fn right_key(record: &Record) -> Result { + record.values.first().cloned().ok_or_else(|| { + PipelineError::InvalidQuery("IN subquery record has no value".to_string()) + }) + } + + fn is_matchable_key(key: &Field) -> bool { + !matches!(key, Field::Null) + } + + fn right_contains(&self, key: &Field) -> bool { + Self::is_matchable_key(key) && self.right_counts.get(key).copied().unwrap_or_default() > 0 + } + + fn store_left(&mut self, key: Field, record: Record) { + self.left_records.entry(key).or_default().push(record); + } + + fn remove_left(&mut self, key: &Field, record: &Record) { + let Some(records) = self.left_records.get_mut(key) else { + return; + }; + + if let Some(index) = records.iter().position(|stored| stored == record) { + records.remove(index); + } + + if records.is_empty() { + self.left_records.remove(key); + } + } + + fn send(id: Option, op: Operation, fw: &mut dyn ProcessorChannelForwarder) { + fw.send(TableOperation { + id, + op, + port: DEFAULT_PORT_HANDLE, + }); + } + + fn send_left_records( + &self, + key: &Field, + visible: bool, + fw: &mut dyn ProcessorChannelForwarder, + ) { + let Some(records) = self.left_records.get(key) else { + return; + }; + + for record in records { + let op = if visible { + Operation::Insert { + new: record.clone(), + } + } else { + Operation::Delete { + old: record.clone(), + } + }; + Self::send(None, op, fw); + } + } + + fn process_left_insert( + &mut self, + id: Option, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.left_key(&new)?; + if self.right_contains(&key) { + Self::send(id, Operation::Insert { new: new.clone() }, fw); + } + self.store_left(key, new); + Ok(()) + } + + fn process_left_delete( + &mut self, + id: Option, + old: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.left_key(&old)?; + if self.right_contains(&key) { + Self::send(id, Operation::Delete { old: old.clone() }, fw); + } + self.remove_left(&key, &old); + Ok(()) + } + + fn process_left_update( + &mut self, + id: Option, + old: Record, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let old_key = self.left_key(&old)?; + let old_visible = self.right_contains(&old_key); + self.remove_left(&old_key, &old); + + let new_key = self.left_key(&new)?; + let new_visible = self.right_contains(&new_key); + self.store_left(new_key, new.clone()); + + match (old_visible, new_visible) { + (true, true) => Self::send(id, Operation::Update { old, new }, fw), + (true, false) => Self::send(id, Operation::Delete { old }, fw), + (false, true) => Self::send(id, Operation::Insert { new }, fw), + (false, false) => {} + } + Ok(()) + } + + fn add_right_key(&mut self, key: Field) -> bool { + if !Self::is_matchable_key(&key) { + return false; + } + + let count = self.right_counts.entry(key).or_default(); + let became_present = *count == 0; + *count += 1; + became_present + } + + fn remove_right_key(&mut self, key: &Field) -> bool { + if !Self::is_matchable_key(key) { + return false; + } + + let Some(count) = self.right_counts.get_mut(key) else { + return false; + }; + + if *count == 0 { + return false; + } + + *count -= 1; + if *count == 0 { + self.right_counts.remove(key); + true + } else { + false + } + } + + fn process_right_insert( + &mut self, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = Self::right_key(&new)?; + if self.add_right_key(key.clone()) { + self.send_left_records(&key, true, fw); + } + Ok(()) + } + + fn process_right_delete( + &mut self, + old: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = Self::right_key(&old)?; + if self.remove_right_key(&key) { + self.send_left_records(&key, false, fw); + } + Ok(()) + } + + fn process_right_update( + &mut self, + old: Record, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let old_key = Self::right_key(&old)?; + let new_key = Self::right_key(&new)?; + if old_key == new_key { + return Ok(()); + } + + if self.remove_right_key(&old_key) { + self.send_left_records(&old_key, false, fw); + } + if self.add_right_key(new_key.clone()) { + self.send_left_records(&new_key, true, fw); + } + Ok(()) + } +} + +impl Processor for InSubqueryProcessor { + fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn process( + &mut self, + op: TableOperation, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), BoxedError> { + let id = op.id; + match (op.port, op.op) { + (LEFT_IN_SUBQUERY_PORT, Operation::Insert { new }) => { + self.process_left_insert(id, new, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::Delete { old }) => { + self.process_left_delete(id, old, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::Update { old, new }) => { + self.process_left_update(id, old, new, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::BatchInsert { new }) => { + for record in new { + self.process_left_insert(id, record, fw)?; + } + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Insert { new }) => { + self.process_right_insert(new, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Delete { old }) => { + self.process_right_delete(old, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Update { old, new }) => { + self.process_right_update(old, new, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::BatchInsert { new }) => { + for record in new { + self.process_right_insert(record, fw)?; + } + } + (port, _) => return Err(PipelineError::InvalidPortHandle(port).into()), + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use dozer_core::{channels::ProcessorChannelForwarder, node::Processor}; + use dozer_sql_expression::execution::Expression; + use dozer_types::{ + node::OpIdentifier, + types::{ + Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, + TableOperation, + }, + }; + + use super::{InSubqueryProcessor, LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT}; + + #[derive(Default)] + struct TestForwarder { + operations: Vec, + } + + impl ProcessorChannelForwarder for TestForwarder { + fn send(&mut self, op: TableOperation) { + self.operations.push(op); + } + } + + fn schema(fields: &[(&str, FieldType)]) -> Schema { + let mut schema = Schema::default(); + for (name, typ) in fields { + schema.field( + FieldDefinition::new((*name).to_string(), *typ, false, SourceDefinition::Dynamic), + false, + ); + } + schema + } + + fn left_record(city: &str, id: i64) -> Record { + Record::new(vec![Field::String(city.to_string()), Field::Int(id)]) + } + + fn left_null_record(id: i64) -> Record { + Record::new(vec![Field::Null, Field::Int(id)]) + } + + fn right_record(city: &str) -> Record { + Record::new(vec![Field::String(city.to_string())]) + } + + fn right_null_record() -> Record { + Record::new(vec![Field::Null]) + } + + fn processor() -> InSubqueryProcessor { + InSubqueryProcessor::new( + schema(&[("city", FieldType::String), ("id", FieldType::Int)]), + Expression::Column { index: 0 }, + ) + } + + fn apply( + processor: &mut InSubqueryProcessor, + port: u16, + op: Operation, + forwarder: &mut TestForwarder, + ) -> Vec { + processor + .process(TableOperation::without_id(op, port), forwarder) + .unwrap(); + let operations = forwarder + .operations + .drain(..) + .map(|operation| operation.op) + .collect::>(); + operations + } + + #[test] + fn right_membership_changes_emit_existing_left_rows_once() { + let mut processor = processor(); + let mut forwarder = TestForwarder::default(); + let paris_order = left_record("Paris", 1); + + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Insert { + new: paris_order.clone() + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Insert { + new: right_record("Paris") + }, + &mut forwarder, + ), + vec![Operation::Insert { + new: paris_order.clone() + }] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Insert { + new: right_record("Paris") + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Delete { + old: right_record("Paris") + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Delete { + old: right_record("Paris") + }, + &mut forwarder, + ), + vec![Operation::Delete { old: paris_order }] + ); + } + + #[test] + fn right_updates_toggle_existing_left_rows() { + let mut processor = processor(); + let mut forwarder = TestForwarder::default(); + let paris_order = left_record("Paris", 1); + + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Insert { + new: paris_order.clone() + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Insert { + new: right_record("London") + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Update { + old: right_record("London"), + new: right_record("Paris"), + }, + &mut forwarder, + ), + vec![Operation::Insert { + new: paris_order.clone() + }] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Update { + old: right_record("Paris"), + new: right_record("Berlin"), + }, + &mut forwarder, + ), + vec![Operation::Delete { old: paris_order }] + ); + } + + #[test] + fn left_updates_follow_where_transition_rules() { + let mut processor = processor(); + let mut forwarder = TestForwarder::default(); + let london_order = left_record("London", 1); + let paris_order = left_record("Paris", 1); + let paris_order_updated = left_record("Paris", 2); + let berlin_order = left_record("Berlin", 2); + + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Insert { + new: right_record("Paris") + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Insert { + new: london_order.clone() + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Update { + old: london_order, + new: paris_order.clone(), + }, + &mut forwarder, + ), + vec![Operation::Insert { + new: paris_order.clone() + }] + ); + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Update { + old: paris_order.clone(), + new: paris_order_updated.clone(), + }, + &mut forwarder, + ), + vec![Operation::Update { + old: paris_order, + new: paris_order_updated.clone(), + }] + ); + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Update { + old: paris_order_updated, + new: berlin_order, + }, + &mut forwarder, + ), + vec![Operation::Delete { + old: left_record("Paris", 2) + }] + ); + } + + #[test] + fn null_keys_never_match() { + let mut processor = processor(); + let mut forwarder = TestForwarder::default(); + let null_order = left_null_record(1); + + assert_eq!( + apply( + &mut processor, + LEFT_IN_SUBQUERY_PORT, + Operation::Insert { + new: null_order.clone() + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Insert { + new: right_null_record() + }, + &mut forwarder, + ), + vec![] + ); + assert_eq!( + apply( + &mut processor, + RIGHT_IN_SUBQUERY_PORT, + Operation::Delete { + old: right_null_record() + }, + &mut forwarder, + ), + vec![] + ); + } + + #[test] + fn left_side_operations_preserve_operation_ids() { + let mut processor = processor(); + let mut forwarder = TestForwarder::default(); + let id = OpIdentifier::new(7, 11); + + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: right_record("Paris"), + }, + RIGHT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation { + id: Some(id), + op: Operation::Insert { + new: left_record("Paris", 1), + }, + port: LEFT_IN_SUBQUERY_PORT, + }, + &mut forwarder, + ) + .unwrap(); + + assert_eq!(forwarder.operations.len(), 1); + assert_eq!(forwarder.operations[0].id, Some(id)); + } +} diff --git a/dozer-sql/src/selection/mod.rs b/dozer-sql/src/selection/mod.rs index 9a12ba12cc..289680d8ed 100644 --- a/dozer-sql/src/selection/mod.rs +++ b/dozer-sql/src/selection/mod.rs @@ -1,2 +1,3 @@ pub mod factory; +pub(crate) mod in_subquery; pub mod processor;