diff --git a/libs/vectorized-exec-spi/build.gradle b/libs/vectorized-exec-spi/build.gradle new file mode 100644 index 0000000000000..dfb95964d01f5 --- /dev/null +++ b/libs/vectorized-exec-spi/build.gradle @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'opensearch.build' + +description = 'Vectorized engine common interfaces for OpenSearch' + +dependencies { + api project(':libs:opensearch-core') + api project(':libs:opensearch-common') + + testImplementation(project(":test:framework")) { + exclude group: 'org.opensearch', module: 'vectorized-exec-spi' + } +} + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} + +jarHell.enabled = false + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/package-info.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/package-info.java new file mode 100644 index 0000000000000..8d91260830538 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * DataFusion integration for OpenSearch. + * Provides JNI bindings and core functionality for DataFusion query engine. + */ +package org.opensearch.vectorized.execution; diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/DataSourceCodec.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/DataSourceCodec.java new file mode 100644 index 0000000000000..c42b5d67c8791 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/DataSourceCodec.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.vectorized.execution.spi; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Service Provider Interface for DataFusion data source codecs. + * Implementations provide access to different data formats (CSV, Parquet, etc.) + * through the DataFusion query engine. + */ +public interface DataSourceCodec { + + /** + * Register a directory containing data files with the runtime environment to prewarm cache + * This ideally should be used as part of each refresh - equivalent of acquire searcher + * where we register the files associated with this particular refresh point + * @param directoryPath the path to the directory containing data files + * @param fileNames the list of file names to register + * @param runtimeId the runtime environment ID + * @return a CompletableFuture that completes when registration is done + */ + CompletableFuture registerDirectory(String directoryPath, List fileNames, long runtimeId); + + /** + * Create a new session context for query execution. + * + * @param globalRuntimeEnvId the global runtime environment ID + * @return a CompletableFuture containing the session context ID + */ + CompletableFuture createSessionContext(long globalRuntimeEnvId); + + /** + * Execute a Substrait query plan. + * + * @param sessionContextId the session context ID + * @param substraitPlanBytes the serialized Substrait query plan + * @return a CompletableFuture containing the result stream + */ + CompletableFuture executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes); + + /** + * Close a session context and free associated resources. + * + * @param sessionContextId the session context ID to close + * @return a CompletableFuture that completes when the context is closed + */ + CompletableFuture closeSessionContext(long sessionContextId); +} diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/RecordBatchStream.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/RecordBatchStream.java new file mode 100644 index 0000000000000..b79f895c243b9 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/RecordBatchStream.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.vectorized.execution.spi; + +import java.util.concurrent.CompletableFuture; + +/** + * Represents a stream of record batches from a DataFusion query execution. + * This interface provides access to query results in a streaming fashion. + */ +public interface RecordBatchStream extends AutoCloseable { + + /** + * Check if there are more record batches available in the stream. + * + * @return true if more batches are available, false otherwise + */ + boolean hasNext(); + + /** + * Get the schema of the record batches in this stream. + * @return the schema object + */ + Object getSchema(); + + /** + * Get the next record batch from the stream. + * + * @return the next record batch as a byte array, or null if no more batches + */ + CompletableFuture next(); + + /** + * Close the stream and free associated resources. + */ + @Override + void close(); +} diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/package-info.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/package-info.java new file mode 100644 index 0000000000000..9402386b8746b --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/spi/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface (SPI) for DataFusion data source codecs. + * Defines interfaces for implementing different data format support. + */ +package org.opensearch.vectorized.execution.spi; diff --git a/plugins/dataformat-csv/build.gradle b/plugins/dataformat-csv/build.gradle new file mode 100644 index 0000000000000..99860394bff22 --- /dev/null +++ b/plugins/dataformat-csv/build.gradle @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'opensearch.opensearchplugin' + +opensearchplugin { + name = 'dataformat-csv' + description = 'CSV data format plugin for OpenSearch DataFusion' + classname = 'org.opensearch.datafusion.csv.CsvDataFormatPlugin' + hasNativeController = false +} + +dependencies { + api project(':libs:opensearch-vectorized-exec-spi') + api project(':libs:opensearch-core') + api project(':libs:opensearch-common') + + testImplementation(project(":test:framework")) { + exclude group: 'org.opensearch', module: 'opensearch-dataformat-csv' + } +} + +// JNI library configuration +task buildJni(type: Exec) { + description = 'Build the Rust JNI library using Cargo' + group = 'build' + + workingDir 'jni' + + // Determine the target directory and library name based on OS + def osName = System.getProperty('os.name').toLowerCase() + def libPrefix = osName.contains('windows') ? '' : 'lib' + def libExtension = osName.contains('windows') ? '.dll' : (osName.contains('mac') ? '.dylib' : '.so') + + // Find cargo executable - try common locations + def cargoExecutable = 'cargo' + def possibleCargoPaths = [ + System.getenv('HOME') + '/.cargo/bin/cargo', + '/usr/local/bin/cargo', + 'cargo' + ] + + for (String path : possibleCargoPaths) { + if (new File(path).exists()) { + cargoExecutable = path + break + } + } + + // Use release build + //def cargoArgs = ['cargo', 'build', '--release'] + + def cargoArgs = [cargoExecutable, 'build', '--release'] + + if (osName.contains('windows')) { + commandLine cargoArgs + } else { + commandLine cargoArgs + } + + // Set environment variables for cross-compilation if needed + environment 'CARGO_TARGET_DIR', file('jni/target').absolutePath + + inputs.files fileTree('jni/src') + inputs.file 'jni/Cargo.toml' + outputs.files file("jni/target/release/${libPrefix}opensearch_datafusion_csv_jni${libExtension}") +} + +task copyJniLib(type: Copy, dependsOn: buildJni) { + from 'jni/target/release' + into 'src/main/resources' + include '*.dylib', '*.so', '*.dll' + + doLast { + // Remove executable permissions from copied native libraries + fileTree('src/main/resources').matching { + include '*.dylib', '*.so', '*.dll' + }.each { file -> + file.setExecutable(false, false) + file.setReadable(true, false) + file.setWritable(true, false) + } + } +} + +processResources.dependsOn copyJniLib +sourcesJar.dependsOn copyJniLib + +// Ensure file permissions check runs after JNI library is copied +tasks.named('filepermissions').configure { + dependsOn copyJniLib +} + +// Ensure forbidden patterns check runs after JNI library is copied +tasks.named('forbiddenPatterns').configure { + dependsOn copyJniLib + exclude '**/*.dylib', '**/*.so', '**/*.dll' +} + +// Ensure spotless check runs after JNI library is copied +tasks.named('spotlessJava').configure { + dependsOn copyJniLib +} + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/plugins/dataformat-csv/jni/Cargo.toml b/plugins/dataformat-csv/jni/Cargo.toml new file mode 100644 index 0000000000000..be5b6c92bfa66 --- /dev/null +++ b/plugins/dataformat-csv/jni/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "opensearch-datafusion-csv-jni" +version = "0.1.0" +edition = "2021" + +[lib] +name = "opensearch_datafusion_csv_jni" +crate-type = ["cdylib"] + +[dependencies] +# DataFusion dependencies +datafusion = "49.0.0" +datafusion-substrait = "49.0.0" +arrow = "54.0.0" +arrow-array = "54.0.0" +arrow-schema = "54.0.0" +arrow-buffer = "54.0.0" + +# JNI dependencies +jni = "0.21" + +# Async runtime +tokio = { version = "1.0", features = ["full"] } +futures = "0.3" +futures-util = "0.3" + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Error handling +anyhow = "1.0" +thiserror = "1.0" + +# Logging +log = "0.4" + +# Parquet support +parquet = "54.0.0" + +# Object store for file access +object_store = "0.11" +url = "2.0" + +# Substrait support +substrait = "0.47" +prost = "0.13" + +# Temporary directory support +tempfile = "3.0" + +[build-dependencies] +cbindgen = "0.27" diff --git a/plugins/dataformat-csv/jni/src/context.rs b/plugins/dataformat-csv/jni/src/context.rs new file mode 100644 index 0000000000000..0878254479201 --- /dev/null +++ b/plugins/dataformat-csv/jni/src/context.rs @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use datafusion::prelude::*; +use datafusion::execution::context::SessionContext; +use std::collections::HashMap; +use std::sync::Arc; +use anyhow::Result; + +/// Manages DataFusion session contexts +pub struct SessionContextManager { + contexts: HashMap<*mut SessionContext, Arc>, + next_runtime_id: u64, +} + +impl SessionContextManager { + pub fn new() -> Self { + Self { + contexts: HashMap::new(), + next_runtime_id: 1, + } + } + + pub async fn register_directory( + &mut self, + table_name: &str, + directory_path: &str, + options: HashMap, + ) -> Result { + // Placeholder implementation - would register csv directory as table + log::info!("Registering directory: {} at path: {} with options: {:?}", + table_name, directory_path, options); + + let runtime_id = self.next_runtime_id; + self.next_runtime_id += 1; + Ok(runtime_id) + } + + pub async fn create_session_context( + &mut self, + config: HashMap, + ) -> Result<*mut SessionContext> { + // Create actual DataFusion session context + let mut session_config = SessionConfig::new(); + + // Apply configuration options + if let Some(batch_size) = config.get("batch_size") { + if let Ok(size) = batch_size.parse::() { + session_config = session_config.with_batch_size(size); + } + } + + let ctx = Arc::new(SessionContext::new_with_config(session_config)); + let ctx_ptr = Arc::as_ptr(&ctx) as *mut SessionContext; + + self.contexts.insert(ctx_ptr, ctx); + + Ok(ctx_ptr) + } + + pub async fn close_session_context(&mut self, ctx_ptr: *mut SessionContext) -> Result<()> { + self.contexts.remove(&ctx_ptr); + Ok(()) + } + + pub fn get_context(&self, ctx_ptr: *mut SessionContext) -> Option<&Arc> { + self.contexts.get(&ctx_ptr) + } +} diff --git a/plugins/dataformat-csv/jni/src/csv_exec.rs b/plugins/dataformat-csv/jni/src/csv_exec.rs new file mode 100644 index 0000000000000..2043be331b35a --- /dev/null +++ b/plugins/dataformat-csv/jni/src/csv_exec.rs @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use anyhow::Result; + +/// Csv-specific execution utilities - placeholder implementation +pub struct CsvExecutor; + +impl CsvExecutor { + pub fn new() -> Self { + Self + } + + /// Create a listing table for Csv files - placeholder + pub async fn create_csv_table( + &self, + table_path: &str, + ) -> Result { + // Placeholder implementation + log::info!("Creating csv table for path: {}", table_path); + Ok(1) // Return dummy table ID + } +} diff --git a/plugins/dataformat-csv/jni/src/lib.rs b/plugins/dataformat-csv/jni/src/lib.rs new file mode 100644 index 0000000000000..34618f94a9372 --- /dev/null +++ b/plugins/dataformat-csv/jni/src/lib.rs @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! OpenSearch DataFusion Csv JNI Library +//! +//! This library provides JNI bindings for DataFusion query execution, + +use jni::JNIEnv; +use jni::objects::{JClass, JString, JObjectArray, JByteArray}; +use jni::sys::{jlong, jstring}; +use std::ptr; +use std::collections::HashMap; + +mod context; +mod runtime; +mod stream; +mod substrait; +mod util; +mod csv_exec; + +use context::SessionContextManager; +use runtime::RuntimeManager; +use stream::RecordBatchStreamWrapper; +use substrait::SubstraitExecutor; +use datafusion::execution::context::SessionContext; +use datafusion::execution::runtime_env::RuntimeEnv; + +/** +TODO : Put more thought into this +**/ +static mut RUNTIME_MANAGER: Option = None; + +static mut SESSION_MANAGER: Option = None; + +/// Initialize the managers (call once) +fn init_managers() { + unsafe { + if RUNTIME_MANAGER.is_none() { + RUNTIME_MANAGER = Some(RuntimeManager::new()); + } + if SESSION_MANAGER.is_none() { + SESSION_MANAGER = Some(SessionContextManager::new()); + } + } +} +static mut RUNTIME_ENVIRONMENTS: Option> = None; + + +/// Register a directory as a table in the global context and return runtime environment ID +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvDataSourceCodec_nativeRegisterDirectory( + mut env: JNIEnv, + _class: JClass, + table_name: JString, + directory_path: JString, + files: JObjectArray, + runtime_id: jlong +) { + let runtimeEnv = unsafe { &mut *(runtime_id as *mut RuntimeEnv) }; + // placeholder +} + +/// Create a new session context +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvDataSourceCodec_nativeCreateSessionContext( + mut env: JNIEnv, + _class: JClass, + config_keys: JObjectArray, + config_values: JObjectArray, +) -> jlong { + // Initialize managers if not already done + init_managers(); + + // PLACEHOLDER + // Parse configuration from JNI arrays + let config = match util::parse_string_map(&mut env, config_keys, config_values) { + Ok(cfg) => cfg, + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to parse config: {}", e)); + return 0; + } + }; + + // Create session context + match unsafe { + RUNTIME_MANAGER.as_ref().unwrap().block_on(async { + SESSION_MANAGER.as_mut().unwrap().create_session_context(config).await + }) + } { + Ok(context_ptr) => context_ptr as jlong, + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to create session context: {}", e)); + 0 + } + } +} + +/// Execute a Substrait query plan +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvDataSourceCodec_nativeExecuteSubstraitQuery( + mut env: JNIEnv, + _class: JClass, + session_context_ptr: jlong, + substrait_plan: JByteArray, +) -> jlong { + + // Convert JByteArray to Vec + let substrait_plan_bytes = match env.convert_byte_array(substrait_plan) { + Ok(bytes) => bytes, + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to convert substrait plan: {}", e)); + return 0; + } + }; + + // Execute the query + match unsafe { + RUNTIME_MANAGER.as_ref().unwrap().block_on(async { + let executor = SubstraitExecutor::new(); + executor.execute_plan(session_context_ptr as *mut SessionContext, &substrait_plan_bytes).await + }) + } { + Ok(stream_ptr) => stream_ptr as jlong, + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to execute query: {}", e)); + 0 + } + } +} + +/// Close a session context +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvDataSourceCodec_nativeCloseSessionContext( + mut env: JNIEnv, + _class: JClass, + session_context_ptr: jlong, +) { + + if let Err(e) = unsafe { + RUNTIME_MANAGER.as_ref().unwrap().block_on(async { + SESSION_MANAGER.as_mut().unwrap() + .close_session_context(session_context_ptr as *mut SessionContext) + .await + }) + } { + util::throw_exception(&mut env, &format!("Failed to close session context: {}", e)); + } +} + +/// Get the next record batch from a stream +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvRecordBatchStream_nativeNextBatch( + mut env: JNIEnv, + _class: JClass, + stream_ptr: jlong, +) -> jstring { + + let stream = unsafe { &mut *(stream_ptr as *mut RecordBatchStreamWrapper) }; + + match unsafe { + RUNTIME_MANAGER.as_ref().unwrap().block_on(async { + stream.next_batch().await + }) + } { + Ok(Some(batch_json)) => { + match env.new_string(&batch_json) { + Ok(jstr) => jstr.into_raw(), + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to create Java string: {}", e)); + ptr::null_mut() + } + } + } + Ok(None) => ptr::null_mut(), // End of stream + Err(e) => { + util::throw_exception(&mut env, &format!("Failed to get next batch: {}", e)); + ptr::null_mut() + } + } +} + +/// Close a record batch stream +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_csv_CsvRecordBatchStream_nativeCloseStream( + _env: JNIEnv, + _class: JClass, + stream_ptr: jlong, +) { + if stream_ptr != 0 { + let stream = unsafe { Box::from_raw(stream_ptr as *mut RecordBatchStreamWrapper) }; + drop(stream); + } +} diff --git a/plugins/dataformat-csv/jni/src/runtime.rs b/plugins/dataformat-csv/jni/src/runtime.rs new file mode 100644 index 0000000000000..bcd48a7dee58b --- /dev/null +++ b/plugins/dataformat-csv/jni/src/runtime.rs @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use tokio::runtime::Runtime; +use std::future::Future; + +/// Manages the Tokio runtime for async operations +pub struct RuntimeManager { + runtime: Runtime, +} + +impl RuntimeManager { + pub fn new() -> Self { + // Placeholder + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + Self { runtime } + } + + pub fn block_on(&self, future: F) -> F::Output + where + F: Future, + { + self.runtime.block_on(future) + } +} diff --git a/plugins/dataformat-csv/jni/src/stream.rs b/plugins/dataformat-csv/jni/src/stream.rs new file mode 100644 index 0000000000000..2fe30f941223b --- /dev/null +++ b/plugins/dataformat-csv/jni/src/stream.rs @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use anyhow::Result; +use serde_json; + +/// Wrapper for DataFusion record batch streams - placeholder implementation +pub struct RecordBatchStreamWrapper { + batch_count: u32, + is_placeholder: bool, +} + +impl RecordBatchStreamWrapper { + pub fn new_placeholder() -> Self { + Self { + batch_count: 0, + is_placeholder: true, + } + } + + pub async fn next_batch(&mut self) -> Result> { + // Return placeholder data for first few calls, then None + if self.is_placeholder { + if self.batch_count < 2 { + self.batch_count += 1; + let placeholder_data = serde_json::json!({ + "rows": [ + {"id": self.batch_count, "name": format!("placeholder_row_{}", self.batch_count)} + ], + "num_rows": 1, + "num_columns": 2 + }); + Ok(Some(serde_json::to_string(&placeholder_data)?)) + } else { + Ok(None) // End of stream + } + } else { + // Real implementation would go here + Ok(None) + } + } +} diff --git a/plugins/dataformat-csv/jni/src/substrait.rs b/plugins/dataformat-csv/jni/src/substrait.rs new file mode 100644 index 0000000000000..d8ca0f2846fd7 --- /dev/null +++ b/plugins/dataformat-csv/jni/src/substrait.rs @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use datafusion::execution::context::SessionContext; +use crate::stream::RecordBatchStreamWrapper; +use anyhow::Result; + +/// Executes Substrait query plans +pub struct SubstraitExecutor; + +impl SubstraitExecutor { + pub fn new() -> Self { + Self + } + + pub async fn execute_plan( + &self, + session_context_ptr: *mut SessionContext, + substrait_plan_bytes: &[u8], + ) -> Result<*mut RecordBatchStreamWrapper> { + // Placeholder implementation - would normally: + // 1. Parse Substrait plan from substrait_plan_bytes + // 2. Convert to DataFusion logical plan using datafusion-substrait + // 3. Execute using the session context + // 4. Return actual record batch stream + + log::info!("Executing Substrait plan with {} bytes for session: {:?}", + substrait_plan_bytes.len(), session_context_ptr); + + // For now, return a placeholder stream + let wrapper = RecordBatchStreamWrapper::new_placeholder(); + let wrapper_ptr = Box::into_raw(Box::new(wrapper)); + + Ok(wrapper_ptr) + } +} diff --git a/plugins/dataformat-csv/jni/src/util.rs b/plugins/dataformat-csv/jni/src/util.rs new file mode 100644 index 0000000000000..5055c1312791a --- /dev/null +++ b/plugins/dataformat-csv/jni/src/util.rs @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +use jni::JNIEnv; +use jni::objects::{JObjectArray, JString}; +use std::collections::HashMap; +use anyhow::Result; + +/// Parse a string map from JNI arrays +pub fn parse_string_map( + env: &mut JNIEnv, + keys: JObjectArray, + values: JObjectArray, +) -> Result> { + let mut map = HashMap::new(); + + let keys_len = env.get_array_length(&keys)?; + let values_len = env.get_array_length(&values)?; + + if keys_len != values_len { + return Err(anyhow::anyhow!("Keys and values arrays must have the same length")); + } + + for i in 0..keys_len { + let key_obj = env.get_object_array_element(&keys, i)?; + let value_obj = env.get_object_array_element(&values, i)?; + + let key_jstring = JString::from(key_obj); + let value_jstring = JString::from(value_obj); + + let key_str = env.get_string(&key_jstring)?; + let value_str = env.get_string(&value_jstring)?; + + map.insert(key_str.to_string_lossy().to_string(), value_str.to_string_lossy().to_string()); + } + + Ok(map) +} + +// Parse a string map from JNI arrays +pub fn parse_string_arr( + env: &mut JNIEnv, + files: JObjectArray, +) -> Result> { + let length = env.get_array_length(&files).unwrap(); + let mut rust_strings: Vec = Vec::with_capacity(length as usize); + for i in 0..length { + let file_obj = env.get_object_array_element(&files, i).unwrap(); + let jstring = JString::from(file_obj); + let rust_str: String = env + .get_string(&jstring) + .expect("Couldn't get java string!") + .into(); + rust_strings.push(rust_str); + } + Ok(rust_strings) +} + +/// Throw a Java exception +pub fn throw_exception(env: &mut JNIEnv, message: &str) { + let _ = env.throw_new("java/lang/RuntimeException", message); +} diff --git a/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataFormatPlugin.java b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataFormatPlugin.java new file mode 100644 index 0000000000000..e8f0d2306d2e6 --- /dev/null +++ b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataFormatPlugin.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.csv; + +import org.opensearch.plugins.DataSourcePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Plugin for CSV data format support in OpenSearch DataFusion. + * This plugin provides CSV data source codec through ServiceLoader mechanism. + * + * Todo: implement vectorized exec specific plugin + */ +public class CsvDataFormatPlugin extends Plugin implements DataSourcePlugin { + + /** + * Creates a new CSV data format plugin. + */ + public CsvDataFormatPlugin() { + // Plugin initialization + } + + // TODO : move to vectorized exec specific plugin + @Override + public Optional> getDataSourceCodecs() { + Map codecs = new HashMap<>(); + // TODO : version it correctly - similar to lucene codecs? + codecs.put("csv-v1", new CsvDataSourceCodec()); + return Optional.of(codecs); + // return Optional.empty(); + } +} diff --git a/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataSourceCodec.java b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataSourceCodec.java new file mode 100644 index 0000000000000..80622fbda6e31 --- /dev/null +++ b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvDataSourceCodec.java @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.csv; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; +import org.opensearch.vectorized.execution.spi.RecordBatchStream; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Datasource codec implementation for CSV files + */ +public class CsvDataSourceCodec implements DataSourceCodec { + + private static final Logger logger = LogManager.getLogger(CsvDataSourceCodec.class); + private static final AtomicLong runtimeIdGenerator = new AtomicLong(0); + private static final AtomicLong sessionIdGenerator = new AtomicLong(0); + private final ConcurrentHashMap sessionContexts = new ConcurrentHashMap<>(); + + // JNI library loading + static { + try { + JniLibraryLoader.loadLibrary(); + logger.info("DataFusion JNI library loaded successfully"); + } catch (Exception e) { + logger.error("Failed to load DataFusion JNI library", e); + throw new RuntimeException("Failed to initialize DataFusion JNI library", e); + } + } + + @Override + public CompletableFuture registerDirectory(String directoryPath, List fileNames, long runtimeId) { + return CompletableFuture.supplyAsync(() -> { + try { + logger.debug("Registering directory: {} with {} files", directoryPath, fileNames.size()); + + // Convert file names to arrays for JNI + String[] fileArray = fileNames.toArray(new String[0]); + + // Call native method to register directory + nativeRegisterDirectory("csv_table", directoryPath, fileArray, runtimeId); + return null; + } catch (Exception e) { + logger.error("Failed to register directory: " + directoryPath, e); + throw new CompletionException("Failed to register directory", e); + } + }); + } + + @Override + public CompletableFuture createSessionContext(long globalRuntimeEnvId) { + return CompletableFuture.supplyAsync(() -> { + try { + long sessionId = sessionIdGenerator.incrementAndGet(); + logger.debug("Creating session context with ID: {} for runtime: {}", sessionId, globalRuntimeEnvId); + + // Default configuration + String[] configKeys = { "batch_size", "target_partitions" }; + String[] configValues = { "1024", "4" }; + + // Create native session context + long nativeContextPtr = nativeCreateSessionContext(configKeys, configValues); + sessionContexts.put(sessionId, nativeContextPtr); + + logger.info("Created session context with ID: {}", sessionId); + return sessionId; + } catch (Exception e) { + logger.error("Failed to create session context for runtime: " + globalRuntimeEnvId, e); + throw new CompletionException("Failed to create session context", e); + } + }); + } + + @Override + public CompletableFuture executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes) { + return CompletableFuture.supplyAsync(() -> { + try { + logger.debug("Executing Substrait query for session: {}", sessionContextId); + + Long nativeContextPtr = sessionContexts.get(sessionContextId); + if (nativeContextPtr == null) { + throw new IllegalArgumentException("Invalid session context ID: " + sessionContextId); + } + + // Execute query and get native stream pointer + long nativeStreamPtr = nativeExecuteSubstraitQuery(nativeContextPtr, substraitPlanBytes); + + // Create Java wrapper for the native stream + RecordBatchStream stream = new CsvRecordBatchStream(nativeStreamPtr); + + logger.info("Successfully executed Substrait query for session: {}", sessionContextId); + return stream; + } catch (Exception e) { + logger.error("Failed to execute Substrait query for session: " + sessionContextId, e); + throw new CompletionException("Failed to execute Substrait query", e); + } + }); + } + + @Override + public CompletableFuture closeSessionContext(long sessionContextId) { + return CompletableFuture.supplyAsync(() -> { + try { + logger.debug("Closing session context: {}", sessionContextId); + + Long nativeContextPtr = sessionContexts.remove(sessionContextId); + if (nativeContextPtr != null) { + nativeCloseSessionContext(nativeContextPtr); + logger.info("Successfully closed session context: {}", sessionContextId); + } else { + logger.warn("Session context not found: {}", sessionContextId); + } + + return null; + } catch (Exception e) { + logger.error("Failed to close session context: " + sessionContextId, e); + throw new CompletionException("Failed to close session context", e); + } + }); + } + + // Native method declarations - these will be implemented in the JNI library + private static native void nativeRegisterDirectory(String tableName, String directoryPath, String[] files, long runtimeId); + + private static native long nativeCreateSessionContext(String[] configKeys, String[] configValues); + + private static native long nativeExecuteSubstraitQuery(long sessionContextPtr, byte[] substraitPlan); + + private static native void nativeCloseSessionContext(long sessionContextPtr); +} diff --git a/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvRecordBatchStream.java b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvRecordBatchStream.java new file mode 100644 index 0000000000000..56738a87cbddf --- /dev/null +++ b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/CsvRecordBatchStream.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.csv; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.vectorized.execution.spi.RecordBatchStream; + +import java.util.concurrent.CompletableFuture; + +/** + * TODO : this need not be here - nothing specific to CSV - move to LIB ? + * Native implementation of RecordBatchStream that wraps a JNI stream pointer. + * This class provides a Java interface over native DataFusion record batches. + */ +public class CsvRecordBatchStream implements RecordBatchStream { + + private static final Logger logger = LogManager.getLogger(CsvRecordBatchStream.class); + + private final long nativeStreamPtr; + private volatile boolean closed = false; + private volatile boolean hasNextCached = false; + private volatile boolean hasNextValue = false; + + /** + * Creates a new CsvRecordBatchStream wrapping the given native stream pointer. + * + * @param nativeStreamPtr Pointer to the native DataFusion RecordBatch stream + */ + public CsvRecordBatchStream(long nativeStreamPtr) { + if (nativeStreamPtr == 0) { + throw new IllegalArgumentException("Invalid native stream pointer"); + } + this.nativeStreamPtr = nativeStreamPtr; + logger.debug("Created CsvRecordBatchStream with pointer: {}", nativeStreamPtr); + } + + @Override + public Object getSchema() { + return "CsvSchema"; // Placeholder + } + + @Override + public CompletableFuture next() { + // PlaceholderImpl + return CompletableFuture.supplyAsync(() -> { + if (closed) { + return null; + } + + try { + // Get the next batch from native code + String batch = nativeNextBatch(nativeStreamPtr); + + // Reset cached hasNext value since we consumed a batch + hasNextCached = false; + + logger.trace("Retrieved next batch from stream pointer: {}", nativeStreamPtr); + return batch; + } catch (Exception e) { + logger.error("Error getting next batch from stream", e); + return null; + } + }); + } + + @Override + public boolean hasNext() { + // Placeholder impl + if (closed) { + return false; + } + + if (hasNextCached) { + return hasNextValue; + } + + try { + // Check if there's a next batch available + // This is a simplified implementation - in practice, you might want to + // peek at the stream without consuming the batch + String nextBatch = nativeNextBatch(nativeStreamPtr); + hasNextValue = (nextBatch != null); + hasNextCached = true; + + logger.trace("hasNext() = {} for stream pointer: {}", hasNextValue, nativeStreamPtr); + return hasNextValue; + } catch (Exception e) { + logger.error("Error checking for next batch in stream", e); + return false; + } + } + + @Override + public void close() { + if (!closed) { + logger.debug("Closing CsvRecordBatchStream with pointer: {}", nativeStreamPtr); + try { + nativeCloseStream(nativeStreamPtr); + closed = true; + logger.debug("Successfully closed CsvRecordBatchStream"); + } catch (Exception e) { + logger.error("Error closing CsvRecordBatchStream", e); + throw e; + } + } + } + + // Native method declarations + private static native String nativeNextBatch(long streamPtr); + + private static native void nativeCloseStream(long streamPtr); +} diff --git a/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/JniLibraryLoader.java b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/JniLibraryLoader.java new file mode 100644 index 0000000000000..6f3e68baa10d1 --- /dev/null +++ b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/JniLibraryLoader.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.csv; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.PosixFilePermission; +import java.util.Locale; +import java.util.Set; + +/** + * Utility class for loading the data source JNI library. + */ +public class JniLibraryLoader { + + private static final Logger logger = LogManager.getLogger(JniLibraryLoader.class); + private static volatile boolean libraryLoaded = false; + + private static final String LIBRARY_NAME = "opensearch_datafusion_csv_jni"; + + /** + * Private constructor to prevent instantiation of utility class. + */ + private JniLibraryLoader() { + // Utility class + } + + /** + * Loads the DataFusion JNI library. This method is thread-safe and will only + * load the library once. + */ + public static synchronized void loadLibrary() { + if (libraryLoaded) { + return; + } + + try { + // First try to load from system library path + System.loadLibrary(LIBRARY_NAME); + logger.info("Loaded DataFusion JNI library from system path"); + libraryLoaded = true; + return; + } catch (UnsatisfiedLinkError e) { + logger.debug("Could not load library from system path, trying to extract from JAR", e); + } + + // Try to extract and load from JAR resources + String libraryPath = extractLibraryFromJar(); + if (libraryPath != null) { + try { + System.load(libraryPath); + logger.info("Loaded DataFusion JNI library from extracted path: {}", libraryPath); + libraryLoaded = true; + return; + } catch (UnsatisfiedLinkError e) { + logger.error("Failed to load extracted library from: " + libraryPath, e); + } + } + + throw new RuntimeException("Failed to load DataFusion JNI library"); + } + + /** + * Extracts the platform-specific JNI library from JAR resources to a temporary file. + * + * @return Path to the extracted library file, or null if extraction failed + */ + private static String extractLibraryFromJar() { + String osName = System.getProperty("os.name").toLowerCase(Locale.ROOT); + String osArch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); + + logger.debug("Detecting platform: OS={}, Arch={}", osName, osArch); + + String libraryFileName = getLibraryFileName(osName); + if (libraryFileName == null) { + logger.error("Unsupported platform: {}", osName); + return null; + } + + String resourcePath = "/" + libraryFileName; + logger.debug("Looking for library resource: {}", resourcePath); + + try (InputStream inputStream = JniLibraryLoader.class.getResourceAsStream(resourcePath)) { + if (inputStream == null) { + logger.error("Library resource not found: {}", resourcePath); + return null; + } + + // Create temporary file in system temp directory + Path tempDir = Files.createTempDirectory(Path.of(System.getProperty("java.io.tmpdir")), "datafusion-jni"); + Path tempLibrary = tempDir.resolve(libraryFileName); + + // Extract library to temporary file + Files.copy(inputStream, tempLibrary, StandardCopyOption.REPLACE_EXISTING); + + // Make executable on Unix-like systems using NIO + if (!osName.contains("windows")) { + Set permissions = Files.getPosixFilePermissions(tempLibrary); + permissions.add(PosixFilePermission.OWNER_EXECUTE); + permissions.add(PosixFilePermission.GROUP_EXECUTE); + permissions.add(PosixFilePermission.OTHERS_EXECUTE); + Files.setPosixFilePermissions(tempLibrary, permissions); + } + + // Register for cleanup on JVM shutdown using NIO + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + Files.deleteIfExists(tempLibrary); + Files.deleteIfExists(tempDir); + } catch (IOException e) { + logger.debug("Failed to cleanup temporary files", e); + } + })); + + String libraryPath = tempLibrary.toAbsolutePath().toString(); + logger.debug("Extracted library to: {}", libraryPath); + return libraryPath; + + } catch (IOException e) { + logger.error("Failed to extract library from JAR", e); + return null; + } + } + + /** + * Gets the platform-specific library file name. + * + * @param osName Operating system name + * @return Library file name, or null if platform is unsupported + */ + private static String getLibraryFileName(String osName) { + String prefix; + String extension; + + if (osName.contains("windows")) { + prefix = ""; + extension = ".dll"; + } else if (osName.contains("mac") || osName.contains("darwin")) { + prefix = "lib"; + extension = ".dylib"; + } else if (osName.contains("linux") || osName.contains("unix")) { + prefix = "lib"; + extension = ".so"; + } else { + return null; + } + + return prefix + LIBRARY_NAME + extension; + + } + + /** + * Checks if the JNI library has been loaded. + * + * @return true if the library is loaded, false otherwise + */ + public static boolean isLibraryLoaded() { + return libraryLoaded; + } +} diff --git a/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/package-info.java b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/package-info.java new file mode 100644 index 0000000000000..35fd564c68e51 --- /dev/null +++ b/plugins/dataformat-csv/src/main/java/org/opensearch/datafusion/csv/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * CSV data format implementation for DataFusion integration. + * Provides CSV file reading capabilities through DataFusion query engine. + */ +package org.opensearch.datafusion.csv; diff --git a/plugins/dataformat-csv/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec b/plugins/dataformat-csv/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec new file mode 100644 index 0000000000000..452b39dc4abf7 --- /dev/null +++ b/plugins/dataformat-csv/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec @@ -0,0 +1 @@ +org.opensearch.datafusion.csv.CsvDataSourceCodec diff --git a/plugins/dataformat-csv/src/main/resources/plugin-descriptor.properties b/plugins/dataformat-csv/src/main/resources/plugin-descriptor.properties new file mode 100644 index 0000000000000..713d226cce94a --- /dev/null +++ b/plugins/dataformat-csv/src/main/resources/plugin-descriptor.properties @@ -0,0 +1,7 @@ +# Plugin descriptor for CSV data format plugin +description=CSV data format plugin for OpenSearch DataFusion +version=${project.version} +name=dataformat-csv +classname=org.opensearch.datafusion.csv.CsvDataFormatPlugin +java.version=${versions.java} +opensearch.version=${opensearch_version} diff --git a/plugins/dataformat-csv/src/test/java/org/opensearch/datafusion/csv/CsvDataFormatPluginTests.java b/plugins/dataformat-csv/src/test/java/org/opensearch/datafusion/csv/CsvDataFormatPluginTests.java new file mode 100644 index 0000000000000..27ea2251e66b6 --- /dev/null +++ b/plugins/dataformat-csv/src/test/java/org/opensearch/datafusion/csv/CsvDataFormatPluginTests.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.csv; + +import org.opensearch.test.OpenSearchTestCase; + +/** + * Tests for the CSV data format plugin. + */ +public class CsvDataFormatPluginTests extends OpenSearchTestCase { + + /** + * Test that the plugin can be instantiated. + */ + public void testPluginInstantiation() { + CsvDataFormatPlugin plugin = new CsvDataFormatPlugin(); + assertNotNull("Plugin should not be null", plugin); + } +} diff --git a/plugins/engine-datafusion/.gitignore b/plugins/engine-datafusion/.gitignore new file mode 100644 index 0000000000000..8e535981ee076 --- /dev/null +++ b/plugins/engine-datafusion/.gitignore @@ -0,0 +1,38 @@ +# Gradle +.gradle/ +build/ + +# Java +*.class +*.jar +*.war +*.ear +hs_err_pid* + +# IDE +.idea/ +*.iml +*.ipr +*.iws +.vscode/ +.settings/ +.project +.classpath + +# OS +.DS_Store +Thumbs.db + +# Rust +jni/target/ +jni/Cargo.lock + +# Native libraries +src/main/resources/native/ + +# Logs +*.log + +# Temporary files +*.tmp +*.temp diff --git a/plugins/engine-datafusion/build.gradle b/plugins/engine-datafusion/build.gradle new file mode 100644 index 0000000000000..1b4c41371af14 --- /dev/null +++ b/plugins/engine-datafusion/build.gradle @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'opensearch.internal-cluster-test' +apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.pluginzip' + +def pluginName = 'engine-datafusion' +def pluginDescription = 'OpenSearch plugin providing access to DataFusion via JNI' +def projectPath = 'org.opensearch' +def pathToPlugin = 'datafusion.DataFusionPlugin' +def pluginClassName = 'DataFusionPlugin' + +opensearchplugin { + name = pluginName + description = pluginDescription + classname = "${projectPath}.${pathToPlugin}" + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') +} + +dependencies { + api project(':libs:opensearch-vectorized-exec-spi') + implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}" + implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" + testImplementation "junit:junit:${versions.junit}" + testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}" + testImplementation "org.mockito:mockito-core:${versions.mockito}" + + // Add CSV plugin for testing + // testImplementation project(':plugins:dataformat-csv') +} + +// Task to build the Rust JNI library +task buildRustLibrary(type: Exec) { + description = 'Build the Rust JNI library using Cargo' + group = 'build' + + workingDir file('jni') + + // Determine the target directory and library name based on OS + def osName = System.getProperty('os.name').toLowerCase() + def libPrefix = osName.contains('windows') ? '' : 'lib' + def libExtension = osName.contains('windows') ? '.dll' : (osName.contains('mac') ? '.dylib' : '.so') + + // Use debug build for development, release for production + def buildType = project.hasProperty('rustRelease') ? 'release' : 'debug' + def targetDir = "target/${buildType}" + + // Find cargo executable - try common locations + def cargoExecutable = 'cargo' + def possibleCargoPaths = [ + System.getenv('HOME') + '/.cargo/bin/cargo', + '/usr/local/bin/cargo', + 'cargo' + ] + + for (String path : possibleCargoPaths) { + if (new File(path).exists()) { + cargoExecutable = path + break + } + } + + def cargoArgs = [cargoExecutable, 'build'] + if (buildType == 'release') { + cargoArgs.add('--release') + } + + if (osName.contains('windows')) { + commandLine cargoArgs + } else { + commandLine cargoArgs + } + + // Set environment variables for cross-compilation if needed + environment 'CARGO_TARGET_DIR', file('jni/target').absolutePath + + inputs.files fileTree('jni/src') + inputs.file 'jni/Cargo.toml' + outputs.files file("jni/${targetDir}/${libPrefix}opensearch_datafusion_jni${libExtension}") + System.out.println("Building Rust library in ${buildType} mode"); +} + +// Task to copy the native library to resources +task copyNativeLibrary(type: Copy, dependsOn: buildRustLibrary) { + description = 'Copy the native library to Java resources' + group = 'build' + + def osName = System.getProperty('os.name').toLowerCase() + def libPrefix = osName.contains('windows') ? '' : 'lib' + def libExtension = osName.contains('windows') ? '.dll' : (osName.contains('mac') ? '.dylib' : '.so') + def buildType = project.hasProperty('rustRelease') ? 'release' : 'debug' + + from file("jni/target/${buildType}/${libPrefix}opensearch_datafusion_jni${libExtension}") + into file('src/main/resources/native') + + // Rename to a standard name for Java to load + rename { filename -> + "libopensearch_datafusion_jni${libExtension}" + } +} + +// Ensure native library is built before Java compilation +compileJava.dependsOn copyNativeLibrary + +// Ensure processResources depends on copyNativeLibrary +processResources.dependsOn copyNativeLibrary +sourcesJar.dependsOn copyNativeLibrary + +// Ensure filepermissions task depends on copyNativeLibrary +tasks.named('filepermissions').configure { + dependsOn copyNativeLibrary +} + +// Clean task should also clean Rust artifacts +clean { + delete file('jni/target') + delete file('src/main/resources/native') +} + +test { + // Set system property to help tests find the native library + systemProperty 'java.library.path', file('src/main/resources/native').absolutePath +} + +yamlRestTest { + systemProperty 'tests.security.manager', 'false' +} diff --git a/plugins/engine-datafusion/jni/Cargo.toml b/plugins/engine-datafusion/jni/Cargo.toml new file mode 100644 index 0000000000000..75097bb55e70c --- /dev/null +++ b/plugins/engine-datafusion/jni/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "opensearch-datafusion-jni" +version = "0.1.0" +edition = "2021" + +[lib] +name = "opensearch_datafusion_jni" +crate-type = ["cdylib"] + +[dependencies] +# DataFusion dependencies +datafusion = "49.0.0" +datafusion-substrait = "49.0.0" +arrow = "55.2.0" +arrow-array = "55.2.0" +arrow-schema = "55.2.0" +arrow-buffer = "55.2.0" + +# JNI dependencies +jni = "0.21" + +# Async runtime +tokio = { version = "1.0", features = ["full"] } + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Error handling +anyhow = "1.0" +thiserror = "1.0" + +# Logging +log = "0.4" + +# Parquet support +parquet = "53.0.0" + +# Object store for file access +object_store = "0.11" +url = "2.0" + +# Substrait support +substrait = "0.47" +prost = "0.13" + +# Temporary directory support +tempfile = "3.0" + +[build-dependencies] +cbindgen = "0.27" diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs new file mode 100644 index 0000000000000..1e9981e9abae3 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +use jni::objects::JClass; +use jni::sys::{jlong, jstring}; +use jni::JNIEnv; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; + +use datafusion::DATAFUSION_VERSION; +use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileStatisticsCache}; +use datafusion::execution::disk_manager::DiskManagerConfig; +use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; +use datafusion::prelude::SessionConfig; + +/// Create a new DataFusion session context +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createContext( + _env: JNIEnv, + _class: JClass, +) -> jlong { + let config = SessionConfig::new().with_repartition_aggregations(true); + let context = SessionContext::new_with_config(config); + let ctx = Box::into_raw(Box::new(context)) as jlong; + ctx +} + +/// Close and cleanup a DataFusion context +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_closeContext( + _env: JNIEnv, + _class: JClass, + context_id: jlong, +) { + let _ = unsafe { Box::from_raw(context_id as *mut SessionContext) }; +} + +/// Get version information +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_getVersionInfo( + env: JNIEnv, + _class: JClass, +) -> jstring { + let version_info = format!(r#"{{"version": "{}", "codecs": ["CsvDataSourceCodec"]}}"#, DATAFUSION_VERSION); + env.new_string(version_info).expect("Couldn't create Java string").as_raw() +} + +/// Get version information (legacy method name) +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_getVersion( + env: JNIEnv, + _class: JClass, +) -> jstring { + env.new_string(DATAFUSION_VERSION).expect("Couldn't create Java string").as_raw() +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntime( + _env: JNIEnv, + _class: JClass, +) -> jlong { + let runtime_env = RuntimeEnvBuilder::default().build().unwrap(); + /** + // We can copy global runtime to local runtime - file statistics cache, and most of the things + // will be shared across session contexts. But list files cache will be specific to session + // context + + let fsCache = runtimeEnv.clone().cache_manager.get_file_statistic_cache().unwrap(); + let localCacheManagerConfig = CacheManagerConfig::default().with_files_statistics_cache(Option::from(fsCache)); + let localCacheManager = CacheManager::try_new(&localCacheManagerConfig); + let localRuntimeEnv = RuntimeEnvBuilder::new() + .with_cache_manager(localCacheManagerConfig) + .with_disk_manager(DiskManagerConfig::new_existing(runtimeEnv.disk_manager)) + .with_memory_pool(runtimeEnv.memory_pool) + .with_object_store_registry(runtimeEnv.object_store_registry) + .build(); + let config = SessionConfig::new().with_repartition_aggregations(true); + let context = SessionContext::new_with_config(config); + **/ + let ctx = Box::into_raw(Box::new(runtime_env)) as jlong; + ctx +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContext( + _env: JNIEnv, + _class: JClass, + runtime_id: jlong, +) -> jlong { + let runtimeEnv = unsafe { &mut *(runtime_id as *mut RuntimeEnv) }; + let config = SessionConfig::new().with_repartition_aggregations(true); + let context = SessionContext::new_with_config_rt(config, Arc::new(runtimeEnv.clone())); + let ctx = Box::into_raw(Box::new(context)) as jlong; + ctx +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_closeSessionContext( + _env: JNIEnv, + _class: JClass, + context_id: jlong, +) { + let _ = unsafe { Box::from_raw(context_id as *mut SessionContext) }; +} + + + + + diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java new file mode 100644 index 0000000000000..224075b9c2414 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.datafusion.action.DataFusionAction; +import org.opensearch.datafusion.action.NodesDataFusionInfoAction; +import org.opensearch.datafusion.action.TransportNodesDataFusionInfoAction; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.DataSourceAwarePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.Client; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Main plugin class for OpenSearch DataFusion integration. + * + */ +public class DataFusionPlugin extends Plugin implements ActionPlugin, DataSourceAwarePlugin { + + private DataFusionService dataFusionService; + private final boolean isDataFusionEnabled; + + /** + * Constructor for DataFusionPlugin. + * @param settings The settings for the DataFusionPlugin. + */ + public DataFusionPlugin(Settings settings) { + // For now, DataFusion is always enabled if the plugin is loaded + // In the future, this could be controlled by a feature flag + this.isDataFusionEnabled = true; + } + + /** + * Creates components for the DataFusion plugin. + * @param client The client instance. + * @param clusterService The cluster service instance. + * @param threadPool The thread pool instance. + * @param resourceWatcherService The resource watcher service instance. + * @param scriptService The script service instance. + * @param xContentRegistry The named XContent registry. + * @param environment The environment instance. + * @param nodeEnvironment The node environment instance. + * @param namedWriteableRegistry The named writeable registry. + * @param indexNameExpressionResolver The index name expression resolver instance. + * @param repositoriesServiceSupplier The supplier for the repositories service. + * @return Collection of created components + */ + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Map dataSourceCodecs + ) { + if (!isDataFusionEnabled) { + return Collections.emptyList(); + } + dataFusionService = new DataFusionService(dataSourceCodecs); + // return Collections.emptyList(); + return Collections.singletonList(dataFusionService); + } + + /** + * Gets the REST handlers for the DataFusion plugin. + * @param settings The settings for the plugin. + * @param restController The REST controller instance. + * @param clusterSettings The cluster settings instance. + * @param indexScopedSettings The index scoped settings instance. + * @param settingsFilter The settings filter instance. + * @param indexNameExpressionResolver The index name expression resolver instance. + * @param nodesInCluster The supplier for the discovery nodes. + * @return A list of REST handlers. + */ + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + if (!isDataFusionEnabled) { + return Collections.emptyList(); + } + return List.of(new DataFusionAction()); + } + + /** + * Gets the list of action handlers for the DataFusion plugin. + * @return A list of action handlers. + */ + @Override + public List> getActions() { + if (!isDataFusionEnabled) { + return Collections.emptyList(); + } + return List.of(new ActionHandler<>(NodesDataFusionInfoAction.INSTANCE, TransportNodesDataFusionInfoAction.class)); + } + + @Override + public void registerDataSources(Map dataSourceCodecs) { + dataFusionService = new DataFusionService(dataSourceCodecs); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java new file mode 100644 index 0000000000000..48578c987226d --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +/** + * JNI wrapper for DataFusion operations + */ +public class DataFusionQueryJNI { + + private static boolean libraryLoaded = false; + + static { + loadNativeLibrary(); + } + + /** + * Private constructor to prevent instantiation of utility class. + */ + private DataFusionQueryJNI() { + // Utility class + } + + /** + * Load the native library from resources + */ + private static synchronized void loadNativeLibrary() { + if (libraryLoaded) { + return; + } + + try { + // Try to load the library directly + System.loadLibrary("opensearch_datafusion_jni"); + libraryLoaded = true; + } catch (UnsatisfiedLinkError e) { + // Try loading from resources + try { + String osName = System.getProperty("os.name").toLowerCase(); + String libExtension = osName.contains("windows") ? ".dll" : (osName.contains("mac") ? ".dylib" : ".so"); + String libName = "libopensearch_datafusion_jni" + libExtension; + + java.io.InputStream is = DataFusionQueryJNI.class.getResourceAsStream("/native/" + libName); + if (is != null) { + java.io.File tempFile = java.io.File.createTempFile("libopensearch_datafusion_jni", libExtension); + tempFile.deleteOnExit(); + + try (java.io.FileOutputStream fos = new java.io.FileOutputStream(tempFile)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + } + + System.load(tempFile.getAbsolutePath()); + libraryLoaded = true; + } else { + throw new RuntimeException("Native library not found: " + libName, e); + } + } catch (Exception ex) { + throw new RuntimeException("Failed to load native library", ex); + } + } + } + + /** + * Create a new global runtime environment + * @return runtime env pointer for subsequent operations + */ + public static native long createGlobalRuntime(); + + /** + * Closes global runtime environment + * @param pointer the runtime environment pointer to close + * @return status code + */ + public static native long closeGlobalRuntime(long pointer); + + /** + * Get version information + * @return JSON string with version information + */ + public static native String getVersionInfo(); + + /** + * Create a new DataFusion session context + * @param runtimeId the global runtime environment ID + * @return context ID for subsequent operations + */ + public static native long createSessionContext(long runtimeId); + + /** + * Close and cleanup a DataFusion context + * @param contextId the context ID to close + */ + public static native void closeSessionContext(long contextId); + + /** + * Execute a Substrait query plan + * @param contextId the session context ID + * @param substraitPlan the serialized Substrait query plan + * @return stream pointer for result iteration + */ + public static native long executeSubstraitQuery(long contextId, byte[] substraitPlan); + + /** + * Register a directory with CSV files + * @param contextId the session context ID + * @param tableName the table name to register + * @param directoryPath the directory path containing CSV files + * @param fileNames array of file names to register + * @return status code + */ + public static native int registerCsvDirectory(long contextId, String tableName, String directoryPath, String[] fileNames); + + /** + * Check if stream has more data + * @param streamPtr the stream pointer + * @return true if more data available + */ + public static native boolean streamHasNext(long streamPtr); + + /** + * Get next batch from stream + * @param streamPtr the stream pointer + * @return byte array containing the next batch, or null if no more data + */ + public static native byte[] streamNext(long streamPtr); + + /** + * Close and cleanup a result stream + * @param streamPtr the stream pointer to close + */ + public static native void closeStream(long streamPtr); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java new file mode 100644 index 0000000000000..099ae90d20599 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java @@ -0,0 +1,201 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ConcurrentMapLong; +import org.opensearch.datafusion.core.GlobalRuntimeEnv; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; +import org.opensearch.vectorized.execution.spi.RecordBatchStream; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Service for managing DataFusion contexts and operations - essentially like SearchService + */ +public class DataFusionService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(DataFusionService.class); + private final ConcurrentMapLong sessionEngines = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); + + private final DataSourceRegistry dataSourceRegistry; + private final GlobalRuntimeEnv globalRuntimeEnv; + + /** + * Creates a new DataFusion service instance. + */ + public DataFusionService(Map dataSourceCodecs) { + this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs); + + // to verify jni + String version = DataFusionQueryJNI.getVersionInfo(); + this.globalRuntimeEnv = new GlobalRuntimeEnv(); + } + + @Override + protected void doStart() { + logger.info("Starting DataFusion service"); + try { + // Initialize the data source registry + // Test that at least one data source is available + if (!dataSourceRegistry.hasCodecs()) { + logger.warn("No data sources available"); + } else { + logger.info( + "DataFusion service started successfully with {} data sources: {}", + dataSourceRegistry.getCodecNames().size(), + dataSourceRegistry.getCodecNames() + ); + + } + } catch (Exception e) { + logger.error("Failed to start DataFusion service", e); + throw new RuntimeException("Failed to initialize DataFusion service", e); + } + } + + @Override + protected void doStop() { + logger.info("Stopping DataFusion service"); + + // Close all session contexts + for (Long sessionId : sessionEngines.keySet()) { + try { + closeSessionContext(sessionId).get(); + } catch (Exception e) { + logger.warn("Error closing session context {}", sessionId, e); + } + } + sessionEngines.clear(); + globalRuntimeEnv.close(); + logger.info("DataFusion service stopped"); + } + + @Override + protected void doClose() { + doStop(); + } + + /** + * Register a directory with list of files to create a runtime environment + * with listing files cache of DataFusion + * + * @param directoryPath path to the directory containing files + * @param fileNames list of file names in the directory + * @return runtime environment ID + */ + public CompletableFuture registerDirectory(String directoryPath, List fileNames) { + DataSourceCodec engine = dataSourceRegistry.getDefaultEngine(); + if (engine == null) { + return CompletableFuture.failedFuture(new IllegalStateException("No DataFusion engine available")); + } + + logger.debug( + "Registering directory {} with {} files using engine {}", + directoryPath, + fileNames.size(), + engine.getClass().getSimpleName() + ); + + return engine.registerDirectory(directoryPath, fileNames, globalRuntimeEnv.getPointer()); + } + + /** + * Create a session context + * + * @return session context ID + */ + public CompletableFuture createSessionContext() { + long runtimeEnvironmentId = globalRuntimeEnv.getPointer(); + DataSourceCodec codec = dataSourceRegistry.getDefaultEngine(); + if (codec == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("Runtime environment not found: " + runtimeEnvironmentId)); + } + + logger.debug( + "Creating session context for runtime environment {} using engine {}", + runtimeEnvironmentId, + codec.getClass().getSimpleName() + ); + + return codec.createSessionContext(runtimeEnvironmentId).thenApply(sessionId -> { + // Track which engine created this session context + sessionEngines.put(sessionId, codec); + logger.debug("Created session context {} with engine {}", sessionId, codec.getClass().getSimpleName()); + return sessionId; + }); + } + + /** + * Execute a query accepting substrait plan bytes and run via session context + * + * @param sessionContextId the session context ID + * @param substraitPlanBytes the substrait plan as byte array + * @return record batch stream containing query results + */ + public CompletableFuture executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes) { + DataSourceCodec engine = sessionEngines.get(sessionContextId); + if (engine == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("Session context not found: " + sessionContextId)); + } + + logger.debug( + "Executing substrait query for session {} with plan size {} bytes using engine {}", + sessionContextId, + substraitPlanBytes.length, + engine.getClass().getSimpleName() + ); + + return engine.executeSubstraitQuery(sessionContextId, substraitPlanBytes); + } + + /** + * Close the session context and clean up resources + * + * @param sessionContextId the session context ID to close + * @return future that completes when cleanup is done + */ + public CompletableFuture closeSessionContext(long sessionContextId) { + DataSourceCodec engine = sessionEngines.remove(sessionContextId); + if (engine == null) { + logger.debug("Session context {} not found or already closed", sessionContextId); + return CompletableFuture.completedFuture(null); + } + + logger.debug("Closing session context {} using engine {}", sessionContextId, engine.getClass().getSimpleName()); + + return engine.closeSessionContext(sessionContextId); + } + + /** + * Get version information from available codecs + * @return JSON version string + */ + public String getVersion() { + StringBuilder version = new StringBuilder(); + version.append("{\"codecs\":["); + + boolean first = true; + for (String engineName : this.dataSourceRegistry.getCodecNames()) { + if (!first) { + version.append(","); + } + version.append("{\"name\":\"").append(engineName).append("\"}"); + first = false; + } + + version.append("]}"); + return version.toString(); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java new file mode 100644 index 0000000000000..9229b861ceef3 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataSourceRegistry.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for DataFusion data source codecs. + */ +public class DataSourceRegistry { + + private static final Logger logger = LogManager.getLogger(DataSourceRegistry.class); + + private final ConcurrentHashMap codecs = new ConcurrentHashMap<>(); + + public DataSourceRegistry(Map dataSourceCodecMap) { + codecs.putAll(dataSourceCodecMap); + } + + /** + * Check if any codecs are available. + * + * @return true if codecs are available, false otherwise + */ + public boolean hasCodecs() { + return !codecs.isEmpty(); + } + + /** + * Get the names of all registered codecs. + * + * @return list of codec names + */ + public List getCodecNames() { + return new ArrayList<>(codecs.keySet()); + } + + /** + * Get the default codec (first available codec). + * + * @return the default codec, or null if none available + */ + public DataSourceCodec getDefaultEngine() { + if (codecs.isEmpty()) { + return null; + } + return codecs.values().iterator().next(); + } + + /** + * Get a codec by name. + * + * @param name the codec name + * @return the codec, or null if not found + */ + public DataSourceCodec getCodec(String name) { + return codecs.get(name); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/DataFusionAction.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/DataFusionAction.java new file mode 100644 index 0000000000000..99695d2c96266 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/DataFusionAction.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.transport.client.node.NodeClient; + +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * REST handler for DataFusion information operations. + * It handles GET requests for retrieving DataFusion server information. + */ +public class DataFusionAction extends BaseRestHandler { + + /** + * Constructor for DataFusionRestHandler. + */ + public DataFusionAction() {} + + /** + * Returns the name of the action. + * @return The name of the action. + */ + @Override + public String getName() { + return "datafusion_info_action"; + } + + /** + * Returns the list of routes for the action. + * @return The list of routes for the action. + */ + @Override + public List routes() { + return List.of(new Route(GET, "/_plugins/datafusion/info"), new Route(GET, "/_plugins/datafusion/info/{nodeId}")); + } + + /** + * Prepares the request for the action. + * @param request The REST request. + * @param client The node client. + * @return The rest channel consumer. + */ + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String nodeId = request.param("nodeId"); + if (nodeId != null) { + // Query specific node + NodesDataFusionInfoRequest nodesRequest = new NodesDataFusionInfoRequest(nodeId); + return channel -> client.execute(NodesDataFusionInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel)); + } else { + NodesDataFusionInfoRequest nodesRequest = new NodesDataFusionInfoRequest(); + return channel -> client.execute(NodesDataFusionInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel)); + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodeDataFusionInfo.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodeDataFusionInfo.java new file mode 100644 index 0000000000000..5512110c576da --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodeDataFusionInfo.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Information about DataFusion on a specific node + */ +public class NodeDataFusionInfo extends BaseNodeResponse implements ToXContentFragment { + + private final String dataFusionVersion; + + /** + * Constructor for NodeDataFusionInfo. + * @param node The discovery node. + * @param dataFusionVersion The DataFusion version. + */ + public NodeDataFusionInfo(DiscoveryNode node, String dataFusionVersion) { + super(node); + this.dataFusionVersion = dataFusionVersion; + } + + /** + * Constructor for NodeDataFusionInfo from stream input. + * @param in The stream input. + * @throws IOException If an I/O error occurs. + */ + public NodeDataFusionInfo(StreamInput in) throws IOException { + super(in); + this.dataFusionVersion = in.readString(); + } + + /** + * Writes the node info to the stream output. + * @param out The stream output. + * @throws IOException If an I/O error occurs. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(dataFusionVersion); + } + + /** + * Converts the node info to XContent. + * @param builder The XContent builder. + * @param params The parameters. + * @return The XContent builder. + * @throws IOException If an I/O error occurs. + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("data_fusion_info"); + builder.field("datafusion_version", dataFusionVersion); + builder.endObject(); + builder.endObject(); + return builder; + } + + /** + * Gets the DataFusion version. + * @return The DataFusion version. + */ + public String getDataFusionVersion() { + return dataFusionVersion; + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoAction.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoAction.java new file mode 100644 index 0000000000000..198c7973e6a9c --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoAction.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.action.ActionType; + +/** + * Action to retrieve DataFusion info from nodes + */ +public class NodesDataFusionInfoAction extends ActionType { + /** + * Singleton instance of NodesDataFusionInfoAction. + */ + public static final NodesDataFusionInfoAction INSTANCE = new NodesDataFusionInfoAction(); + /** + * Name of this action. + */ + public static final String NAME = "cluster:admin/datafusion/info"; + + NodesDataFusionInfoAction() { + super(NAME, NodesDataFusionInfoResponse::new); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoRequest.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoRequest.java new file mode 100644 index 0000000000000..4e32bb3b0f18c --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoRequest.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request for retrieving DataFusion information from nodes + */ +public class NodesDataFusionInfoRequest extends BaseNodesRequest { + + /** + * Default constructor for NodesDataFusionInfoRequest. + */ + public NodesDataFusionInfoRequest() { + super((String[]) null); + } + + /** + * Constructor for NodesDataFusionInfoRequest with specific node IDs. + * @param nodeIds The node IDs to query. + */ + public NodesDataFusionInfoRequest(String... nodeIds) { + super(nodeIds); + } + + /** + * Constructor for NodesDataFusionInfoRequest from stream input. + * @param in The stream input. + * @throws IOException If an I/O error occurs. + */ + public NodesDataFusionInfoRequest(StreamInput in) throws IOException { + super(in); + } + + /** + * Writes the request to the stream output. + * @param out The stream output. + * @throws IOException If an I/O error occurs. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + /** + * Node-level request for DataFusion information + */ + public static class NodeDataFusionInfoRequest extends org.opensearch.transport.TransportRequest { + + /** + * Default constructor for NodeDataFusionInfoRequest. + */ + public NodeDataFusionInfoRequest() {} + + /** + * Constructor for NodeDataFusionInfoRequest from stream input. + * @param in The stream input. + * @throws IOException If an I/O error occurs. + */ + public NodeDataFusionInfoRequest(StreamInput in) throws IOException { + super(in); + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoResponse.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoResponse.java new file mode 100644 index 0000000000000..fca186749cde6 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/NodesDataFusionInfoResponse.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Response containing DataFusion information from multiple nodes + */ +public class NodesDataFusionInfoResponse extends BaseNodesResponse implements ToXContentObject { + + /** + * Constructor for NodesDataFusionInfoResponse. + * @param clusterName The cluster name. + * @param nodes The list of node DataFusion info. + * @param failures The list of failed node exceptions. + */ + public NodesDataFusionInfoResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeDataFusionInfo::new); + } + + /** + * Constructor for NodesDataFusionInfoResponse from stream input. + * @param in The stream input. + * @throws IOException If an I/O error occurs. + */ + public NodesDataFusionInfoResponse(StreamInput in) throws IOException { + super(in); + } + + /** + * Writes the node response to stream output. + * @param out The stream output. + * @throws IOException If an I/O error occurs. + */ + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + /** + * Converts the response to XContent. + * @param builder The XContent builder. + * @param params The parameters. + * @return The XContent builder. + * @throws IOException If an I/O error occurs. + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("nodes"); + for (NodeDataFusionInfo nodeInfo : getNodes()) { + builder.field(nodeInfo.getNode().getId()); + // builder.field("name", nodeInfo.getNode().getName()); + // builder.field("transport_address", nodeInfo.getNode().getAddress().toString()); + nodeInfo.toXContent(builder, params); + } + builder.endObject(); + + if (!failures().isEmpty()) { + builder.startArray("failures"); + for (FailedNodeException failure : failures()) { + builder.startObject(); + builder.field("node_id", failure.nodeId()); + builder.field("reason", failure.getMessage()); + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + return builder; + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/TransportNodesDataFusionInfoAction.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/TransportNodesDataFusionInfoAction.java new file mode 100644 index 0000000000000..8a659f29230d6 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/TransportNodesDataFusionInfoAction.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.action; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.datafusion.DataFusionService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action for retrieving DataFusion information from nodes + */ +public class TransportNodesDataFusionInfoAction extends TransportNodesAction< + NodesDataFusionInfoRequest, + NodesDataFusionInfoResponse, + NodesDataFusionInfoRequest.NodeDataFusionInfoRequest, + NodeDataFusionInfo> { + + private final DataFusionService dataFusionService; + + /** + * Constructor for TransportNodesDataFusionInfoAction. + * @param threadPool The thread pool. + * @param clusterService The cluster service. + * @param transportService The transport service. + * @param actionFilters The action filters. + * @param dataFusionService The DataFusion service. + */ + @Inject + public TransportNodesDataFusionInfoAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + DataFusionService dataFusionService + ) { + super( + NodesDataFusionInfoAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + NodesDataFusionInfoRequest::new, + NodesDataFusionInfoRequest.NodeDataFusionInfoRequest::new, + ThreadPool.Names.MANAGEMENT, + NodeDataFusionInfo.class + ); + this.dataFusionService = dataFusionService; + } + + /** + * Creates a new nodes response. + * @param request The nodes request. + * @param responses The list of node responses. + * @param failures The list of failed node exceptions. + * @return The nodes response. + */ + @Override + protected NodesDataFusionInfoResponse newResponse( + NodesDataFusionInfoRequest request, + List responses, + List failures + ) { + return new NodesDataFusionInfoResponse(clusterService.getClusterName(), responses, failures); + } + + /** + * Creates a new node request. + * @param request The nodes request. + * @return The node request. + */ + @Override + protected NodesDataFusionInfoRequest.NodeDataFusionInfoRequest newNodeRequest(NodesDataFusionInfoRequest request) { + return new NodesDataFusionInfoRequest.NodeDataFusionInfoRequest(); + } + + @Override + protected NodeDataFusionInfo newNodeResponse(StreamInput in) throws IOException { + return new NodeDataFusionInfo(in); + } + + /** + * Handles the node request and returns the node response. + * @param request The node request. + * @return The node response. + */ + @Override + protected NodeDataFusionInfo nodeOperation(NodesDataFusionInfoRequest.NodeDataFusionInfoRequest request) { + try { + System.out.println(this.dataFusionService.getVersion()); + return new NodeDataFusionInfo(clusterService.localNode(), dataFusionService.getVersion()); + } catch (Exception e) { + return new NodeDataFusionInfo(clusterService.localNode(), "unknown"); + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/package-info.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/package-info.java new file mode 100644 index 0000000000000..d3542f4dfe9dc --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/action/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * REST actions and transport handlers for DataFusion plugin. + * Provides API endpoints for DataFusion functionality. + */ +package org.opensearch.datafusion.action; diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java new file mode 100644 index 0000000000000..1867028fcb945 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.core; + +import static org.opensearch.datafusion.DataFusionQueryJNI.closeGlobalRuntime; +import static org.opensearch.datafusion.DataFusionQueryJNI.createGlobalRuntime; + +/** + * Global runtime environment for DataFusion operations. + * Manages the lifecycle of the native DataFusion runtime. + */ +public class GlobalRuntimeEnv implements AutoCloseable { + // ptr to runtime environment in df + private final long ptr; + + /** + * Creates a new global runtime environment. + */ + public GlobalRuntimeEnv() { + this.ptr = createGlobalRuntime(); + } + + /** + * Gets the native pointer to the runtime environment. + * @return the native pointer + */ + public long getPointer() { + return ptr; + } + + @Override + public void close() { + closeGlobalRuntime(this.ptr); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/SessionContext.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/SessionContext.java new file mode 100644 index 0000000000000..956aa78fdaa30 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/SessionContext.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.core; + +/** + * Session context for datafusion + */ +public class SessionContext implements AutoCloseable { + + // ptr to context in df + private final long ptr; + + /** + * Create a new DataFusion session context + * @return context ID for subsequent operations + */ + static native long createContext(); + + /** + * Close and cleanup a DataFusion context + * @param contextId the context ID to close + */ + public static native void closeContext(long contextId); + + /** + * Creates a new session context. + */ + public SessionContext() { + this.ptr = createContext(); + } + + @Override + public void close() throws Exception { + closeContext(this.ptr); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/package-info.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/package-info.java new file mode 100644 index 0000000000000..2c6e72ef3a582 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Core DataFusion runtime and session management classes. + * Provides runtime environment and session context management. + */ +package org.opensearch.datafusion.core; diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/package-info.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/package-info.java new file mode 100644 index 0000000000000..81017da49c16c --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * DataFusion query engine integration for OpenSearch. + * Provides the main plugin and service classes for DataFusion functionality. + */ +package org.opensearch.datafusion; diff --git a/plugins/engine-datafusion/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec b/plugins/engine-datafusion/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec new file mode 100644 index 0000000000000..9b1ec055f7ea2 --- /dev/null +++ b/plugins/engine-datafusion/src/main/resources/META-INF/services/org.opensearch.vectorized.execution.spi.DataSourceCodec @@ -0,0 +1,5 @@ +# DataFusion Engine implementations +# Add your custom implementations here, e.g.: +# com.example.CustomCsvDataFusionEngine + +# Note: Built-in csv engine is now in separate library diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java new file mode 100644 index 0000000000000..395e2fae52e2f --- /dev/null +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for DataFusionService + * + * Note: These tests require the native library to be available. + * They are disabled by default and can be enabled by setting the system property: + * -Dtest.native.enabled=true + */ +public class TestDataFusionServiceTests extends OpenSearchTestCase { + + private DataFusionService service; + + @Override + public void setUp() throws Exception { + super.setUp(); + service = new DataFusionService(Collections.emptyMap()); + service.doStart(); + } + + public void testGetVersion() { + String version = service.getVersion(); + assertNotNull(version); + // The service returns codec information in JSON format + assertTrue("Version should contain codecs", version.contains("codecs")); + assertTrue("Version should contain CsvDataSourceCodec", version.contains("CsvDataSourceCodec")); + } + + public void testCreateAndCloseContext() { + service.registerDirectory("/Users/gbh/Documents", List.of("parquet-nested.csv")); + long contextId = service.createSessionContext().join(); + // Create context + assertTrue(contextId > 0); + + service.getVersion(); + } + + public void testCodecDiscovery() { + // Test that the CSV codec can be discovered via SPI + // TODO : test with dummy plugin and dummy codec + } +} diff --git a/server/build.gradle b/server/build.gradle index 803d791295e71..100ff7be5b49b 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -71,6 +71,7 @@ dependencies { api project(":libs:opensearch-geo") api project(":libs:opensearch-telemetry") api project(":libs:opensearch-task-commons") + api project(':libs:opensearch-vectorized-exec-spi') compileOnly project(":libs:agent-sm:bootstrap") compileOnly project(':libs:opensearch-plugin-classloader') @@ -114,6 +115,7 @@ dependencies { api libs.protobuf api libs.jakartaannotation + // https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap api libs.roaringbitmap testImplementation 'org.awaitility:awaitility:4.3.0' diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b972457ee085a..1c2701e95a72e 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -210,6 +210,8 @@ import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.CryptoKeyProviderPlugin; import org.opensearch.plugins.CryptoPlugin; +import org.opensearch.plugins.DataSourceAwarePlugin; +import org.opensearch.plugins.DataSourcePlugin; import org.opensearch.plugins.DiscoveryPlugin; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.ExtensionAwarePlugin; @@ -285,6 +287,7 @@ import org.opensearch.transport.client.Client; import org.opensearch.transport.client.node.NodeClient; import org.opensearch.usage.UsageService; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.wlm.WorkloadGroupService; import org.opensearch.wlm.WorkloadGroupsStateAccessor; @@ -1093,10 +1096,38 @@ protected Node(final Environment initialEnvironment, Collection clas ).stream() ) .collect(Collectors.toList()); - // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); + Map dataSourceCodecMap = new HashMap<>(); + for (DataSourcePlugin dataSourcePlugin : pluginsService.filterPlugins(DataSourcePlugin.class)) { + if (dataSourcePlugin.getDataSourceCodecs().isPresent()) { + dataSourceCodecMap.putAll(dataSourcePlugin.getDataSourceCodecs().get()); + } + } + + Collection dataSourceAwareComponents = pluginsService.filterPlugins(DataSourceAwarePlugin.class) + .stream() + .flatMap( + p -> p.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get, + dataSourceCodecMap + ).stream() + ) + .collect(Collectors.toList()); + + // Add all dataSourceAwarePlugin components to the existing pluginComponents + pluginComponents.addAll(dataSourceAwareComponents); List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); diff --git a/server/src/main/java/org/opensearch/plugins/DataSourceAwarePlugin.java b/server/src/main/java/org/opensearch/plugins/DataSourceAwarePlugin.java new file mode 100644 index 0000000000000..1b2a4d0d05e52 --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/DataSourceAwarePlugin.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.Client; +import org.opensearch.vectorized.execution.spi.DataSourceCodec; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +public interface DataSourceAwarePlugin { + void registerDataSources(Map dataSourceCodecs); + + /** + * Make dataSourceCodecs available for the DataSourceAwarePlugin(s) + */ + default Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Map dataSourceCodecs + ) { + return Collections.emptyList(); + } +} diff --git a/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java b/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java new file mode 100644 index 0000000000000..3118e3d1e7d90 --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.vectorized.execution.spi.DataSourceCodec; + +import java.util.Map; +import java.util.Optional; + +public interface DataSourcePlugin { + // TODO : move to vectorized exec specific plugin + default Optional> getDataSourceCodecs() { + return Optional.empty(); + } +}