From b38975d8a3ffadece359900b74104393c17541e9 Mon Sep 17 00:00:00 2001 From: dmadisetti Date: Wed, 27 May 2026 12:51:48 -0700 Subject: [PATCH 1/3] feat: stream initial package installations for WASM --- .../src/core/packages/useInstallPackage.ts | 17 +- marimo/_runtime/callbacks/packages.py | 87 ++++-- marimo/_runtime/commands.py | 7 +- .../_runtime/packages/_micropip_streaming.py | 167 ++++++++++++ marimo/_runtime/packages/package_manager.py | 33 ++- .../_runtime/packages/pypi_package_manager.py | 61 +++++ .../packages/test_micropip_streaming.py | 257 ++++++++++++++++++ tests/_runtime/test_manage_script_metadata.py | 33 +++ 8 files changed, 627 insertions(+), 35 deletions(-) create mode 100644 marimo/_runtime/packages/_micropip_streaming.py create mode 100644 tests/_runtime/packages/test_micropip_streaming.py diff --git a/frontend/src/core/packages/useInstallPackage.ts b/frontend/src/core/packages/useInstallPackage.ts index 3d4d0869e71..ba658948184 100644 --- a/frontend/src/core/packages/useInstallPackage.ts +++ b/frontend/src/core/packages/useInstallPackage.ts @@ -22,16 +22,17 @@ export function useInstallPackages(): { setLoading(true); try { - for (const [idx, packageName] of packages.entries()) { - const response = await addPackage({ package: packageName }); - if (response.success) { + // Batch all packages into a single install call. + // The worker splits by space and passes the full list to micropip, + // which resolves and downloads in parallel internally. + const response = await addPackage({ package: packages.join(" ") }); + if (response.success) { + for (const packageName of packages) { showAddPackageToast(packageName); - } else { - showAddPackageToast(packageName, response.error); } - // Wait 1s if there are more packages to install - if (idx < packages.length - 1) { - await new Promise((resolve) => setTimeout(resolve, 1000)); + } else { + for (const packageName of packages) { + showAddPackageToast(packageName, response.error); } } onSuccess?.(); diff --git a/marimo/_runtime/callbacks/packages.py b/marimo/_runtime/callbacks/packages.py index dae5416f2ef..5465e690061 100644 --- a/marimo/_runtime/callbacks/packages.py +++ b/marimo/_runtime/callbacks/packages.py @@ -45,6 +45,33 @@ def __init__(self, kernel: Kernel): def register(self, router: RequestRouter) -> None: router.register(InstallPackagesCommand, self._handle_install) + def _notebook_index_urls(self) -> list[str]: + """Read PEP 723 index config from the current notebook. + + Returns a flat list with the primary `index-url` first, then any + `extra-index-url` entries, then `[[tool.uv.index]]` URLs. + Returns `[]` if there's no filename or no config — the receiving + backend will fall back to its default index. + """ + filename = self._kernel.app_metadata.filename + if not filename: + return [] + try: + from marimo._utils.inline_script_metadata import PyProjectReader + + reader = PyProjectReader.from_filename(filename) + except Exception: + return [] + urls: list[str] = [] + if reader.index_url: + urls.append(reader.index_url) + urls.extend(reader.extra_index_urls) + for entry in reader.index_configs: + url = entry.get("url") + if url and url not in urls: + urls.append(url) + return urls + async def _handle_install(self, request: InstallPackagesCommand) -> None: await self.install_missing_packages(request) broadcast_notification(CompletedRunNotification()) @@ -151,7 +178,9 @@ def missing_packages_hook( version = {pkg: "" for pkg in packages} self._kernel.enqueue_control_request( InstallPackagesCommand( - manager=self.package_manager.name, versions=version + manager=self.package_manager.name, + versions=version, + index_urls=self._notebook_index_urls(), ) ) else: @@ -227,34 +256,43 @@ def log_callback(log_line: str) -> None: return log_callback + # Mark every still-installable package as "installing" up-front so the + # UI can render the batch state before any wheel completes. for pkg in missing_packages: - if self.package_manager.attempted_to_install(package=pkg): - # Already attempted an installation; it must have failed. - # Skip the installation. - continue - package_statuses[pkg] = "installing" - broadcast_notification( - InstallingPackageAlertNotification( - packages=package_statuses, source=request.source - ) + if not self.package_manager.attempted_to_install(package=pkg): + package_statuses[pkg] = "installing" + broadcast_notification( + InstallingPackageAlertNotification( + packages=package_statuses, source=request.source ) - - # Send initial "start" log - broadcast_notification( - InstallingPackageAlertNotification( - packages=package_statuses, - logs={pkg: f"Installing {pkg}...\n"}, - log_status="start", - source=request.source, + ) + for pkg in missing_packages: + if package_statuses.get(pkg) == "installing": + broadcast_notification( + InstallingPackageAlertNotification( + packages=package_statuses, + logs={pkg: f"Installing {pkg}...\n"}, + log_status="start", + source=request.source, + ) ) - ) - version = request.versions.get(pkg) - if await self.package_manager.install( - pkg, version=version, log_callback=create_log_callback(pkg) - ): + installable = [ + pkg + for pkg in missing_packages + if not self.package_manager.attempted_to_install(package=pkg) + ] + versions: dict[str, str | None] = { + pkg: request.versions.get(pkg) for pkg in installable + } + async for pkg, success in self.package_manager.stream_install( + installable, + versions=versions, + index_urls=request.index_urls or None, + log_callback_factory=create_log_callback, + ): + if success: package_statuses[pkg] = "installed" - # Send final "done" log broadcast_notification( InstallingPackageAlertNotification( packages=package_statuses, @@ -267,7 +305,6 @@ def log_callback(log_line: str) -> None: package_statuses[pkg] = "failed" mod = self.package_manager.package_to_module(pkg) self._kernel.module_registry.excluded_modules.add(mod) - # Send final "done" log with error broadcast_notification( InstallingPackageAlertNotification( packages=package_statuses, diff --git a/marimo/_runtime/commands.py b/marimo/_runtime/commands.py index 32d6397a7b7..568c651c005 100644 --- a/marimo/_runtime/commands.py +++ b/marimo/_runtime/commands.py @@ -608,13 +608,15 @@ class InstallPackagesCommand(Command): manager: Package manager to use ('pip', 'conda', 'uv', etc.). versions: Package names mapped to version specifiers. Empty version means install latest. + index_urls: Alternative package index URLs. Primary index first, + then extras. Honored by backends that support custom + indexes (currently micropip); other backends ignore it. source: Where to install. "kernel" (default) dispatches to the kernel subprocess; "server" installs directly into the server's Python environment (sys.executable), used when the server itself needs a package (e.g. nbformat for IPYNB auto-export in sandbox mode). """ - # TODO: index URL (index/channel/...) manager: str # Map from package name to desired version @@ -622,6 +624,9 @@ class InstallPackagesCommand(Command): # will be installed versions: dict[str, str] + # Alternative package index URLs (primary + extras, primary first). + index_urls: list[str] = msgspec.field(default_factory=list) + source: Literal["kernel", "server"] = "kernel" diff --git a/marimo/_runtime/packages/_micropip_streaming.py b/marimo/_runtime/packages/_micropip_streaming.py new file mode 100644 index 00000000000..0ae75b4ea71 --- /dev/null +++ b/marimo/_runtime/packages/_micropip_streaming.py @@ -0,0 +1,167 @@ +# Copyright 2026 Marimo. All rights reserved. +# This module is intentionally marimo-free so it can be contributed +# back to pyodide/micropip as a streaming install primitive. +# Do NOT import from marimo.* here. +from __future__ import annotations + +import asyncio +import importlib +import importlib.metadata +from pathlib import Path +from site import getsitepackages +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + +def _append_version(pkg_name: str, version: str | None) -> str: + """Qualify a version string with a leading '==' if it doesn't have one.""" + if version is None or version in ("", "latest"): + return pkg_name + return f"{pkg_name}=={version}" + + +def _split_packages(package: str) -> list[str]: + """Split a whitespace-joined package spec into individual requirements. + + Handles editable installs (`-e `), direct URLs (`pkg @ url`), + and PEP 508 environment markers (`pkg==1.0; python_version>'3.6'`). + """ + packages: list[str] = [] + current: list[str] = [] + in_marker = False + + for part in package.split(): + if ( + part in ("-e", "--editable", "@") + or current + and current[-1] in ("-e", "--editable", "@") + ): + current.append(part) + elif part.endswith(";"): + if current: + packages.append(" ".join(current)) + current = [] + in_marker = True + current.append(part) + elif in_marker: + current.append(part) + if part.endswith(("'", '"')): + in_marker = False + packages.append(" ".join(current)) + current = [] + else: + if current: + packages.append(" ".join(current)) + current = [part] + + if current: + packages.append(" ".join(current)) + + return [pkg.strip() for pkg in packages] + + +async def stream_transaction_install( + packages: list[str], + *, + versions: dict[str, str | None] | None = None, + index_urls: list[str] | None = None, + constraints: list[str] | None = None, +) -> AsyncIterator[tuple[str, bool]]: + """Install via micropip's Transaction API, yielding (name, success) per wheel. + + Mirrors what `micropip.install()` does internally, but exposes the resolved + wheel list so callers can stream progress as each wheel finishes installing + (via `asyncio.as_completed`) instead of waiting for an opaque end-to-end call. + + `index_urls` and `constraints`, when None, fall back to the values on + `micropip._micropip` (the global singleton); when provided, they override + the singleton for this transaction only. + + Yields `(package_name, success)` per requested package. Packages that + resolve to native pyodide-distribution entries are loaded via + `loadPackage`; pure-python wheels are downloaded and extracted in parallel. + """ + import micropip # type: ignore[import-not-found] + from micropip._utils import default_environment # type: ignore[import-not-found] + from micropip.transaction import Transaction # type: ignore[import-not-found] + from packaging.utils import canonicalize_name + + mgr = micropip._micropip # singleton PackageManager + ctx = default_environment() + wheel_base = Path(getsitepackages()[0]) + + flat_requirements: list[str] = [] + for pkg in packages: + version = (versions or {}).get(pkg) + versioned = _append_version(pkg, version) + flat_requirements.extend(_split_packages(versioned)) + + transaction = Transaction( + _compat_layer=mgr.compat_layer, + ctx=ctx, + ctx_extras=[], + keep_going=True, + deps=True, + pre=False, + fetch_kwargs={}, + verbose=False, + index_urls=index_urls if index_urls is not None else mgr.index_urls, + constraints=constraints + if constraints is not None + else mgr.constraints, + reinstall=False, + ) + + await transaction.gather_requirements(flat_requirements) + + # Map normalized name -> original name as the caller spelled it. + requested = {canonicalize_name(p): p for p in packages} + + for failed_name in transaction.failed: + normalized = canonicalize_name(failed_name) + original = requested.pop(normalized, failed_name) + yield (original, False) + + async def _install_wheel(wheel: Any) -> tuple[str, Exception | None]: + try: + await wheel.install(wheel_base, mgr.compat_layer) + except Exception as exc: + return wheel.name, exc + return wheel.name, None + + if transaction.wheels: + tasks = [ + asyncio.create_task(_install_wheel(w)) for w in transaction.wheels + ] + for future in asyncio.as_completed(tasks): + name, exc = await future + normalized = canonicalize_name(name) + if normalized in requested: + original = requested.pop(normalized) + yield (original, exc is None) + + if transaction.pyodide_packages: + await mgr.compat_layer.loadPackage( + mgr.compat_layer.to_js( + [name for name, _, _ in transaction.pyodide_packages] + ) + ) + for name, _, _ in transaction.pyodide_packages: + normalized = canonicalize_name(name) + if normalized in requested: + original = requested.pop(normalized) + yield (original, True) + + importlib.invalidate_caches() + + # Anything still in `requested` may have been pulled in transitively under + # a different name (e.g. caller asked for `foo`, micropip installed `foo-impl`). + # If it's importable now, treat it as installed. + for _normalized, original in list(requested.items()): + try: + importlib.metadata.version(original) + yield (original, True) + except importlib.metadata.PackageNotFoundError: + yield (original, False) diff --git a/marimo/_runtime/packages/package_manager.py b/marimo/_runtime/packages/package_manager.py index b38178468d5..d909143592a 100644 --- a/marimo/_runtime/packages/package_manager.py +++ b/marimo/_runtime/packages/package_manager.py @@ -4,7 +4,7 @@ import abc import subprocess import sys -from collections.abc import Callable +from collections.abc import AsyncIterator, Callable from typing import TYPE_CHECKING import msgspec @@ -115,6 +115,37 @@ async def install( log_callback=log_callback, ) + async def stream_install( + self, + packages: list[str], + *, + versions: dict[str, str | None] | None = None, + index_urls: list[str] | None = None, + log_callback_factory: Callable[[str], LogCallback] | None = None, + ) -> AsyncIterator[tuple[str, bool]]: + """Install packages and yield (name, success) as each completes. + + The default implementation installs sequentially. Subclasses (e.g. + `MicropipPackageManager`) may override to batch installs and stream + real progress. `index_urls` is honored by backends that support it + (currently only micropip); other backends ignore it. + """ + # `index_urls` honored by overriding subclasses (micropip); + # the sequential default ignores it. + del index_urls + for pkg in packages: + if self.attempted_to_install(package=pkg): + yield (pkg, False) + continue + version = (versions or {}).get(pkg) + cb = log_callback_factory(pkg) if log_callback_factory else None + success = await self.install( + pkg, + version=version, + log_callback=cb, + ) + yield (pkg, success) + @abc.abstractmethod async def uninstall(self, package: str, group: str | None = None) -> bool: """Attempt to uninstall a package diff --git a/marimo/_runtime/packages/pypi_package_manager.py b/marimo/_runtime/packages/pypi_package_manager.py index 0dd6ca6f769..3e982f20fb1 100644 --- a/marimo/_runtime/packages/pypi_package_manager.py +++ b/marimo/_runtime/packages/pypi_package_manager.py @@ -8,9 +8,16 @@ import tempfile from functools import cached_property from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Callable from marimo import _loggers from marimo._dependencies.dependencies import DependencyManager +from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, +) from marimo._runtime.packages.module_name_to_pypi_name import ( module_name_to_pypi_name, ) @@ -239,6 +246,60 @@ async def _install( log_callback(f"Failed to install {package}: {e}\n") return False + async def stream_install( + self, + packages: list[str], + *, + versions: dict[str, str | None] | None = None, + index_urls: list[str] | None = None, + log_callback_factory: Callable[[str], LogCallback] | None = None, + ) -> AsyncIterator[tuple[str, bool]]: + """Batch-install via micropip Transaction internals, streaming progress. + + Wraps `stream_transaction_install` with marimo bookkeeping + (`_attempted_packages`) and log-callback glue. Falls back to the + base sequential path if micropip's internal API has shifted. + """ + assert is_pyodide() + + if log_callback_factory: + for pkg in packages: + log_callback_factory(pkg)(f"Resolving {pkg}...\n") + + try: + async for pkg, success in stream_transaction_install( + packages, + versions=versions, + index_urls=index_urls, + ): + # Mark only as the engine resolves each package — if the + # engine raises before any yields, the fallback path needs + # to start clean (it will mark via `install()`). + self._attempted_packages.add(pkg) + if log_callback_factory: + msg = ( + f"Successfully installed {pkg}\n" + if success + else f"Failed to install {pkg}\n" + ) + log_callback_factory(pkg)(msg) + yield (pkg, success) + except (AttributeError, ImportError, TypeError): + # micropip's private Transaction API shifted; fall back to the + # base sequential path. Narrow catch: install errors should + # surface, only API-shape mismatches trigger the fallback. + LOGGER.warning( + "micropip Transaction API unavailable, falling back to sequential installs", + exc_info=True, + ) + async for result in super().stream_install( + packages, + versions=versions, + index_urls=index_urls, + log_callback_factory=log_callback_factory, + ): + yield result + async def uninstall(self, package: str, group: str | None = None) -> bool: # The `group` parameter is accepted for interface compatibility, but is ignored. del group diff --git a/tests/_runtime/packages/test_micropip_streaming.py b/tests/_runtime/packages/test_micropip_streaming.py new file mode 100644 index 00000000000..5a3a42d94c7 --- /dev/null +++ b/tests/_runtime/packages/test_micropip_streaming.py @@ -0,0 +1,257 @@ +# Copyright 2026 Marimo. All rights reserved. +"""Unit tests for the marimo-free `stream_transaction_install` engine. + +micropip itself only runs inside Pyodide, so we monkeypatch +`micropip._micropip` + `micropip.transaction.Transaction` with stubs that +simulate the resolution / install steps deterministically. +""" + +from __future__ import annotations + +import asyncio +import sys +import types +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import pytest + + +@dataclass +class _FakeWheel: + name: str + install_error: Exception | None = None + install_event: asyncio.Event | None = None + + async def install(self, _wheel_base: Any, _compat: Any) -> None: + if self.install_event is not None: + await self.install_event.wait() + if self.install_error is not None: + raise self.install_error + + +@dataclass +class _FakeTransaction: + wheels: list[_FakeWheel] = field(default_factory=list) + failed: list[str] = field(default_factory=list) + pyodide_packages: list[tuple[str, str, str]] = field(default_factory=list) + seen_requirements: list[str] = field(default_factory=list) + init_kwargs: dict[str, Any] = field(default_factory=dict) + + async def gather_requirements(self, requirements: list[str]) -> None: + self.seen_requirements.extend(requirements) + + +class _FakeCompatLayer: + def __init__(self) -> None: + self.loaded: list[list[str]] = [] + + def to_js(self, value: Any) -> Any: + return value + + async def loadPackage(self, names: list[str]) -> None: + self.loaded.append(list(names)) + + +@dataclass +class _FakeMicropipManager: + compat_layer: _FakeCompatLayer = field(default_factory=_FakeCompatLayer) + index_urls: list[str] = field(default_factory=lambda: ["https://default/"]) + constraints: list[str] = field(default_factory=list) + + +def _install_fake_micropip( + monkeypatch: Any, fake_tx: _FakeTransaction +) -> _FakeMicropipManager: + """Install fake `micropip`, `micropip._utils`, `micropip.transaction`, and + `packaging.utils` modules into sys.modules so the engine resolves to them. + """ + fake_mgr = _FakeMicropipManager() + + micropip_mod = types.ModuleType("micropip") + micropip_mod._micropip = fake_mgr # type: ignore[attr-defined] + + utils_mod = types.ModuleType("micropip._utils") + utils_mod.default_environment = dict # type: ignore[attr-defined] + + txn_mod = types.ModuleType("micropip.transaction") + + def _Transaction(**kwargs: Any) -> _FakeTransaction: + fake_tx.init_kwargs = kwargs + return fake_tx + + txn_mod.Transaction = _Transaction # type: ignore[attr-defined] + + monkeypatch.setitem(sys.modules, "micropip", micropip_mod) + monkeypatch.setitem(sys.modules, "micropip._utils", utils_mod) + monkeypatch.setitem(sys.modules, "micropip.transaction", txn_mod) + return fake_mgr + + +async def _drain(agen: Any) -> list[tuple[str, bool]]: + out: list[tuple[str, bool]] = [] + async for item in agen: + out.append(item) + return out + + +async def test_two_wheels_both_succeed( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo"), _FakeWheel("bar")]) + _install_fake_micropip(monkeypatch, tx) + + results = await _drain(stream_transaction_install(["foo", "bar"])) + assert sorted(results) == [("bar", True), ("foo", True)] + + +async def test_wheel_install_failure_no_double_yield( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction( + wheels=[ + _FakeWheel("foo", install_error=RuntimeError("boom")), + _FakeWheel("bar"), + ] + ) + _install_fake_micropip(monkeypatch, tx) + + results = await _drain(stream_transaction_install(["foo", "bar"])) + assert sorted(results) == [("bar", True), ("foo", False)] + # Each requested package yields exactly once. + assert [name for name, _ in results].count("foo") == 1 + + +async def test_resolution_failure_yields_false( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(failed=["foo"]) + _install_fake_micropip(monkeypatch, tx) + + results = await _drain(stream_transaction_install(["foo"])) + assert results == [("foo", False)] + + +async def test_pyodide_package_only_loadpackage_once( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction( + pyodide_packages=[("numpy", "1.26.0", "default")], + ) + fake_mgr = _install_fake_micropip(monkeypatch, tx) + + results = await _drain(stream_transaction_install(["numpy"])) + assert results == [("numpy", True)] + assert fake_mgr.compat_layer.loaded == [["numpy"]] + + +async def test_index_urls_override_passed_to_transaction( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + fake_mgr = _install_fake_micropip(monkeypatch, tx) + + override = ["https://my.private/simple", "https://extra/simple"] + await _drain(stream_transaction_install(["foo"], index_urls=override)) + assert tx.init_kwargs["index_urls"] == override + # And the singleton's was NOT used. + assert tx.init_kwargs["index_urls"] is not fake_mgr.index_urls + + +async def test_index_urls_none_falls_back_to_singleton( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + fake_mgr = _install_fake_micropip(monkeypatch, tx) + + await _drain(stream_transaction_install(["foo"], index_urls=None)) + assert tx.init_kwargs["index_urls"] is fake_mgr.index_urls + + +async def test_url_spec_passthrough(monkeypatch: pytest.MonkeyPatch) -> None: + """PEP 508 URL specs in the packages list must reach gather_requirements + unaltered — the engine shouldn't try to be clever about them.""" + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + _install_fake_micropip(monkeypatch, tx) + + spec = "foo @ git+https://example.com/repo@deadbeef" + await _drain(stream_transaction_install([spec])) + assert spec in tx.seen_requirements + + +async def test_versions_appended(monkeypatch: pytest.MonkeyPatch) -> None: + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + _install_fake_micropip(monkeypatch, tx) + + await _drain( + stream_transaction_install(["foo"], versions={"foo": "1.2.3"}) + ) + assert "foo==1.2.3" in tx.seen_requirements + + +async def test_streaming_order_first_done_first_yielded( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The engine should yield as each wheel completes, not in submission order.""" + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + fast_done = asyncio.Event() + slow_done = asyncio.Event() + tx = _FakeTransaction( + wheels=[ + _FakeWheel("slow", install_event=slow_done), + _FakeWheel("fast", install_event=fast_done), + ] + ) + _install_fake_micropip(monkeypatch, tx) + + async def driver() -> list[tuple[str, bool]]: + results: list[tuple[str, bool]] = [] + agen = stream_transaction_install(["slow", "fast"]) + # Let `fast` finish first. + fast_done.set() + async for item in agen: + results.append(item) + if item[0] == "fast": + # Now release `slow` so the generator can complete. + slow_done.set() + return results + + results = await driver() + assert results[0] == ("fast", True) + assert results[1] == ("slow", True) diff --git a/tests/_runtime/test_manage_script_metadata.py b/tests/_runtime/test_manage_script_metadata.py index ae00199ac5b..ec89876173f 100644 --- a/tests/_runtime/test_manage_script_metadata.py +++ b/tests/_runtime/test_manage_script_metadata.py @@ -19,6 +19,7 @@ CommandMessage, InstallPackagesCommand, ) +from marimo._runtime.packages.package_manager import LogCallback from marimo._runtime.packages.package_managers import create_package_manager from marimo._runtime.packages.pypi_package_manager import ( MicropipPackageManager, @@ -30,10 +31,38 @@ if TYPE_CHECKING: import pathlib + from collections.abc import AsyncIterator, Callable HAS_UV = DependencyManager.which("uv") +def _make_stream_install(mock_pm: Mock) -> Any: + """Create an async generator ``stream_install`` that delegates to + the mock's ``install`` method, matching the base-class default + behaviour so the existing test expectations hold.""" + + async def stream_install( + packages: list[str], + *, + versions: dict[str, str | None] | None = None, + index_urls: list[str] | None = None, + log_callback_factory: Callable[[str], LogCallback] | None = None, + ) -> AsyncIterator[tuple[str, bool]]: + del index_urls + for pkg in packages: + if mock_pm.attempted_to_install(package=pkg): + yield (pkg, False) + continue + version = (versions or {}).get(pkg) + cb = log_callback_factory(pkg) if log_callback_factory else None + success = await mock_pm.install( + pkg, version=version, log_callback=cb, + ) + yield (pkg, success) + + return stream_install + + @pytest.mark.skipif(not HAS_UV, reason="uv not installed") @patch( "marimo._runtime.packages.pypi_package_manager.UvPackageManager.is_in_uv_project", @@ -528,6 +557,7 @@ async def mock_install(pkg: str, version=None, log_callback=None): return True mock_package_manager.install = AsyncMock(side_effect=mock_install) + mock_package_manager.stream_install = _make_stream_install(mock_package_manager) # Set up packages callbacks k.packages_callbacks.package_manager = mock_package_manager @@ -604,6 +634,7 @@ async def mock_install_fail(pkg: str, version=None, log_callback=None): return False # Installation failed mock_package_manager.install = AsyncMock(side_effect=mock_install_fail) + mock_package_manager.stream_install = _make_stream_install(mock_package_manager) k.packages_callbacks.package_manager = mock_package_manager with ( @@ -668,6 +699,7 @@ async def mock_install(pkg: str, version=None, log_callback=None): return True mock_package_manager.install = AsyncMock(side_effect=mock_install) + mock_package_manager.stream_install = _make_stream_install(mock_package_manager) k.packages_callbacks.package_manager = mock_package_manager with ( @@ -743,6 +775,7 @@ async def mock_install_old_style(pkg: str, version=None, **kwargs: Any): mock_package_manager.install = AsyncMock( side_effect=mock_install_old_style ) + mock_package_manager.stream_install = _make_stream_install(mock_package_manager) k.packages_callbacks.package_manager = mock_package_manager with ( From 52b3b5bc0e19779957fd1871cb1be714da38111b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 20:15:15 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../_runtime/packages/_micropip_streaming.py | 8 ++++++-- tests/_runtime/test_manage_script_metadata.py | 20 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/marimo/_runtime/packages/_micropip_streaming.py b/marimo/_runtime/packages/_micropip_streaming.py index 0ae75b4ea71..5945445503c 100644 --- a/marimo/_runtime/packages/_micropip_streaming.py +++ b/marimo/_runtime/packages/_micropip_streaming.py @@ -84,8 +84,12 @@ async def stream_transaction_install( `loadPackage`; pure-python wheels are downloaded and extracted in parallel. """ import micropip # type: ignore[import-not-found] - from micropip._utils import default_environment # type: ignore[import-not-found] - from micropip.transaction import Transaction # type: ignore[import-not-found] + from micropip._utils import ( + default_environment, # type: ignore[import-not-found] + ) + from micropip.transaction import ( + Transaction, # type: ignore[import-not-found] + ) from packaging.utils import canonicalize_name mgr = micropip._micropip # singleton PackageManager diff --git a/tests/_runtime/test_manage_script_metadata.py b/tests/_runtime/test_manage_script_metadata.py index ec89876173f..f6baaf052ee 100644 --- a/tests/_runtime/test_manage_script_metadata.py +++ b/tests/_runtime/test_manage_script_metadata.py @@ -56,7 +56,9 @@ async def stream_install( version = (versions or {}).get(pkg) cb = log_callback_factory(pkg) if log_callback_factory else None success = await mock_pm.install( - pkg, version=version, log_callback=cb, + pkg, + version=version, + log_callback=cb, ) yield (pkg, success) @@ -557,7 +559,9 @@ async def mock_install(pkg: str, version=None, log_callback=None): return True mock_package_manager.install = AsyncMock(side_effect=mock_install) - mock_package_manager.stream_install = _make_stream_install(mock_package_manager) + mock_package_manager.stream_install = _make_stream_install( + mock_package_manager + ) # Set up packages callbacks k.packages_callbacks.package_manager = mock_package_manager @@ -634,7 +638,9 @@ async def mock_install_fail(pkg: str, version=None, log_callback=None): return False # Installation failed mock_package_manager.install = AsyncMock(side_effect=mock_install_fail) - mock_package_manager.stream_install = _make_stream_install(mock_package_manager) + mock_package_manager.stream_install = _make_stream_install( + mock_package_manager + ) k.packages_callbacks.package_manager = mock_package_manager with ( @@ -699,7 +705,9 @@ async def mock_install(pkg: str, version=None, log_callback=None): return True mock_package_manager.install = AsyncMock(side_effect=mock_install) - mock_package_manager.stream_install = _make_stream_install(mock_package_manager) + mock_package_manager.stream_install = _make_stream_install( + mock_package_manager + ) k.packages_callbacks.package_manager = mock_package_manager with ( @@ -775,7 +783,9 @@ async def mock_install_old_style(pkg: str, version=None, **kwargs: Any): mock_package_manager.install = AsyncMock( side_effect=mock_install_old_style ) - mock_package_manager.stream_install = _make_stream_install(mock_package_manager) + mock_package_manager.stream_install = _make_stream_install( + mock_package_manager + ) k.packages_callbacks.package_manager = mock_package_manager with ( From 4307799340ee7bc776c624205270587e05ab8164 Mon Sep 17 00:00:00 2001 From: dmadisetti Date: Wed, 27 May 2026 15:32:45 -0700 Subject: [PATCH 3/3] fix: address cubic review + CI failures on micropip streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Track requested packages by parsed PEP 508 base name (via packaging.requirements.Requirement), so versioned / URL specs like `foo==1.0` and `foo @ git+…@ref` correctly match the wheel names micropip yields back instead of getting reported as failed. - Replace `# type: ignore` micropip imports with importlib.import_module so the module loads cleanly outside Pyodide and mypy doesn't need to see the optional dep at all. - Isolate loadPackage failures so one pyodide-batch error yields per-package False rather than terminating the generator. - Fallback path now retries only packages the engine didn't yield, avoiding double-yield + (pkg, False) reports for packages that already succeeded. - Defensive isinstance(url, str) checks when reading PEP 723 index config from the notebook (guards malformed pyproject entries). - Regenerate packages/openapi/api.yaml for the new index_urls field. --- marimo/_runtime/callbacks/packages.py | 10 ++- .../_runtime/packages/_micropip_streaming.py | 49 ++++++++---- .../_runtime/packages/pypi_package_manager.py | 5 +- packages/openapi/api.yaml | 13 ++++ packages/openapi/src/api.ts | 7 ++ .../packages/test_micropip_streaming.py | 75 +++++++++++++++++++ 6 files changed, 139 insertions(+), 20 deletions(-) diff --git a/marimo/_runtime/callbacks/packages.py b/marimo/_runtime/callbacks/packages.py index 5465e690061..b3a8c1e13af 100644 --- a/marimo/_runtime/callbacks/packages.py +++ b/marimo/_runtime/callbacks/packages.py @@ -63,12 +63,16 @@ def _notebook_index_urls(self) -> list[str]: except Exception: return [] urls: list[str] = [] - if reader.index_url: + if isinstance(reader.index_url, str) and reader.index_url: urls.append(reader.index_url) - urls.extend(reader.extra_index_urls) + for extra in reader.extra_index_urls: + if isinstance(extra, str) and extra and extra not in urls: + urls.append(extra) for entry in reader.index_configs: + if not isinstance(entry, dict): + continue url = entry.get("url") - if url and url not in urls: + if isinstance(url, str) and url and url not in urls: urls.append(url) return urls diff --git a/marimo/_runtime/packages/_micropip_streaming.py b/marimo/_runtime/packages/_micropip_streaming.py index 5945445503c..1019fa63d56 100644 --- a/marimo/_runtime/packages/_micropip_streaming.py +++ b/marimo/_runtime/packages/_micropip_streaming.py @@ -83,13 +83,15 @@ async def stream_transaction_install( resolve to native pyodide-distribution entries are loaded via `loadPackage`; pure-python wheels are downloaded and extracted in parallel. """ - import micropip # type: ignore[import-not-found] - from micropip._utils import ( - default_environment, # type: ignore[import-not-found] - ) - from micropip.transaction import ( - Transaction, # type: ignore[import-not-found] - ) + # Lazy / dynamic import so this module loads cleanly outside Pyodide + # (where micropip isn't installed). A ModuleNotFoundError here propagates + # as ImportError, which the marimo-side wrapper catches and falls back. + micropip = importlib.import_module("micropip") + default_environment = importlib.import_module( + "micropip._utils" + ).default_environment + Transaction = importlib.import_module("micropip.transaction").Transaction + from packaging.requirements import Requirement from packaging.utils import canonicalize_name mgr = micropip._micropip # singleton PackageManager @@ -120,8 +122,21 @@ async def stream_transaction_install( await transaction.gather_requirements(flat_requirements) - # Map normalized name -> original name as the caller spelled it. - requested = {canonicalize_name(p): p for p in packages} + # Map normalized base-package-name -> original spec as the caller spelled + # it. The caller's strings may include version specifiers, URL specs, or + # markers (`foo==1.0`, `foo @ git+…@ref`, `foo; python_version>'3.10'`), + # so parse out the base name before canonicalizing — otherwise the wheel + # name micropip yields back ("foo") will fail to match the spec string. + requested: dict[str, str] = {} + for spec in packages: + try: + base_name = Requirement(spec).name + except Exception: + # Editable/path specs and other oddities Requirement can't parse; + # fall back to canonicalizing the whole string. Worst case it + # gets reconciled by the importlib.metadata pass at the end. + base_name = spec + requested[canonicalize_name(base_name)] = spec for failed_name in transaction.failed: normalized = canonicalize_name(failed_name) @@ -147,16 +162,18 @@ async def _install_wheel(wheel: Any) -> tuple[str, Exception | None]: yield (original, exc is None) if transaction.pyodide_packages: - await mgr.compat_layer.loadPackage( - mgr.compat_layer.to_js( - [name for name, _, _ in transaction.pyodide_packages] - ) - ) - for name, _, _ in transaction.pyodide_packages: + names = [name for name, _, _ in transaction.pyodide_packages] + try: + await mgr.compat_layer.loadPackage(mgr.compat_layer.to_js(names)) + except Exception: + load_succeeded = False + else: + load_succeeded = True + for name in names: normalized = canonicalize_name(name) if normalized in requested: original = requested.pop(normalized) - yield (original, True) + yield (original, load_succeeded) importlib.invalidate_caches() diff --git a/marimo/_runtime/packages/pypi_package_manager.py b/marimo/_runtime/packages/pypi_package_manager.py index 3e982f20fb1..06768f50271 100644 --- a/marimo/_runtime/packages/pypi_package_manager.py +++ b/marimo/_runtime/packages/pypi_package_manager.py @@ -266,6 +266,7 @@ async def stream_install( for pkg in packages: log_callback_factory(pkg)(f"Resolving {pkg}...\n") + yielded: set[str] = set() try: async for pkg, success in stream_transaction_install( packages, @@ -276,6 +277,7 @@ async def stream_install( # engine raises before any yields, the fallback path needs # to start clean (it will mark via `install()`). self._attempted_packages.add(pkg) + yielded.add(pkg) if log_callback_factory: msg = ( f"Successfully installed {pkg}\n" @@ -292,8 +294,9 @@ async def stream_install( "micropip Transaction API unavailable, falling back to sequential installs", exc_info=True, ) + remaining = [p for p in packages if p not in yielded] async for result in super().stream_install( - packages, + remaining, versions=versions, index_urls=index_urls, log_callback_factory=log_callback_factory, diff --git a/packages/openapi/api.yaml b/packages/openapi/api.yaml index 91ca46def5c..623cd00f26c 100644 --- a/packages/openapi/api.yaml +++ b/packages/openapi/api.yaml @@ -2222,12 +2222,20 @@ components: \ or manually by the user.\n\n Attributes:\n manager: Package manager\ \ to use ('pip', 'conda', 'uv', etc.).\n versions: Package names mapped\ \ to version specifiers. Empty version\n means install latest.\n\ + \ index_urls: Alternative package index URLs. Primary index first,\n\ + \ then extras. Honored by backends that support custom\n\ + \ indexes (currently micropip); other backends ignore it.\n\ \ source: Where to install. \"kernel\" (default) dispatches to the\ \ kernel\n subprocess; \"server\" installs directly into the\ \ server's Python\n environment (sys.executable), used when\ \ the server itself needs\n a package (e.g. nbformat for IPYNB\ \ auto-export in sandbox mode)." properties: + indexUrls: + default: [] + items: + type: string + type: array manager: type: string source: @@ -2250,6 +2258,11 @@ components: type: object InstallPackagesRequest: properties: + indexUrls: + default: [] + items: + type: string + type: array manager: type: string source: diff --git a/packages/openapi/src/api.ts b/packages/openapi/src/api.ts index 3cf49d21074..772a59df554 100644 --- a/packages/openapi/src/api.ts +++ b/packages/openapi/src/api.ts @@ -4704,12 +4704,17 @@ export interface components { * manager: Package manager to use ('pip', 'conda', 'uv', etc.). * versions: Package names mapped to version specifiers. Empty version * means install latest. + * index_urls: Alternative package index URLs. Primary index first, + * then extras. Honored by backends that support custom + * indexes (currently micropip); other backends ignore it. * source: Where to install. "kernel" (default) dispatches to the kernel * subprocess; "server" installs directly into the server's Python * environment (sys.executable), used when the server itself needs * a package (e.g. nbformat for IPYNB auto-export in sandbox mode). */ InstallPackagesCommand: { + /** @default [] */ + indexUrls?: string[]; manager: string; /** * @default kernel @@ -4724,6 +4729,8 @@ export interface components { }; /** InstallPackagesRequest */ InstallPackagesRequest: { + /** @default [] */ + indexUrls?: string[]; manager: string; /** * @default kernel diff --git a/tests/_runtime/packages/test_micropip_streaming.py b/tests/_runtime/packages/test_micropip_streaming.py index 5a3a42d94c7..db9c00f7c2d 100644 --- a/tests/_runtime/packages/test_micropip_streaming.py +++ b/tests/_runtime/packages/test_micropip_streaming.py @@ -255,3 +255,78 @@ async def driver() -> list[tuple[str, bool]]: results = await driver() assert results[0] == ("fast", True) assert results[1] == ("slow", True) + + +async def test_versioned_spec_tracked_correctly( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A versioned spec ("foo==1.0") must be tracked by its base name so the + wheel install ("foo") is recognized as fulfilling the request.""" + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + _install_fake_micropip(monkeypatch, tx) + + results = await _drain(stream_transaction_install(["foo==1.0"])) + # Yields the original spec string, not the bare name. + assert results == [("foo==1.0", True)] + + +async def test_url_spec_tracked_correctly( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A PEP 508 URL spec ("foo @ git+…") must be tracked by base name too.""" + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + tx = _FakeTransaction(wheels=[_FakeWheel("foo")]) + _install_fake_micropip(monkeypatch, tx) + + spec = "foo @ git+https://example.com/repo@deadbeef" + results = await _drain(stream_transaction_install([spec])) + assert results == [(spec, True)] + + +async def test_loadpackage_failure_yields_false_no_terminate( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A loadPackage exception must yield (name, False) per pyodide package + rather than crashing the generator mid-stream.""" + from marimo._runtime.packages._micropip_streaming import ( + stream_transaction_install, + ) + + class _BadCompatLayer(_FakeCompatLayer): + async def loadPackage(self, names: list[str]) -> None: + del names + raise RuntimeError("pyodide load failed") + + fake_mgr = _FakeMicropipManager(compat_layer=_BadCompatLayer()) + tx = _FakeTransaction( + wheels=[_FakeWheel("foo")], + pyodide_packages=[("numpy", "1.26.0", "default")], + ) + + micropip_mod = types.ModuleType("micropip") + micropip_mod._micropip = fake_mgr # type: ignore[attr-defined] + utils_mod = types.ModuleType("micropip._utils") + utils_mod.default_environment = dict # type: ignore[attr-defined] + txn_mod = types.ModuleType("micropip.transaction") + + def _Transaction(**kwargs: Any) -> _FakeTransaction: + tx.init_kwargs = kwargs + return tx + + txn_mod.Transaction = _Transaction # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "micropip", micropip_mod) + monkeypatch.setitem(sys.modules, "micropip._utils", utils_mod) + monkeypatch.setitem(sys.modules, "micropip.transaction", txn_mod) + + results = await _drain(stream_transaction_install(["foo", "numpy"])) + # Generator completed (no crash); numpy yielded False from the failure. + assert ("numpy", False) in results + # foo wheel still succeeded independently. + assert ("foo", True) in results