Python SDK to generate Flux batch jobs and services
- flux-mcp: MCP functions for Flux.
- flux-mcp-server: MCP server.
- fractale-mcp: (fractale) MCP orchestration (agents, databases, ui interfaces).
- hpc-mcp: HPC tools for a larger set of HPC and converged computing use cases.
- flux-scribe: Write job events to a local sqlite database via the JournalConsumer (not added yet, written and needs testing)
This is a small Flux utility that makes it easy to create Flux batch jobs and services. The use case is to submit work (one or more jobs) under an instance, and run a custom service, or epilog and prolog commands. We will provision several services here, and you can also provide your own name to start / stop.
Install the library and start (or be in) a flux instance.
flux start
pip install -e . --break-system-packagesWe have a few simple examples:
python3 ./examples/save_logs.pyexport FLUX_SCRIBE_DATABASE=sqlite:///flux-batch-job.db
python3 ./examples/flux_scribe_module.pyOr run the controlled example to see a batch job with prolog and epilog run and complete:
python3 ./tests/test_flux_batch.pyFlux Batch Module Test
[OK] Connected to Flux.
[*] Creating batch jobs...
[*] Mapping attributes to BatchJobspecV1...
[*] Previewing submission (Dryrun -> Wrapper)...
#!/bin/bash
echo 'Batch Wrapper Starting'
flux submit --wait /bin/echo 'Job 1 starting'
flux submit --wait /bin/sleep 5
flux submit --wait /bin/echo 'Job 2 finished'
flux job wait --all
echo 'Batch Wrapper Finished'
[*] Performing submission (Dryrun -> Wrapper -> Submit)...
[SUCCESS] Batch submitted! Flux Job ID: ƒMX29AwFu$ flux jobs -a
JOBID USER NAME ST NTASKS NNODES TIME INFO
ƒMX29AwFu vscode test-batch R 1 1 4.213s 68e8c4399c15$ flux jobs -a
JOBID USER NAME ST NTASKS NNODES TIME INFO
ƒMX29AwFu vscode test-batch CD 1 1 6.354s 68e8c4399c15Here is an explicit (manual) example to do the same:
import flux
import flux_batch
# for pretty printing
# from rich import print
handle = flux.Flux()
# Create your batch job with some number of commands
batch = flux_batch.BatchJobV1()
batch.add_job(["echo", "Job 1 starting"])
batch.add_job(["sleep", "5"])
batch.add_job(["echo", "Job 2 finished"])
# Wrap it up into a jobspec
spec = flux_batch.BatchJobspecV1.from_jobs(
batch,
nodes=1,
nslots=1,
time_limit="10m",
job_name="test-batch",
# Add saving of logs, info, and metadata
logs_dir="./logs",
)
# Add a prolog and epilog
spec.add_prolog("echo 'Batch Wrapper Starting'")
spec.add_epilog("echo 'Batch Wrapper Finished'")
# Add a service (this assumes user level that exists)
spec.add_service("flux-scribe")
# Preview it (batch wrapper), or generate the jobspec (json)
print(flux_batch.submit(handle, spec, dry_run=True))
jobspec = flux_batch.jobspec(spec)
# Submit that bad boi.
jobid = flux_batch.submit(handle, jobspec)- Option for controlled output (that we can easily get after)
- Create flux-scribe service, add example (need to test on cluster with systemctl)
HPCIC DevTools is distributed under the terms of the MIT license. All new contributions must be made under this license.
See LICENSE, COPYRIGHT, and NOTICE for details.
SPDX-License-Identifier: (MIT)
LLNL-CODE- 842614
