Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 84 additions & 35 deletions rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.openrewrite.internal.RecipeIntrospectionUtils.dataTableDescriptorFromDataTable;

Expand Down Expand Up @@ -223,7 +224,6 @@ private <T> Stream<T> readRows(String dataTableName, @Nullable String group, @Nu
}
}

List<Object> allRows = new ArrayList<>();
//noinspection DataFlowIssue
File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension));
if (files == null) {
Expand All @@ -237,54 +237,103 @@ private <T> Stream<T> readRows(String dataTableName, @Nullable String group, @Nu
activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension));
}

int prefixCount = prefixColumns.size();
int suffixCount = suffixColumns.size();

// Eagerly select the files belonging to this table by reading only their small
// comment header, then parse their rows lazily so a whole table's rows are never
// materialized in memory at once.
List<Path> matchingFiles = new ArrayList<>();
for (File file : files) {
if (activeWriterPaths.contains(file.toPath())) {
continue;
}
try (InputStream is = inputStreamFactory.apply(file.toPath())) {
DataTableDescriptor descriptor = readDescriptor(is);
if (descriptor == null ||
!descriptor.getName().equals(dataTableName) ||
!Objects.equals(descriptor.getGroup(), group)) {
continue;
if (descriptor != null &&
descriptor.getName().equals(dataTableName) &&
Objects.equals(descriptor.getGroup(), group)) {
matchingFiles.add(file.toPath());
}
// readDescriptor consumed comment lines; now parse the remaining CSV
// (header + data rows). Re-read the full file with CsvParser.
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

try (InputStream is = inputStreamFactory.apply(file.toPath())) {
CsvParserSettings settings = new CsvParserSettings();
settings.setMaxCharsPerColumn(-1);
settings.setHeaderExtractionEnabled(true);
settings.getFormat().setComment('#');
CsvParser parser = new CsvParser(settings);
parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8));

String[] row;
while ((row = parser.parseNext()) != null) {
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
String[] dataRow;
if (dataCount <= 0) {
dataRow = row;
} else {
dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
}
allRows.add(meta != null ? meta.toRow(dataRow) : dataRow);
}
parser.stopParsing();
} catch (IOException e) {
// Skip unreadable files
int prefixCount = prefixColumns.size();
int suffixCount = suffixColumns.size();

return matchingFiles.stream()
.flatMap(path -> (Stream<T>) streamRows(path, meta, prefixCount, suffixCount));
}

/**
* Lazily parse a single CSV file row-by-row. The returned stream owns its input
* stream and parser; fully consuming the stream (or closing it) releases them.
* Rows are produced one at a time, so a whole table's rows are never held in
* memory at once.
*/
private Stream<Object> streamRows(Path path, @Nullable RowMetadata meta, int prefixCount, int suffixCount) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since CsvDataTableStore is used directly in the CLI you'll have to do a release which includes this before this will work at runtime. Recipe modules built with this running on un-updated CLI may encounter NoSuchMethodException here... unless you've verified that isn't true, you might want to guard invocation of this method with reflection until CLI with this method have been released for a little while

// Open and set up parsing eagerly so that open/parse failures surface to the
// caller exactly as the previous eager implementation did -- it caught only
// java.io.IOException, while the stream factories and the Univocity parser throw
// unchecked exceptions, so those always propagated. Only a stream-*close*
// IOException was swallowed there, which the onClose handler below preserves.
InputStream is = inputStreamFactory.apply(path);
CsvParser parser;
try {
CsvParserSettings settings = new CsvParserSettings();
settings.setMaxCharsPerColumn(-1);
settings.setHeaderExtractionEnabled(true);
settings.getFormat().setComment('#');
parser = new CsvParser(settings);
parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8));
} catch (RuntimeException e) {
// Avoid leaking the stream if parser setup fails before the lazy stream
// (and its onClose) is returned.
try {
is.close();
} catch (IOException ignored) {
// best effort
}
throw e;
}

return (Stream<T>) allRows.stream();
Iterator<Object> rows = new Iterator<Object>() {
private String @Nullable [] next = parser.parseNext();

@Override
public boolean hasNext() {
return next != null;
}

@Override
public Object next() {
String[] row = next;
if (row == null) {
throw new NoSuchElementException();
}
next = parser.parseNext();
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
String[] dataRow;
if (dataCount <= 0) {
dataRow = row;
} else {
dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
}
return meta != null ? meta.toRow(dataRow) : dataRow;
}
};

return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(rows, Spliterator.ORDERED | Spliterator.NONNULL), false)
.onClose(() -> {
parser.stopParsing();
try {
is.close();
} catch (IOException ignored) {
// already closed by the parser at end-of-input
}
});
}

@Override
Expand Down