Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions frontend/src/core/packages/useInstallPackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ export function useInstallPackages(): {
setLoading(true);

try {
for (const [idx, packageName] of packages.entries()) {
const response = await addPackage({ package: packageName });
if (response.success) {
// Batch all packages into a single install call.
// The worker splits by space and passes the full list to micropip,
// which resolves and downloads in parallel internally.
const response = await addPackage({ package: packages.join(" ") });
if (response.success) {
for (const packageName of packages) {
showAddPackageToast(packageName);
} else {
showAddPackageToast(packageName, response.error);
}
// Wait 1s if there are more packages to install
if (idx < packages.length - 1) {
await new Promise((resolve) => setTimeout(resolve, 1000));
} else {
for (const packageName of packages) {
showAddPackageToast(packageName, response.error);
}
}
onSuccess?.();
Expand Down
91 changes: 66 additions & 25 deletions marimo/_runtime/callbacks/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,37 @@ def __init__(self, kernel: Kernel):
def register(self, router: RequestRouter) -> None:
router.register(InstallPackagesCommand, self._handle_install)

def _notebook_index_urls(self) -> list[str]:
"""Read PEP 723 index config from the current notebook.

Returns a flat list with the primary `index-url` first, then any
`extra-index-url` entries, then `[[tool.uv.index]]` URLs.
Returns `[]` if there's no filename or no config — the receiving
backend will fall back to its default index.
"""
filename = self._kernel.app_metadata.filename
if not filename:
return []
try:
from marimo._utils.inline_script_metadata import PyProjectReader

reader = PyProjectReader.from_filename(filename)
except Exception:
return []
urls: list[str] = []
if isinstance(reader.index_url, str) and reader.index_url:
urls.append(reader.index_url)
for extra in reader.extra_index_urls:
if isinstance(extra, str) and extra and extra not in urls:
urls.append(extra)
for entry in reader.index_configs:
if not isinstance(entry, dict):
continue
url = entry.get("url")
if isinstance(url, str) and url and url not in urls:
urls.append(url)
return urls

async def _handle_install(self, request: InstallPackagesCommand) -> None:
await self.install_missing_packages(request)
broadcast_notification(CompletedRunNotification())
Expand Down Expand Up @@ -151,7 +182,9 @@ def missing_packages_hook(
version = {pkg: "" for pkg in packages}
self._kernel.enqueue_control_request(
InstallPackagesCommand(
manager=self.package_manager.name, versions=version
manager=self.package_manager.name,
versions=version,
index_urls=self._notebook_index_urls(),
)
)
else:
Expand Down Expand Up @@ -227,34 +260,43 @@ def log_callback(log_line: str) -> None:

return log_callback

# Mark every still-installable package as "installing" up-front so the
# UI can render the batch state before any wheel completes.
for pkg in missing_packages:
if self.package_manager.attempted_to_install(package=pkg):
# Already attempted an installation; it must have failed.
# Skip the installation.
continue
package_statuses[pkg] = "installing"
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses, source=request.source
)
if not self.package_manager.attempted_to_install(package=pkg):
package_statuses[pkg] = "installing"
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses, source=request.source
)

# Send initial "start" log
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses,
logs={pkg: f"Installing {pkg}...\n"},
log_status="start",
source=request.source,
)
for pkg in missing_packages:
if package_statuses.get(pkg) == "installing":
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses,
logs={pkg: f"Installing {pkg}...\n"},
log_status="start",
source=request.source,
)
)
)

version = request.versions.get(pkg)
if await self.package_manager.install(
pkg, version=version, log_callback=create_log_callback(pkg)
):
installable = [
pkg
for pkg in missing_packages
if not self.package_manager.attempted_to_install(package=pkg)
]
versions: dict[str, str | None] = {
pkg: request.versions.get(pkg) for pkg in installable
}
async for pkg, success in self.package_manager.stream_install(
installable,
versions=versions,
index_urls=request.index_urls or None,
log_callback_factory=create_log_callback,
):
if success:
package_statuses[pkg] = "installed"
# Send final "done" log
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses,
Expand All @@ -267,7 +309,6 @@ def log_callback(log_line: str) -> None:
package_statuses[pkg] = "failed"
mod = self.package_manager.package_to_module(pkg)
self._kernel.module_registry.excluded_modules.add(mod)
# Send final "done" log with error
broadcast_notification(
InstallingPackageAlertNotification(
packages=package_statuses,
Expand Down
7 changes: 6 additions & 1 deletion marimo/_runtime/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,20 +608,25 @@ class InstallPackagesCommand(Command):
manager: Package manager to use ('pip', 'conda', 'uv', etc.).
versions: Package names mapped to version specifiers. Empty version
means install latest.
index_urls: Alternative package index URLs. Primary index first,
then extras. Honored by backends that support custom
indexes (currently micropip); other backends ignore it.
source: Where to install. "kernel" (default) dispatches to the kernel
subprocess; "server" installs directly into the server's Python
environment (sys.executable), used when the server itself needs
a package (e.g. nbformat for IPYNB auto-export in sandbox mode).
"""

# TODO: index URL (index/channel/...)
manager: str

# Map from package name to desired version
# If the package name is not in the map, the latest version
# will be installed
versions: dict[str, str]

# Alternative package index URLs (primary + extras, primary first).
index_urls: list[str] = msgspec.field(default_factory=list)

source: Literal["kernel", "server"] = "kernel"


Expand Down
188 changes: 188 additions & 0 deletions marimo/_runtime/packages/_micropip_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright 2026 Marimo. All rights reserved.
# This module is intentionally marimo-free so it can be contributed
# back to pyodide/micropip as a streaming install primitive.
# Do NOT import from marimo.* here.
from __future__ import annotations

import asyncio
import importlib
import importlib.metadata
from pathlib import Path
from site import getsitepackages
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import AsyncIterator


def _append_version(pkg_name: str, version: str | None) -> str:
"""Qualify a version string with a leading '==' if it doesn't have one."""
if version is None or version in ("", "latest"):
return pkg_name
return f"{pkg_name}=={version}"


def _split_packages(package: str) -> list[str]:
"""Split a whitespace-joined package spec into individual requirements.

Handles editable installs (`-e <path>`), direct URLs (`pkg @ url`),
and PEP 508 environment markers (`pkg==1.0; python_version>'3.6'`).
"""
packages: list[str] = []
current: list[str] = []
in_marker = False

for part in package.split():
if (
part in ("-e", "--editable", "@")
or current
and current[-1] in ("-e", "--editable", "@")
):
current.append(part)
elif part.endswith(";"):
if current:
packages.append(" ".join(current))
current = []
in_marker = True
current.append(part)
elif in_marker:
current.append(part)
if part.endswith(("'", '"')):
in_marker = False
packages.append(" ".join(current))
current = []
else:
if current:
packages.append(" ".join(current))
current = [part]

if current:
packages.append(" ".join(current))

return [pkg.strip() for pkg in packages]


async def stream_transaction_install(
packages: list[str],
*,
versions: dict[str, str | None] | None = None,
index_urls: list[str] | None = None,
constraints: list[str] | None = None,
) -> AsyncIterator[tuple[str, bool]]:
"""Install via micropip's Transaction API, yielding (name, success) per wheel.

Mirrors what `micropip.install()` does internally, but exposes the resolved
wheel list so callers can stream progress as each wheel finishes installing
(via `asyncio.as_completed`) instead of waiting for an opaque end-to-end call.

`index_urls` and `constraints`, when None, fall back to the values on
`micropip._micropip` (the global singleton); when provided, they override
the singleton for this transaction only.

Yields `(package_name, success)` per requested package. Packages that
resolve to native pyodide-distribution entries are loaded via
`loadPackage`; pure-python wheels are downloaded and extracted in parallel.
"""
# Lazy / dynamic import so this module loads cleanly outside Pyodide
# (where micropip isn't installed). A ModuleNotFoundError here propagates
# as ImportError, which the marimo-side wrapper catches and falls back.
micropip = importlib.import_module("micropip")
default_environment = importlib.import_module(
"micropip._utils"
).default_environment
Transaction = importlib.import_module("micropip.transaction").Transaction
from packaging.requirements import Requirement
from packaging.utils import canonicalize_name

mgr = micropip._micropip # singleton PackageManager
ctx = default_environment()
wheel_base = Path(getsitepackages()[0])

flat_requirements: list[str] = []
for pkg in packages:
version = (versions or {}).get(pkg)
versioned = _append_version(pkg, version)
flat_requirements.extend(_split_packages(versioned))

transaction = Transaction(
_compat_layer=mgr.compat_layer,
ctx=ctx,
ctx_extras=[],
keep_going=True,
deps=True,
pre=False,
fetch_kwargs={},
verbose=False,
index_urls=index_urls if index_urls is not None else mgr.index_urls,
constraints=constraints
if constraints is not None
else mgr.constraints,
reinstall=False,
)

await transaction.gather_requirements(flat_requirements)

# Map normalized base-package-name -> original spec as the caller spelled
# it. The caller's strings may include version specifiers, URL specs, or
# markers (`foo==1.0`, `foo @ git+…@ref`, `foo; python_version>'3.10'`),
# so parse out the base name before canonicalizing — otherwise the wheel
# name micropip yields back ("foo") will fail to match the spec string.
requested: dict[str, str] = {}
for spec in packages:
try:
base_name = Requirement(spec).name
except Exception:
# Editable/path specs and other oddities Requirement can't parse;
# fall back to canonicalizing the whole string. Worst case it
# gets reconciled by the importlib.metadata pass at the end.
base_name = spec
requested[canonicalize_name(base_name)] = spec

for failed_name in transaction.failed:
normalized = canonicalize_name(failed_name)
original = requested.pop(normalized, failed_name)
yield (original, False)

async def _install_wheel(wheel: Any) -> tuple[str, Exception | None]:
try:
await wheel.install(wheel_base, mgr.compat_layer)
except Exception as exc:
return wheel.name, exc
return wheel.name, None

if transaction.wheels:
tasks = [
asyncio.create_task(_install_wheel(w)) for w in transaction.wheels
]
for future in asyncio.as_completed(tasks):
name, exc = await future
normalized = canonicalize_name(name)
if normalized in requested:
original = requested.pop(normalized)
yield (original, exc is None)

if transaction.pyodide_packages:
names = [name for name, _, _ in transaction.pyodide_packages]
try:
await mgr.compat_layer.loadPackage(mgr.compat_layer.to_js(names))
except Exception:
load_succeeded = False
else:
load_succeeded = True
for name in names:
normalized = canonicalize_name(name)
if normalized in requested:
original = requested.pop(normalized)
yield (original, load_succeeded)

importlib.invalidate_caches()

# Anything still in `requested` may have been pulled in transitively under
# a different name (e.g. caller asked for `foo`, micropip installed `foo-impl`).
# If it's importable now, treat it as installed.
for _normalized, original in list(requested.items()):
try:
importlib.metadata.version(original)
yield (original, True)
except importlib.metadata.PackageNotFoundError:
yield (original, False)
Loading
Loading