Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,422 changes: 2,308 additions & 114 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,36 @@ arrow-array = "52"
arrow-ipc = "52"
arrow-schema = "52"
proctitle = "0.1"
unicase = "2.7"
petgraph = "0.7"
itertools = "0.14"
strum = { version = "0.26", features = ["derive"] }
datafusion-functions-aggregate = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}

typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/parquet'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}

sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
jiter = {git = "https://github.com/ArroyoSystems/jiter", branch = "disable_python" }


[features]
default = ["incremental-cache", "python"]
incremental-cache = ["wasmtime/incremental-cache"]
python = []

[patch."https://github.com/ArroyoSystems/sqlparser-rs"]
sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }
11 changes: 10 additions & 1 deletion src/coordinator/analyze/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::Analysis;
use crate::coordinator::execution_context::ExecutionContext;
use crate::coordinator::statement::{
CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction, Statement,
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction, StreamingSql,
};
use std::fmt;

Expand Down Expand Up @@ -115,4 +115,13 @@ impl StatementVisitor for Analyzer<'_> {
) -> StatementVisitorResult {
StatementVisitorResult::Analyze(Box::new(stmt.clone()))
}

fn visit_streaming_sql(
&self,
stmt: &StreamingSql,
_context: &StatementVisitorContext,
) -> StatementVisitorResult {
// TODO: add semantic analysis for streaming SQL (schema validation, etc.)
StatementVisitorResult::Analyze(Box::new(StreamingSql::new(stmt.statement.clone())))
}
}
4 changes: 3 additions & 1 deletion src/coordinator/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::coordinator::execution::Executor;
use crate::coordinator::plan::{LogicalPlanVisitor, LogicalPlanner, PlanNode};
use crate::coordinator::statement::Statement;
use crate::runtime::taskexecutor::TaskManager;
use crate::sql::planner::StreamSchemaProvider;

use super::execution_context::ExecutionContext;

Expand Down Expand Up @@ -90,7 +91,8 @@ impl Coordinator {
}

fn step_build_logical_plan(&self, analysis: &Analysis) -> Result<Box<dyn PlanNode>> {
let visitor = LogicalPlanVisitor::new();
let schema_provider = StreamSchemaProvider::new();
let visitor = LogicalPlanVisitor::new(schema_provider);
let plan = visitor.visit(analysis);
Ok(plan)
}
Expand Down
14 changes: 14 additions & 0 deletions src/coordinator/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::coordinator::dataset::{ExecuteResult, ShowFunctionsResult, empty_reco
use crate::coordinator::plan::{
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, PlanVisitor,
PlanVisitorContext, PlanVisitorResult, ShowFunctionsPlan, StartFunctionPlan, StopFunctionPlan,
StreamingSqlPlan,
};
use crate::coordinator::statement::{ConfigSource, FunctionSource};
use crate::runtime::taskexecutor::TaskManager;
Expand Down Expand Up @@ -200,4 +201,17 @@ impl PlanVisitor for Executor {

PlanVisitorResult::Execute(result)
}

fn visit_streaming_sql_plan(
&self,
plan: &StreamingSqlPlan,
_context: &PlanVisitorContext,
) -> PlanVisitorResult {
// TODO: apply rewrite_plan for streaming transformations, then execute
let result = Err(ExecuteError::Internal(format!(
"Streaming SQL execution not yet implemented. LogicalPlan:\n{}",
plan.logical_plan.display_indent()
)));
PlanVisitorResult::Execute(result)
}
}
2 changes: 1 addition & 1 deletion src/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ pub use coordinator::Coordinator;
pub use dataset::{DataSet, ShowFunctionsResult};
pub use statement::{
CreateFunction, CreatePythonFunction, DropFunction, PythonModule, ShowFunctions, StartFunction,
Statement, StopFunction,
Statement, StopFunction, StreamingSql,
};
35 changes: 28 additions & 7 deletions src/coordinator/plan/logical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tracing::debug;

use crate::coordinator::analyze::analysis::Analysis;
use crate::coordinator::plan::{
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, ShowFunctionsPlan,
StartFunctionPlan, StopFunctionPlan,
StartFunctionPlan, StopFunctionPlan, StreamingSqlPlan,
};
use crate::coordinator::statement::{
CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction,
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction, StreamingSql,
};
use crate::sql::planner::StreamSchemaProvider;

#[derive(Debug, Default)]
pub struct LogicalPlanVisitor;
pub struct LogicalPlanVisitor {
schema_provider: StreamSchemaProvider,
}

impl LogicalPlanVisitor {
pub fn new() -> Self {
Self
pub fn new(schema_provider: StreamSchemaProvider) -> Self {
Self { schema_provider }
}

pub fn visit(&self, analysis: &Analysis) -> Box<dyn PlanNode> {
Expand All @@ -51,7 +55,6 @@ impl StatementVisitor for LogicalPlanVisitor {
let config_source = stmt.get_config_source().cloned();
let extra_props = stmt.get_extra_properties().clone();

// Name will be read from config file during execution
StatementVisitorResult::Plan(Box::new(CreateFunctionPlan::new(
function_source,
config_source,
Expand Down Expand Up @@ -106,4 +109,22 @@ impl StatementVisitor for LogicalPlanVisitor {
config_content,
)))
}

fn visit_streaming_sql(
&self,
stmt: &StreamingSql,
_context: &StatementVisitorContext,
) -> StatementVisitorResult {
let sql_to_rel = datafusion::sql::planner::SqlToRel::new(&self.schema_provider);

match sql_to_rel.sql_statement_to_plan(stmt.statement.clone()) {
Ok(plan) => {
debug!("Logical plan:\n{}", plan.display_graphviz());
StatementVisitorResult::Plan(Box::new(StreamingSqlPlan::new(plan)))
}
Err(e) => {
panic!("Failed to convert SQL statement to logical plan: {e}");
}
}
}
}
2 changes: 2 additions & 0 deletions src/coordinator/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod optimizer;
mod show_functions_plan;
mod start_function_plan;
mod stop_function_plan;
mod streaming_sql_plan;
mod visitor;

pub use create_function_plan::CreateFunctionPlan;
Expand All @@ -28,6 +29,7 @@ pub use optimizer::LogicalPlanner;
pub use show_functions_plan::ShowFunctionsPlan;
pub use start_function_plan::StartFunctionPlan;
pub use stop_function_plan::StopFunctionPlan;
pub use streaming_sql_plan::StreamingSqlPlan;
pub use visitor::{PlanVisitor, PlanVisitorContext, PlanVisitorResult};

use std::fmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod sql_parser;
use datafusion::logical_expr::LogicalPlan;

pub use sql_parser::SqlParser;
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};

#[derive(Debug)]
pub struct ParseError {
pub message: String,
pub struct StreamingSqlPlan {
pub logical_plan: LogicalPlan,
}

impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Parse error: {}", self.message)
impl StreamingSqlPlan {
pub fn new(logical_plan: LogicalPlan) -> Self {
Self { logical_plan }
}
}

impl std::error::Error for ParseError {}

impl From<String> for ParseError {
fn from(message: String) -> Self {
ParseError { message }
}
}

impl ParseError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
impl PlanNode for StreamingSqlPlan {
fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
visitor.visit_streaming_sql_plan(self, context)
}
}
8 changes: 7 additions & 1 deletion src/coordinator/plan/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use super::{
CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, ShowFunctionsPlan,
StartFunctionPlan, StopFunctionPlan,
StartFunctionPlan, StopFunctionPlan, StreamingSqlPlan,
};

/// Context passed to PlanVisitor methods
Expand Down Expand Up @@ -84,4 +84,10 @@ pub trait PlanVisitor {
plan: &CreatePythonFunctionPlan,
context: &PlanVisitorContext,
) -> PlanVisitorResult;

fn visit_streaming_sql_plan(
&self,
plan: &StreamingSqlPlan,
context: &PlanVisitorContext,
) -> PlanVisitorResult;
}
2 changes: 2 additions & 0 deletions src/coordinator/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod drop_function;
mod show_functions;
mod start_function;
mod stop_function;
mod streaming_sql;
mod visitor;

pub use create_function::{ConfigSource, CreateFunction, FunctionSource};
Expand All @@ -24,6 +25,7 @@ pub use drop_function::DropFunction;
pub use show_functions::ShowFunctions;
pub use start_function::StartFunction;
pub use stop_function::StopFunction;
pub use streaming_sql::StreamingSql;
pub use visitor::{StatementVisitor, StatementVisitorContext, StatementVisitorResult};

use std::fmt;
Expand Down
39 changes: 39 additions & 0 deletions src/coordinator/statement/streaming_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use datafusion::sql::sqlparser::ast::Statement as DFStatement;

use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};

/// Wraps a DataFusion SQL statement (SELECT, INSERT, CREATE TABLE, etc.)
/// so it can flow through the same Statement → StatementVisitor pipeline
/// as FunctionStream DDL commands.
#[derive(Debug)]
pub struct StreamingSql {
pub statement: DFStatement,
}

impl StreamingSql {
pub fn new(statement: DFStatement) -> Self {
Self { statement }
}
}

impl Statement for StreamingSql {
fn accept(
&self,
visitor: &dyn StatementVisitor,
context: &StatementVisitorContext,
) -> StatementVisitorResult {
visitor.visit_streaming_sql(self, context)
}
}
7 changes: 7 additions & 0 deletions src/coordinator/statement/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

use super::{
CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction, StopFunction,
StreamingSql,
};
use crate::coordinator::plan::PlanNode;
use crate::coordinator::statement::Statement;
Expand Down Expand Up @@ -87,4 +88,10 @@ pub trait StatementVisitor {
stmt: &CreatePythonFunction,
context: &StatementVisitorContext,
) -> StatementVisitorResult;

fn visit_streaming_sql(
&self,
stmt: &StreamingSql,
context: &StatementVisitorContext,
) -> StatementVisitorResult;
}
Loading
Loading