diff --git a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java index e641eece7a..d2686484c7 100644 --- a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.openrewrite.internal.RecipeIntrospectionUtils.dataTableDescriptorFromDataTable; @@ -223,7 +224,6 @@ private Stream readRows(String dataTableName, @Nullable String group, @Nu } } - List allRows = new ArrayList<>(); //noinspection DataFlowIssue File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension)); if (files == null) { @@ -237,54 +237,103 @@ private Stream readRows(String dataTableName, @Nullable String group, @Nu activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension)); } - int prefixCount = prefixColumns.size(); - int suffixCount = suffixColumns.size(); - + // Eagerly select the files belonging to this table by reading only their small + // comment header, then parse their rows lazily so a whole table's rows are never + // materialized in memory at once. + List matchingFiles = new ArrayList<>(); for (File file : files) { if (activeWriterPaths.contains(file.toPath())) { continue; } try (InputStream is = inputStreamFactory.apply(file.toPath())) { DataTableDescriptor descriptor = readDescriptor(is); - if (descriptor == null || - !descriptor.getName().equals(dataTableName) || - !Objects.equals(descriptor.getGroup(), group)) { - continue; + if (descriptor != null && + descriptor.getName().equals(dataTableName) && + Objects.equals(descriptor.getGroup(), group)) { + matchingFiles.add(file.toPath()); } - // readDescriptor consumed comment lines; now parse the remaining CSV - // (header + data rows). Re-read the full file with CsvParser. } catch (IOException e) { throw new UncheckedIOException(e); } + } - try (InputStream is = inputStreamFactory.apply(file.toPath())) { - CsvParserSettings settings = new CsvParserSettings(); - settings.setMaxCharsPerColumn(-1); - settings.setHeaderExtractionEnabled(true); - settings.getFormat().setComment('#'); - CsvParser parser = new CsvParser(settings); - parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8)); - - String[] row; - while ((row = parser.parseNext()) != null) { - // Strip prefix and suffix columns, returning only data table columns - int dataCount = row.length - prefixCount - suffixCount; - String[] dataRow; - if (dataCount <= 0) { - dataRow = row; - } else { - dataRow = new String[dataCount]; - System.arraycopy(row, prefixCount, dataRow, 0, dataCount); - } - allRows.add(meta != null ? meta.toRow(dataRow) : dataRow); - } - parser.stopParsing(); - } catch (IOException e) { - // Skip unreadable files + int prefixCount = prefixColumns.size(); + int suffixCount = suffixColumns.size(); + + return matchingFiles.stream() + .flatMap(path -> (Stream) streamRows(path, meta, prefixCount, suffixCount)); + } + + /** + * Lazily parse a single CSV file row-by-row. The returned stream owns its input + * stream and parser; fully consuming the stream (or closing it) releases them. + * Rows are produced one at a time, so a whole table's rows are never held in + * memory at once. + */ + private Stream streamRows(Path path, @Nullable RowMetadata meta, int prefixCount, int suffixCount) { + // Open and set up parsing eagerly so that open/parse failures surface to the + // caller exactly as the previous eager implementation did -- it caught only + // java.io.IOException, while the stream factories and the Univocity parser throw + // unchecked exceptions, so those always propagated. Only a stream-*close* + // IOException was swallowed there, which the onClose handler below preserves. + InputStream is = inputStreamFactory.apply(path); + CsvParser parser; + try { + CsvParserSettings settings = new CsvParserSettings(); + settings.setMaxCharsPerColumn(-1); + settings.setHeaderExtractionEnabled(true); + settings.getFormat().setComment('#'); + parser = new CsvParser(settings); + parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8)); + } catch (RuntimeException e) { + // Avoid leaking the stream if parser setup fails before the lazy stream + // (and its onClose) is returned. + try { + is.close(); + } catch (IOException ignored) { + // best effort } + throw e; } - return (Stream) allRows.stream(); + Iterator rows = new Iterator() { + private String @Nullable [] next = parser.parseNext(); + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Object next() { + String[] row = next; + if (row == null) { + throw new NoSuchElementException(); + } + next = parser.parseNext(); + // Strip prefix and suffix columns, returning only data table columns + int dataCount = row.length - prefixCount - suffixCount; + String[] dataRow; + if (dataCount <= 0) { + dataRow = row; + } else { + dataRow = new String[dataCount]; + System.arraycopy(row, prefixCount, dataRow, 0, dataCount); + } + return meta != null ? meta.toRow(dataRow) : dataRow; + } + }; + + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(rows, Spliterator.ORDERED | Spliterator.NONNULL), false) + .onClose(() -> { + parser.stopParsing(); + try { + is.close(); + } catch (IOException ignored) { + // already closed by the parser at end-of-input + } + }); } @Override