diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..8ecd9e3 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,71 @@ +name: Docs + +on: + push: + branches: [master] + pull_request: + branches: [master] + workflow_dispatch: + +permissions: + contents: read + pages: write + id-token: write + +concurrency: + group: "pages-${{ github.ref }}" + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: false + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y texlive-latex-extra texlive-fonts-recommended texlive-science dvipng cm-super libosmesa6-dev libgl1-mesa-dev + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install dependencies + run: uv sync + + - name: Fetch MLflow artifacts + env: + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + run: uv run python main.py --fetch + + - name: Build Sphinx docs + env: + PYVISTA_OFF_SCREEN: "true" + PYTHONPATH: "${{ github.workspace }}" + run: uv run python main.py --docs + + - name: Upload artifact + if: github.ref == 'refs/heads/main' + uses: actions/upload-pages-artifact@v3 + with: + path: docs/build/html + + deploy: + if: github.ref == 'refs/heads/main' + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + runs-on: ubuntu-latest + needs: build + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 diff --git a/.gitignore b/.gitignore index f8715dd..276a422 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ docs/reports/TexReport/ # Generated data (keep README.md) -data/* +#data/* # Uv stuff uv.lock @@ -18,6 +18,7 @@ docs/source/generated/ docs/source/example_gallery/ docs/source/_autosummary/ docs/source/gen_modules/backreferences/ +sg_execution_times* # macOS .DS_Store diff --git a/CODE_REVIEW.md b/CODE_REVIEW.md deleted file mode 100644 index 2a2cfe4..0000000 --- a/CODE_REVIEW.md +++ /dev/null @@ -1,37 +0,0 @@ -# Code Review - COMPLETED - -All phases completed successfully. 68 tests passing. - -## Summary of Changes - -### Phase 0: Pytest Suite -- `tests/test_kernels.py` - Kernel correctness & O(h²) convergence -- `tests/test_decomposition.py` - Domain decomposition logic -- `tests/test_communicators.py` - Halo exchange (NumpyHaloExchange, CustomHaloExchange) -- `tests/test_solver.py` - JacobiPoisson solver convergence & accuracy (single-rank) -- `tests/test_problems.py` - Grid creation & sinusoidal problem setup -- `tests/test_mpi_integration.py` - End-to-end MPI tests via subprocess runner (1-8 ranks) - -### Phase 1: Bug Fixes & Enhancements -- Fixed warmup parameter mismatch in `kernels.py` -- Scaling experiment converted to placeholder -- Added configurable `axis` parameter to sliced decomposition - -### Phase 2: Communicators Refactor -- Renamed: `NumpyCommunicator` → `NumpyHaloExchange` -- Renamed: `DatatypeCommunicator` → `CustomHaloExchange` -- Extracted `_get_neighbor_ranks()` helper (~60 lines reduced) -- Created `_BaseHaloExchange` base class - -### Phase 3: Experiments Refactor -- `plot_decompositions.py`: 134 → 84 lines (extracted `visualize_decomposition`) -- `compute_all.py`: 175 → 106 lines (extracted `run_kernel`, `kernel_to_df`) - -### Phase 4: MPI Correctness -- Fixed hardcoded byte stride → `MPI.DOUBLE.Get_size()` -- Added offset validation assertions in `CustomHaloExchange` - -### Phase 5: Documentation & Terminology -- Standardized on "halo" terminology (replaced "ghost") -- Renamed method: `exchange_ghosts` → `exchange_halos` -- Renamed field: `ghost_cells_total` → `halo_cells_total` diff --git a/Experiments/01-kernels/compute_all.py b/Experiments/01-kernels/compute_all.py index 28f3198..90095ec 100644 --- a/Experiments/01-kernels/compute_all.py +++ b/Experiments/01-kernels/compute_all.py @@ -5,9 +5,9 @@ 1. Convergence validation with analytical solution 2. Fixed iteration performance benchmark """ + import numpy as np import pandas as pd -from pathlib import Path from dataclasses import asdict from scipy.ndimage import laplace @@ -29,10 +29,10 @@ def run_kernel(kernel, f, max_iter, track_algebraic=False): if track_algebraic: kernel.timeseries.physical_errors = [] - h2 = kernel.parameters.h ** 2 + h2 = kernel.parameters.h**2 for _ in range(max_iter): - residual = kernel.step(u_old, u, f) + kernel.step(u_old, u, f) if track_algebraic: Au = -laplace(u) / h2 @@ -47,10 +47,10 @@ def run_kernel(kernel, f, max_iter, track_algebraic=False): def kernel_to_df(kernel, kernel_name, N, omega, **extra): """Convert kernel timeseries to DataFrame.""" df = pd.DataFrame(asdict(kernel.timeseries)) - df['iteration'] = range(len(df)) - df['kernel'] = kernel_name - df['N'] = N - df['omega'] = omega + df["iteration"] = range(len(df)) + df["kernel"] = kernel_name + df["N"] = N + df["omega"] = omega for k, v in extra.items(): df[k] = v return df @@ -67,14 +67,18 @@ def kernel_to_df(kernel, kernel_name, N, omega, **extra): f = problems.sinusoidal_source_term(N) numpy_kernel = NumPyKernel(N=N, omega=omega, tolerance=0.0, max_iter=max_iter) - numba_kernel = NumbaKernel(N=N, omega=omega, tolerance=0.0, max_iter=max_iter, numba_threads=4) + numba_kernel = NumbaKernel( + N=N, omega=omega, tolerance=0.0, max_iter=max_iter, numba_threads=4 + ) numba_kernel.warmup() - for name, kernel in [('numpy', numpy_kernel), ('numba', numba_kernel)]: + for name, kernel in [("numpy", numpy_kernel), ("numba", numba_kernel)]: run_kernel(kernel, f, max_iter, track_algebraic=True) all_dfs.append(kernel_to_df(kernel, name, N, omega, tolerance=0.0)) -pd.concat(all_dfs, ignore_index=True).to_parquet(data_dir / "kernel_convergence.parquet", index=False) +pd.concat(all_dfs, ignore_index=True).to_parquet( + data_dir / "kernel_convergence.parquet", index=False +) # Experiment 2: Fixed Iteration Benchmark @@ -90,16 +94,38 @@ def kernel_to_df(kernel, kernel_name, N, omega, **extra): kernel = NumPyKernel(N=N, omega=omega, tolerance=0.0, max_iter=max_iter) f = np.ones((N, N, N), dtype=np.float64) run_kernel(kernel, f, max_iter) - all_dfs.append(kernel_to_df(kernel, 'numpy', N, omega, max_iter=max_iter, use_numba=False, num_threads=0)) + all_dfs.append( + kernel_to_df( + kernel, "numpy", N, omega, max_iter=max_iter, use_numba=False, num_threads=0 + ) + ) # Numba with thread scaling for num_threads in thread_counts: for idx, N in enumerate(problem_sizes): - kernel = NumbaKernel(N=N, omega=omega, tolerance=0.0, max_iter=max_iter, numba_threads=num_threads) + kernel = NumbaKernel( + N=N, + omega=omega, + tolerance=0.0, + max_iter=max_iter, + numba_threads=num_threads, + ) if idx == 0: kernel.warmup() f = np.ones((N, N, N), dtype=np.float64) run_kernel(kernel, f, max_iter) - all_dfs.append(kernel_to_df(kernel, 'numba', N, omega, max_iter=max_iter, use_numba=True, num_threads=num_threads)) - -pd.concat(all_dfs, ignore_index=True).to_parquet(data_dir / "kernel_benchmark.parquet", index=False) + all_dfs.append( + kernel_to_df( + kernel, + "numba", + N, + omega, + max_iter=max_iter, + use_numba=True, + num_threads=num_threads, + ) + ) + +pd.concat(all_dfs, ignore_index=True).to_parquet( + data_dir / "kernel_benchmark.parquet", index=False +) diff --git a/Experiments/01-kernels/plot_kernels.py b/Experiments/01-kernels/plot_kernels.py index 57f816c..3dd1ecc 100644 --- a/Experiments/01-kernels/plot_kernels.py +++ b/Experiments/01-kernels/plot_kernels.py @@ -4,6 +4,7 @@ Comprehensive analysis and visualization of NumPy vs Numba kernel benchmarks. """ + import pandas as pd import matplotlib.pyplot as plt import seaborn as sns @@ -23,8 +24,12 @@ fig_dir.mkdir(parents=True, exist_ok=True) # Check if data exists -if not data_dir.exists(): - raise FileNotFoundError(f"Data not found: {data_dir}. Run compute_kernels.py first.") +if not list(data_dir.glob("*.parquet")): + print(f"Data not found: {data_dir}. Run compute_kernels.py first.") + # Graceful exit for docs build + import sys + + sys.exit(0) # %% # Plot 1: Convergence Validation @@ -37,21 +42,21 @@ # Create faceted plot: one subplot per problem size g = sns.relplot( data=df_conv, - x='iteration', - y='physical_errors', - col='N', - hue='kernel', + x="iteration", + y="physical_errors", + col="N", + hue="kernel", style="kernel", - kind='line', + kind="line", dashes=True, markers=False, - facet_kws={'sharey': True, 'sharex': False} + facet_kws={"sharey": True, "sharex": False}, ) - g.set(xscale='log', yscale='log') - g.set_axis_labels('Iteration', r'Algebraic Residual $||Au - f||_\infty$') - g.set_titles(col_template='N={col_name}') - g.fig.suptitle(r'Kernel Convergence Validation', y=1.02) + g.set(xscale="log", yscale="log") + g.set_axis_labels("Iteration", r"Algebraic Residual $||Au - f||_\infty$") + g.set_titles(col_template="N={col_name}") + g.fig.suptitle(r"Kernel Convergence Validation", y=1.02) # Save figure g.savefig(fig_dir / "01_convergence_validation.pdf") @@ -64,13 +69,14 @@ df = pd.read_parquet(benchmark_file) # Convert to milliseconds -df['time_ms'] = df['compute_times'] * 1000 +df["time_ms"] = df["compute_times"] * 1000 # Prepare configuration labels -df['config'] = df.apply( - lambda row: 'NumPy' if row['kernel'] == 'numpy' +df["config"] = df.apply( + lambda row: "NumPy" + if row["kernel"] == "numpy" else f"Numba ({int(row['num_threads'])} threads)", - axis=1 + axis=1, ) # %% @@ -81,21 +87,21 @@ fig, ax = plt.subplots() sns.lineplot( data=df, - x='N', - y='time_ms', - hue='config', - style='config', + x="N", + y="time_ms", + hue="config", + style="config", markers=True, dashes=False, - errorbar='ci', # Show confidence intervals - ax=ax + errorbar="ci", # Show confidence intervals + ax=ax, ) -ax.set_xscale('log') -ax.set_yscale('log') -ax.set_xlabel('Problem Size (N)') -ax.set_ylabel('Time per Iteration (ms)') -ax.set_title('Kernel Performance Comparison') +ax.set_xscale("log") +ax.set_yscale("log") +ax.set_xlabel("Problem Size (N)") +ax.set_ylabel("Time per Iteration (ms)") +ax.set_title("Kernel Performance Comparison") fig.savefig(fig_dir / "02_performance.pdf") @@ -104,30 +110,33 @@ # ------------------------- # Compute numpy baseline for each N and iteration -df_numpy = df[df['kernel'] == 'numpy'][['N', 'iteration', 'compute_times']].rename( - columns={'compute_times': 'numpy_time'} +df_numpy = df[df["kernel"] == "numpy"][["N", "iteration", "compute_times"]].rename( + columns={"compute_times": "numpy_time"} +) +df_speedup = df[df["kernel"] == "numba"].merge( + df_numpy, on=["N", "iteration"], how="left" +) +df_speedup["speedup"] = df_speedup["numpy_time"] / df_speedup["compute_times"] +df_speedup["thread_label"] = ( + df_speedup["num_threads"].astype(int).astype(str) + " threads" ) -df_speedup = df[df['kernel'] == 'numba'].merge(df_numpy, on=['N', 'iteration'], how='left') -df_speedup['speedup'] = df_speedup['numpy_time'] / df_speedup['compute_times'] -df_speedup['thread_label'] = df_speedup['num_threads'].astype(int).astype(str) + ' threads' # Create speedup plot - seaborn will compute mean and error bars fig, ax = plt.subplots() sns.lineplot( data=df_speedup, - x='N', - y='speedup', - hue='thread_label', - style='thread_label', + x="N", + y="speedup", + hue="thread_label", + style="thread_label", markers=True, dashes=False, - errorbar='ci', - ax=ax + errorbar="ci", + ax=ax, ) -ax.set_xlabel('Problem Size (N)') -ax.set_ylabel('Speedup vs NumPy') -ax.set_title('Fixed Iteration Speedup (200 iterations)') +ax.set_xlabel("Problem Size (N)") +ax.set_ylabel("Speedup vs NumPy") +ax.set_title("Fixed Iteration Speedup (200 iterations)") fig.savefig(fig_dir / "03_speedup_fixed_iter.pdf") - diff --git a/Experiments/02-decomposition/plot_decompositions.py b/Experiments/02-decomposition/plot_decompositions.py index dec50b5..4015a50 100644 --- a/Experiments/02-decomposition/plot_decompositions.py +++ b/Experiments/02-decomposition/plot_decompositions.py @@ -4,6 +4,7 @@ Visualize how domain partitioning works for sliced vs cubic decompositions. """ + import matplotlib.pyplot as plt import pyvista as pv from pyvista import themes @@ -15,7 +16,7 @@ # ----- pv.set_plot_theme(themes.ParaViewTheme()) -pv.global_theme.anti_aliasing = 'ssaa' +pv.global_theme.anti_aliasing = "ssaa" pv.global_theme.smooth_shading = True pv.global_theme.multi_samples = 16 @@ -32,7 +33,7 @@ # ------------------------------ # 1D decomposition along Z-axis - each rank owns horizontal slices. -decomp = DomainDecomposition(N=N, size=4, strategy='sliced') +decomp = DomainDecomposition(N=N, size=4, strategy="sliced") plotter = pv.Plotter(window_size=[1500, 1500], off_screen=True) for rank in range(4): @@ -41,11 +42,14 @@ z1, y1, x1 = info.global_end box = pv.Box(bounds=[x0, x1, y0, y1, z0, z1]) color = cmap(rank / 4)[:3] - plotter.add_mesh(box, opacity=0.4, color=color, show_edges=True, - edge_color='black', line_width=8) + plotter.add_mesh( + box, opacity=0.4, color=color, show_edges=True, edge_color="black", line_width=8 + ) plotter.add_axes() -plotter.screenshot(fig_dir / "01a_sliced_decomposition.png", transparent_background=True) +plotter.screenshot( + fig_dir / "01a_sliced_decomposition.png", transparent_background=True +) plotter.show() # %% @@ -53,7 +57,7 @@ # ----------------------------- # 3D Cartesian decomposition - domain split across all dimensions. -decomp = DomainDecomposition(N=N, size=8, strategy='cubic') +decomp = DomainDecomposition(N=N, size=8, strategy="cubic") plotter = pv.Plotter(window_size=[1500, 1500], off_screen=True) for rank in range(8): @@ -62,8 +66,9 @@ z1, y1, x1 = info.global_end box = pv.Box(bounds=[x0, x1, y0, y1, z0, z1]) color = cmap(rank / 8)[:3] - plotter.add_mesh(box, opacity=0.4, color=color, show_edges=True, - edge_color='black', line_width=8) + plotter.add_mesh( + box, opacity=0.4, color=color, show_edges=True, edge_color="black", line_width=8 + ) plotter.add_axes() plotter.screenshot(fig_dir / "01b_cubic_decomposition.png", transparent_background=True) diff --git a/Experiments/03-communication/compute_communication.py b/Experiments/03-communication/compute_communication.py index 182f862..9db1272 100644 --- a/Experiments/03-communication/compute_communication.py +++ b/Experiments/03-communication/compute_communication.py @@ -7,8 +7,8 @@ Uses per-iteration timeseries data for statistical analysis. """ + import subprocess -import sys from Poisson import get_project_root @@ -17,6 +17,7 @@ def main(): """Entry point - spawns MPI if needed.""" try: from mpi4py import MPI + if MPI.COMM_WORLD.Get_size() > 1: _run_benchmark() return @@ -24,7 +25,12 @@ def main(): pass # Spawn MPI - script = get_project_root() / "Experiments" / "03-communication" / "compute_communication.py" + script = ( + get_project_root() + / "Experiments" + / "03-communication" + / "compute_communication.py" + ) subprocess.run(["mpiexec", "-n", "4", "uv", "run", "python", str(script)]) @@ -32,7 +38,13 @@ def _run_benchmark(): """MPI worker - collects per-iteration timings.""" import pandas as pd from mpi4py import MPI - from Poisson import JacobiPoisson, DomainDecomposition, NumpyHaloExchange, CustomHaloExchange, get_project_root + from Poisson import ( + JacobiPoisson, + DomainDecomposition, + NumpyHaloExchange, + CustomHaloExchange, + get_project_root, + ) comm = MPI.COMM_WORLD rank = comm.Get_rank() @@ -70,10 +82,17 @@ def _run_benchmark(): print(f" {label}...", end=" ", flush=True) # Create and run solver - decomp = DomainDecomposition(N=N, size=size, strategy='sliced', axis=axis) - halo = CustomHaloExchange() if comm_type == "custom" else NumpyHaloExchange() - solver = JacobiPoisson(N=N, decomposition=decomp, communicator=halo, - max_iter=WARMUP + ITERATIONS, tolerance=0) + decomp = DomainDecomposition(N=N, size=size, strategy="sliced", axis=axis) + halo = ( + CustomHaloExchange() if comm_type == "custom" else NumpyHaloExchange() + ) + solver = JacobiPoisson( + N=N, + decomposition=decomp, + communicator=halo, + max_iter=WARMUP + ITERATIONS, + tolerance=0, + ) solver.solve() # Get max halo time across ranks per iteration (skip warmup) @@ -82,13 +101,20 @@ def _run_benchmark(): if rank == 0: local_N = N // size # Local subdomain size along decomposed axis - print(f"mean={sum(max_times)/len(max_times)*1e6:.1f} μs/iter") - dfs.append(pd.DataFrame({ - "N": N, "local_N": local_N, "axis": axis, - "communicator": comm_type, "label": label, - "iteration": range(len(max_times)), - "halo_time_us": [t * 1e6 for t in max_times], - })) + print(f"mean={sum(max_times) / len(max_times) * 1e6:.1f} μs/iter") + dfs.append( + pd.DataFrame( + { + "N": N, + "local_N": local_N, + "axis": axis, + "communicator": comm_type, + "label": label, + "iteration": range(len(max_times)), + "halo_time_us": [t * 1e6 for t in max_times], + } + ) + ) if rank == 0: df = pd.concat(dfs, ignore_index=True) diff --git a/Experiments/03-communication/plot_communication.py b/Experiments/03-communication/plot_communication.py index 53e647c..b1bb287 100644 --- a/Experiments/03-communication/plot_communication.py +++ b/Experiments/03-communication/plot_communication.py @@ -7,6 +7,7 @@ Uses per-iteration timeseries data for tight confidence intervals. """ + import matplotlib.pyplot as plt import pandas as pd import seaborn as sns @@ -18,7 +19,7 @@ # ----- sns.set_style("whitegrid") -plt.rcParams['figure.dpi'] = 100 +plt.rcParams["figure.dpi"] = 100 repo_root = get_project_root() data_dir = repo_root / "data" / "communication" @@ -31,7 +32,9 @@ parquet_files = list(data_dir.glob("communication_*.parquet")) if not parquet_files: - raise FileNotFoundError(f"No data found in {data_dir}. Run compute_communication.py first.") + raise FileNotFoundError( + f"No data found in {data_dir}. Run compute_communication.py first." + ) df = pd.concat([pd.read_parquet(f) for f in parquet_files], ignore_index=True) @@ -55,28 +58,30 @@ sns.lineplot( data=df, - x='local_N', - y='halo_time_us', - hue='label', - style='label', + x="local_N", + y="halo_time_us", + hue="label", + style="label", markers=True, dashes=False, palette=palette, ax=ax, - errorbar=('ci', 95), + errorbar=("ci", 95), markersize=8, linewidth=2, ) -ax.set_xlabel('Local Subdomain Size (N / nprocs)', fontsize=12) -ax.set_ylabel('Halo Exchange Time (μs)', fontsize=12) -ax.set_title('Halo Exchange Performance: Contiguous vs Non-Contiguous Memory', fontsize=13) -ax.legend(title='Configuration', fontsize=9, loc='upper left') +ax.set_xlabel("Local Subdomain Size (N / nprocs)", fontsize=12) +ax.set_ylabel("Halo Exchange Time (μs)", fontsize=12) +ax.set_title( + "Halo Exchange Performance: Contiguous vs Non-Contiguous Memory", fontsize=13 +) +ax.legend(title="Configuration", fontsize=9, loc="upper left") ax.grid(True, alpha=0.3) plt.tight_layout() output_file = fig_dir / "communication_comparison.pdf" -plt.savefig(output_file, bbox_inches='tight') +plt.savefig(output_file, bbox_inches="tight") print(f"Saved: {output_file}") plt.show() @@ -87,7 +92,9 @@ print("\n" + "=" * 70) print("Summary: Mean halo time (μs) with 95% CI by local subdomain size") print("=" * 70) -summary = df.groupby(['local_N', 'label'])['halo_time_us'].agg(['mean', 'std', 'count']) -summary['ci95'] = 1.96 * summary['std'] / (summary['count'] ** 0.5) -summary['display'] = summary.apply(lambda r: f"{r['mean']:.1f} ± {r['ci95']:.1f}", axis=1) -print(summary['display'].unstack().to_string()) +summary = df.groupby(["local_N", "label"])["halo_time_us"].agg(["mean", "std", "count"]) +summary["ci95"] = 1.96 * summary["std"] / (summary["count"] ** 0.5) +summary["display"] = summary.apply( + lambda r: f"{r['mean']:.1f} ± {r['ci95']:.1f}", axis=1 +) +print(summary["display"].unstack().to_string()) diff --git a/Experiments/04-validation/compute_validation.py b/Experiments/04-validation/compute_validation.py index 100e9d6..5360d08 100644 --- a/Experiments/04-validation/compute_validation.py +++ b/Experiments/04-validation/compute_validation.py @@ -43,8 +43,8 @@ ) if "error" in result: - print(f"ERROR") + print("ERROR") else: - print(f"done") + print("done") print(f"\nSaved results to: {data_dir}") diff --git a/Experiments/04-validation/plot_validation.py b/Experiments/04-validation/plot_validation.py index 8799b3c..ea30fea 100644 --- a/Experiments/04-validation/plot_validation.py +++ b/Experiments/04-validation/plot_validation.py @@ -39,28 +39,35 @@ # Load validation data from HDF5 files h5_files = list(data_dir.glob("*.h5")) if not h5_files: - raise FileNotFoundError(f"No data found in {data_dir}. Run compute_validation.py first.") + raise FileNotFoundError( + f"No data found in {data_dir}. Run compute_validation.py first." + ) -df = pd.concat([pd.read_hdf(f, key='results') for f in h5_files], ignore_index=True) +df = pd.concat([pd.read_hdf(f, key="results") for f in h5_files], ignore_index=True) print(f"\nLoaded {len(df)} validation results") print(f"Strategies: {df['decomposition'].unique()}") print(f"Problem sizes: {sorted(df['N'].unique())}") # Create labels for plotting -df['Strategy'] = df['decomposition'].str.capitalize() -df['Communicator'] = df['communicator'].str.replace('haloexchange', '').str.replace('custom', 'Custom').str.replace('numpy', 'NumPy') -df['Method'] = df['Strategy'] + ' + ' + df['Communicator'] +df["Strategy"] = df["decomposition"].str.capitalize() +df["Communicator"] = ( + df["communicator"] + .str.replace("haloexchange", "") + .str.replace("custom", "Custom") + .str.replace("numpy", "NumPy") +) +df["Method"] = df["Strategy"] + " + " + df["Communicator"] # Use lineplot (single rank count = 8) fig, ax = plt.subplots(figsize=(8, 6)) sns.lineplot( data=df, - x='N', - y='final_error', - hue='Method', - style='Method', + x="N", + y="final_error", + hue="Method", + style="Method", markers=True, dashes=True, ax=ax, @@ -68,14 +75,14 @@ # Add O(N^-2) reference line N_ref = [16, 64] -ax.plot(N_ref, [0.02, 0.02 * (16/64)**2], 'k:', alpha=0.5, label=r'$O(N^{-2})$') +ax.plot(N_ref, [0.02, 0.02 * (16 / 64) ** 2], "k:", alpha=0.5, label=r"$O(N^{-2})$") -ax.set_xscale('log') -ax.set_yscale('log') +ax.set_xscale("log") +ax.set_yscale("log") ax.grid(True, alpha=0.3) -ax.set_xlabel('Grid Size N') -ax.set_ylabel('L2 Error') -ax.set_title('Spatial Convergence: Solver Validation') +ax.set_xlabel("Grid Size N") +ax.set_ylabel("L2 Error") +ax.set_title("Spatial Convergence: Solver Validation") ax.legend() fig.tight_layout() @@ -95,12 +102,12 @@ x = np.linspace(0, 2, N) y = np.linspace(0, 2, N) z = np.linspace(0, 2, N) -X, Y, Z = np.meshgrid(x, y, z, indexing='ij') +X, Y, Z = np.meshgrid(x, y, z, indexing="ij") u_analytical = np.sin(np.pi * X) * np.sin(np.pi * Y) * np.sin(np.pi * Z) # Create structured grid grid = pv.StructuredGrid(X, Y, Z) -grid['solution'] = u_analytical.flatten(order='F') +grid["solution"] = u_analytical.flatten(order="F") # Create orthogonal slices at domain center @@ -112,44 +119,44 @@ # Add orthogonal slices plotter.add_mesh( slices, - scalars='solution', - cmap='coolwarm', + scalars="solution", + cmap="coolwarm", show_edges=True, - edge_color='black', + edge_color="black", line_width=0.5, show_scalar_bar=True, scalar_bar_args={ - 'title': 'u(x,y,z)', - 'position_x': 0.85, - 'position_y': 0.05, - 'title_font_size': 20, - 'label_font_size': 16, - 'fmt': '%.2f', - 'n_labels': 7 - } + "title": "u(x,y,z)", + "position_x": 0.85, + "position_y": 0.05, + "title_font_size": 20, + "label_font_size": 16, + "fmt": "%.2f", + "n_labels": 7, + }, ) # Add coordinate axes plotter.add_axes( interactive=False, line_width=5, - x_color='red', - y_color='green', - z_color='blue', - xlabel='X', - ylabel='Y', - zlabel='Z' + x_color="red", + y_color="green", + z_color="blue", + xlabel="X", + ylabel="Y", + zlabel="Z", ) # Add bounds with labels plotter.show_bounds( - grid='back', - location='outer', - xtitle='X', - ytitle='Y', - ztitle='Z', + grid="back", + location="outer", + xtitle="X", + ytitle="Y", + ztitle="Z", font_size=12, - all_edges=True + all_edges=True, ) # Show the plot (Sphinx-Gallery scraper will capture it) @@ -158,4 +165,3 @@ plotter.screenshot(output_file, transparent_background=True) print(f"Saved: {output_file}") plotter.show() - diff --git a/Experiments/05-scaling/compute_scaling.py b/Experiments/05-scaling/compute_scaling.py new file mode 100644 index 0000000..c92a7c6 --- /dev/null +++ b/Experiments/05-scaling/compute_scaling.py @@ -0,0 +1,168 @@ +""" +Poisson Scaling Experiment +========================== + +MPI Jacobi solver for 3D Poisson equation with MLflow logging. + +Usage: + mpiexec -n 4 uv run python compute_scaling.py --N 64 + mpiexec -n 8 uv run python compute_scaling.py --N 128 --tol 1e-8 --numba +""" + +import argparse +import os +import sys + +from mpi4py import MPI + +from Poisson import ( + JacobiPoisson, + DomainDecomposition, + NumpyHaloExchange, + CustomHaloExchange, + get_project_root, +) + +# Parse command line arguments +parser = argparse.ArgumentParser( + description="MPI Jacobi solver for 3D Poisson equation" +) +parser.add_argument("--N", type=int, default=64, help="Grid size N³ (default: 64)") +parser.add_argument( + "--tol", type=float, default=1e-6, help="Convergence tolerance (default: 1e-6)" +) +parser.add_argument( + "--max-iter", type=int, default=50000, help="Max iterations (default: 50000)" +) +parser.add_argument( + "--omega", type=float, default=0.8, help="Relaxation parameter (default: 0.8)" +) +parser.add_argument( + "--strategy", + choices=["sliced", "cubic"], + default="sliced", + help="Decomposition strategy", +) +parser.add_argument( + "--communicator", + choices=["numpy", "custom"], + default="numpy", + help="Halo exchange communicator", +) +parser.add_argument("--numba", action="store_true", help="Use Numba kernel") +parser.add_argument("--job-name", type=str, default=None, help="LSF Job Name for log retrieval") +parser.add_argument("--log-dir", type=str, default="logs", help="Directory containing LSF logs") +parser.add_argument("--experiment-name", type=str, default=None, help="MLflow experiment name") +args = parser.parse_args() + +N = args.N +comm = MPI.COMM_WORLD +n_ranks = comm.Get_size() +rank = comm.Get_rank() + +# Setup directories +project_root = get_project_root() +data_dir = project_root / "data" / "scaling" +data_dir.mkdir(parents=True, exist_ok=True) + +# Create solver +decomp = DomainDecomposition(N=N, size=n_ranks, strategy=args.strategy) +halo = CustomHaloExchange() if args.communicator == "custom" else NumpyHaloExchange() +solver = JacobiPoisson( + N=N, + omega=args.omega, + tolerance=args.tol, + max_iter=args.max_iter, + use_numba=args.numba, + decomposition=decomp, + communicator=halo, +) + +if rank == 0: + print( + f"Solver configured: N={N}³, ranks={n_ranks}, strategy={args.strategy}, comm={args.communicator}" + ) + print(f" Kernel: {'Numba' if args.numba else 'NumPy'}, omega={args.omega}") + +# MLflow setup with nested runs (auto-detect HPC via LSF env vars) +if args.experiment_name: + experiment_name = args.experiment_name +else: + is_hpc = "LSB_JOBID" in os.environ + experiment_name = "HPC-Poisson-Scaling" if is_hpc else "Poisson-Scaling" + +parent_run = f"N{N}" +run_name = f"N{N}_p{n_ranks}_{args.strategy}" +solver.mlflow_start(experiment_name, run_name, parent_run_name=parent_run) + +# Save Run ID for external log uploader +if rank == 0 and args.job_name: + try: + run_id = mlflow.active_run().info.run_id + log_path = project_root / args.log_dir + log_path.mkdir(parents=True, exist_ok=True) + run_id_file = log_path / f"{args.job_name}.runid" + with open(run_id_file, "w") as f: + f.write(run_id) + except Exception as e: + print(f"Warning: Could not save run ID to file: {e}") + +# Warmup Numba if needed +if args.numba: + solver.warmup() + +# Solve with timing +t0 = MPI.Wtime() +solver.solve() +wall_time = MPI.Wtime() - t0 + +# Store timing metrics +if rank == 0: + solver.results.wall_time = wall_time + solver.results.total_compute_time = sum(solver.timeseries.compute_times) + solver.results.total_halo_time = sum(solver.timeseries.halo_exchange_times) + solver.results.total_mpi_comm_time = sum(solver.timeseries.mpi_comm_times) + +# Compute L2 error +solver.compute_l2_error() + +# Save solution +output_file = data_dir / f"poisson_N{N}_p{n_ranks}.h5" +solver.save_hdf5(output_file) +solver.mlflow_log_artifact(str(output_file)) + +if rank == 0: + print(f"\nResults saved to: {output_file}") + + # Flush streams to ensure logs on disk are up to date for the external uploader + sys.stdout.flush() + sys.stderr.flush() + +# End MLflow run +solver.mlflow_end() + +# Summary +if rank == 0: + print("\nSolution Status:") + print(f" Converged: {solver.results.converged}") + print(f" Iterations: {solver.results.iterations}") + print(f" L2 error: {solver.results.final_error:.6e}") + print(f" Wall time: {wall_time:.2f} seconds") + + # Timing breakdown + total = ( + solver.results.total_compute_time + + solver.results.total_halo_time + + solver.results.total_mpi_comm_time + ) + if total > 0: + print("\nTiming breakdown:") + print( + f" Compute: {solver.results.total_compute_time:.3f}s ({100 * solver.results.total_compute_time / total:.1f}%)" + ) + print( + f" Halo exchange:{solver.results.total_halo_time:.3f}s ({100 * solver.results.total_halo_time / total:.1f}%)" + ) + print( + f" MPI allreduce:{solver.results.total_mpi_comm_time:.3f}s ({100 * solver.results.total_mpi_comm_time / total:.1f}%)" + ) diff --git a/Experiments/05-scaling/submit_sweep.sh b/Experiments/05-scaling/submit_sweep.sh deleted file mode 100644 index aeb1b9c..0000000 --- a/Experiments/05-scaling/submit_sweep.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -#BSUB -J submit_sequential -#BSUB -o Experiments/05-scaling/logs/sweep_%J.out -#BSUB -e Experiments/05-scaling/logs/sweep_%J.err -#BSUB -n 8 -#BSUB -W 00:10 -#BSUB -q hpcintro -#BSUB -R "span[hosts=1]" -#BSUB -R "rusage[mem=1GB]" - -module load python3/3.11.1 -module load mpi/5.0.8-gcc-13.4.0-binutils-2.44 - -uv sync -uv run python Experiments/05-scaling/run_sweep.py diff --git a/Experiments/05-scaling/sweep.py b/Experiments/05-scaling/sweep.py deleted file mode 100644 index 3920b64..0000000 --- a/Experiments/05-scaling/sweep.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Scaling Sweep: Strong and Weak Scaling Experiments -=================================================== - -Placeholder for future implementation. -""" - - -def strong_scaling(N, rank_counts, strategy, **kwargs): - """Strong scaling experiment (not yet implemented).""" - raise NotImplementedError("Strong scaling experiment not yet implemented") - - -def weak_scaling(N_per_rank, rank_counts, strategy, **kwargs): - """Weak scaling experiment (not yet implemented).""" - raise NotImplementedError("Weak scaling experiment not yet implemented") - - -if __name__ == "__main__": - print("Scaling experiments not yet implemented.") - print("See CODE_REVIEW.md Phase 1.3 for status.") diff --git a/Experiments/05-scaling/template_config.yaml b/Experiments/05-scaling/template_config.yaml new file mode 100644 index 0000000..ad277ad --- /dev/null +++ b/Experiments/05-scaling/template_config.yaml @@ -0,0 +1,72 @@ +# ============================================================================ +# LSF Scaling Experiment Configuration Template +# ============================================================================ + +type: strong # Options: strong, weak +script: Experiments/05-scaling/compute_scaling.py + +# ============================================================================ +# Global Parameters (Apply to all groups) +# ============================================================================ +# Arguments passed to the python script +parameters: + tol: 1e-6 + max-iter: 50000 + numba: true # Always use numba + +# ============================================================================ +# Global LSF Settings (Apply to all groups) +# ============================================================================ +lsf: + queue: hpc # -q: Queue name + walltime: "00:10" # -W: Walltime (HH:MM) + + # Default resources + resources: + - "rusage[mem=4GB]" + + # Additional options + exclusive: false # -x: Exclusive node access + +# ============================================================================ +# Experiment Groups +# ============================================================================ +groups: + # -------------------------------------------------------------------------- + # Group 1: Single Node (Shared Memory) + # -------------------------------------------------------------------------- + # Runs on a single host. Useful for small scale testing and debugging. + - name: single_node + lsf: + resources: + - "rusage[mem=4GB]" + - "span[hosts=1]" # FORCE execution on a single node + + # Sweep parameters generate one job per combination + sweep: + N: [64, 128] + ranks: [1, 2, 4, 8, 16, 32] + communicator: [numpy, custom] + strategy: [sliced, cubic] + + # -------------------------------------------------------------------------- + # Group 2: Multi Node (Distributed Memory) + # -------------------------------------------------------------------------- + # Runs across multiple nodes. + # 'ptile' defines how many MPI ranks to place per node. + - name: multi_node + lsf: + walltime: "00:10" + resources: + - "rusage[mem=8GB]" + - "span[ptile=24]" # Place 24 ranks per node (e.g. full node usage) + + # MPI options for process binding and mapping + # Example: 2 processes per CPU socket (package) + mpi_options: "--map-by ppr:2:package --bind-to core --report-bindings" + + sweep: + N: [256] + ranks: [48, 96] # Must be multiples of ptile (24) for efficient packing + communicator: [custom, numpy] + strategy: [cubic, sliced] diff --git a/data/01-kernels/kernel_benchmark.parquet b/data/01-kernels/kernel_benchmark.parquet new file mode 100644 index 0000000..b69abfb Binary files /dev/null and b/data/01-kernels/kernel_benchmark.parquet differ diff --git a/data/01-kernels/kernel_convergence.parquet b/data/01-kernels/kernel_convergence.parquet new file mode 100644 index 0000000..aadb61e Binary files /dev/null and b/data/01-kernels/kernel_convergence.parquet differ diff --git a/data/communication/communication_np4.parquet b/data/communication/communication_np4.parquet new file mode 100644 index 0000000..1bfc00c Binary files /dev/null and b/data/communication/communication_np4.parquet differ diff --git a/data/validation/N16_np8_cubic_custom.h5 b/data/validation/N16_np8_cubic_custom.h5 new file mode 100644 index 0000000..9709a5f Binary files /dev/null and b/data/validation/N16_np8_cubic_custom.h5 differ diff --git a/data/validation/N16_np8_cubic_numpy.h5 b/data/validation/N16_np8_cubic_numpy.h5 new file mode 100644 index 0000000..7089d6e Binary files /dev/null and b/data/validation/N16_np8_cubic_numpy.h5 differ diff --git a/data/validation/N16_np8_sliced_custom.h5 b/data/validation/N16_np8_sliced_custom.h5 new file mode 100644 index 0000000..4c74556 Binary files /dev/null and b/data/validation/N16_np8_sliced_custom.h5 differ diff --git a/data/validation/N16_np8_sliced_numpy.h5 b/data/validation/N16_np8_sliced_numpy.h5 new file mode 100644 index 0000000..d921aec Binary files /dev/null and b/data/validation/N16_np8_sliced_numpy.h5 differ diff --git a/data/validation/N32_np8_cubic_custom.h5 b/data/validation/N32_np8_cubic_custom.h5 new file mode 100644 index 0000000..6c70770 Binary files /dev/null and b/data/validation/N32_np8_cubic_custom.h5 differ diff --git a/data/validation/N32_np8_cubic_numpy.h5 b/data/validation/N32_np8_cubic_numpy.h5 new file mode 100644 index 0000000..b6853fd Binary files /dev/null and b/data/validation/N32_np8_cubic_numpy.h5 differ diff --git a/data/validation/N32_np8_sliced_custom.h5 b/data/validation/N32_np8_sliced_custom.h5 new file mode 100644 index 0000000..894d8e1 Binary files /dev/null and b/data/validation/N32_np8_sliced_custom.h5 differ diff --git a/data/validation/N32_np8_sliced_numpy.h5 b/data/validation/N32_np8_sliced_numpy.h5 new file mode 100644 index 0000000..bd97128 Binary files /dev/null and b/data/validation/N32_np8_sliced_numpy.h5 differ diff --git a/data/validation/N48_np8_cubic_custom.h5 b/data/validation/N48_np8_cubic_custom.h5 new file mode 100644 index 0000000..f0b4190 Binary files /dev/null and b/data/validation/N48_np8_cubic_custom.h5 differ diff --git a/data/validation/N48_np8_cubic_numpy.h5 b/data/validation/N48_np8_cubic_numpy.h5 new file mode 100644 index 0000000..078da37 Binary files /dev/null and b/data/validation/N48_np8_cubic_numpy.h5 differ diff --git a/data/validation/N48_np8_sliced_custom.h5 b/data/validation/N48_np8_sliced_custom.h5 new file mode 100644 index 0000000..b05f92c Binary files /dev/null and b/data/validation/N48_np8_sliced_custom.h5 differ diff --git a/data/validation/N48_np8_sliced_numpy.h5 b/data/validation/N48_np8_sliced_numpy.h5 new file mode 100644 index 0000000..72ef850 Binary files /dev/null and b/data/validation/N48_np8_sliced_numpy.h5 differ diff --git a/docs/source/api_reference.rst b/docs/source/api_reference.rst index 0ca65c9..e32bf9b 100644 --- a/docs/source/api_reference.rst +++ b/docs/source/api_reference.rst @@ -108,8 +108,20 @@ Simplified Datastructures - :class:`LocalFields` - Local domain arrays with halo zones (u1, u2, f) - :class:`LocalSeries` - Per-iteration timing arrays (compute_times, halo_exchange_times, residual_history) -Parallel I/O Strategy -^^^^^^^^^^^^^^^^^^^^^^ +Experiment Tracking & I/O +------------------------- + +The framework integrates with **MLflow** for experiment tracking and **HDF5** for data storage. + +**MLflow Tracking:** + +The solver automatically logs: +- **Parameters**: Grid size, tolerance, decomposition strategy, etc. +- **Metrics**: Convergence status, iterations, final error, wall time. +- **Time Series**: Complete history of residuals and performance metrics (compute time, comm time) per step. +- **Artifacts**: The HDF5 result file is uploaded to the MLflow run. + +**Parallel HDF5 Strategy:** **HDF5 collective writes** eliminate the gather-to-rank-0 bottleneck: @@ -188,11 +200,25 @@ Communication Strategies NumpyHaloExchange CustomHaloExchange +Experiment Utilities +==================== + +.. currentmodule:: utils + +.. autosummary:: + :toctree: generated + + mlflow_io + +The :mod:`utils.mlflow_io` module provides tools for fetching and managing experiment data from MLflow. + .. _data-structures: Data Structures =============== +.. currentmodule:: Poisson + Global Configuration & Metrics ------------------------------ diff --git a/docs/source/conf.py b/docs/source/conf.py index 8c65635..d2e33be 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -18,7 +18,9 @@ # -- Project information ----------------------------------------------------- project = "MPI 3D Poisson Solver" -copyright = "2025, Alexander Elbæk Nielsen, Junriu Li, Philip Korsager Nickel, DTU Compute" +copyright = ( + "2025, Alexander Elbæk Nielsen, Junriu Li, Philip Korsager Nickel, DTU Compute" +) author = "Alexander Elbæk Nielsen, Junriu Li, Philip Korsager Nickel" # -- General configuration --------------------------------------------------- @@ -76,7 +78,10 @@ "remove_config_comments": True, # Clean up notebook outputs "abort_on_example_error": False, # Continue if examples fail "plot_gallery": True, - "image_scrapers": ("matplotlib", "pyvista"), # Capture both matplotlib and pyvista plots + "image_scrapers": ( + "matplotlib", + "pyvista", + ), # Capture both matplotlib and pyvista plots "capture_repr": ("_repr_html_", "__repr__"), # Capture output representations "matplotlib_animations": True, # Support matplotlib animations # Remove Jupyter cell markers (# %%) from rendered output diff --git a/docs/source/sg_execution_times.rst b/docs/source/sg_execution_times.rst index 0d42d20..273760b 100644 --- a/docs/source/sg_execution_times.rst +++ b/docs/source/sg_execution_times.rst @@ -6,7 +6,7 @@ Computation times ================= -**00:05.720** total execution time for 8 files **from all galleries**: +**00:01.892** total execution time for 8 files **from all galleries**: .. container:: @@ -32,27 +32,27 @@ Computation times * - Example - Time - Mem (MB) - * - :ref:`sphx_glr_example_gallery_01-kernels_plot_kernels.py` (``../../Experiments/01-kernels/plot_kernels.py``) - - 00:02.639 - - 0.0 - * - :ref:`sphx_glr_example_gallery_04-validation_plot_validation.py` (``../../Experiments/04-validation/plot_validation.py``) - - 00:01.353 - - 0.0 - * - :ref:`sphx_glr_example_gallery_02-decomposition_plot_decompositions.py` (``../../Experiments/02-decomposition/plot_decompositions.py``) - - 00:01.044 - - 0.0 * - :ref:`sphx_glr_example_gallery_03-communication_plot_communication.py` (``../../Experiments/03-communication/plot_communication.py``) - - 00:00.685 + - 00:01.892 - 0.0 * - :ref:`sphx_glr_example_gallery_01-kernels_compute_all.py` (``../../Experiments/01-kernels/compute_all.py``) - 00:00.000 - 0.0 + * - :ref:`sphx_glr_example_gallery_01-kernels_plot_kernels.py` (``../../Experiments/01-kernels/plot_kernels.py``) + - 00:00.000 + - 0.0 + * - :ref:`sphx_glr_example_gallery_02-decomposition_plot_decompositions.py` (``../../Experiments/02-decomposition/plot_decompositions.py``) + - 00:00.000 + - 0.0 * - :ref:`sphx_glr_example_gallery_03-communication_compute_communication.py` (``../../Experiments/03-communication/compute_communication.py``) - 00:00.000 - 0.0 * - :ref:`sphx_glr_example_gallery_04-validation_compute_validation.py` (``../../Experiments/04-validation/compute_validation.py``) - 00:00.000 - 0.0 - * - :ref:`sphx_glr_example_gallery_05-scaling_sweep.py` (``../../Experiments/05-scaling/sweep.py``) + * - :ref:`sphx_glr_example_gallery_04-validation_plot_validation.py` (``../../Experiments/04-validation/plot_validation.py``) + - 00:00.000 + - 0.0 + * - :ref:`sphx_glr_example_gallery_05-scaling_compute_scaling.py` (``../../Experiments/05-scaling/compute_scaling.py``) - 00:00.000 - 0.0 diff --git a/main.py b/main.py index f11a4c1..cd23562 100644 --- a/main.py +++ b/main.py @@ -146,7 +146,11 @@ def run_compute_scripts(): print("✓") success_count += 1 else: - error_msg = result.stderr[:200] if result.stderr else f"exit {result.returncode}" + error_msg = ( + result.stderr[:200] + if result.stderr + else f"exit {result.returncode}" + ) print(f"✗ ({error_msg})") fail_count += 1 @@ -176,7 +180,7 @@ def copy_plots(): if dest_dir.exists(): shutil.rmtree(dest_dir) shutil.copytree(source_dir, dest_dir) - print(f" ✓ Copied figures/ to docs/reports/TexReport/figures/") + print(" ✓ Copied figures/ to docs/reports/TexReport/figures/") except Exception as e: print(f" ✗ Failed to copy: {e}") @@ -191,6 +195,10 @@ def build_docs(): print("\nBuilding Sphinx documentation...") + if not source_dir.exists(): + print(f" Error: Documentation source directory not found: {source_dir}") + return False + try: result = subprocess.run( [ @@ -229,6 +237,70 @@ def build_docs(): return False +def hpc_submit_pack(scaling_type: str, dry_run: bool): + """Generate and optionally submit an LSF job pack.""" + print(f"\nGenerating {scaling_type} scaling job pack...") + + pack_file_name = f"Experiments/05-scaling/{scaling_type}_scaling_jobs.pack" + pack_file_path = REPO_ROOT / pack_file_name + + # Generate the pack file + cmd = [ + "uv", + "run", + "python", + "-m", + "src.utils.generate_pack", + "--type", + scaling_type, + "--output", + str(pack_file_path), + "--config-dir", + str(REPO_ROOT / "Experiments" / "05-scaling"), + ] + # Use standard N values for strong scaling if not specified in generate_pack default + if scaling_type == "strong": + # Hardcoded defaults for project consistency + cmd.extend(["--N", "64", "128", "256"]) + + result = subprocess.run(cmd, capture_output=True, text=True, cwd=str(REPO_ROOT)) + + if result.returncode != 0: + print(f" ✗ Failed to generate pack file: {result.stderr}") + return + print(f" ✓ {result.stdout.strip()}") + + if dry_run: + print(f"\n[DRY RUN] Content of {pack_file_name}:") + print("-" * 40) + print(pack_file_path.read_text()) + print("-" * 40) + print( + f" To submit manually: bsub -pack {pack_file_name}" + ) + return + + # Submit the pack file + print(f"\nSubmitting {pack_file_path} to LSF...") + + if not shutil.which("bsub"): + print(" ✗ 'bsub' command not found. Are you on the HPC login node?") + return + + submit_cmd = ["bsub", "-pack", str(pack_file_path)] + + result = subprocess.run( + submit_cmd, capture_output=True, text=True, cwd=str(REPO_ROOT) + ) + + if result.returncode != 0: + print(f" ✗ Failed to submit jobs: {result.stderr}") + else: + print(f" ✓ Jobs submitted successfully.") + if result.stdout: + print(f" {result.stdout.strip()}") + + def clean_all(): """Clean all generated files and caches.""" print("\nCleaning all generated files and caches...") @@ -248,9 +320,16 @@ def remove_item(path): # Directories to clean dirs = [ - "docs/build", "docs/source/example_gallery", "docs/source/generated", - "docs/source/gen_modules", "plots", "build", "dist", - ".pytest_cache", ".ruff_cache", ".mypy_cache", + "docs/build", + "docs/source/example_gallery", + "docs/source/generated", + "docs/source/gen_modules", + "plots", + "build", + "dist", + ".pytest_cache", + ".ruff_cache", + ".mypy_cache", ] for d in dirs: path = REPO_ROOT / d @@ -301,6 +380,43 @@ def remove_item(path): print() +def fetch_mlflow(): + """Fetch artifacts from MLflow for all converged runs.""" + print("\nFetching MLflow artifacts...") + + try: + # Import locally to avoid hard dependency if not fetching + # from utils.mlflow_io import download_artifacts_with_naming, setup_mlflow_auth + from utils.mlflow_io import setup_mlflow_auth + + setup_mlflow_auth() + + # Define download targets - modifying for LSM Project 2 context + # Assuming we might have experiments named like 'LSM-Project-2/Scaling' or similar + # For now, we'll use a placeholder or a generic search if available, + # but matching the ANA-P3 pattern: + + # output_dir = REPO_ROOT / "data" / "downloaded" + + # Example: Fetch from a "Scaling" experiment + # experiments = ["LSM-Scaling", "LSM-Kernels"] + # for exp in experiments: + # print(f"\n{exp}:") + # paths = download_artifacts_with_naming(exp, output_dir / exp) + # print(f" ✓ Downloaded {len(paths)} files to data/downloaded/{exp}/") + + print( + " (No experiments configured for auto-fetch yet. Edit main.py to specify experiments.)" + ) + print() + + except ImportError as e: + print(f" ✗ Missing dependency: {e}") + print(" Install with: uv sync") + except Exception as e: + print(f" ✗ Failed to fetch: {e}\n") + + def main(): """Main entry point.""" parser = argparse.ArgumentParser( @@ -313,15 +429,45 @@ def main(): python main.py --plot Run all plotting scripts python main.py --copy-plots Copy plots to plots/ directory python main.py --clean Clean all generated files - python main.py --compute --plot Run compute then plot scripts + python main.py --hpc strong --dry Generate strong scaling pack (dry run) """, ) - parser.add_argument("--docs", action="store_true", help="Build Sphinx HTML documentation") - parser.add_argument("--compute", action="store_true", help="Run all compute scripts (sequentially)") - parser.add_argument("--plot", action="store_true", help="Run all plotting scripts (in parallel)") - parser.add_argument("--copy-plots", action="store_true", help="Copy plots to plots/ directory") - parser.add_argument("--clean", action="store_true", help="Clean all generated files and caches") + # Action Group + actions = parser.add_argument_group("Actions") + actions.add_argument( + "--docs", action="store_true", help="Build Sphinx HTML documentation" + ) + actions.add_argument( + "--compute", action="store_true", help="Run all compute scripts (sequentially)" + ) + actions.add_argument( + "--plot", action="store_true", help="Run all plotting scripts (in parallel)" + ) + actions.add_argument( + "--copy-plots", action="store_true", help="Copy plots to plots/ directory" + ) + actions.add_argument( + "--clean", action="store_true", help="Clean all generated files and caches" + ) + actions.add_argument( + "--fetch", + action="store_true", + help="Fetch artifacts from MLflow for all converged runs", + ) + actions.add_argument( + "--hpc", + choices=["strong", "weak"], + help="Generate and submit LSF job pack for scaling", + ) + + # Options Group + options = parser.add_argument_group("Options") + options.add_argument( + "--dry", + action="store_true", + help="Print generated job pack without submitting (for --hpc)", + ) # Show help if no arguments provided if len(sys.argv) == 1: @@ -344,6 +490,15 @@ def main(): if args.copy_plots: copy_plots() + if args.fetch: + fetch_mlflow() + + # Handle HPC pack submission + if args.hpc: + hpc_submit_pack(args.hpc, args.dry) + + # Handle documentation commands + if args.docs: build_docs() diff --git a/pyproject.toml b/pyproject.toml index f390589..d09c7cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,8 @@ dependencies = [ "pyvista>=0.46.4", "h5py>=3.15.1", "tables>=3.10.2", + "psutil>=7.1.3", + "pyyaml>=6.0.3", ] [tool.pytest.ini_options] @@ -55,4 +57,7 @@ package-dir = {"" = "src"} [tool.setuptools.packages.find] where = ["src"] +[dependency-groups] +dev = [] + diff --git a/setup_mlflow.py b/setup_mlflow.py index 647ca56..89252c1 100644 --- a/setup_mlflow.py +++ b/setup_mlflow.py @@ -1,6 +1,6 @@ import mlflow -# Login to Databricks +# Login to Databricks mlflow.login(backend="databricks", interactive=True) -# https://dbc-6756e917-e5fc.cloud.databricks.com \ No newline at end of file +# https://dbc-6756e917-e5fc.cloud.databricks.com diff --git a/src/Poisson/__init__.py b/src/Poisson/__init__.py index 905ed59..144cdd0 100644 --- a/src/Poisson/__init__.py +++ b/src/Poisson/__init__.py @@ -13,11 +13,21 @@ KernelSeries, ) from .kernels import NumPyKernel, NumbaKernel -from .jacobi import JacobiPoisson -from .decomposition import DomainDecomposition, RankInfo, NoDecomposition -from .communicators import NumpyHaloExchange, CustomHaloExchange -from .problems import create_grid_3d, sinusoidal_exact_solution, sinusoidal_source_term, setup_sinusoidal_problem -from .runner import run_solver +from .solver import JacobiPoisson +from .mpi import ( + DomainDecomposition, + RankInfo, + NoDecomposition, + NumpyHaloExchange, + CustomHaloExchange, +) +from .problems import ( + create_grid_3d, + sinusoidal_exact_solution, + sinusoidal_source_term, + setup_sinusoidal_problem, +) +from .helpers import run_solver __all__ = [ # Data structures - Kernel diff --git a/src/Poisson/datastructures.py b/src/Poisson/datastructures.py index 8458977..ff4782a 100644 --- a/src/Poisson/datastructures.py +++ b/src/Poisson/datastructures.py @@ -12,9 +12,10 @@ # ============================================================================ -# Kernel +# Kernel # ============================================================================ + @dataclass class KernelParams: """Kernel configuration parameters. @@ -22,6 +23,7 @@ class KernelParams: Note: N is the LOCAL grid size (after domain decomposition + halo zones for MPI). For standalone usage, N is the full problem size. """ + N: int # Local grid size (including halo zones for MPI) omega: float tolerance: float = 1e-10 @@ -39,6 +41,7 @@ def __post_init__(self): @dataclass class KernelMetrics: """Final convergence metrics (updated during kernel execution).""" + converged: bool = False iterations: int = 0 final_residual: float | None = None @@ -52,6 +55,7 @@ class KernelSeries: The kernel automatically populates residuals and compute_times during step(). Physical errors can be optionally appended by the caller for validation. """ + residuals: list[float] = field(default_factory=list) compute_times: list[float] = field(default_factory=list) physical_errors: list[float] | None = None @@ -61,6 +65,7 @@ class KernelSeries: # Solver - Global (identical across ranks, or rank 0 only) # ============================================================================ + @dataclass class GlobalParams: """Global problem definition (all ranks have identical copy). @@ -68,6 +73,7 @@ class GlobalParams: Note: N is the GLOBAL grid size (before domain decomposition). The solver internally computes N_local for each rank after decomposition. """ + # Global problem parameters N: int = 0 # Global grid size (before decomposition) omega: float = 0.75 @@ -77,7 +83,7 @@ class GlobalParams: # MPI configuration mpi_size: int = 1 decomposition: str = "none" # "none", "sliced", "cubic" - communicator: str = "none" # "none", "numpy", "custom" + communicator: str = "none" # "none", "numpy", "custom" # Kernel backend selection use_numba: bool = False @@ -87,6 +93,7 @@ class GlobalParams: @dataclass class GlobalMetrics: """Final convergence metrics (computed/stored on rank 0 only).""" + iterations: int = 0 converged: bool = False final_error: float | None = None @@ -101,6 +108,7 @@ class GlobalMetrics: # Solver - Local (each rank has different values) # ============================================================================ + @dataclass class LocalParams: """Local rank-specific parameters (computed after decomposition). @@ -108,11 +116,12 @@ class LocalParams: Each rank has its own LocalParams with rank-specific values including the kernel configuration for that rank's local domain size. """ + N_local: int # Local grid size including halo zones # Domain coordinates in global grid local_start: tuple[int, int, int] # (i_start, j_start, k_start) - local_end: tuple[int, int, int] # (i_end, j_end, k_end) + local_end: tuple[int, int, int] # (i_end, j_end, k_end) # Kernel configuration kernel: KernelParams @@ -128,6 +137,7 @@ def __post_init__(self): @dataclass class LocalFields: """Local domain arrays with halo zones (each rank).""" + u1: np.ndarray = field(default_factory=lambda: np.zeros((0, 0, 0))) u2: np.ndarray = field(default_factory=lambda: np.zeros((0, 0, 0))) f: np.ndarray = field(default_factory=lambda: np.zeros((0, 0, 0))) @@ -140,9 +150,8 @@ class LocalSeries: Each rank accumulates its own timing data for each iteration. Rank 0 additionally stores residual history. """ + compute_times: list[float] = field(default_factory=list) mpi_comm_times: list[float] = field(default_factory=list) halo_exchange_times: list[float] = field(default_factory=list) residual_history: list[float] = field(default_factory=list) - - diff --git a/src/Poisson/helpers/__init__.py b/src/Poisson/helpers/__init__.py new file mode 100644 index 0000000..b2a1771 --- /dev/null +++ b/src/Poisson/helpers/__init__.py @@ -0,0 +1,5 @@ +"""Helper utilities for running solvers.""" + +from .runner import run_solver + +__all__ = ["run_solver"] diff --git a/src/Poisson/runner.py b/src/Poisson/helpers/runner.py similarity index 79% rename from src/Poisson/runner.py rename to src/Poisson/helpers/runner.py index d97d4fc..12caccd 100644 --- a/src/Poisson/runner.py +++ b/src/Poisson/helpers/runner.py @@ -30,12 +30,22 @@ def run_solver(N: int, n_ranks: int = 1, output: str = None, **kwargs) -> dict: # Use temp file if no output path specified use_temp = output is None if use_temp: - tmp = tempfile.NamedTemporaryFile(suffix='.h5', delete=False) + tmp = tempfile.NamedTemporaryFile(suffix=".h5", delete=False) output = tmp.name tmp.close() config = {"N": N, "output": output, **kwargs} - cmd = ["mpiexec", "-n", str(n_ranks), "uv", "run", "python", "-m", "Poisson.runner_helper", json.dumps(config)] + cmd = [ + "mpiexec", + "-n", + str(n_ranks), + "uv", + "run", + "python", + "-m", + "Poisson.helpers.runner_helper", + json.dumps(config), + ] proc = subprocess.run(cmd, capture_output=True, text=True) @@ -46,7 +56,7 @@ def run_solver(N: int, n_ranks: int = 1, output: str = None, **kwargs) -> dict: if not Path(output).exists(): return {"error": "No output file created", "stderr": proc.stderr} - result = pd.read_hdf(output, key='results').iloc[0].to_dict() + result = pd.read_hdf(output, key="results").iloc[0].to_dict() # Clean up temp file if we created one if use_temp: diff --git a/src/Poisson/runner_helper.py b/src/Poisson/helpers/runner_helper.py similarity index 82% rename from src/Poisson/runner_helper.py rename to src/Poisson/helpers/runner_helper.py index f1c1efd..38967c7 100644 --- a/src/Poisson/runner_helper.py +++ b/src/Poisson/helpers/runner_helper.py @@ -1,9 +1,14 @@ -"""MPI worker - invoked via: mpiexec -n X uv run python -m Poisson.runner_helper '{config}'""" +"""MPI worker - invoked via: mpiexec -n X uv run python -m Poisson.helpers.runner_helper '{config}'""" import sys import json from mpi4py import MPI -from Poisson import JacobiPoisson, DomainDecomposition, NumpyHaloExchange, CustomHaloExchange +from Poisson import ( + JacobiPoisson, + DomainDecomposition, + NumpyHaloExchange, + CustomHaloExchange, +) config = json.loads(sys.argv[1]) comm = MPI.COMM_WORLD @@ -14,7 +19,11 @@ strategy=config.get("strategy", "sliced"), axis=config.get("axis", "z"), ) -halo = CustomHaloExchange() if config.get("communicator") == "custom" else NumpyHaloExchange() +halo = ( + CustomHaloExchange() + if config.get("communicator") == "custom" + else NumpyHaloExchange() +) solver = JacobiPoisson( N=config["N"], diff --git a/src/Poisson/kernels.py b/src/Poisson/kernels.py index 1bcf096..ff13027 100644 --- a/src/Poisson/kernels.py +++ b/src/Poisson/kernels.py @@ -9,7 +9,9 @@ @njit(parallel=True) -def _jacobi_step_numba(uold: np.ndarray, u: np.ndarray, f: np.ndarray, h: float, omega: float) -> float: +def _jacobi_step_numba( + uold: np.ndarray, u: np.ndarray, f: np.ndarray, h: float, omega: float +) -> float: """Numba JIT implementation of Jacobi iteration step.""" c = 1.0 / 6.0 h2 = h * h @@ -18,7 +20,9 @@ def _jacobi_step_numba(uold: np.ndarray, u: np.ndarray, f: np.ndarray, h: float, for j in range(1, u.shape[1] - 1): for k in range(1, u.shape[2] - 1): u[i, j, k] = ( - omega * c * ( + omega + * c + * ( uold[i - 1, j, k] + uold[i + 1, j, k] + uold[i, j - 1, k] @@ -74,7 +78,9 @@ def step(self, uold: np.ndarray, u: np.ndarray, f: np.ndarray) -> float: h2 = self.parameters.h * self.parameters.h u[1:-1, 1:-1, 1:-1] = ( - self.parameters.omega * c * ( + self.parameters.omega + * c + * ( uold[0:-2, 1:-1, 1:-1] + uold[2:, 1:-1, 1:-1] + uold[1:-1, 0:-2, 1:-1] @@ -89,7 +95,7 @@ def step(self, uold: np.ndarray, u: np.ndarray, f: np.ndarray) -> float: # Compute sum of squared differences over interior points only # Return unnormalized sum - normalization done globally in solver diff = u[1:-1, 1:-1, 1:-1] - uold[1:-1, 1:-1, 1:-1] - diff_sum = np.sum(diff ** 2) + diff_sum = np.sum(diff**2) self._track(diff_sum, time.perf_counter() - start) return diff_sum @@ -105,7 +111,9 @@ def __init__(self, **kwargs): def step(self, uold: np.ndarray, u: np.ndarray, f: np.ndarray) -> float: """Perform one Jacobi iteration step.""" start = time.perf_counter() - residual = _jacobi_step_numba(uold, u, f, self.parameters.h, self.parameters.omega) + residual = _jacobi_step_numba( + uold, u, f, self.parameters.h, self.parameters.omega + ) self._track(residual, time.perf_counter() - start) return residual diff --git a/src/Poisson/mpi/__init__.py b/src/Poisson/mpi/__init__.py new file mode 100644 index 0000000..599785b --- /dev/null +++ b/src/Poisson/mpi/__init__.py @@ -0,0 +1,12 @@ +"""MPI domain decomposition and communication.""" + +from .decomposition import DomainDecomposition, RankInfo, NoDecomposition +from .communicators import NumpyHaloExchange, CustomHaloExchange + +__all__ = [ + "DomainDecomposition", + "RankInfo", + "NoDecomposition", + "NumpyHaloExchange", + "CustomHaloExchange", +] diff --git a/src/Poisson/communicators.py b/src/Poisson/mpi/communicators.py similarity index 70% rename from src/Poisson/communicators.py rename to src/Poisson/mpi/communicators.py index e5279a3..b0538ea 100644 --- a/src/Poisson/communicators.py +++ b/src/Poisson/mpi/communicators.py @@ -1,9 +1,10 @@ """Halo exchange communicators for MPI domain decomposition.""" + import numpy as np from mpi4py import MPI # Axis config: (index, name) - used to build neighbor keys like 'z_lower' -_AXES = [(0, 'z'), (1, 'y'), (2, 'x')] +_AXES = [(0, "z"), (1, "y"), (2, "x")] def _face_slice(axis, idx, has_halo): @@ -20,18 +21,19 @@ class _BaseHaloExchange: def exchange_halos(self, u, decomposition, rank, comm): """Exchange halo zones with neighbors.""" neighbors = decomposition.get_neighbors(rank) - has_halo = {ax: f'{name}_lower' in neighbors for ax, name in _AXES} + has_halo = {ax: f"{name}_lower" in neighbors for ax, name in _AXES} for axis, name in _AXES: if not has_halo[axis]: continue - lo = neighbors.get(f'{name}_lower') - hi = neighbors.get(f'{name}_upper') + lo = neighbors.get(f"{name}_lower") + hi = neighbors.get(f"{name}_upper") self._exchange_axis(u, axis, lo, hi, has_halo, comm, tag=axis * 100) class NumpyHaloExchange(_BaseHaloExchange): """Halo exchange using NumPy array copies.""" + name = "NumPy" def _exchange_axis(self, u, axis, lo_rank, hi_rank, has_halo, comm, tag): @@ -42,12 +44,15 @@ def _exchange_axis(self, u, axis, lo_rank, hi_rank, has_halo, comm, tag): send = np.ascontiguousarray(u[_face_slice(axis, send_i, has_halo)]) recv = np.empty_like(send) comm.Sendrecv(send, dest, tag, recv, src, tag) - u[_face_slice(axis, recv_i, has_halo)] = recv if src != MPI.PROC_NULL else 0.0 + u[_face_slice(axis, recv_i, has_halo)] = ( + recv if src != MPI.PROC_NULL else 0.0 + ) tag += 1 class CustomHaloExchange(_BaseHaloExchange): """Halo exchange using MPI derived datatypes (zero-copy).""" + name = "MPI Datatype" def __init__(self): @@ -68,26 +73,53 @@ def _exchange_axis(self, u, axis, lo_rank, hi_rank, has_halo, comm, tag): # Compute offsets into flattened array start = [1 if has_halo[ax] else 0 for ax in range(3)] - flat = lambda z, y, x: z * ny * nx + y * nx + x + + def flat(z, y, x): + return z * ny * nx + y * nx + x if axis == 0: - send_hi, recv_lo = flat(nz-2, start[1], start[2]), flat(0, start[1], start[2]) - send_lo, recv_hi = flat(1, start[1], start[2]), flat(nz-1, start[1], start[2]) + send_hi, recv_lo = ( + flat(nz - 2, start[1], start[2]), + flat(0, start[1], start[2]), + ) + send_lo, recv_hi = ( + flat(1, start[1], start[2]), + flat(nz - 1, start[1], start[2]), + ) elif axis == 1: - send_hi, recv_lo = flat(start[0], ny-2, start[2]), flat(start[0], 0, start[2]) - send_lo, recv_hi = flat(start[0], 1, start[2]), flat(start[0], ny-1, start[2]) + send_hi, recv_lo = ( + flat(start[0], ny - 2, start[2]), + flat(start[0], 0, start[2]), + ) + send_lo, recv_hi = ( + flat(start[0], 1, start[2]), + flat(start[0], ny - 1, start[2]), + ) else: - send_hi, recv_lo = flat(start[0], start[1], nx-2), flat(start[0], start[1], 0) - send_lo, recv_hi = flat(start[0], start[1], 1), flat(start[0], start[1], nx-1) + send_hi, recv_lo = ( + flat(start[0], start[1], nx - 2), + flat(start[0], start[1], 0), + ) + send_lo, recv_hi = ( + flat(start[0], start[1], 1), + flat(start[0], start[1], nx - 1), + ) # Exchange - comm.Sendrecv([u_flat[send_hi:], 1, dtype], hi, tag, - [u_flat[recv_lo:], 1, dtype], lo, tag) + comm.Sendrecv( + [u_flat[send_hi:], 1, dtype], hi, tag, [u_flat[recv_lo:], 1, dtype], lo, tag + ) if lo_rank is None: u[_face_slice(axis, 0, has_halo)] = 0.0 - comm.Sendrecv([u_flat[send_lo:], 1, dtype], lo, tag + 1, - [u_flat[recv_hi:], 1, dtype], hi, tag + 1) + comm.Sendrecv( + [u_flat[send_lo:], 1, dtype], + lo, + tag + 1, + [u_flat[recv_hi:], 1, dtype], + hi, + tag + 1, + ) if hi_rank is None: u[_face_slice(axis, -1, has_halo)] = 0.0 diff --git a/src/Poisson/decomposition.py b/src/Poisson/mpi/decomposition.py similarity index 86% rename from src/Poisson/decomposition.py rename to src/Poisson/mpi/decomposition.py index daa89f0..6d96799 100644 --- a/src/Poisson/decomposition.py +++ b/src/Poisson/mpi/decomposition.py @@ -11,12 +11,13 @@ @dataclass class RankInfo: """Decomposition information for a single rank.""" + rank: int # Local domain (interior points only) local_shape: tuple[int, int, int] # (nx, ny, nz) interior global_start: tuple[int, int, int] # (i, j, k) in global grid - global_end: tuple[int, int, int] # (i, j, k) exclusive + global_end: tuple[int, int, int] # (i, j, k) exclusive # Halo zones halo_shape: tuple[int, int, int] # Shape including halos @@ -53,21 +54,21 @@ class DomainDecomposition: ... print(f"Rank {rank}: {info.local_shape}") """ - def __init__(self, N, size, strategy='sliced', axis='z'): + def __init__(self, N, size, strategy="sliced", axis="z"): self.N = N self.size = size self.strategy = strategy # Normalize axis to integer (0=z, 1=y, 2=x in ZYX ordering) - axis_map = {'z': 0, 'y': 1, 'x': 2, 0: 0, 1: 1, 2: 2} + axis_map = {"z": 0, "y": 1, "x": 2, 0: 0, 1: 1, 2: 2} if axis not in axis_map: raise ValueError(f"Invalid axis: {axis}. Use 'x', 'y', 'z' or 0, 1, 2") self.axis = axis_map[axis] # Decompose domain - if strategy == 'sliced': + if strategy == "sliced": self._decompose_sliced() - elif strategy == 'cubic': + elif strategy == "cubic": self._decompose_cubic() else: raise ValueError(f"Unknown strategy: {strategy}") @@ -109,7 +110,7 @@ def _decompose_sliced(self): """Decompose domain with 1D slicing along configurable axis.""" interior_N = self.N - 2 axis = self.axis # 0=z, 1=y, 2=x - axis_names = ['z', 'y', 'x'] + axis_names = ["z", "y", "x"] axis_name = axis_names[axis] self._rank_info = [] @@ -145,8 +146,8 @@ def _decompose_sliced(self): # Neighbors along decomposition axis only neighbors = {} - neighbors[f'{axis_name}_lower'] = rank - 1 if rank > 0 else None - neighbors[f'{axis_name}_upper'] = rank + 1 if rank < self.size - 1 else None + neighbors[f"{axis_name}_lower"] = rank - 1 if rank > 0 else None + neighbors[f"{axis_name}_upper"] = rank + 1 if rank < self.size - 1 else None n_neighbors = sum(1 for n in neighbors.values() if n is not None) @@ -162,7 +163,7 @@ def _decompose_sliced(self): halo_shape=halo_shape, neighbors=neighbors, n_neighbors=n_neighbors, - halo_cells_total=halo_cells_total + halo_cells_total=halo_cells_total, ) self._rank_info.append(info) @@ -193,9 +194,9 @@ def split_sizes(n, parts): # Store for later use self._split_info = { - 'nx': (nx_counts, nx_starts), - 'ny': (ny_counts, ny_starts), - 'nz': (nz_counts, nz_starts), + "nx": (nx_counts, nx_starts), + "ny": (ny_counts, ny_starts), + "nz": (nz_counts, nz_starts), } self._rank_info = [] @@ -227,22 +228,22 @@ def split_sizes(n, parts): # Neighbors neighbors = {} - neighbors['x_lower'] = self._cart_neighbor(ix-1, iy, iz, px, py, pz) - neighbors['x_upper'] = self._cart_neighbor(ix+1, iy, iz, px, py, pz) - neighbors['y_lower'] = self._cart_neighbor(ix, iy-1, iz, px, py, pz) - neighbors['y_upper'] = self._cart_neighbor(ix, iy+1, iz, px, py, pz) - neighbors['z_lower'] = self._cart_neighbor(ix, iy, iz-1, px, py, pz) - neighbors['z_upper'] = self._cart_neighbor(ix, iy, iz+1, px, py, pz) + neighbors["x_lower"] = self._cart_neighbor(ix - 1, iy, iz, px, py, pz) + neighbors["x_upper"] = self._cart_neighbor(ix + 1, iy, iz, px, py, pz) + neighbors["y_lower"] = self._cart_neighbor(ix, iy - 1, iz, px, py, pz) + neighbors["y_upper"] = self._cart_neighbor(ix, iy + 1, iz, px, py, pz) + neighbors["z_lower"] = self._cart_neighbor(ix, iy, iz - 1, px, py, pz) + neighbors["z_upper"] = self._cart_neighbor(ix, iy, iz + 1, px, py, pz) n_neighbors = sum(1 for n in neighbors.values() if n is not None) nz, ny, nx = local_shape halo_cells = 0 - if neighbors['x_lower'] is not None or neighbors['x_upper'] is not None: + if neighbors["x_lower"] is not None or neighbors["x_upper"] is not None: halo_cells += 2 * nz * ny - if neighbors['y_lower'] is not None or neighbors['y_upper'] is not None: + if neighbors["y_lower"] is not None or neighbors["y_upper"] is not None: halo_cells += 2 * nz * nx - if neighbors['z_lower'] is not None or neighbors['z_upper'] is not None: + if neighbors["z_lower"] is not None or neighbors["z_upper"] is not None: halo_cells += 2 * ny * nx info = RankInfo( @@ -253,7 +254,7 @@ def split_sizes(n, parts): halo_shape=halo_shape, neighbors=neighbors, n_neighbors=n_neighbors, - halo_cells_total=halo_cells + halo_cells_total=halo_cells, ) self._rank_info.append(info) @@ -266,19 +267,20 @@ def _cart_neighbor(self, ix, iy, iz, px, py, pz): def _factorize_3d(self, n): """Simple 3D factorization (as cubic as possible).""" candidates = np.arange(1, int(n**0.5) + 1) - divisors = np.concatenate([candidates[n % candidates == 0], - n // candidates[n % candidates == 0]]) + divisors = np.concatenate( + [candidates[n % candidates == 0], n // candidates[n % candidates == 0]] + ) divisors = np.unique(divisors) best = (n, 1, 1) - best_score = float('inf') + best_score = float("inf") for i in divisors: remaining = n // i valid_j = divisors[divisors <= remaining] for j in valid_j[remaining % valid_j == 0]: k = remaining // j - score = (i - j)**2 + (j - k)**2 + (k - i)**2 + score = (i - j) ** 2 + (j - k) ** 2 + (k - i) ** 2 if score < best_score: best = (int(i), int(j), int(k)) best_score = score @@ -316,7 +318,7 @@ def initialize_local_arrays_distributed(self, N, rank, comm): # Build local source term using physical coordinates h = 2.0 / (N - 1) - if self.strategy == 'sliced': + if self.strategy == "sliced": # Sliced: one axis decomposed, others full gs = info.global_start axis = self.axis # 0=z, 1=y, 2=x @@ -333,8 +335,10 @@ def initialize_local_arrays_distributed(self, N, rank, comm): # Full axis coords.append(np.linspace(-1, 1, N)) - Z, Y, X = np.meshgrid(coords[0], coords[1], coords[2], indexing='ij') - source = 3 * np.pi**2 * np.sin(np.pi * X) * np.sin(np.pi * Y) * np.sin(np.pi * Z) + Z, Y, X = np.meshgrid(coords[0], coords[1], coords[2], indexing="ij") + source = ( + 3 * np.pi**2 * np.sin(np.pi * X) * np.sin(np.pi * Y) * np.sin(np.pi * Z) + ) # Build interior slice (skip halo on decomposed axis only) interior = [slice(None), slice(None), slice(None)] @@ -361,13 +365,19 @@ def initialize_local_arrays_distributed(self, N, rank, comm): Yl = ys.reshape((1, ny, 1)) Xl = xs.reshape((1, 1, nx)) - f_local[1:-1, 1:-1, 1:-1] = 3 * np.pi**2 * np.sin(np.pi * Xl) * np.sin(np.pi * Yl) * np.sin(np.pi * Zl) + f_local[1:-1, 1:-1, 1:-1] = ( + 3 + * np.pi**2 + * np.sin(np.pi * Xl) + * np.sin(np.pi * Yl) + * np.sin(np.pi * Zl) + ) return u1, u2, f_local def extract_interior(self, u_local): """Extract interior points from local array (excluding halos).""" - if self.strategy == 'sliced': + if self.strategy == "sliced": interior = [slice(None), slice(None), slice(None)] interior[self.axis] = slice(1, -1) return u_local[tuple(interior)].copy() @@ -381,7 +391,7 @@ def get_interior_placement(self, rank_id, N, comm): """ info = self.get_rank_info(rank_id) - if self.strategy == 'sliced': + if self.strategy == "sliced": gs = info.global_start ge = info.global_end slices = [slice(None), slice(None), slice(None)] @@ -410,7 +420,7 @@ def apply_boundary_conditions(self, u_local, rank): rank : int MPI rank """ - if self.strategy != 'cubic': + if self.strategy != "cubic": return # Sliced doesn't need this - boundaries are handled differently info = self.get_rank_info(rank) @@ -446,7 +456,7 @@ class NoDecomposition: """Stub decomposition for single-rank (sequential) execution.""" def __init__(self): - self.strategy = 'none' + self.strategy = "none" self._N = None def get_rank_info(self, rank): @@ -454,18 +464,18 @@ def get_rank_info(self, rank): N = self._N or 1 return RankInfo( rank=0, - local_shape=(N-2, N-2, N-2), # Interior only + local_shape=(N - 2, N - 2, N - 2), # Interior only global_start=(1, 1, 1), - global_end=(N-1, N-1, N-1), + global_end=(N - 1, N - 1, N - 1), halo_shape=(N, N, N), neighbors={}, n_neighbors=0, - halo_cells_total=0 + halo_cells_total=0, ) def initialize_local_arrays_distributed(self, N, rank, comm): """Initialize arrays for single-rank execution.""" - from .problems import sinusoidal_source_term + from ..problems import sinusoidal_source_term self._N = N if rank == 0: diff --git a/src/Poisson/problems.py b/src/Poisson/problems.py index 4468f19..708a919 100644 --- a/src/Poisson/problems.py +++ b/src/Poisson/problems.py @@ -5,7 +5,9 @@ import numpy as np -def create_grid_3d(N: int, value: float = 0.0, boundary_value: float = 0.0) -> np.ndarray: +def create_grid_3d( + N: int, value: float = 0.0, boundary_value: float = 0.0 +) -> np.ndarray: """Create 3D grid with specified interior and boundary values.""" u = np.full((N, N, N), value, dtype=np.float64) u[[0, -1], :, :] = boundary_value @@ -35,7 +37,9 @@ def sinusoidal_source_term(N: int) -> np.ndarray: return 3 * np.pi**2 * np.sin(np.pi * xs) * np.sin(np.pi * ys) * np.sin(np.pi * zs) -def setup_sinusoidal_problem(N: int, initial_value: float = 0.0) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]: +def setup_sinusoidal_problem( + N: int, initial_value: float = 0.0 +) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]: """Set up sinusoidal test problem: -∇²u = 3π² sin(π x)sin(π y)sin(π z).""" h = 2.0 / (N - 1) u1 = create_grid_3d(N, value=initial_value, boundary_value=0.0) diff --git a/src/Poisson/jacobi.py b/src/Poisson/solver.py similarity index 61% rename from src/Poisson/jacobi.py rename to src/Poisson/solver.py index a7f0524..a929b69 100644 --- a/src/Poisson/jacobi.py +++ b/src/Poisson/solver.py @@ -5,6 +5,8 @@ is enabled by providing decomposition and communicator strategies. """ +import time + import numpy as np from dataclasses import asdict from mpi4py import MPI @@ -13,8 +15,8 @@ from .kernels import NumPyKernel, NumbaKernel from .datastructures import GlobalParams, GlobalMetrics, LocalSeries -from .communicators import NumpyHaloExchange -from .decomposition import NoDecomposition +from .mpi.communicators import NumpyHaloExchange +from .mpi.decomposition import NoDecomposition def _get_strategy_name(obj): @@ -22,7 +24,9 @@ def _get_strategy_name(obj): if obj is None: return "numpy" name = obj.__class__.__name__.lower() - return name.replace('decomposition', '').replace('communicator', '').replace('mpi', '') + return ( + name.replace("decomposition", "").replace("communicator", "").replace("mpi", "") + ) class JacobiPoisson: @@ -59,32 +63,39 @@ def __init__(self, decomposition=None, communicator=None, **kwargs): self.kernel = KernelClass( N=self.config.N, omega=self.config.omega, - numba_threads=self.config.numba_threads if self.config.use_numba else None + numba_threads=self.config.numba_threads if self.config.use_numba else None, ) # Strategy setup self._setup_strategies(decomposition, communicator) # Initialize arrays - self.u1_local, self.u2_local, self.f_local = \ + self.u1_local, self.u2_local, self.f_local = ( self.decomposition.initialize_local_arrays_distributed( self.config.N, self.rank, self.comm ) + ) def _setup_strategies(self, decomposition, communicator): """Configure decomposition and communicator strategies.""" if self.size == 1: # Single rank: store names but use NoDecomposition internally - self.config.decomposition = getattr(decomposition, 'strategy', 'none') if decomposition else "none" - self.config.communicator = _get_strategy_name(communicator) if communicator else "none" + self.config.decomposition = ( + getattr(decomposition, "strategy", "none") if decomposition else "none" + ) + self.config.communicator = ( + _get_strategy_name(communicator) if communicator else "none" + ) self.decomposition = NoDecomposition() self.communicator = communicator or NumpyHaloExchange() else: if decomposition is None: - raise ValueError("Decomposition strategy required for multi-rank execution") + raise ValueError( + "Decomposition strategy required for multi-rank execution" + ) self.decomposition = decomposition self.communicator = communicator or NumpyHaloExchange() - self.config.decomposition = getattr(decomposition, 'strategy', 'unknown') + self.config.decomposition = getattr(decomposition, "strategy", "unknown") self.config.communicator = _get_strategy_name(self.communicator) # ======================================================================== @@ -103,9 +114,18 @@ def _iterate(self): """Execute Jacobi iteration loop.""" uold, u = self.u1_local, self.u2_local + # MLflow timing + mlflow_time = 0.0 + for i in range(self.config.max_iter): residual = self._step(uold, u) + # Live MLflow logging every 50 iterations + if self.rank == 0 and i % 50 == 0 and mlflow.active_run(): + t_log_start = time.time() + mlflow.log_metrics({"residual": residual}, step=i) + mlflow_time += time.time() - t_log_start + if residual < self.config.tolerance: self._record_convergence(i + 1, converged=True) return u @@ -131,13 +151,15 @@ def _step(self, uold, u): # Compute residual after BCs (so boundary cells don't contribute) diff = u[1:-1, 1:-1, 1:-1] - uold[1:-1, 1:-1, 1:-1] - local_diff_sum = np.sum(diff ** 2) + local_diff_sum = np.sum(diff**2) self.timeseries.compute_times.append(MPI.Wtime() - t0) # Global residual t0 = MPI.Wtime() n_interior = (self.config.N - 2) ** 3 - global_residual = np.sqrt(self.comm.allreduce(local_diff_sum, op=MPI.SUM)) / n_interior + global_residual = ( + np.sqrt(self.comm.allreduce(local_diff_sum, op=MPI.SUM)) / n_interior + ) self.timeseries.mpi_comm_times.append(MPI.Wtime() - t0) if self.rank == 0: @@ -153,7 +175,9 @@ def _gather_solution(self, u_local): all_interiors = self.comm.gather(local_interior, root=0) self.u_global = np.zeros((self.config.N,) * 3) for rank_id, data in enumerate(all_interiors): - placement = self.decomposition.get_interior_placement(rank_id, self.config.N, self.comm) + placement = self.decomposition.get_interior_placement( + rank_id, self.config.N, self.comm + ) self.u_global[placement] = data else: self.comm.gather(local_interior, root=0) @@ -190,7 +214,10 @@ def compute_l2_error(self): gs = info.global_start local_shape = info.local_shape - if hasattr(self.decomposition, 'strategy') and self.decomposition.strategy == 'cubic': + if ( + hasattr(self.decomposition, "strategy") + and self.decomposition.strategy == "cubic" + ): # Cubic: all dims decomposed nz, ny, nx = local_shape z_idx = np.arange(gs[0], gs[0] + nz) @@ -216,7 +243,7 @@ def compute_l2_error(self): ys = np.linspace(-1, 1, N)[1:-1] # Interior only xs = np.linspace(-1, 1, N)[1:-1] - Z, Y, X = np.meshgrid(zs, ys, xs, indexing='ij') + Z, Y, X = np.meshgrid(zs, ys, xs, indexing="ij") u_exact = np.sin(np.pi * X) * np.sin(np.pi * Y) * np.sin(np.pi * Z) u_numerical = u_local[1:-1, 1:-1, 1:-1] @@ -246,39 +273,125 @@ def save_hdf5(self, path): return import pandas as pd + row = {**asdict(self.config), **asdict(self.results)} - pd.DataFrame([row]).to_hdf(path, key='results', mode='w') + pd.DataFrame([row]).to_hdf(path, key="results", mode="w") # ======================================================================== - # MLflow + # MLflow Integration # ======================================================================== - def mlflow_start(self, experiment_name): - """Start MLflow run (rank 0 only).""" + def mlflow_start( + self, experiment_name: str, run_name: str = None, parent_run_name: str = None + ): + """Start MLflow run and log parameters (rank 0 only).""" if self.rank != 0: return mlflow.login() - # Databricks requires absolute paths - experiment_name = f"/Shared/{experiment_name}" + # Databricks requires absolute paths - using a standard project prefix if not present + if not experiment_name.startswith("/"): + experiment_name = f"/Shared/LSM-Project-2/{experiment_name}" if mlflow.get_experiment_by_name(experiment_name) is None: mlflow.create_experiment(name=experiment_name) mlflow.set_experiment(experiment_name) - mlflow.start_run() + + # Handle parent run if specified + if parent_run_name: + experiment = mlflow.get_experiment_by_name(experiment_name) + client = mlflow.tracking.MlflowClient() + + # Search for existing parent run + runs = client.search_runs( + experiment_ids=[experiment.experiment_id], + filter_string=f"tags.mlflow.runName = '{parent_run_name}' AND tags.is_parent = 'true'", + max_results=1, + ) + + if runs: + parent_run_id = runs[0].info.run_id + else: + parent_run = client.create_run( + experiment_id=experiment.experiment_id, + run_name=parent_run_name, + tags={"is_parent": "true"}, + ) + parent_run_id = parent_run.info.run_id + + # Start nested child run + mlflow.start_run(run_id=parent_run_id, log_system_metrics=False) + mlflow.start_run(run_name=run_name, nested=True, log_system_metrics=True) + self._mlflow_nested = True + else: + mlflow.start_run(log_system_metrics=True, run_name=run_name) + self._mlflow_nested = False + + # Log all parameters from config mlflow.log_params(asdict(self.config)) - def mlflow_end(self): - """End MLflow run with metrics (rank 0 only).""" + def mlflow_end(self, log_time_series: bool = True): + """End MLflow run and log metrics (rank 0 only).""" if self.rank != 0: return - import mlflow - mlflow.log_metrics({ - "total_compute_time": sum(self.timeseries.compute_times), - "total_halo_time": sum(self.timeseries.halo_exchange_times), - "total_mpi_comm_time": sum(self.timeseries.mpi_comm_times), - }) + # Populate timing totals in results if timeseries exists + if self.timeseries.compute_times: + self.results.total_compute_time = sum(self.timeseries.compute_times) + if self.timeseries.halo_exchange_times: + self.results.total_halo_time = sum(self.timeseries.halo_exchange_times) + if self.timeseries.mpi_comm_times: + self.results.total_mpi_comm_time = sum(self.timeseries.mpi_comm_times) + + # Log final metrics + mlflow.log_metrics(asdict(self.results)) + + # Log time series as step-based metrics + if log_time_series: + self._mlflow_log_time_series() + + # End child run mlflow.end_run() + + # End parent run if nested + if getattr(self, "_mlflow_nested", False): + mlflow.end_run() + + def _mlflow_log_time_series(self): + """Log time series as step-based metrics using async batch logging.""" + from mlflow.entities import Metric + + if not mlflow.active_run(): + return + + run_id = mlflow.active_run().info.run_id + client = mlflow.tracking.MlflowClient() + timestamp = int(time.time() * 1000) + + # Build all metrics from timeseries + metrics = [] + ts_dict = asdict(self.timeseries) + for name, values in ts_dict.items(): + if values: # Skip empty lists + for step, value in enumerate(values): + # Ensure value is float + try: + val = float(value) + metrics.append(Metric(name, val, timestamp, step)) + except (ValueError, TypeError): + continue + + # Async batch log (non-blocking) - split into chunks of 1000 (MLflow limit) + batch_size = 1000 + for i in range(0, len(metrics), batch_size): + chunk = metrics[i : i + batch_size] + if chunk: + client.log_batch(run_id, metrics=chunk, synchronous=False) + + def mlflow_log_artifact(self, filepath: str): + """Log an artifact to MLflow (rank 0 only).""" + if self.rank != 0: + return + mlflow.log_artifact(filepath) diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 4212172..1a8ea02 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -1,11 +1,12 @@ -"""Utility modules for data handling, plotting, and CLI. +"""Utility modules for plotting and CLI. Import conveniences: -- from utils import datatools # For data operations - from utils import plotting # For plotting operations - from utils import cli # For command-line argument parsing +- from utils import mlflow_io # For MLflow I/O operations +- from utils import hpc # For HPC job generation """ -from . import datatools, plotting, cli +from . import plotting, cli, mlflow_io, hpc -__all__ = ["datatools", "plotting", "cli"] +__all__ = ["plotting", "cli", "mlflow_io", "hpc"] diff --git a/src/utils/datatools.py b/src/utils/datatools.py deleted file mode 100644 index 280238a..0000000 --- a/src/utils/datatools.py +++ /dev/null @@ -1,204 +0,0 @@ -"""Data utilities for creating, loading, and saving simulation data. - -This module provides utilities for: -- Path management (automatic directory structure mirroring Experiments/) -- Data I/O (loading and saving dataframes) -- Numerical computations (norms and errors) -- DataFrame utilities -""" - -from __future__ import annotations - -import inspect -from pathlib import Path -from typing import Any, Literal - -import numpy as np -import pandas as pd - - -# ============================================================================== -# Path Management -# ============================================================================== - - -def get_repo_root() -> Path: - """Get repository root directory. - - Returns the repository root by detecting the presence of pyproject.toml. - Works from any subdirectory of the repository. - - Returns - ------- - Path - Absolute path to the repository root - - """ - # Start from this file's location - current = Path(__file__).resolve().parent - - # Walk up until we find pyproject.toml (marks repo root) - for parent in [current] + list(current.parents): - if (parent / "pyproject.toml").exists(): - return parent - - # Fallback: assume two levels up from utils - return current.parent.parent - - -def get_experiment_name(caller_file: Path | str | None = None) -> str: - """Get experiment name from the calling script's location. - - Extracts the experiment name from the path relative to Experiments/. - For example: - - Experiments/sequential/compute.py → "sequential" - - Experiments/parallel/mpi/compute.py → "parallel/mpi" - - Parameters - ---------- - caller_file : Path or str, optional - Path to the calling file. If None, automatically detects the caller. - - Returns - ------- - str - Experiment name (relative path from Experiments/) - - Raises - ------ - ValueError - If the calling file is not in an Experiments/ subdirectory - - """ - if caller_file is None: - # Get the calling file from the stack - # We need to go up 2 frames: this function -> get_data_dir/get_figures_dir -> actual caller - frame = inspect.currentframe() - if frame is None or frame.f_back is None or frame.f_back.f_back is None: - raise RuntimeError("Cannot detect caller file") - caller_file = Path(frame.f_back.f_back.f_globals["__file__"]) - else: - caller_file = Path(caller_file) - - caller_file = caller_file.resolve() - - # Find Experiments directory in the path - experiments_idx = None - for i, part in enumerate(caller_file.parts): - if part == "Experiments": - experiments_idx = i - break - - if experiments_idx is None: - raise ValueError( - f"File {caller_file} is not in an Experiments/ subdirectory. " - "This utility is designed for scripts in Experiments/*/" - ) - - # Get the path components between Experiments/ and the filename - # e.g., for Experiments/sequential/compute.py → ["sequential"] - # or for Experiments/parallel/mpi/compute.py → ["parallel", "mpi"] - experiment_parts = caller_file.parts[experiments_idx + 1 : -1] - - if not experiment_parts: - raise ValueError( - f"File {caller_file} is directly in Experiments/. " - "Scripts should be in a subdirectory (e.g., Experiments/sequential/)" - ) - - return "/".join(experiment_parts) - - -def get_data_dir(caller_file: Path | str | None = None, create: bool = True) -> Path: - """Get data directory for the calling experiment. - - Automatically determines the correct data directory based on the - calling script's location in Experiments/, mirroring the structure. - - Parameters - ---------- - caller_file : Path or str, optional - Path to the calling file. If None, automatically detects the caller. - create : bool, default True - Whether to create the directory if it doesn't exist - - Returns - ------- - Path - Data directory path (e.g., repo_root/data/sequential/) - - Examples - -------- - From Experiments/sequential/compute.py: - >>> data_dir = get_data_dir() # Returns repo_root/data/sequential/ - - From Experiments/parallel/mpi/compute.py: - >>> data_dir = get_data_dir() # Returns repo_root/data/parallel/mpi/ - - """ - experiment_name = get_experiment_name(caller_file) - repo_root = get_repo_root() - data_dir = repo_root / "data" / experiment_name - - if create: - data_dir.mkdir(parents=True, exist_ok=True) - - return data_dir - - -def get_figures_dir(caller_file: Path | str | None = None, create: bool = True) -> Path: - """Get figures directory for the calling experiment. - - Automatically determines the correct figures directory based on the - calling script's location in Experiments/, mirroring the structure. - - Parameters - ---------- - caller_file : Path or str, optional - Path to the calling file. If None, automatically detects the caller. - create : bool, default True - Whether to create the directory if it doesn't exist - - Returns - ------- - Path - Figures directory path (e.g., repo_root/figures/sequential/) - - Examples - -------- - From Experiments/sequential/plot.py: - >>> figures_dir = get_figures_dir() # Returns repo_root/figures/sequential/ - - From Experiments/parallel/mpi/plot.py: - >>> figures_dir = get_figures_dir() # Returns repo_root/figures/parallel/mpi/ - - """ - experiment_name = get_experiment_name(caller_file) - repo_root = get_repo_root() - figures_dir = repo_root / "figures" / experiment_name - - if create: - figures_dir.mkdir(parents=True, exist_ok=True) - - return figures_dir - - -def ensure_output_dir(path: Path | str) -> Path: - """Ensure output directory exists, creating it if necessary. - - Parameters - ---------- - path : Path or str - Directory path to create - - Returns - ------- - Path - The created/existing directory path - - """ - path = Path(path) - path.mkdir(parents=True, exist_ok=True) - return path - - diff --git a/src/utils/generate_pack.py b/src/utils/generate_pack.py new file mode 100644 index 0000000..abb235a --- /dev/null +++ b/src/utils/generate_pack.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Generate LSF job pack for scaling experiments. + +Usage: + python generate_pack.py --type strong --output jobs.pack + python generate_pack.py --type weak +""" + +import argparse +from pathlib import Path +import sys +# Use relative import since this is now in utils +from .hpc import load_config, generate_pack_lines, write_pack_file + +def main(): + parser = argparse.ArgumentParser(description="Generate LSF job pack") + parser.add_argument("--type", choices=["strong", "weak"], required=True, help="Scaling type") + parser.add_argument("--output", type=str, default="jobs.pack", help="Output pack file") + parser.add_argument("--config-dir", type=str, required=True, help="Directory containing config files") + # Optional override for strong scaling grid sizes + parser.add_argument("--N", type=int, nargs="+", help="Grid sizes for strong scaling") + + args = parser.parse_args() + + # Load config based on type from the specified directory + config_path = Path(args.config_dir) / f"{args.type}_scaling.yaml" + if not config_path.exists(): + # Fallback to template for strong scaling if specific file doesn't exist + if args.type == "strong": + config_path = Path(args.config_dir) / "template_config.yaml" + + if not config_path.exists(): + print(f"Error: Config file {config_path} not found") + sys.exit(1) + + config = load_config(config_path) + + # Override N if provided + if args.N and args.type == "strong": + if "groups" in config: + for group in config["groups"]: + if "sweep" in group: + group["sweep"]["N"] = args.N + elif "sweep" in config: + config["sweep"]["N"] = args.N + + # Generate content + job_name_base = f"{args.type}_scaling" + lines = generate_pack_lines(config, job_name_base) + + # Write file + write_pack_file(args.output, lines) + + print(f"Generated {len(lines)} jobs in {args.output}") + +if __name__ == "__main__": + main() diff --git a/src/utils/hpc.py b/src/utils/hpc.py new file mode 100644 index 0000000..ded264c --- /dev/null +++ b/src/utils/hpc.py @@ -0,0 +1,168 @@ +"""Utilities for LSF job pack generation.""" + +import yaml +import itertools +from pathlib import Path +from typing import List, Dict, Any + + +def load_config(config_path: Path) -> Dict[str, Any]: + """Load YAML configuration file.""" + with open(config_path, "r") as f: + return yaml.safe_load(f) + + +def generate_pack_lines(config: Dict[str, Any], job_name_base: str) -> List[str]: + """Generate list of job pack lines (options + command).""" + script = config.get("script", "compute_scaling.py") + base_cmd = f"mpiexec -n {{ranks}} uv run python {script}" + + # Global defaults + global_params = config.get("parameters", {}) + global_lsf = config.get("lsf", {}) + global_sweep = config.get("sweep", {}) + global_mpi = config.get("mpi_options", "") + + # Normalize to a list of groups + if "groups" in config: + groups = config["groups"] + else: + # Legacy/Simple mode: Treat top-level as a single group + groups = [{ + "name": "default", + "lsf": {}, + "parameters": {}, + "sweep": global_sweep, + "mpi_options": "" + }] + + lines = [] + global_job_counter = 1 + + for group in groups: + # Merge configurations (Group overrides Global) + group_lsf = {**global_lsf, **group.get("lsf", {})} + group_params = {**global_params, **group.get("parameters", {})} + group_sweep = group.get("sweep", {}) + group_mpi = group.get("mpi_options", global_mpi) + + # If no sweep in group, use global sweep (if it exists and wasn't just the legacy fallback) + if not group_sweep and "groups" in config: + group_sweep = global_sweep + + # Prepare sweep combinations + keys = list(group_sweep.keys()) + values = list(group_sweep.values()) + combinations = list(itertools.product(*values)) + + group_name = group.get("name", "job") + + for combo in combinations: + current_params = group_params.copy() + current_sweep = dict(zip(keys, combo)) + + # Handle scaling logic + scaling_type = config.get("type", "strong") + if scaling_type == "weak": + # For weak scaling, N depends on ranks + base_N = current_params.get("base_N", 32) + ranks = current_sweep.get("ranks", 1) + N = int(round(base_N * (ranks**(1/3)))) + current_params["N"] = N + + # Merge sweep parameters + current_params.update(current_sweep) + + # Extract ranks for mpiexec and bsub -n + ranks = current_params.pop("ranks") + + # Calculate job name early to pass to script + current_job_name = f"{job_name_base}_{group_name}_{global_job_counter}" + + # Build command arguments + args = [] + for k, v in current_params.items(): + if isinstance(v, bool): + if v: + args.append(f"--{k}") + else: + args.append(f"--{k} {v}") + + # Add logging info for MLflow artifact upload + args.append(f"--job-name {current_job_name}") + args.append("--log-dir logs") + args.append(f"--experiment-name {group_name}") + + # Construct the actual command to run + # base_cmd pattern: mpiexec -n {ranks} {mpi_options} uv run python {script} {args} + cmd_parts = [f"mpiexec -n {ranks}"] + if group_mpi: + cmd_parts.append(group_mpi) + cmd_parts.append(f"uv run python {script}") + cmd_parts.append(" ".join(args)) + + main_cmd = " ".join(cmd_parts) + + # Chain the log uploader command + # We use (cmd; uploader) to ensure uploader runs regardless of cmd exit status + uploader_cmd = f"uv run python src/utils/upload_logs.py --job-name {current_job_name} --log-dir logs --experiment-name {group_name}" + cmd = f'({main_cmd}; {uploader_cmd})' + + # Build LSF options for this specific job + # current_job_name is already defined above + + # Map config keys to LSF flags + lsf_opts = [] + lsf_opts.append(f"-J {current_job_name}") + + # Cores: driven by ranks (if in sweep/params) or explicit 'cores' config + cores = ranks if ranks is not None else group_lsf.get('cores', 1) + lsf_opts.append(f"-n {cores}") + + # Value mappings + val_map = { + 'queue': '-q', + 'walltime': '-W', + 'memory': '-M', + 'email': '-u', + 'project': '-P', + } + for key, flag in val_map.items(): + if key in group_lsf: + lsf_opts.append(f"{flag} {group_lsf[key]}") + + # Boolean mappings + bool_map = { + 'exclusive': '-x', + 'notify_start': '-B', + 'notify_end': '-N', + } + for key, flag in bool_map.items(): + if group_lsf.get(key, False): + lsf_opts.append(flag) + + # Resource requirements (-R) + resources = group_lsf.get('resources', []) + if isinstance(resources, str): + resources = [resources] + for r in resources: + lsf_opts.append(f'-R "{r}"') + + # Logs + lsf_opts.append(f"-o logs/{current_job_name}.out") + lsf_opts.append(f"-e logs/{current_job_name}.err") + + # Combine options and command + line = " ".join(lsf_opts) + " " + cmd + lines.append(line) + global_job_counter += 1 + + return lines + + +def write_pack_file(output_path: Path, lines: List[str]): + """Write LSF pack file.""" + with open(output_path, "w") as f: + f.write("# LSF Job Pack generated by generate_pack.py\n") + for line in lines: + f.write(line + "\n") \ No newline at end of file diff --git a/src/utils/mlflow_io.py b/src/utils/mlflow_io.py new file mode 100644 index 0000000..c01721f --- /dev/null +++ b/src/utils/mlflow_io.py @@ -0,0 +1,207 @@ +"""MLflow I/O utilities for fetching runs and artifacts. + +This module provides helpers for retrieving experiment data from +MLflow/Databricks and downloading artifacts to the local data directory. +""" + +import os +from pathlib import Path +from typing import List, Optional +import mlflow +import pandas as pd + + +def setup_mlflow_auth(): + """Configure MLflow authentication. + + Uses DATABRICKS_TOKEN environment variable if available (for CI), + otherwise falls back to interactive login. + """ + token = os.environ.get("DATABRICKS_TOKEN") + if token: + # CI environment - set both host and token for Databricks auth + host = "https://dbc-6756e917-e5fc.cloud.databricks.com" + os.environ["DATABRICKS_HOST"] = host + mlflow.set_tracking_uri("databricks") + else: + # Local environment - interactive login + mlflow.login() + + +def load_runs( + experiment: str, + converged_only: bool = True, + exclude_parent_runs: bool = True, +) -> pd.DataFrame: + """Load runs from an MLflow experiment. + + Parameters + ---------- + experiment : str + Experiment name (e.g., "HPC-FV-Solver" or full path "/Shared/ANA-P3/HPC-FV-Solver"). + converged_only : bool, default True + Only return runs where metrics.converged = 1. + exclude_parent_runs : bool, default True + Exclude parent runs (nested run containers). + + Returns + ------- + pd.DataFrame + DataFrame with run info, parameters (params.*), and metrics (metrics.*). + + Examples + -------- + >>> df = load_runs("HPC-FV-Solver") + >>> df[["run_id", "params.nx", "metrics.wall_time_seconds"]] + """ + # Normalize experiment name + if not experiment.startswith("/"): + # Note: Adapted for LSM Project 2, assuming same shared folder structure or user should adjust + experiment = f"/Shared/LSM-Project-2/{experiment}" + + # Build filter string + filters = [] + if converged_only: + filters.append("metrics.converged = 1") + + filter_string = " and ".join(filters) if filters else "" + + # Fetch runs + df = mlflow.search_runs( + experiment_names=[experiment], + filter_string=filter_string, + order_by=["start_time DESC"], + ) + + # Filter out parent runs in pandas (MLflow filter doesn't handle None well) + if exclude_parent_runs and "tags.is_parent" in df.columns: + df = df[df["tags.is_parent"] != "true"] + + return df + + +def download_artifacts( + experiment: str, + output_dir: Path, + converged_only: bool = True, + artifact_filter: Optional[List[str]] = None, +) -> List[Path]: + """Download artifacts from MLflow runs to local directory. + + Parameters + ---------- + experiment : str + Experiment name (e.g., "HPC-FV-Solver"). + output_dir : Path + Directory to save artifacts. Files are named based on run parameters. + converged_only : bool, default True + Only download from converged runs. + artifact_filter : list of str, optional + Only download artifacts matching these patterns (e.g., ["*.h5", "*.png"]). + If None, downloads all artifacts. + + Returns + ------- + list of Path + Paths to downloaded files. + + Examples + -------- + >>> paths = download_artifacts("HPC-FV-Solver", Path("data/FV-Solver")) + >>> print(paths) + [Path('data/FV-Solver/LDC_N32_Re100.h5'), ...] + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Get runs + df = load_runs(experiment, converged_only=converged_only) + if df.empty: + print(f"No runs found for {experiment}") + return [] + + client = mlflow.tracking.MlflowClient() + downloaded = [] + + for _, row in df.iterrows(): + run_id = row["run_id"] + + # List artifacts + artifacts = client.list_artifacts(run_id) + + for artifact in artifacts: + # Apply filter if specified + if artifact_filter: + if not any(artifact.path.endswith(f) for f in artifact_filter): + continue + + # Download to output directory + local_path = client.download_artifacts(run_id, artifact.path, output_dir) + downloaded.append(Path(local_path)) + print(f" Downloaded: {artifact.path}") + + return downloaded + + +def download_artifacts_with_naming( + experiment: str, + output_dir: Path, + converged_only: bool = True, +) -> List[Path]: + """Download HDF5 artifacts with standardized naming. + + Names files as: POISSON_N{n}_Iter{iter}.h5 (Adapted for LSM) + + Parameters + ---------- + experiment : str + Experiment name. + output_dir : Path + Directory to save artifacts. + converged_only : bool, default True + Only download from converged runs. + + Returns + ------- + list of Path + Paths to downloaded files. + """ + import tempfile + import shutil + + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + df = load_runs(experiment, converged_only=converged_only) + if df.empty: + print(f"No runs found for {experiment}") + return [] + + client = mlflow.tracking.MlflowClient() + downloaded = [] + + for _, row in df.iterrows(): + run_id = row["run_id"] + + # Extract parameters for naming - Adapting to typical Poisson params + # Assuming 'n' is grid size, 'max_iter' or 'iterations' might be useful + n = row.get("params.n", row.get("params.N", "unknown")) + + # List artifacts and find HDF5 files + artifacts = client.list_artifacts(run_id) + + for artifact in artifacts: + if artifact.path.endswith(".h5"): + # Download to temp location first + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = client.download_artifacts(run_id, artifact.path, tmpdir) + + # Rename with standardized naming + new_name = f"Poisson_N{n}_{artifact.path.split('/')[-1]}" + final_path = output_dir / new_name + + shutil.copy(tmp_path, final_path) + downloaded.append(final_path) + print(f" {artifact.path} -> {new_name}") + + return downloaded diff --git a/src/utils/upload_logs.py b/src/utils/upload_logs.py new file mode 100644 index 0000000..d8b9300 --- /dev/null +++ b/src/utils/upload_logs.py @@ -0,0 +1,80 @@ +"""Helper script to upload LSF logs to MLflow after a job finishes. + +This script is designed to run after the main computation, regardless of success or failure. +It relies on a .runid file created by the main script to know which MLflow run to attach logs to. +""" + +import argparse +import os +import time +from pathlib import Path +import mlflow + +def main(): + parser = argparse.ArgumentParser(description="Upload LSF logs to MLflow") + parser.add_argument("--job-name", type=str, required=True, help="LSF Job Name") + parser.add_argument("--log-dir", type=str, default="logs", help="Directory containing logs") + parser.add_argument("--experiment-name", type=str, default="HPC-Poisson-Scaling", help="MLflow experiment name") + args = parser.parse_args() + + log_dir = Path(args.log_dir) + job_name = args.job_name + run_id_file = log_dir / f"{job_name}.runid" + out_log = log_dir / f"{job_name}.out" + err_log = log_dir / f"{job_name}.err" + + # Give the filesystem a moment to sync logs if the job just crashed + time.sleep(2) + + run_id = None + if run_id_file.exists(): + try: + with open(run_id_file, "r") as f: + run_id = f.read().strip() + print(f"Found Run ID: {run_id}") + except Exception as e: + print(f"Error reading run ID file: {e}") + + # Logic to handle upload + try: + active_run = None + + if run_id: + # Resume existing run + active_run = mlflow.start_run(run_id=run_id, log_system_metrics=False) + else: + # Create new run for startup failure + print("Run ID file not found. Creating new run for startup failure.") + experiment_name = args.experiment_name + + # Ensure experiment exists + if mlflow.get_experiment_by_name(experiment_name) is None: + try: + mlflow.create_experiment(name=experiment_name) + except: + pass # concurrent creation might fail, ignore + + mlflow.set_experiment(experiment_name) + active_run = mlflow.start_run(run_name=f"{job_name} (Startup Failure)") + mlflow.set_tag("status", "startup_failure") + + with active_run: + if out_log.exists(): + print(f"Uploading stdout: {out_log}") + mlflow.log_artifact(str(out_log), artifact_path="logs") + else: + print(f"Warning: stdout log not found at {out_log}") + + if err_log.exists(): + print(f"Uploading stderr: {err_log}") + mlflow.log_artifact(str(err_log), artifact_path="logs") + else: + print(f"Warning: stderr log not found at {err_log}") + + print("Log upload complete.") + + except Exception as e: + print(f"Failed to upload logs to MLflow: {e}") + +if __name__ == "__main__": + main() diff --git a/tests/test_communicators.py b/tests/test_communicators.py deleted file mode 100644 index 7f6c6c2..0000000 --- a/tests/test_communicators.py +++ /dev/null @@ -1,130 +0,0 @@ -"""Tests for halo exchange communicators.""" - -import numpy as np -import pytest -from Poisson import NumpyHaloExchange, CustomHaloExchange - - -class MockDecomposition: - """Mock decomposition for testing communicators.""" - - def __init__(self, neighbors): - self._neighbors = neighbors - - def get_neighbors(self, rank): - return self._neighbors - - -class MockComm: - """Mock MPI communicator for single-rank testing.""" - - PROC_NULL = -1 - - def Sendrecv(self, sendbuf, dest, sendtag, recvbuf, source, recvtag): - """No-op for single rank - just zero the receive buffer.""" - if hasattr(recvbuf, '__getitem__') and hasattr(recvbuf[0], 'fill'): - recvbuf[0].fill(0.0) - - -class TestNumpyHaloExchange: - """Tests for NumPy-based halo exchange.""" - - def test_sliced_z_axis_boundaries_zeroed(self): - """Boundary halos should be zero (Dirichlet BC).""" - comm = NumpyHaloExchange() - u = np.ones((10, 8, 8), dtype=np.float64) - - # No neighbors = boundary rank - decomp = MockDecomposition({'z_lower': None, 'z_upper': None}) - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - - # Halos at boundaries should remain unchanged (no neighbor to receive from) - # Interior should be unchanged - assert np.all(u[1:-1, :, :] == 1.0) - - def test_sliced_y_axis_detection(self): - """Should detect y-axis decomposition from neighbor keys.""" - comm = NumpyHaloExchange() - u = np.ones((8, 10, 8), dtype=np.float64) - - decomp = MockDecomposition({'y_lower': None, 'y_upper': None}) - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - - # Should not crash - verifies axis detection works - assert u.shape == (8, 10, 8) - - def test_sliced_interior_unchanged(self): - """Sliced halo exchange should not modify interior values.""" - comm = NumpyHaloExchange() - u = np.ones((10, 8, 8), dtype=np.float64) - interior_before = u[1:-1, :, :].copy() - - decomp = MockDecomposition({'z_lower': None, 'z_upper': None}) - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - - # Interior should be unchanged - assert np.allclose(u[1:-1, :, :], interior_before) - - def test_cubic_all_boundaries(self): - """Cubic decomposition with all boundaries should zero halos.""" - comm = NumpyHaloExchange() - u = np.ones((10, 10, 10), dtype=np.float64) - - # Corner rank - no neighbors - decomp = MockDecomposition({ - 'x_lower': None, 'x_upper': None, - 'y_lower': None, 'y_upper': None, - 'z_lower': None, 'z_upper': None, - }) - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - - # All boundary halos should be zeroed - assert np.all(u[1:-1, 1:-1, 0] == 0.0) # x_lower - assert np.all(u[1:-1, 1:-1, -1] == 0.0) # x_upper - assert np.all(u[1:-1, 0, 1:-1] == 0.0) # y_lower - assert np.all(u[1:-1, -1, 1:-1] == 0.0) # y_upper - assert np.all(u[0, 1:-1, 1:-1] == 0.0) # z_lower - assert np.all(u[-1, 1:-1, 1:-1] == 0.0) # z_upper - - # Interior unchanged - assert np.all(u[1:-1, 1:-1, 1:-1] == 1.0) - - -class TestCustomHaloExchange: - """Tests for MPI datatype-based halo exchange.""" - - def test_datatype_caching(self): - """Should cache datatypes after first exchange.""" - comm = CustomHaloExchange() - u = np.ones((10, 10, 10), dtype=np.float64) - - decomp = MockDecomposition({'z_lower': None, 'z_upper': None}) - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - - # Datatypes should be cached - assert len(comm._cache) == 1 - - def test_datatype_reuse(self): - """Should reuse datatypes for same shape/decomposition.""" - comm = CustomHaloExchange() - u = np.ones((10, 10, 10), dtype=np.float64) - decomp = MockDecomposition({'z_lower': None, 'z_upper': None}) - - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - cache_size_after_first = len(comm._cache) - - comm.exchange_halos(u, decomp, rank=0, comm=MockComm()) - assert len(comm._cache) == cache_size_after_first # No new entries - - def test_datatype_recreation_on_shape_change(self): - """Should create new datatypes when array shape changes.""" - comm = CustomHaloExchange() - decomp = MockDecomposition({'z_lower': None, 'z_upper': None}) - - u1 = np.ones((10, 10, 10), dtype=np.float64) - comm.exchange_halos(u1, decomp, rank=0, comm=MockComm()) - assert len(comm._cache) == 1 - - u2 = np.ones((12, 12, 12), dtype=np.float64) - comm.exchange_halos(u2, decomp, rank=0, comm=MockComm()) - assert len(comm._cache) == 2 # New entry for different shape diff --git a/tests/test_decomposition.py b/tests/test_decomposition.py index b13820b..e0de391 100644 --- a/tests/test_decomposition.py +++ b/tests/test_decomposition.py @@ -11,30 +11,30 @@ class TestSlicedDecomposition: @pytest.mark.parametrize("N,size", [(50, 4), (64, 8), (100, 7)]) def test_full_coverage_no_overlaps(self, N, size): """Each interior point owned by exactly one rank.""" - decomp = DomainDecomposition(N=N, size=size, strategy='sliced') + decomp = DomainDecomposition(N=N, size=size, strategy="sliced") total_nz = sum(decomp.get_rank_info(r).local_shape[0] for r in range(size)) assert total_nz == N - 2 # interior size def test_neighbors_correct(self): """Interior ranks have 2 neighbors, boundary ranks have 1.""" - decomp = DomainDecomposition(N=50, size=4, strategy='sliced') + decomp = DomainDecomposition(N=50, size=4, strategy="sliced") # First rank: no lower neighbor - assert decomp.get_rank_info(0).neighbors['z_lower'] is None - assert decomp.get_rank_info(0).neighbors['z_upper'] == 1 + assert decomp.get_rank_info(0).neighbors["z_lower"] is None + assert decomp.get_rank_info(0).neighbors["z_upper"] == 1 # Interior rank: both neighbors - assert decomp.get_rank_info(1).neighbors['z_lower'] == 0 - assert decomp.get_rank_info(1).neighbors['z_upper'] == 2 + assert decomp.get_rank_info(1).neighbors["z_lower"] == 0 + assert decomp.get_rank_info(1).neighbors["z_upper"] == 2 # Last rank: no upper neighbor - assert decomp.get_rank_info(3).neighbors['z_lower'] == 2 - assert decomp.get_rank_info(3).neighbors['z_upper'] is None + assert decomp.get_rank_info(3).neighbors["z_lower"] == 2 + assert decomp.get_rank_info(3).neighbors["z_upper"] is None def test_halo_shape(self): """Halo shape adds 2 in z-direction only.""" - decomp = DomainDecomposition(N=50, size=4, strategy='sliced') + decomp = DomainDecomposition(N=50, size=4, strategy="sliced") info = decomp.get_rank_info(1) nz, ny, nx = info.local_shape @@ -51,7 +51,7 @@ class TestCubicDecomposition: def test_full_coverage_no_overlaps(self): """Each point owned by exactly one rank.""" N, size = 64, 8 - decomp = DomainDecomposition(N=N, size=size, strategy='cubic') + decomp = DomainDecomposition(N=N, size=size, strategy="cubic") covered = np.zeros((N, N, N), dtype=bool) for rank in range(size): @@ -65,11 +65,16 @@ def test_full_coverage_no_overlaps(self): def test_neighbor_reciprocity(self): """If A neighbors B, then B neighbors A.""" - decomp = DomainDecomposition(N=64, size=8, strategy='cubic') + decomp = DomainDecomposition(N=64, size=8, strategy="cubic") - opposites = {'x_lower': 'x_upper', 'x_upper': 'x_lower', - 'y_lower': 'y_upper', 'y_upper': 'y_lower', - 'z_lower': 'z_upper', 'z_upper': 'z_lower'} + opposites = { + "x_lower": "x_upper", + "x_upper": "x_lower", + "y_lower": "y_upper", + "y_upper": "y_lower", + "z_lower": "z_upper", + "z_upper": "z_lower", + } for rank in range(8): info = decomp.get_rank_info(rank) @@ -80,7 +85,7 @@ def test_neighbor_reciprocity(self): def test_corner_has_3_neighbors(self): """Corner rank (rank 0) touches 3 boundaries, has 3 neighbors.""" - decomp = DomainDecomposition(N=64, size=8, strategy='cubic') + decomp = DomainDecomposition(N=64, size=8, strategy="cubic") info = decomp.get_rank_info(0) assert info.global_start == (0, 0, 0) @@ -88,7 +93,7 @@ def test_corner_has_3_neighbors(self): def test_interior_has_6_neighbors(self): """Interior rank has all 6 neighbors.""" - decomp = DomainDecomposition(N=81, size=27, strategy='cubic') # 3x3x3 + decomp = DomainDecomposition(N=81, size=27, strategy="cubic") # 3x3x3 # Center rank at (1,1,1) in processor grid py, pz = decomp.dims[1], decomp.dims[2] @@ -104,30 +109,30 @@ class TestConfigurableAxis: def test_z_axis_works(self): """Default z-axis decomposition.""" - decomp = DomainDecomposition(N=50, size=4, strategy='sliced') + decomp = DomainDecomposition(N=50, size=4, strategy="sliced") info = decomp.get_rank_info(0) assert info.local_shape[1] == 50 # full y assert info.local_shape[2] == 50 # full x - assert 'z_lower' in info.neighbors + assert "z_lower" in info.neighbors def test_y_axis_decomposition(self): """Decompose along y-axis.""" - decomp = DomainDecomposition(N=50, size=4, strategy='sliced', axis='y') + decomp = DomainDecomposition(N=50, size=4, strategy="sliced", axis="y") info = decomp.get_rank_info(0) assert info.local_shape[0] == 50 # full z assert info.local_shape[2] == 50 # full x - assert 'y_lower' in info.neighbors + assert "y_lower" in info.neighbors def test_x_axis_decomposition(self): """Decompose along x-axis.""" - decomp = DomainDecomposition(N=50, size=4, strategy='sliced', axis='x') + decomp = DomainDecomposition(N=50, size=4, strategy="sliced", axis="x") info = decomp.get_rank_info(0) assert info.local_shape[0] == 50 # full z assert info.local_shape[1] == 50 # full y - assert 'x_lower' in info.neighbors + assert "x_lower" in info.neighbors class TestEdgeCases: @@ -135,7 +140,7 @@ class TestEdgeCases: def test_single_rank(self): """Single rank gets entire interior.""" - decomp = DomainDecomposition(N=50, size=1, strategy='sliced') + decomp = DomainDecomposition(N=50, size=1, strategy="sliced") info = decomp.get_rank_info(0) assert info.local_shape[0] == 48 @@ -144,4 +149,4 @@ def test_single_rank(self): def test_invalid_strategy(self): """Unknown strategy raises ValueError.""" with pytest.raises(ValueError): - DomainDecomposition(N=50, size=4, strategy='invalid') + DomainDecomposition(N=50, size=4, strategy="invalid") diff --git a/tests/test_kernels.py b/tests/test_kernels.py index 6b348ba..38391b3 100644 --- a/tests/test_kernels.py +++ b/tests/test_kernels.py @@ -1,8 +1,7 @@ """Tests for Jacobi iteration kernels.""" import numpy as np -import pytest -from Poisson import NumPyKernel, NumbaKernel, setup_sinusoidal_problem, sinusoidal_exact_solution +from Poisson import NumPyKernel, NumbaKernel, setup_sinusoidal_problem def run_iterations(kernel, u1, u2, f, n_iter): @@ -21,62 +20,12 @@ def test_kernels_produce_identical_results(): u1, u2, f, _ = setup_sinusoidal_problem(N) numpy_kernel = NumPyKernel(N=N, omega=1.0, tolerance=1e-10, max_iter=1000) - numba_kernel = NumbaKernel(N=N, omega=1.0, tolerance=1e-10, max_iter=1000, numba_threads=1) + numba_kernel = NumbaKernel( + N=N, omega=1.0, tolerance=1e-10, max_iter=1000, numba_threads=1 + ) numba_kernel.warmup() u_numpy = run_iterations(numpy_kernel, u1.copy(), u2.copy(), f.copy(), 10) u_numba = run_iterations(numba_kernel, u1.copy(), u2.copy(), f.copy(), 10) assert np.allclose(u_numpy, u_numba, atol=1e-10) - - -def test_convergence_order(): - """Jacobi kernel should achieve O(h²) convergence.""" - problem_sizes = [15, 25, 50] - errors, h_values = [], [] - - for N in problem_sizes: - u1, u2, f, h = setup_sinusoidal_problem(N) - exact = sinusoidal_exact_solution(N) - kernel = NumPyKernel(N=N, omega=1.0, tolerance=1e-12, max_iter=50000) - - # Solve until converged - for i in range(50000): - if i % 2 == 0: - res = kernel.step(u1, u2, f) - u = u2 - else: - res = kernel.step(u2, u1, f) - u = u1 - if np.sqrt(res / (N-2)**3) < 1e-12: - break - - l2_error = np.sqrt(np.mean((u[1:-1,1:-1,1:-1] - exact[1:-1,1:-1,1:-1])**2)) - errors.append(l2_error) - h_values.append(h) - - # Calculate convergence rate - rates = [np.log(errors[i]/errors[i+1]) / np.log(h_values[i]/h_values[i+1]) - for i in range(len(errors)-1)] - avg_rate = np.mean(rates) - - assert abs(avg_rate - 2.0) < 0.2, f"Rate {avg_rate:.2f} not close to 2.0" - - -def test_boundary_preservation(): - """Kernels should not modify boundary conditions.""" - N = 16 - u1, u2, f, _ = setup_sinusoidal_problem(N) - boundaries = [u1[0,:,:].copy(), u1[-1,:,:].copy(), - u1[:,0,:].copy(), u1[:,-1,:].copy(), - u1[:,:,0].copy(), u1[:,:,-1].copy()] - - kernel = NumPyKernel(N=N, omega=1.0, tolerance=1e-10, max_iter=100) - u = run_iterations(kernel, u1, u2, f, 10) - - assert np.allclose(u[0,:,:], boundaries[0]) - assert np.allclose(u[-1,:,:], boundaries[1]) - assert np.allclose(u[:,0,:], boundaries[2]) - assert np.allclose(u[:,-1,:], boundaries[3]) - assert np.allclose(u[:,:,0], boundaries[4]) - assert np.allclose(u[:,:,-1], boundaries[5]) diff --git a/tests/test_mpi_integration.py b/tests/test_mpi_integration.py index 44be69e..198ed07 100644 --- a/tests/test_mpi_integration.py +++ b/tests/test_mpi_integration.py @@ -1,156 +1,67 @@ -"""MPI integration tests using the subprocess runner. - -These tests spawn actual MPI processes to verify end-to-end distributed execution. -Each configuration is run once and results are reused across assertions. -""" +"""MPI integration tests - spawn actual MPI processes via run_solver.""" import numpy as np import pytest from Poisson import run_solver -# ============================================================================= -# Test Configuration -# ============================================================================= - -RANK_CONFIGS = [ - (1, "sliced"), - (2, "sliced"), - (4, "sliced"), - (8, "cubic"), -] - -CONVERGENCE_GRID_SIZES = [15, 25, 45] - -COMMUNICATORS = ["numpy", "custom"] - -DECOMPOSITIONS = ["sliced", "cubic"] - - -# ============================================================================= -# Fixtures - run each config once, reuse results -# ============================================================================= - -@pytest.fixture(scope="module") -def rank_results(): - """Run solver for different rank/strategy configs.""" - results = {} - for n_ranks, strategy in RANK_CONFIGS: - result = run_solver(N=25, n_ranks=n_ranks, strategy=strategy, max_iter=10000, tol=1e-8, validate=True) - results[(n_ranks, strategy)] = result - return results - - -@pytest.fixture(scope="module") -def convergence_results(): - """Run solver at multiple grid sizes to test O(h²) convergence.""" - results = {} - for N in CONVERGENCE_GRID_SIZES: - result = run_solver(N=N, n_ranks=2, strategy="sliced", max_iter=50000, tol=1e-10, validate=True) - results[N] = result - return results - - +# Run solver once per config, reuse results @pytest.fixture(scope="module") -def communicator_results(): - """Run solver with different communicators.""" - results = {} - for comm_type in COMMUNICATORS: - result = run_solver(N=20, n_ranks=2, strategy="sliced", communicator=comm_type, max_iter=500, tol=0.0, validate=True) - results[comm_type] = result - return results - - -@pytest.fixture(scope="module") -def decomposition_results(): - """Run solver with sliced vs cubic decomposition.""" +def mpi_results(): + """Run all MPI configurations once.""" return { - strategy: run_solver(N=24, n_ranks=8, strategy=strategy, max_iter=5000, tol=1e-8, validate=True) - for strategy in DECOMPOSITIONS + (2, "sliced"): run_solver( + N=25, n_ranks=2, strategy="sliced", max_iter=10000, tol=1e-8, validate=True + ), + (4, "sliced"): run_solver( + N=25, n_ranks=4, strategy="sliced", max_iter=10000, tol=1e-8, validate=True + ), + (8, "cubic"): run_solver( + N=25, n_ranks=8, strategy="cubic", max_iter=10000, tol=1e-8, validate=True + ), + "numpy": run_solver( + N=20, n_ranks=2, communicator="numpy", max_iter=500, tol=0.0 + ), + "custom": run_solver( + N=20, n_ranks=2, communicator="custom", max_iter=500, tol=0.0 + ), + "convergence": { + N: run_solver(N=N, n_ranks=2, max_iter=20000, tol=1e-8, validate=True) + for N in [15, 25] + }, } -# ============================================================================= -# Tests - assertions on cached results -# ============================================================================= - -class TestMPIExecution: - """Tests for multi-rank MPI execution.""" - - @pytest.mark.parametrize("n_ranks,strategy", RANK_CONFIGS) - def test_no_errors(self, rank_results, n_ranks, strategy): - """Solver should complete without errors.""" - result = rank_results[(n_ranks, strategy)] - assert "error" not in result, f"Solver failed: {result.get('error')}" - - @pytest.mark.parametrize("n_ranks,strategy", RANK_CONFIGS) - def test_converged(self, rank_results, n_ranks, strategy): - """Solver should converge.""" - result = rank_results[(n_ranks, strategy)] - assert result["converged"], f"Did not converge in {result['iterations']} iterations" - - -class TestMPIAccuracy: - """Tests for solution accuracy across ranks.""" - - @pytest.mark.parametrize("n_ranks,strategy", RANK_CONFIGS) - def test_accuracy(self, rank_results, n_ranks, strategy): - """Solution should match analytical solution.""" - result = rank_results[(n_ranks, strategy)] - assert result["final_error"] < 0.1, f"L2 error {result['final_error']} too large" - - def test_ranks_produce_consistent_error(self, rank_results): - """Different rank counts should produce similar errors.""" - sliced_configs = [(n, s) for n, s in RANK_CONFIGS if s == "sliced"] - errors = [rank_results[cfg]["final_error"] for cfg in sliced_configs] - assert max(errors) / min(errors) < 1.1, f"Errors diverge: {errors}" - - def test_convergence_order(self, convergence_results): - """MPI solver should exhibit O(h²) convergence.""" - # Check all runs succeeded - for N in CONVERGENCE_GRID_SIZES: - assert "error" not in convergence_results[N], f"N={N} failed: {convergence_results[N].get('error')}" - - errors = [convergence_results[N]["final_error"] for N in CONVERGENCE_GRID_SIZES] - h = [2.0 / (N - 1) for N in CONVERGENCE_GRID_SIZES] - - # Compute convergence order: error ~ h^p => log(e1/e2) / log(h1/h2) = p - orders = [] - for i in range(len(errors) - 1): - order = np.log(errors[i] / errors[i + 1]) / np.log(h[i] / h[i + 1]) - orders.append(order) - - avg_order = np.mean(orders) - assert 1.8 < avg_order < 2.5, f"Expected O(h²), got order {avg_order:.2f} (orders: {orders})" +@pytest.mark.parametrize("config", [(2, "sliced"), (4, "sliced"), (8, "cubic")]) +def test_mpi_runs_and_converges(mpi_results, config): + """MPI solver should run without errors and converge.""" + r = mpi_results[config] + assert "error" not in r, f"Failed: {r.get('error')}" + assert r["converged"] + assert r["final_error"] < 0.1 -class TestCommunicators: - """Tests for different halo exchange implementations.""" +def test_ranks_produce_consistent_error(mpi_results): + """Different rank counts should produce similar errors.""" + errors = [mpi_results[(n, "sliced")]["final_error"] for n in [2, 4]] + assert max(errors) / min(errors) < 1.1 - @pytest.mark.parametrize("communicator", COMMUNICATORS) - def test_no_errors(self, communicator_results, communicator): - """Communicator should work without errors.""" - result = communicator_results[communicator] - assert "error" not in result, f"Solver failed: {result.get('error')}" - def test_produce_same_iterations(self, communicator_results): - """Both communicators should produce identical iteration counts.""" - iters = [communicator_results[c]["iterations"] for c in COMMUNICATORS] - assert len(set(iters)) == 1, f"Iteration counts differ: {dict(zip(COMMUNICATORS, iters))}" +def test_convergence_order(mpi_results): + """Should exhibit O(h²) convergence.""" + conv = mpi_results["convergence"] + errors = [conv[N]["final_error"] for N in [15, 25]] + h = [2.0 / (N - 1) for N in [15, 25]] + order = np.log(errors[0] / errors[1]) / np.log(h[0] / h[1]) + assert 1.8 < order < 2.5, f"Expected ~2, got {order:.2f}" -class TestDecompositionStrategies: - """Tests for different decomposition strategies.""" +@pytest.mark.parametrize("comm", ["numpy", "custom"]) +def test_communicators_work(mpi_results, comm): + """Both communicators should work.""" + assert "error" not in mpi_results[comm] - @pytest.mark.parametrize("strategy", DECOMPOSITIONS) - def test_no_errors(self, decomposition_results, strategy): - """Decomposition should work without errors.""" - result = decomposition_results[strategy] - assert "error" not in result, f"Solver failed: {result.get('error')}" - assert result["decomposition"] == strategy - def test_same_accuracy(self, decomposition_results): - """Decomposition strategies should produce similar accuracy.""" - errors = [decomposition_results[s]["final_error"] for s in DECOMPOSITIONS] - ratio = max(errors) / min(errors) - assert ratio < 1.2, f"Errors differ: {dict(zip(DECOMPOSITIONS, errors))}" +def test_communicators_same_iterations(mpi_results): + """Communicators should produce identical iteration counts.""" + assert mpi_results["numpy"]["iterations"] == mpi_results["custom"]["iterations"] diff --git a/tests/test_problems.py b/tests/test_problems.py deleted file mode 100644 index 4a881f2..0000000 --- a/tests/test_problems.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Tests for problem setup utilities.""" - -import numpy as np -import pytest -from Poisson import ( - create_grid_3d, - sinusoidal_exact_solution, - sinusoidal_source_term, - setup_sinusoidal_problem, -) - - -class TestGridCreation: - """Tests for grid creation utilities.""" - - def test_grid_shape(self): - """Grid should have correct shape.""" - N = 10 - u = create_grid_3d(N) - - assert u.shape == (N, N, N) - - def test_grid_default_values(self): - """Grid should have specified interior value and zero boundaries.""" - N = 10 - u = create_grid_3d(N, value=1.0, boundary_value=0.0) - - # Interior should be 1.0 - assert np.all(u[1:-1, 1:-1, 1:-1] == 1.0) - # Boundaries should be 0.0 - assert np.all(u[0, :, :] == 0.0) - assert np.all(u[-1, :, :] == 0.0) - assert np.all(u[:, 0, :] == 0.0) - assert np.all(u[:, -1, :] == 0.0) - assert np.all(u[:, :, 0] == 0.0) - assert np.all(u[:, :, -1] == 0.0) - - def test_grid_dtype(self): - """Grid should be float64.""" - N = 10 - u = create_grid_3d(N) - assert u.dtype == np.float64 - - -class TestSinusoidalSolution: - """Tests for sinusoidal test problem.""" - - def test_exact_solution_shape(self): - """Exact solution should have correct shape.""" - N = 15 - u_exact = sinusoidal_exact_solution(N) - - assert u_exact.shape == (N, N, N) - - def test_exact_solution_boundaries_zero(self): - """Exact solution should be zero on boundaries.""" - N = 15 - u = sinusoidal_exact_solution(N) - - # sin(pi * x) = 0 at x = -1 and x = 1 - assert np.allclose(u[0, :, :], 0.0, atol=1e-10) - assert np.allclose(u[-1, :, :], 0.0, atol=1e-10) - assert np.allclose(u[:, 0, :], 0.0, atol=1e-10) - assert np.allclose(u[:, -1, :], 0.0, atol=1e-10) - assert np.allclose(u[:, :, 0], 0.0, atol=1e-10) - assert np.allclose(u[:, :, -1], 0.0, atol=1e-10) - - def test_exact_solution_symmetry(self): - """Solution should be symmetric about origin.""" - N = 21 # Odd for center point - u = sinusoidal_exact_solution(N) - mid = N // 2 - - # Check point symmetry - assert np.isclose(u[mid, mid, mid], u[mid, mid, mid]) # Center - assert np.isclose(u[mid-1, mid, mid], u[mid+1, mid, mid], rtol=1e-10) - - def test_source_term_shape(self): - """Source term should have correct shape.""" - N = 15 - f = sinusoidal_source_term(N) - - assert f.shape == (N, N, N) - - def test_source_term_consistent(self): - """Source term should be -3*pi^2 * u_exact for Poisson equation.""" - N = 15 - u_exact = sinusoidal_exact_solution(N) - f = sinusoidal_source_term(N) - - # f = 3*pi^2 * sin(pi*x)*sin(pi*y)*sin(pi*z) = 3*pi^2 * u_exact - expected = 3 * np.pi**2 * u_exact - assert np.allclose(f, expected, rtol=1e-10) - - -class TestProblemSetup: - """Tests for complete problem setup.""" - - def test_setup_returns_four_values(self): - """setup_sinusoidal_problem should return (u1, u2, f, h).""" - result = setup_sinusoidal_problem(N=15) - - assert len(result) == 4 - - def test_setup_array_shapes(self): - """Setup arrays should have correct shapes.""" - N = 15 - u1, u2, f, h = setup_sinusoidal_problem(N) - - assert u1.shape == (N, N, N) - assert u2.shape == (N, N, N) - assert f.shape == (N, N, N) - - def test_setup_grid_spacing(self): - """Grid spacing h should be 2/(N-1).""" - N = 11 - _, _, _, h = setup_sinusoidal_problem(N) - - expected_h = 2.0 / (N - 1) - assert np.isclose(h, expected_h) - - def test_setup_initial_zero(self): - """Initial guess should be zero.""" - N = 15 - u1, u2, _, _ = setup_sinusoidal_problem(N) - - assert np.all(u1 == 0.0) - assert np.all(u2 == 0.0) - - def test_setup_boundary_conditions(self): - """Initial arrays should have zero boundaries (Dirichlet BC).""" - N = 15 - u1, u2, f, _ = setup_sinusoidal_problem(N) - - # Check u1 boundaries - assert np.all(u1[0, :, :] == 0.0) - assert np.all(u1[-1, :, :] == 0.0) - assert np.all(u1[:, 0, :] == 0.0) - assert np.all(u1[:, -1, :] == 0.0) - assert np.all(u1[:, :, 0] == 0.0) - assert np.all(u1[:, :, -1] == 0.0) diff --git a/tests/test_solver.py b/tests/test_solver.py index 003aa61..b286d8e 100644 --- a/tests/test_solver.py +++ b/tests/test_solver.py @@ -2,121 +2,52 @@ import numpy as np import pytest -from Poisson import JacobiPoisson, sinusoidal_exact_solution +from Poisson import JacobiPoisson -class TestSequentialSolver: - """Tests for single-rank (sequential) solver execution.""" +@pytest.fixture +def solver(): + """Basic solver for quick tests.""" + s = JacobiPoisson(N=15, omega=1.0, max_iter=100, tolerance=0.0) + s.solve() + return s - def test_solver_converges(self): - """Solver should converge within max iterations.""" - solver = JacobiPoisson(N=15, omega=1.0, max_iter=5000, tolerance=1e-6) - solver.solve() - assert solver.results.converged or solver.results.iterations < 5000 +@pytest.fixture +def converged_solver(): + """Solver run to convergence.""" + s = JacobiPoisson(N=20, omega=1.0, max_iter=10000, tolerance=1e-8) + s.solve() + return s - def test_solver_reduces_residual(self): - """Residual should decrease over iterations.""" - solver = JacobiPoisson(N=15, omega=1.0, max_iter=100, tolerance=0.0) - solver.solve() - - residuals = solver.timeseries.residual_history - assert len(residuals) == 100 - assert residuals[-1] < residuals[0] # Residual decreased - def test_solver_with_numba(self): - """Solver should work with Numba kernel.""" - solver = JacobiPoisson(N=15, omega=1.0, max_iter=100, tolerance=0.0, use_numba=True) - solver.warmup() - solver.solve() +class TestSolverBasics: + """Core solver functionality.""" - assert solver.results.iterations == 100 + def test_converges(self, converged_solver): + """Solver should converge and have low error.""" + assert converged_solver.results.converged + assert converged_solver.compute_l2_error() < 0.1 - def test_solver_timing_recorded(self): - """Solver should record timing data.""" - solver = JacobiPoisson(N=15, omega=1.0, max_iter=50, tolerance=0.0) - solver.solve() + def test_reduces_residual(self, solver): + """Residual should decrease over iterations.""" + r = solver.timeseries.residual_history + assert r[-1] < r[0] - assert len(solver.timeseries.compute_times) == 50 - assert len(solver.timeseries.halo_exchange_times) == 50 + def test_timing_recorded(self, solver): + """Should record timing data.""" + assert len(solver.timeseries.compute_times) == 100 assert all(t >= 0 for t in solver.timeseries.compute_times) - def test_solver_solution_accuracy(self): - """Converged solution should match analytical solution.""" - N = 20 - solver = JacobiPoisson(N=N, omega=1.0, max_iter=10000, tolerance=1e-8) - solver.solve() - - # Compute L2 error - error = solver.compute_l2_error() - - # Error should be reasonable for this grid size - assert error < 0.1, f"L2 error {error} too large" - - -class TestSolverConfiguration: - """Tests for solver configuration options.""" - - def test_omega_affects_convergence(self): - """Different omega values should affect convergence rate.""" - results = {} - for omega in [0.5, 1.0]: - solver = JacobiPoisson(N=15, omega=omega, max_iter=200, tolerance=1e-6) - solver.solve() - results[omega] = solver.timeseries.residual_history[-1] - - # Both should reduce residual (exact comparison depends on problem) - assert all(r < 1.0 for r in results.values()) - - def test_larger_grid_more_iterations(self): - """Larger grids generally need more iterations for same tolerance.""" - iters = {} - for N in [10, 20]: - solver = JacobiPoisson(N=N, omega=1.0, max_iter=50000, tolerance=1e-4) - solver.solve() - iters[N] = solver.results.iterations - - # Larger grid should need more iterations (or hit max) - assert iters[20] >= iters[10] - - def test_invalid_N_raises(self): - """Very small N should still work (edge case).""" - solver = JacobiPoisson(N=5, omega=1.0, max_iter=10, tolerance=0.0) - solver.solve() # Should not crash - assert solver.results.iterations == 10 - - -class TestSolverDataStructures: - """Tests for solver data structure handling.""" - - def test_u_global_shape(self): - """Global solution should have correct shape.""" - N = 15 - solver = JacobiPoisson(N=N, omega=1.0, max_iter=10, tolerance=0.0) - solver.solve() - - assert hasattr(solver, 'u_global') - assert solver.u_global.shape == (N, N, N) - - def test_boundary_conditions_preserved(self): - """Dirichlet BCs (zero) should be preserved on boundaries.""" - N = 15 - solver = JacobiPoisson(N=N, omega=1.0, max_iter=100, tolerance=0.0) - solver.solve() - + def test_boundary_conditions(self, solver): + """Dirichlet BCs (zero) should be preserved.""" u = solver.u_global - # All boundaries should be zero (Dirichlet BC) - assert np.allclose(u[0, :, :], 0.0) - assert np.allclose(u[-1, :, :], 0.0) - assert np.allclose(u[:, 0, :], 0.0) - assert np.allclose(u[:, -1, :], 0.0) - assert np.allclose(u[:, :, 0], 0.0) - assert np.allclose(u[:, :, -1], 0.0) - - def test_results_populated(self): - """Results dataclass should be populated after solve.""" - solver = JacobiPoisson(N=10, omega=1.0, max_iter=50, tolerance=0.0) - solver.solve() - - assert solver.results.iterations == 50 - assert isinstance(solver.results.converged, bool) + for face in [u[0], u[-1], u[:, 0], u[:, -1], u[:, :, 0], u[:, :, -1]]: + assert np.allclose(face, 0.0) + + def test_numba_kernel(self): + """Numba kernel should work.""" + s = JacobiPoisson(N=15, omega=1.0, max_iter=50, tolerance=0.0, use_numba=True) + s.warmup() + s.solve() + assert s.results.iterations == 50