From dc317858c2294ed7bfbf2c142b83b05949d9e03a Mon Sep 17 00:00:00 2001 From: VinoFx <37253811+VinoFx@users.noreply.github.com> Date: Wed, 20 May 2026 23:27:50 -0500 Subject: [PATCH] Support IN subqueries in SQL selection --- dozer-sql/src/builder/mod.rs | 157 +++++- dozer-sql/src/builder/tests.rs | 21 + dozer-sql/src/selection/in_subquery.rs | 521 ++++++++++++++++++ dozer-sql/src/selection/mod.rs | 1 + .../src/sql_tests/full/in_subquery.test | 75 +++ .../src/sql_tests/prototype/in_subquery.test | 58 ++ 6 files changed, 828 insertions(+), 5 deletions(-) create mode 100644 dozer-sql/src/selection/in_subquery.rs create mode 100644 dozer-tests/src/sql_tests/full/in_subquery.test create mode 100644 dozer-tests/src/sql_tests/prototype/in_subquery.test diff --git a/dozer-sql/src/builder/mod.rs b/dozer-sql/src/builder/mod.rs index 3ccf3ed356..33bc66d337 100644 --- a/dozer-sql/src/builder/mod.rs +++ b/dozer-sql/src/builder/mod.rs @@ -2,11 +2,16 @@ use crate::aggregation::factory::AggregationProcessorFactory; use crate::builder::PipelineError::InvalidQuery; use crate::errors::PipelineError; use crate::selection::factory::SelectionProcessorFactory; +use crate::selection::in_subquery::{ + InSubqueryProcessorFactory, LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT, +}; use dozer_core::app::AppPipeline; use dozer_core::node::PortHandle; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_sql_expression::builder::{ExpressionBuilder, NameOrAlias}; -use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier, TableFactor}; +use dozer_sql_expression::sqlparser::ast::{ + BinaryOperator, Expr as SqlExpr, SetOperator, SetQuantifier, TableFactor, +}; use dozer_types::models::udf_config::UdfConfig; use dozer_sql_expression::sqlparser::{ @@ -121,6 +126,11 @@ struct TableInfo { override_name: Option, } +struct InSubquerySelection { + expr: Box, + subquery: Box, +} + fn query_to_pipeline( table_info: TableInfo, query: Query, @@ -242,6 +252,11 @@ fn select_to_pipeline( pipeline_idx: usize, is_top_select: bool, ) -> Result { + let mut select = select; + let (in_subquery_selection, residual_selection) = + split_in_subquery_selection(select.selection.take())?; + select.selection = residual_selection; + // FROM clause let Some(from) = select.from.into_iter().next() else { return Err(PipelineError::UnsupportedSqlError( @@ -258,6 +273,8 @@ fn select_to_pipeline( let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id()); let (gen_product_name, product_output_port) = output_node; + let mut upstream_node = gen_product_name; + let mut upstream_port = product_output_port; for (source_name, processor_name, processor_port) in input_nodes { if let Some(table_info) = query_ctx @@ -276,6 +293,66 @@ fn select_to_pipeline( } } + if let Some(in_subquery_selection) = in_subquery_selection { + let in_subquery_source_name = format!("in_subquery_{}", query_ctx.get_next_processor_id()); + query_to_pipeline( + TableInfo { + name: NameOrAlias(in_subquery_source_name.clone(), None), + override_name: None, + }, + *in_subquery_selection.subquery, + pipeline, + query_ctx, + pipeline_idx, + false, + )?; + + let in_subquery_output = query_ctx + .pipeline_map + .get(&(pipeline_idx, in_subquery_source_name.clone())) + .ok_or_else(|| { + PipelineError::InvalidQuery(format!( + "Invalid IN subquery {in_subquery_source_name}" + )) + })? + .clone(); + let in_subquery_processor_name = + format!("in_subquery--{}", query_ctx.get_next_processor_id()); + if !query_ctx + .processors_list + .insert(in_subquery_processor_name.clone()) + { + return Err(PipelineError::ProcessorAlreadyExists( + in_subquery_processor_name, + )); + } + + let in_subquery_processor = InSubqueryProcessorFactory::new( + in_subquery_processor_name.clone(), + *in_subquery_selection.expr, + query_ctx.udfs.clone(), + query_ctx.runtime.clone(), + ); + pipeline.add_processor( + Box::new(in_subquery_processor), + in_subquery_processor_name.clone(), + ); + pipeline.connect_nodes( + upstream_node, + upstream_port, + in_subquery_processor_name.clone(), + LEFT_IN_SUBQUERY_PORT, + ); + pipeline.connect_nodes( + in_subquery_output.node, + in_subquery_output.port, + in_subquery_processor_name.clone(), + RIGHT_IN_SUBQUERY_PORT, + ); + upstream_node = in_subquery_processor_name; + upstream_port = DEFAULT_PORT_HANDLE; + } + let aggregation = AggregationProcessorFactory::new( gen_agg_name.clone(), select.projection, @@ -304,8 +381,8 @@ fn select_to_pipeline( pipeline.add_processor(Box::new(selection), gen_selection_name.clone()); pipeline.connect_nodes( - gen_product_name, - product_output_port, + upstream_node, + upstream_port, gen_selection_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -318,8 +395,8 @@ fn select_to_pipeline( ); } else { pipeline.connect_nodes( - gen_product_name, - product_output_port, + upstream_node, + upstream_port, gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -360,6 +437,76 @@ fn select_to_pipeline( Ok(gen_agg_name) } +fn split_in_subquery_selection( + selection: Option, +) -> Result<(Option, Option), PipelineError> { + match selection { + Some(selection) => extract_in_subquery_selection(selection), + None => Ok((None, None)), + } +} + +fn extract_in_subquery_selection( + selection: SqlExpr, +) -> Result<(Option, Option), PipelineError> { + match selection { + SqlExpr::InSubquery { + expr, + subquery, + negated, + } => { + if negated { + return Err(PipelineError::InvalidQuery( + "NOT IN subqueries are not supported".to_string(), + )); + } + Ok((Some(InSubquerySelection { expr, subquery }), None)) + } + SqlExpr::BinaryOp { + left, + op: BinaryOperator::And, + right, + } => { + let (left_in_subquery, left_residual) = extract_in_subquery_selection(*left)?; + let (right_in_subquery, right_residual) = extract_in_subquery_selection(*right)?; + let in_subquery = match (left_in_subquery, right_in_subquery) { + (Some(_), Some(_)) => { + return Err(PipelineError::InvalidQuery( + "Only one IN subquery is supported in a WHERE clause".to_string(), + )) + } + (Some(in_subquery), None) | (None, Some(in_subquery)) => Some(in_subquery), + (None, None) => None, + }; + + Ok(( + in_subquery, + combine_residual_and(left_residual, right_residual), + )) + } + SqlExpr::Nested(expr) => { + let (in_subquery, residual) = extract_in_subquery_selection(*expr)?; + Ok(( + in_subquery, + residual.map(|expr| SqlExpr::Nested(Box::new(expr))), + )) + } + _ => Ok((None, Some(selection))), + } +} + +fn combine_residual_and(left: Option, right: Option) -> Option { + match (left, right) { + (Some(left), Some(right)) => Some(SqlExpr::BinaryOp { + left: Box::new(left), + op: BinaryOperator::And, + right: Box::new(right), + }), + (Some(expr), None) | (None, Some(expr)) => Some(expr), + (None, None) => None, + } +} + #[allow(clippy::too_many_arguments)] fn set_to_pipeline( table_info: TableInfo, diff --git a/dozer-sql/src/builder/tests.rs b/dozer-sql/src/builder/tests.rs index 0426c4dc84..6b603f4431 100644 --- a/dozer-sql/src/builder/tests.rs +++ b/dozer-sql/src/builder/tests.rs @@ -145,6 +145,27 @@ fn test_correct_into_clause() { assert!(result.is_ok()); } +#[test] +fn test_in_subquery_where_clause() { + let sql = r#" + SELECT id + INTO matched_users + FROM users + WHERE id IN (SELECT id FROM allowed_users) AND active = 1 + "#; + let runtime = create_test_runtime(); + let context = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ) + .unwrap(); + + assert!(context.output_tables_map.contains_key("matched_users")); +} + #[test] fn test_missing_into_in_nested_from_clause() { let sql = r#"SELECT a FROM (SELECT a from b)"#; diff --git a/dozer-sql/src/selection/in_subquery.rs b/dozer-sql/src/selection/in_subquery.rs new file mode 100644 index 0000000000..fd3787dcb3 --- /dev/null +++ b/dozer-sql/src/selection/in_subquery.rs @@ -0,0 +1,521 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::errors::PipelineError; +use dozer_core::{ + channels::ProcessorChannelForwarder, + epoch::Epoch, + event::EventHub, + node::{PortHandle, Processor, ProcessorFactory}, + DEFAULT_PORT_HANDLE, +}; +use dozer_sql_expression::{ + builder::ExpressionBuilder, execution::Expression, sqlparser::ast::Expr as SqlExpr, +}; +use dozer_types::{ + errors::internal::BoxedError, + models::udf_config::UdfConfig, + tonic::async_trait, + types::{Field, Operation, Record, Schema, TableOperation}, +}; +use tokio::runtime::Runtime; + +pub(crate) const LEFT_IN_SUBQUERY_PORT: PortHandle = 0; +pub(crate) const RIGHT_IN_SUBQUERY_PORT: PortHandle = 1; + +#[derive(Debug)] +pub struct InSubqueryProcessorFactory { + id: String, + left_expr: SqlExpr, + udfs: Vec, + runtime: Arc, +} + +impl InSubqueryProcessorFactory { + pub fn new( + id: String, + left_expr: SqlExpr, + udfs: Vec, + runtime: Arc, + ) -> Self { + Self { + id, + left_expr, + udfs, + runtime, + } + } +} + +#[async_trait] +impl ProcessorFactory for InSubqueryProcessorFactory { + fn id(&self) -> String { + self.id.clone() + } + + fn type_name(&self) -> String { + "InSubquery".to_string() + } + + fn get_input_ports(&self) -> Vec { + vec![LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT] + } + + fn get_output_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + async fn get_output_schema( + &self, + _output_port: &PortHandle, + input_schemas: &HashMap, + ) -> Result { + input_schemas + .get(&LEFT_IN_SUBQUERY_PORT) + .cloned() + .ok_or_else(|| PipelineError::InvalidPortHandle(LEFT_IN_SUBQUERY_PORT).into()) + } + + async fn build( + &self, + input_schemas: HashMap, + _output_schemas: HashMap, + _event_hub: EventHub, + ) -> Result, BoxedError> { + let left_schema = input_schemas + .get(&LEFT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(LEFT_IN_SUBQUERY_PORT))? + .clone(); + let right_schema = input_schemas + .get(&RIGHT_IN_SUBQUERY_PORT) + .ok_or(PipelineError::InvalidPortHandle(RIGHT_IN_SUBQUERY_PORT))? + .clone(); + + if right_schema.fields.len() != 1 { + return Err(PipelineError::InvalidQuery( + "IN subquery must return exactly one column".to_string(), + ) + .into()); + } + + let left_expr = ExpressionBuilder::new(left_schema.fields.len(), self.runtime.clone()) + .build(false, &self.left_expr, &left_schema, &self.udfs) + .await?; + let right_expr = Expression::Column { index: 0 }; + + Ok(Box::new(InSubqueryProcessor::new( + left_schema, + right_schema, + left_expr, + right_expr, + ))) + } +} + +#[derive(Debug)] +pub struct InSubqueryProcessor { + left_schema: Schema, + right_schema: Schema, + left_expr: Expression, + right_expr: Expression, + left_records: HashMap>, + right_counts: HashMap, +} + +impl InSubqueryProcessor { + fn new( + left_schema: Schema, + right_schema: Schema, + left_expr: Expression, + right_expr: Expression, + ) -> Self { + Self { + left_schema, + right_schema, + left_expr, + right_expr, + left_records: HashMap::new(), + right_counts: HashMap::new(), + } + } + + fn left_key(&mut self, record: &Record) -> Result { + Ok(self.left_expr.evaluate(record, &self.left_schema)?) + } + + fn right_key(&mut self, record: &Record) -> Result { + Ok(self.right_expr.evaluate(record, &self.right_schema)?) + } + + fn right_contains(&self, key: &Field) -> bool { + self.right_counts.get(key).copied().unwrap_or_default() > 0 + } + + fn add_left_record(&mut self, key: Field, record: Record) { + self.left_records.entry(key).or_default().push(record); + } + + fn remove_left_record(&mut self, key: &Field, record: &Record) -> bool { + let Some(records) = self.left_records.get_mut(key) else { + return false; + }; + + let Some(index) = records.iter().position(|stored| stored == record) else { + return false; + }; + + records.remove(index); + let should_remove_key = records.is_empty(); + if should_remove_key { + self.left_records.remove(key); + } + true + } + + fn matching_left_records(&self, key: &Field) -> Vec { + self.left_records.get(key).cloned().unwrap_or_default() + } + + fn process_left_insert( + &mut self, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.left_key(&new)?; + let matched = self.right_contains(&key); + self.add_left_record(key, new.clone()); + if matched { + fw.send(TableOperation::without_id( + Operation::Insert { new }, + DEFAULT_PORT_HANDLE, + )); + } + Ok(()) + } + + fn process_left_delete( + &mut self, + old: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.left_key(&old)?; + let matched = self.right_contains(&key); + let existed = self.remove_left_record(&key, &old); + if existed && matched { + fw.send(TableOperation::without_id( + Operation::Delete { old }, + DEFAULT_PORT_HANDLE, + )); + } + Ok(()) + } + + fn process_left_update( + &mut self, + old: Record, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let old_key = self.left_key(&old)?; + let new_key = self.left_key(&new)?; + let old_matched = self.right_contains(&old_key); + let new_matched = self.right_contains(&new_key); + let existed = self.remove_left_record(&old_key, &old); + self.add_left_record(new_key, new.clone()); + + if !existed { + return Ok(()); + } + + match (old_matched, new_matched) { + (true, true) => fw.send(TableOperation::without_id( + Operation::Update { old, new }, + DEFAULT_PORT_HANDLE, + )), + (true, false) => fw.send(TableOperation::without_id( + Operation::Delete { old }, + DEFAULT_PORT_HANDLE, + )), + (false, true) => fw.send(TableOperation::without_id( + Operation::Insert { new }, + DEFAULT_PORT_HANDLE, + )), + (false, false) => {} + } + Ok(()) + } + + fn process_right_insert( + &mut self, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.right_key(&new)?; + let count = self.right_counts.entry(key.clone()).or_default(); + let was_present = *count > 0; + *count += 1; + + if !was_present { + for record in self.matching_left_records(&key) { + fw.send(TableOperation::without_id( + Operation::Insert { new: record }, + DEFAULT_PORT_HANDLE, + )); + } + } + Ok(()) + } + + fn process_right_delete( + &mut self, + old: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let key = self.right_key(&old)?; + let Some(count) = self.right_counts.get_mut(&key) else { + return Ok(()); + }; + + *count -= 1; + if *count == 0 { + self.right_counts.remove(&key); + for record in self.matching_left_records(&key) { + fw.send(TableOperation::without_id( + Operation::Delete { old: record }, + DEFAULT_PORT_HANDLE, + )); + } + } + Ok(()) + } + + fn process_right_update( + &mut self, + old: Record, + new: Record, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), PipelineError> { + let old_key = self.right_key(&old)?; + let new_key = self.right_key(&new)?; + if old_key == new_key { + return Ok(()); + } + + self.process_right_delete(old, fw)?; + self.process_right_insert(new, fw) + } +} + +impl Processor for InSubqueryProcessor { + fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn process( + &mut self, + op: TableOperation, + fw: &mut dyn ProcessorChannelForwarder, + ) -> Result<(), BoxedError> { + match (op.port, op.op) { + (LEFT_IN_SUBQUERY_PORT, Operation::Insert { new }) => { + self.process_left_insert(new, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::Delete { old }) => { + self.process_left_delete(old, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::Update { old, new }) => { + self.process_left_update(old, new, fw)? + } + (LEFT_IN_SUBQUERY_PORT, Operation::BatchInsert { new }) => { + for record in new { + self.process_left_insert(record, fw)?; + } + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Insert { new }) => { + self.process_right_insert(new, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Delete { old }) => { + self.process_right_delete(old, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::Update { old, new }) => { + self.process_right_update(old, new, fw)? + } + (RIGHT_IN_SUBQUERY_PORT, Operation::BatchInsert { new }) => { + for record in new { + self.process_right_insert(record, fw)?; + } + } + (port, _) => return Err(PipelineError::InvalidPortHandle(port).into()), + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use dozer_core::channels::ProcessorChannelForwarder; + use dozer_core::node::Processor; + use dozer_sql_expression::execution::Expression; + use dozer_types::types::{ + Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, + TableOperation, + }; + + use super::{InSubqueryProcessor, LEFT_IN_SUBQUERY_PORT, RIGHT_IN_SUBQUERY_PORT}; + + struct TestForwarder { + operations: Vec, + } + + impl ProcessorChannelForwarder for TestForwarder { + fn send(&mut self, op: TableOperation) { + self.operations.push(op); + } + } + + fn left_schema() -> Schema { + Schema::default() + .field( + FieldDefinition::new( + "id".to_string(), + FieldType::Int, + false, + SourceDefinition::Dynamic, + ), + false, + ) + .field( + FieldDefinition::new( + "name".to_string(), + FieldType::String, + false, + SourceDefinition::Dynamic, + ), + false, + ) + .clone() + } + + fn right_schema() -> Schema { + Schema::default() + .field( + FieldDefinition::new( + "id".to_string(), + FieldType::Int, + false, + SourceDefinition::Dynamic, + ), + false, + ) + .clone() + } + + fn left_record(id: i64, name: &str) -> Record { + Record::new(vec![Field::Int(id), Field::String(name.to_string())]) + } + + fn right_record(id: i64) -> Record { + Record::new(vec![Field::Int(id)]) + } + + #[test] + fn streams_left_rows_when_right_membership_changes() { + let mut processor = InSubqueryProcessor::new( + left_schema(), + right_schema(), + Expression::Column { index: 0 }, + Expression::Column { index: 0 }, + ); + let mut forwarder = TestForwarder { operations: vec![] }; + + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: right_record(1), + }, + RIGHT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: left_record(1, "penelope"), + }, + LEFT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: left_record(2, "jack"), + }, + LEFT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: right_record(2), + }, + RIGHT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation::without_id( + Operation::Update { + old: left_record(1, "penelope"), + new: left_record(3, "penelope"), + }, + LEFT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + processor + .process( + TableOperation::without_id( + Operation::Insert { + new: right_record(3), + }, + RIGHT_IN_SUBQUERY_PORT, + ), + &mut forwarder, + ) + .unwrap(); + + let operations = forwarder + .operations + .into_iter() + .map(|op| op.op) + .collect::>(); + assert_eq!( + operations, + vec![ + Operation::Insert { + new: left_record(1, "penelope") + }, + Operation::Insert { + new: left_record(2, "jack") + }, + Operation::Delete { + old: left_record(1, "penelope") + }, + Operation::Insert { + new: left_record(3, "penelope") + }, + ] + ); + } +} diff --git a/dozer-sql/src/selection/mod.rs b/dozer-sql/src/selection/mod.rs index 9a12ba12cc..ccfcabaef9 100644 --- a/dozer-sql/src/selection/mod.rs +++ b/dozer-sql/src/selection/mod.rs @@ -1,2 +1,3 @@ pub mod factory; +pub mod in_subquery; pub mod processor; diff --git a/dozer-tests/src/sql_tests/full/in_subquery.test b/dozer-tests/src/sql_tests/full/in_subquery.test new file mode 100644 index 0000000000..86f7951f94 --- /dev/null +++ b/dozer-tests/src/sql_tests/full/in_subquery.test @@ -0,0 +1,75 @@ +control sortmode rowsort + + +statement ok +CREATE TABLE actor( + actor_id integer NOT NULL, + first_name text NOT NULL +) + +statement ok +CREATE TABLE allowed_actor( + actor_id integer NOT NULL +) + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (1, 'penelope'); + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (2, 'jack'); + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (3, 'angelina'); + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (1); + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (3); + +query I +select actor_id from actor where actor_id in (1, 3) +---- +1 +3 + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) +---- +1 penelope +3 angelina + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (2); + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) +---- +1 penelope +2 jack +3 angelina + +statement ok +DELETE FROM allowed_actor WHERE actor_id = 1; + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) +---- +2 jack +3 angelina + +statement ok +UPDATE actor SET actor_id = 30 WHERE actor_id = 3; + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) +---- +2 jack + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (30); + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) and first_name != 'jack' +---- +30 angelina diff --git a/dozer-tests/src/sql_tests/prototype/in_subquery.test b/dozer-tests/src/sql_tests/prototype/in_subquery.test new file mode 100644 index 0000000000..e88eed3489 --- /dev/null +++ b/dozer-tests/src/sql_tests/prototype/in_subquery.test @@ -0,0 +1,58 @@ +control sortmode rowsort + + +statement ok +CREATE TABLE actor( + actor_id integer NOT NULL, + first_name text NOT NULL +) + +statement ok +CREATE TABLE allowed_actor( + actor_id integer NOT NULL +) + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (1, 'penelope'); + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (2, 'jack'); + +statement ok +INSERT INTO actor(actor_id, first_name) VALUES (3, 'angelina'); + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (1); + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (3); + +query I +select actor_id from actor where actor_id in (1, 3) + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (2); + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) + +statement ok +DELETE FROM allowed_actor WHERE actor_id = 1; + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) + +statement ok +UPDATE actor SET actor_id = 30 WHERE actor_id = 3; + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) + +statement ok +INSERT INTO allowed_actor(actor_id) VALUES (30); + +query IT +select actor_id, first_name from actor where actor_id in (select actor_id from allowed_actor) and first_name != 'jack'