diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index 2e4b0450e6bb3..0e7d047d9c721 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -209,6 +209,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute let table_path = shard_view.table_path(); let files_meta = shard_view.files_meta(); + println!("Table path: {}", table_path); + println!("Files: {:?}", files_meta); let list_file_cache = Arc::new(DefaultListFilesCache::default()); list_file_cache.put(table_path.prefix(), files_meta); @@ -252,7 +254,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute // Create a new TableProvider let provider = Arc::new(ListingTable::try_new(config).unwrap()); let shard_id = table_path.prefix().filename().expect("error in fetching Path"); - ctx.register_table("logs", provider) + ctx.register_table("hits", provider) .expect("Failed to attach the Table"); }); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index dc7ef2b75ff94..d123f39b5a104 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -65,7 +65,7 @@ public class DatafusionEngine extends SearchExecEngine formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException { this.dataFormat = dataFormat; - this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot); + this.datafusionReaderManager = new DatafusionReaderManager("/Users/abandeji/Public/workplace/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/7xU89OS-Tn2_nO7CboVqMg/0/parquet", formatCatalogSnapshot); this.datafusionService = dataFusionService; } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java index 435d4a7b481b6..2c7baa68018e5 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java @@ -45,14 +45,14 @@ public class DatafusionReader implements Closeable { public DatafusionReader(String directoryPath, Collection files) { this.directoryPath = directoryPath; this.files = files; - String[] fileNames = Objects.isNull(files) ? new String[]{} : files.stream().map(FileMetadata::fileName).toArray(String[]::new); + String[] fileNames = Objects.isNull(files) ? new String[]{"hits_data.parquet"} : files.stream().map(FileMetadata::fileName).toArray(String[]::new); this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames); incRef(); } /** * Gets the cache pointer. - * + * * @return the cache pointer */ public long getCachePtr() { @@ -68,7 +68,7 @@ public void incRef() { /** * Decrements the reference count. - * + * * @throws IOException if an I/O error occurs */ public void decRef() throws IOException { diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java index c0ca934323b13..47547007a8e73 100644 --- a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java @@ -101,7 +101,7 @@ public void testQueryPhaseExecutor() throws IOException { Map finalRes = new HashMap<>(); DatafusionSearcher datafusionSearcher = null; try { - DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "generation-1-optimized.parquet")), service); + DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service); datafusionSearcher = engine.acquireSearcher("Search"); diff --git a/server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java index 3c30e40682c84..5f5efdf01d8eb 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java @@ -55,7 +55,7 @@ public CompositeIndexingExecutionEngine(PluginsService pluginsService, Any dataf public CompositeIndexingExecutionEngine(PluginsService pluginsService) { try { DataSourcePlugin plugin = pluginsService.filterPlugins(DataSourcePlugin.class).stream() - .findFirst() + .findAny() .orElseThrow(() -> new IllegalArgumentException("dataformat [" + DataFormat.TEXT + "] is not registered.")); delegates.add(plugin.indexingEngine()); } catch (NullPointerException e) { diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java index a6f02ad619cb4..6c2b40567f4d6 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java @@ -60,7 +60,9 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic for(org.opensearch.vectorized.execution.search.DataFormat dataFormat : searchEnginePlugin.getSupportedFormats()) { SearchExecEngine searchExecEngine = searchEnginePlugin.createEngine(dataFormat, catalogSnapshot.getSearchableFiles(dataFormat.toString())); - readEngines.getOrDefault(dataFormat, new ArrayList<>()).add(searchExecEngine); + List> readEngine = readEngines.getOrDefault(dataFormat, new ArrayList<>()); + readEngine.add(searchExecEngine); + readEngines.put(dataFormat, readEngine); // TODO : figure out how to do internal and external refresh listeners // Maybe external refresh should be managed in opensearch core and plugins should always give // internal refresh managers