Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/geneva/udfs/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Geneva provides three types of user-defined functions for transforming data. Eac
| **Refresh** | Incremental | Incremental | Full |
| **Parallelism** | Fragment-parallel | Fragment-parallel | Partition-parallel |
| **Inherited columns** | N/A — adds to existing rows | Automatic from query | Independent output schema |
| **Registration** | `table.add_columns()` | `db.create_materialized_view(udtf=)` | `db.create_udtf_view()` |
| **Registration** | [`table.add_columns()`](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.add_columns) | [`db.create_scalar_udtf_view()`](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_scalar_udtf_view) | [`db.create_udtf_view()`](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_udtf_view) |
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing that changed in this PR is that now you use create_scalar_udtf_view() instead of create_materialized_view(udtf=) to make a scalar UDTF.

I couldn't actually pull these out to their own snippets because they're in table cells, but I did add test_geneva_udfs_index just so a test will fail if we change one of these methods' names.

(I also added links just for convenience)


## UDFs (1:1)

Expand Down
188 changes: 55 additions & 133 deletions docs/geneva/udfs/scalar-udtfs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ description: Use scalar UDTFs for 1:N row expansion — split videos into clips,
icon: diagram-subtask
---

import {
PyScalarUdtfIterator,
PyScalarUdtfList,
PyScalarUdtfBatch,
PyCreateScalarUdtfView,
PyAddColumnsScalarUdtf,
PyIncrementalRefresh,
PyChainingUdtfViews,
PyDocumentChunkingFull,
} from '/snippets/geneva_scalar_udtfs.mdx';

<Badge>Beta — introduced in Geneva 0.11.0</Badge>

Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs enable **1:N row expansion** — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh.
Expand All @@ -19,24 +30,11 @@ Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs e

Use the `@scalar_udtf` decorator on a function that **yields** output rows. Geneva infers the output schema from the return type annotation.

```python
from geneva import scalar_udtf
from typing import Iterator, NamedTuple

class Clip(NamedTuple):
clip_start: float
clip_end: float
clip_bytes: bytes

@scalar_udtf
def extract_clips(video_path: str, duration: float) -> Iterator[Clip]:
"""Yields multiple clips per video."""
clip_length = 10.0
for start in range(0, int(duration), int(clip_length)):
end = min(start + clip_length, duration)
clip_data = extract_video_segment(video_path, start, end)
yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyScalarUdtfIterator}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is now in tests/py/test_geneva_scalar_udtfs.py; similarly to all the rest of this file

</CodeBlock>
</CodeGroup>

Input parameters are bound to source columns **by name** — the parameter `video_path` binds to source column `video_path`, just like standard UDFs.

Expand All @@ -48,61 +46,34 @@ A scalar UDTF can yield **zero rows** for a source row. The source row is still

If you prefer to build the full list in memory rather than yielding, you can return a `list` instead of an `Iterator`:

```python
@scalar_udtf
def extract_clips(video_path: str, duration: float) -> list[Clip]:
clips = []
for start in range(0, int(duration), 10):
end = min(start + 10, duration)
clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"..."))
return clips
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyScalarUdtfList}
</CodeBlock>
</CodeGroup>

### Batched scalar UDTF

For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows:
For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows. Because the return type `pa.RecordBatch` cannot be inferred, you must supply `output_schema` explicitly:

```python
@scalar_udtf(batch=True)
def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:
"""Process rows in batches. Same 1:N semantic per row."""
...
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyScalarUdtfBatch}
</CodeBlock>
</CodeGroup>

## Creating a Scalar UDTF View

Scalar UDTFs use the existing `create_materialized_view` API with a `udtf=` parameter:

```python
import geneva

db = geneva.connect("/data/mydb")
videos = db.open_table("videos")
Scalar UDTFs use the `create_scalar_udtf_view` API:

# Create the 1:N materialized view
clips = db.create_materialized_view(
"clips",
query=videos.search(None).select(["video_path", "metadata"]),
udtf=extract_clips,
)

# Populate — runs the UDTF on every source row
clips.refresh()
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyCreateScalarUdtfView}
</CodeBlock>
</CodeGroup>

The `query` parameter controls which source columns are inherited. Columns listed in `.select()` are carried into every child row automatically.

### Inheriting source columns

```python
# Only video_path and metadata are inherited into the clips table
clips = db.create_materialized_view(
"clips",
query=videos.search(None).select(["video_path", "metadata"]),
udtf=extract_clips,
)
```

## Inherited Columns

Child rows automatically include the parent's columns — no manual join required. The columns available in the child table are determined by the query's `.select()`:
Expand Down Expand Up @@ -131,17 +102,11 @@ The first three rows come from the `/v/a.mp4` source row, the last two from `/v/

Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them:

```python
@udf(data_type=pa.list_(pa.float32(), 512))
def clip_embedding(clip_bytes: bytes) -> list[float]:
return embed_model.encode(clip_bytes)

# Add an embedding column to the clips table
clips.add_columns({"embedding": clip_embedding})

# Backfill computes embeddings for all existing clips
clips.backfill("embedding")
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyAddColumnsScalarUdtf}
</CodeBlock>
</CodeGroup>

This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs.

Expand All @@ -153,77 +118,34 @@ Scalar UDTFs support **incremental refresh**, just like standard materialized vi
- **Deleted source rows**: Child rows linked to the deleted parent are cascade-deleted.
- **Updated source rows**: Old children are deleted, UDTF re-runs, new children inserted.

```python
# Add new videos to the source table
videos.add(new_video_data)

# Incremental refresh — only processes the new videos
clips.refresh()
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyIncrementalRefresh}
</CodeBlock>
</CodeGroup>

Only the new source rows are processed. Existing clips from previous refreshes are untouched.

## Chaining UDTF Views

Scalar UDTF views are standard materialized views, so they can serve as the source for further views:

```python
# videos → clips (1:N)
clips = db.create_materialized_view(
"clips", query=videos.search(None), udtf=extract_clips
)

# clips → frames (1:N)
frames = db.create_materialized_view(
"frames", query=clips.search(None), udtf=extract_frames
)
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyChainingUdtfViews}
</CodeBlock>
</CodeGroup>

## Full Example: Document Chunking

```python
from geneva import connect, scalar_udtf, udf
from typing import Iterator, NamedTuple
import pyarrow as pa

class Chunk(NamedTuple):
chunk_index: int
chunk_text: str

@scalar_udtf
def chunk_document(text: str) -> Iterator[Chunk]:
"""Split a document into overlapping chunks."""
words = text.split()
chunk_size = 500
overlap = 50
for i, start in enumerate(range(0, len(words), chunk_size - overlap)):
chunk_words = words[start:start + chunk_size]
yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words))

db = connect("/data/mydb")
docs = db.open_table("documents")

# Create chunked view — inherits doc_id, title, etc. from source
chunks = db.create_materialized_view(
"doc_chunks",
query=docs.search(None).select(["doc_id", "title", "text"]),
udtf=chunk_document,
)
chunks.refresh()

# Add embeddings to chunks for semantic search
@udf(data_type=pa.list_(pa.float32(), 1536))
def embed_text(chunk_text: str) -> list[float]:
return embedding_model.encode(chunk_text)

chunks.add_columns({"embedding": embed_text})
chunks.backfill("embedding") # Backfills embeddings on all existing chunks

# Query — parent columns available alongside chunk columns
chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas()
```
<CodeGroup>
<CodeBlock filename="Python" language="python" icon="python">
{PyDocumentChunkingFull}
</CodeBlock>
</CodeGroup>

For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see [Understanding Transforms](/geneva/udfs).

Reference:
* [`create_materialized_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_materialized_view)
* [`scalar_udtf` API](https://lancedb.github.io/geneva/api/udtf/#geneva.scalar_udtf)
* [`create_scalar_udtf_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_scalar_udtf_view)
20 changes: 20 additions & 0 deletions docs/snippets/geneva_scalar_udtfs.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}

export const PyAddColumnsScalarUdtf = "@udf(data_type=pa.list_(pa.float32(), 512))\ndef clip_embedding(clip_bytes: bytes) -> list[float]:\n return embed_model.encode(clip_bytes)\n\n# Add an embedding column to the clips table\nclips.add_columns({\"embedding\": clip_embedding})\n\n# Backfill computes embeddings for all existing clips\nclips.backfill(\"embedding\")\n";

export const PyChainingUdtfViews = "# videos → clips (1:N)\nclips = db.create_scalar_udtf_view(\n \"clips\", source=videos.search(None), scalar_udtf=extract_clips\n)\n\n# clips → frames (1:N)\nframes = db.create_scalar_udtf_view(\n \"frames\", source=clips.search(None), scalar_udtf=extract_frames\n)\n";

export const PyCreateScalarUdtfView = "import geneva\n\ndb = geneva.connect(\"/data/mydb\")\nvideos = db.open_table(\"videos\")\n\n# Create the 1:N materialized view\nclips = db.create_scalar_udtf_view(\n \"clips\",\n source=videos.search(None).select([\"video_path\", \"metadata\"]),\n scalar_udtf=extract_clips,\n)\n\n# Populate — runs the UDTF on every source row\nclips.refresh()\n";

export const PyDocumentChunkingFull = "from geneva import connect, scalar_udtf, udf\nfrom typing import Iterator, NamedTuple\nimport pyarrow as pa\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n\ndb = connect(\"/data/mydb\")\ndocs = db.open_table(\"documents\")\n\n# Create chunked view — inherits doc_id, title, etc. from source\nchunks = db.create_scalar_udtf_view(\n \"doc_chunks\",\n source=docs.search(None).select([\"doc_id\", \"title\", \"text\"]),\n scalar_udtf=chunk_document,\n)\nchunks.refresh()\n\n# Add embeddings to chunks for semantic search\n@udf(data_type=pa.list_(pa.float32(), 1536))\ndef embed_text(chunk_text: str) -> list[float]:\n return embedding_model.encode(chunk_text)\n\nchunks.add_columns({\"embedding\": embed_text})\nchunks.backfill(\"embedding\") # Backfills embeddings on all existing chunks\n\n# Query — parent columns available alongside chunk columns\nchunks.search(None).select([\"doc_id\", \"title\", \"chunk_text\", \"embedding\"]).to_pandas()\n";

export const PyDocumentChunkingUdtf = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n";

export const PyIncrementalRefresh = "# Add new videos to the source table\nvideos.add(new_video_data)\n\n# Incremental refresh — only processes the new videos\nclips.refresh()\n";

export const PyScalarUdtfBatch = "@scalar_udtf(batch=True, output_schema=clip_schema)\ndef extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:\n \"\"\"Process rows in batches. Same 1:N semantic per row.\"\"\"\n ...\n";

export const PyScalarUdtfIterator = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Clip(NamedTuple):\n clip_start: float\n clip_end: float\n clip_bytes: bytes\n\n@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> Iterator[Clip]:\n \"\"\"Yields multiple clips per video.\"\"\"\n clip_length = 10.0\n for start in range(0, int(duration), int(clip_length)):\n end = min(start + clip_length, duration)\n clip_data = extract_video_segment(video_path, start, end)\n yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)\n";

export const PyScalarUdtfList = "@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> list[Clip]:\n clips = []\n for start in range(0, int(duration), 10):\n end = min(start + 10, duration)\n clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b\"...\"))\n return clips\n";

8 changes: 8 additions & 0 deletions docs/snippets/geneva_udfs_index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}

export const PyRegistrationScalarUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_scalar_udtf_view(\"my_view\", source=my_source, scalar_udtf=my_scalar_udtf)\n";

export const PyRegistrationUdf = "mock_table.add_columns({\"col\": my_udf})\n";

export const PyRegistrationUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_udtf_view(\"my_view\", source=my_source, udtf=my_udtf)\n";

4 changes: 2 additions & 2 deletions docs/snippets/tables.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}

export const PyWriteWithConcurrency = "import pyarrow as pa\nfrom lancedb.scannable import Scannable\nfrom random import random\n\nVECTOR_DIM = 4\n\nschema = pa.schema(\n [\n pa.field(\"id\", pa.int64()),\n pa.field(\"vector\", pa.list_(pa.float32(), VECTOR_DIM)),\n ]\n)\n\ndef make_batch(batch_idx: int, rows_per_batch: int) -> pa.RecordBatch:\n start = batch_idx * rows_per_batch\n stop = start + rows_per_batch\n row_ids = list(range(start, stop))\n vectors = pa.array(\n [[random() for _ in range(VECTOR_DIM)] for _ in row_ids],\n type=pa.list_(pa.float32(), VECTOR_DIM),\n )\n return pa.RecordBatch.from_arrays(\n [\n pa.array(row_ids, type=pa.int64()),\n vectors,\n ],\n schema=schema,\n )\n\ndef make_batch_reader(\n num_batches: int, rows_per_batch: int\n) -> pa.RecordBatchReader:\n return pa.RecordBatchReader.from_batches(\n schema,\n (make_batch(batch_idx, rows_per_batch) for batch_idx in range(num_batches)),\n )\n\ndef make_large_scannable(num_batches: int, rows_per_batch: int) -> Scannable:\n total_rows = num_batches * rows_per_batch\n return Scannable(\n schema=schema,\n num_rows=total_rows,\n reader=lambda: make_batch_reader(num_batches, rows_per_batch),\n )\n\ndb = tmp_db\ntable = db.create_table(\n \"bulk_ingest_concurrent\",\n make_large_scannable(num_batches=1000, rows_per_batch=10000),\n mode=\"overwrite\",\n)\n";

export const PyAddColumnsCalculated = "# Add a discounted price column (10% discount)\ntable.add_columns({\"discounted_price\": \"cast((price * 0.9) as float)\"})\n";

export const PyAddColumnsDefaultValues = "# Add a stock status column with default value\ntable.add_columns({\"in_stock\": \"cast(true as boolean)\"})\n";
Expand Down Expand Up @@ -116,8 +118,6 @@ export const PyVersioningRollback = "table.restore(version_after_mod)\nversions

export const PyVersioningUpdateData = "table.update(where=\"author='Richard'\", values={\"author\": \"Richard Daniel Sanchez\"})\nrows_after_update = table.count_rows(\"author = 'Richard Daniel Sanchez'\")\nprint(f\"Rows updated to Richard Daniel Sanchez: {rows_after_update}\")\n";

export const PyWriteWithConcurrency = "import pyarrow as pa\nfrom lancedb.scannable import Scannable\nfrom random import random\n\nVECTOR_DIM = 4\n\nschema = pa.schema(\n [\n pa.field(\"id\", pa.int64()),\n pa.field(\"vector\", pa.list_(pa.float32(), VECTOR_DIM)),\n ]\n)\n\ndef make_batch(batch_idx: int, rows_per_batch: int) -> pa.RecordBatch:\n start = batch_idx * rows_per_batch\n stop = start + rows_per_batch\n row_ids = list(range(start, stop))\n vectors = pa.array(\n [[random() for _ in range(VECTOR_DIM)] for _ in row_ids],\n type=pa.list_(pa.float32(), VECTOR_DIM),\n )\n return pa.RecordBatch.from_arrays(\n [\n pa.array(row_ids, type=pa.int64()),\n vectors,\n ],\n schema=schema,\n )\n\ndef make_batch_reader(\n num_batches: int, rows_per_batch: int\n) -> pa.RecordBatchReader:\n return pa.RecordBatchReader.from_batches(\n schema,\n (make_batch(batch_idx, rows_per_batch) for batch_idx in range(num_batches)),\n )\n\ndef make_large_scannable(num_batches: int, rows_per_batch: int) -> Scannable:\n total_rows = num_batches * rows_per_batch\n return Scannable(\n schema=schema,\n num_rows=total_rows,\n reader=lambda: make_batch_reader(num_batches, rows_per_batch),\n )\n\ndb = tmp_db\ntable = db.create_table(\n \"bulk_ingest_concurrent\",\n make_large_scannable(num_batches=1000, rows_per_batch=10000),\n mode=\"overwrite\",\n)\n";

export const TsAddColumnsCalculated = "// Add a discounted price column (10% discount)\nawait schemaAddTable.addColumns([\n {\n name: \"discounted_price\",\n valueSql: \"cast((price * 0.9) as float)\",\n },\n]);\n";

export const TsAddColumnsDefaultValues = "// Add a stock status column with default value\nawait schemaAddTable.addColumns([\n {\n name: \"in_stock\",\n valueSql: \"cast(true as boolean)\",\n },\n]);\n";
Expand Down
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ dependencies = [
"polars>=1.39.2",
"pytest>=9.0.1",
"pytest-asyncio>=1.3.0",
<<<<<<< Updated upstream
"Pillow>=11.0.0",
"geneva>=0.11.0",
=======
"Pillow>=12.1.1",
>>>>>>> Stashed changes
"geneva>=0.12.0",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this and the uv.lock bc I guess it had previously gotten committed mid-rebase

]
2 changes: 0 additions & 2 deletions tests/py/test_geneva_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import pytest


# TODO: remove skip once geneva 0.12.0 is on PyPI (head node defaults changed to 4 CPU / 8Gi)
@pytest.mark.skip(reason="requires geneva>=0.12.0")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay, while I was here I remembered to do this

def test_head_node_defaults():
# If these change, update the Head Node table in performance.mdx
from geneva.cluster.builder import KubeRayClusterBuilder
Expand Down
5 changes: 3 additions & 2 deletions tests/py/test_geneva_dependency_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def test_env_vars_via_cluster(monkeypatch):


def test_pip_manifest(monkeypatch):
from unittest.mock import MagicMock
mock_conn = MagicMock()
import geneva
from unittest.mock import MagicMock, create_autospec
mock_conn = create_autospec(geneva.db.Connection, instance=True)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, here's the important thing: MagicMocks let you call anything with any params... defeating the point of the test! But create_autospec will fail if the methods or params don't exist. So, updated this here.

monkeypatch.setattr("geneva.connect", MagicMock(return_value=mock_conn))

# --8<-- [start:pip_manifest]
Expand Down
Loading
Loading