Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions meta/src/meta/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
%nonterm csv_data logic.CSVData
%nonterm csv_locator_inline_data String
%nonterm csv_locator_paths Sequence[String]
%nonterm csv_table logic.CSVTarget
%nonterm csvlocator logic.CSVLocator
%nonterm data logic.Data
%nonterm date logic.DateValue
Expand Down Expand Up @@ -1104,14 +1105,23 @@ gnf_columns
csv_asof
: "(" "asof" STRING ")"

csv_table
: "(" "table" relation_id "[" STRING* "]" "[" type* "]" ")"
construct: $$ = logic.CSVTarget(target_id=$3, column_names=$5, types=$8)
deconstruct:
$3: logic.RelationId = $$.target_id
$5: Sequence[String] = $$.column_names
$8: Sequence[logic.Type] = $$.types

csv_data
: "(" "csv_data" csvlocator csv_config gnf_columns csv_asof ")"
construct: $$ = logic.CSVData(locator=$3, config=$4, columns=$5, asof=$6)
: "(" "csv_data" csvlocator csv_config gnf_columns? csv_table? csv_asof ")"
construct: $$ = construct_csv_data($3, $4, $5, $6, $7)
deconstruct:
$3: logic.CSVLocator = $$.locator
$4: logic.CSVConfig = $$.config
$5: Sequence[logic.GNFColumn] = $$.columns
$6: String = $$.asof
$5: Optional[Sequence[logic.GNFColumn]] = deconstruct_csv_data_columns_optional($$)
$6: Optional[logic.CSVTarget] = deconstruct_csv_data_target_optional($$)
$7: String = $$.asof

csv_locator_paths
: "(" "paths" STRING* ")"
Expand Down Expand Up @@ -1751,6 +1761,34 @@ def deconstruct_iceberg_data_to_snapshot_optional(msg: logic.IcebergData) -> Opt
return builtin.none()


def construct_csv_data(
locator: logic.CSVLocator,
config: logic.CSVConfig,
columns_opt: Optional[Sequence[logic.GNFColumn]],
target_opt: Optional[logic.CSVTarget],
asof: String,
) -> logic.CSVData:
return logic.CSVData(
locator=locator,
config=config,
columns=builtin.unwrap_option_or(columns_opt, list[logic.GNFColumn]()),
asof=asof,
target=target_opt,
)


def deconstruct_csv_data_columns_optional(msg: logic.CSVData) -> Optional[Sequence[logic.GNFColumn]]:
if not builtin.has_proto_field(msg, "target"):
return builtin.some(msg.columns)
return builtin.none()


def deconstruct_csv_data_target_optional(msg: logic.CSVData) -> Optional[logic.CSVTarget]:
if builtin.has_proto_field(msg, "target"):
return builtin.some(builtin.unwrap_option(msg.target))
return builtin.none()


def construct_export_iceberg_config_full(
locator: logic.IcebergLocator,
config: logic.IcebergCatalogConfig,
Expand Down
7 changes: 7 additions & 0 deletions proto/relationalai/lqp/v1/logic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,18 @@ message BeTreeLocator {
int64 tree_height = 3;
}

message CSVTarget {
RelationId target_id = 1; // Output relation path
repeated string column_names = 2; // CSV column names in order (no METADATA$ columns)
repeated Type types = 3; // Column types in order (same length as column_names)
}

message CSVData {
CSVLocator locator = 1;
CSVConfig config = 2;
repeated GNFColumn columns = 3;
string asof = 4; // Blob storage timestamp for freshness requirements
optional CSVTarget target = 5; // If present, single-relation (table) mode; mutually exclusive with columns
}

message CSVLocator {
Expand Down
Loading
Loading