-
Notifications
You must be signed in to change notification settings - Fork 9
Pull out scalar UDTF code into snippets #202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,17 @@ description: Use scalar UDTFs for 1:N row expansion — split videos into clips, | |
| icon: diagram-subtask | ||
| --- | ||
|
|
||
| import { | ||
| PyScalarUdtfIterator, | ||
| PyScalarUdtfList, | ||
| PyScalarUdtfBatch, | ||
| PyCreateScalarUdtfView, | ||
| PyAddColumnsScalarUdtf, | ||
| PyIncrementalRefresh, | ||
| PyChainingUdtfViews, | ||
| PyDocumentChunkingFull, | ||
| } from '/snippets/geneva_scalar_udtfs.mdx'; | ||
|
|
||
| <Badge>Beta — introduced in Geneva 0.11.0</Badge> | ||
|
|
||
| Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs enable **1:N row expansion** — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh. | ||
|
|
@@ -19,24 +30,11 @@ Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs e | |
|
|
||
| Use the `@scalar_udtf` decorator on a function that **yields** output rows. Geneva infers the output schema from the return type annotation. | ||
|
|
||
| ```python | ||
| from geneva import scalar_udtf | ||
| from typing import Iterator, NamedTuple | ||
|
|
||
| class Clip(NamedTuple): | ||
| clip_start: float | ||
| clip_end: float | ||
| clip_bytes: bytes | ||
|
|
||
| @scalar_udtf | ||
| def extract_clips(video_path: str, duration: float) -> Iterator[Clip]: | ||
| """Yields multiple clips per video.""" | ||
| clip_length = 10.0 | ||
| for start in range(0, int(duration), int(clip_length)): | ||
| end = min(start + clip_length, duration) | ||
| clip_data = extract_video_segment(video_path, start, end) | ||
| yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data) | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyScalarUdtfIterator} | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this code is now in tests/py/test_geneva_scalar_udtfs.py; similarly to all the rest of this file |
||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| Input parameters are bound to source columns **by name** — the parameter `video_path` binds to source column `video_path`, just like standard UDFs. | ||
|
|
||
|
|
@@ -48,61 +46,34 @@ A scalar UDTF can yield **zero rows** for a source row. The source row is still | |
|
|
||
| If you prefer to build the full list in memory rather than yielding, you can return a `list` instead of an `Iterator`: | ||
|
|
||
| ```python | ||
| @scalar_udtf | ||
| def extract_clips(video_path: str, duration: float) -> list[Clip]: | ||
| clips = [] | ||
| for start in range(0, int(duration), 10): | ||
| end = min(start + 10, duration) | ||
| clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"...")) | ||
| return clips | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyScalarUdtfList} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| ### Batched scalar UDTF | ||
|
|
||
| For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows: | ||
| For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows. Because the return type `pa.RecordBatch` cannot be inferred, you must supply `output_schema` explicitly: | ||
|
|
||
| ```python | ||
| @scalar_udtf(batch=True) | ||
| def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch: | ||
| """Process rows in batches. Same 1:N semantic per row.""" | ||
| ... | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyScalarUdtfBatch} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| ## Creating a Scalar UDTF View | ||
|
|
||
| Scalar UDTFs use the existing `create_materialized_view` API with a `udtf=` parameter: | ||
|
|
||
| ```python | ||
| import geneva | ||
|
|
||
| db = geneva.connect("/data/mydb") | ||
| videos = db.open_table("videos") | ||
| Scalar UDTFs use the `create_scalar_udtf_view` API: | ||
|
|
||
| # Create the 1:N materialized view | ||
| clips = db.create_materialized_view( | ||
| "clips", | ||
| query=videos.search(None).select(["video_path", "metadata"]), | ||
| udtf=extract_clips, | ||
| ) | ||
|
|
||
| # Populate — runs the UDTF on every source row | ||
| clips.refresh() | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyCreateScalarUdtfView} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| The `query` parameter controls which source columns are inherited. Columns listed in `.select()` are carried into every child row automatically. | ||
|
|
||
| ### Inheriting source columns | ||
|
|
||
| ```python | ||
| # Only video_path and metadata are inherited into the clips table | ||
| clips = db.create_materialized_view( | ||
| "clips", | ||
| query=videos.search(None).select(["video_path", "metadata"]), | ||
| udtf=extract_clips, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Inherited Columns | ||
|
|
||
| Child rows automatically include the parent's columns — no manual join required. The columns available in the child table are determined by the query's `.select()`: | ||
|
|
@@ -131,17 +102,11 @@ The first three rows come from the `/v/a.mp4` source row, the last two from `/v/ | |
|
|
||
| Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them: | ||
|
|
||
| ```python | ||
| @udf(data_type=pa.list_(pa.float32(), 512)) | ||
| def clip_embedding(clip_bytes: bytes) -> list[float]: | ||
| return embed_model.encode(clip_bytes) | ||
|
|
||
| # Add an embedding column to the clips table | ||
| clips.add_columns({"embedding": clip_embedding}) | ||
|
|
||
| # Backfill computes embeddings for all existing clips | ||
| clips.backfill("embedding") | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyAddColumnsScalarUdtf} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs. | ||
|
|
||
|
|
@@ -153,77 +118,34 @@ Scalar UDTFs support **incremental refresh**, just like standard materialized vi | |
| - **Deleted source rows**: Child rows linked to the deleted parent are cascade-deleted. | ||
| - **Updated source rows**: Old children are deleted, UDTF re-runs, new children inserted. | ||
|
|
||
| ```python | ||
| # Add new videos to the source table | ||
| videos.add(new_video_data) | ||
|
|
||
| # Incremental refresh — only processes the new videos | ||
| clips.refresh() | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyIncrementalRefresh} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| Only the new source rows are processed. Existing clips from previous refreshes are untouched. | ||
|
|
||
| ## Chaining UDTF Views | ||
|
|
||
| Scalar UDTF views are standard materialized views, so they can serve as the source for further views: | ||
|
|
||
| ```python | ||
| # videos → clips (1:N) | ||
| clips = db.create_materialized_view( | ||
| "clips", query=videos.search(None), udtf=extract_clips | ||
| ) | ||
|
|
||
| # clips → frames (1:N) | ||
| frames = db.create_materialized_view( | ||
| "frames", query=clips.search(None), udtf=extract_frames | ||
| ) | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyChainingUdtfViews} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| ## Full Example: Document Chunking | ||
|
|
||
| ```python | ||
| from geneva import connect, scalar_udtf, udf | ||
| from typing import Iterator, NamedTuple | ||
| import pyarrow as pa | ||
|
|
||
| class Chunk(NamedTuple): | ||
| chunk_index: int | ||
| chunk_text: str | ||
|
|
||
| @scalar_udtf | ||
| def chunk_document(text: str) -> Iterator[Chunk]: | ||
| """Split a document into overlapping chunks.""" | ||
| words = text.split() | ||
| chunk_size = 500 | ||
| overlap = 50 | ||
| for i, start in enumerate(range(0, len(words), chunk_size - overlap)): | ||
| chunk_words = words[start:start + chunk_size] | ||
| yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words)) | ||
|
|
||
| db = connect("/data/mydb") | ||
| docs = db.open_table("documents") | ||
|
|
||
| # Create chunked view — inherits doc_id, title, etc. from source | ||
| chunks = db.create_materialized_view( | ||
| "doc_chunks", | ||
| query=docs.search(None).select(["doc_id", "title", "text"]), | ||
| udtf=chunk_document, | ||
| ) | ||
| chunks.refresh() | ||
|
|
||
| # Add embeddings to chunks for semantic search | ||
| @udf(data_type=pa.list_(pa.float32(), 1536)) | ||
| def embed_text(chunk_text: str) -> list[float]: | ||
| return embedding_model.encode(chunk_text) | ||
|
|
||
| chunks.add_columns({"embedding": embed_text}) | ||
| chunks.backfill("embedding") # Backfills embeddings on all existing chunks | ||
|
|
||
| # Query — parent columns available alongside chunk columns | ||
| chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas() | ||
| ``` | ||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyDocumentChunkingFull} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see [Understanding Transforms](/geneva/udfs). | ||
|
|
||
| Reference: | ||
| * [`create_materialized_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_materialized_view) | ||
| * [`scalar_udtf` API](https://lancedb.github.io/geneva/api/udtf/#geneva.scalar_udtf) | ||
| * [`create_scalar_udtf_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_scalar_udtf_view) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| {/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */} | ||
|
|
||
| export const PyAddColumnsScalarUdtf = "@udf(data_type=pa.list_(pa.float32(), 512))\ndef clip_embedding(clip_bytes: bytes) -> list[float]:\n return embed_model.encode(clip_bytes)\n\n# Add an embedding column to the clips table\nclips.add_columns({\"embedding\": clip_embedding})\n\n# Backfill computes embeddings for all existing clips\nclips.backfill(\"embedding\")\n"; | ||
|
|
||
| export const PyChainingUdtfViews = "# videos → clips (1:N)\nclips = db.create_scalar_udtf_view(\n \"clips\", source=videos.search(None), scalar_udtf=extract_clips\n)\n\n# clips → frames (1:N)\nframes = db.create_scalar_udtf_view(\n \"frames\", source=clips.search(None), scalar_udtf=extract_frames\n)\n"; | ||
|
|
||
| export const PyCreateScalarUdtfView = "import geneva\n\ndb = geneva.connect(\"/data/mydb\")\nvideos = db.open_table(\"videos\")\n\n# Create the 1:N materialized view\nclips = db.create_scalar_udtf_view(\n \"clips\",\n source=videos.search(None).select([\"video_path\", \"metadata\"]),\n scalar_udtf=extract_clips,\n)\n\n# Populate — runs the UDTF on every source row\nclips.refresh()\n"; | ||
|
|
||
| export const PyDocumentChunkingFull = "from geneva import connect, scalar_udtf, udf\nfrom typing import Iterator, NamedTuple\nimport pyarrow as pa\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n\ndb = connect(\"/data/mydb\")\ndocs = db.open_table(\"documents\")\n\n# Create chunked view — inherits doc_id, title, etc. from source\nchunks = db.create_scalar_udtf_view(\n \"doc_chunks\",\n source=docs.search(None).select([\"doc_id\", \"title\", \"text\"]),\n scalar_udtf=chunk_document,\n)\nchunks.refresh()\n\n# Add embeddings to chunks for semantic search\n@udf(data_type=pa.list_(pa.float32(), 1536))\ndef embed_text(chunk_text: str) -> list[float]:\n return embedding_model.encode(chunk_text)\n\nchunks.add_columns({\"embedding\": embed_text})\nchunks.backfill(\"embedding\") # Backfills embeddings on all existing chunks\n\n# Query — parent columns available alongside chunk columns\nchunks.search(None).select([\"doc_id\", \"title\", \"chunk_text\", \"embedding\"]).to_pandas()\n"; | ||
|
|
||
| export const PyDocumentChunkingUdtf = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n"; | ||
|
|
||
| export const PyIncrementalRefresh = "# Add new videos to the source table\nvideos.add(new_video_data)\n\n# Incremental refresh — only processes the new videos\nclips.refresh()\n"; | ||
|
|
||
| export const PyScalarUdtfBatch = "@scalar_udtf(batch=True, output_schema=clip_schema)\ndef extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:\n \"\"\"Process rows in batches. Same 1:N semantic per row.\"\"\"\n ...\n"; | ||
|
|
||
| export const PyScalarUdtfIterator = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Clip(NamedTuple):\n clip_start: float\n clip_end: float\n clip_bytes: bytes\n\n@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> Iterator[Clip]:\n \"\"\"Yields multiple clips per video.\"\"\"\n clip_length = 10.0\n for start in range(0, int(duration), int(clip_length)):\n end = min(start + clip_length, duration)\n clip_data = extract_video_segment(video_path, start, end)\n yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)\n"; | ||
|
|
||
| export const PyScalarUdtfList = "@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> list[Clip]:\n clips = []\n for start in range(0, int(duration), 10):\n end = min(start + 10, duration)\n clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b\"...\"))\n return clips\n"; | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| {/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */} | ||
|
|
||
| export const PyRegistrationScalarUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_scalar_udtf_view(\"my_view\", source=my_source, scalar_udtf=my_scalar_udtf)\n"; | ||
|
|
||
| export const PyRegistrationUdf = "mock_table.add_columns({\"col\": my_udf})\n"; | ||
|
|
||
| export const PyRegistrationUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_udtf_view(\"my_view\", source=my_source, udtf=my_udtf)\n"; | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,10 +12,6 @@ dependencies = [ | |
| "polars>=1.39.2", | ||
| "pytest>=9.0.1", | ||
| "pytest-asyncio>=1.3.0", | ||
| <<<<<<< Updated upstream | ||
| "Pillow>=11.0.0", | ||
| "geneva>=0.11.0", | ||
| ======= | ||
| "Pillow>=12.1.1", | ||
| >>>>>>> Stashed changes | ||
| "geneva>=0.12.0", | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated this and the uv.lock bc I guess it had previously gotten committed mid-rebase |
||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,8 +8,6 @@ | |
| import pytest | ||
|
|
||
|
|
||
| # TODO: remove skip once geneva 0.12.0 is on PyPI (head node defaults changed to 4 CPU / 8Gi) | ||
| @pytest.mark.skip(reason="requires geneva>=0.12.0") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yay, while I was here I remembered to do this |
||
| def test_head_node_defaults(): | ||
| # If these change, update the Head Node table in performance.mdx | ||
| from geneva.cluster.builder import KubeRayClusterBuilder | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,8 +40,9 @@ def test_env_vars_via_cluster(monkeypatch): | |
|
|
||
|
|
||
| def test_pip_manifest(monkeypatch): | ||
| from unittest.mock import MagicMock | ||
| mock_conn = MagicMock() | ||
| import geneva | ||
| from unittest.mock import MagicMock, create_autospec | ||
| mock_conn = create_autospec(geneva.db.Connection, instance=True) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, here's the important thing: MagicMocks let you call anything with any params... defeating the point of the test! But create_autospec will fail if the methods or params don't exist. So, updated this here. |
||
| monkeypatch.setattr("geneva.connect", MagicMock(return_value=mock_conn)) | ||
|
|
||
| # --8<-- [start:pip_manifest] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thing that changed in this PR is that now you use
create_scalar_udtf_view()instead ofcreate_materialized_view(udtf=)to make a scalar UDTF.I couldn't actually pull these out to their own snippets because they're in table cells, but I did add test_geneva_udfs_index just so a test will fail if we change one of these methods' names.
(I also added links just for convenience)