Cluster-Parallel Flux Enumeration and Sampling#
What’s in this notebook? This notebook demonstrates how to distribute a flux vacuum search across multiple cluster nodes using the export_cluster_job / process_chunk_from_disk / merge_cluster_results pipeline.
Export — precompute the ISD pipeline and save pre-filtered h-vector chunks to disk.
Process — run each chunk independently (designed for SLURM array jobs).
Merge — collect results, deduplicate, optionally Newton-refine and write to database.
The per-chunk processing is embarrassingly parallel — each chunk reads from a shared pipeline of pure arrays with no shared mutable state.
(Created: Andreas Schachner, 2026-03-25)
Imports and model setup#
import warnings, os, shutil, time, math
import numpy as np
import jax
import jax.numpy as jnp
jax.config.update("jax_enable_x64", True)
import jaxvacua as jvc
from jaxvacua.flux_bounding import bounded_fluxes
warnings.filterwarnings('ignore')
h12 = 2
model = jvc.FluxVacuaFinder(
h12=h12, model_ID=1, model_type="KS", maximum_degree=5)
model.lcs_tree.a_matrix = jnp.array([[4.5, 1.5], [1.5, 0.]])
sampler = jvc.data_sampler(
model,
moduli_bounds=(2., 5.),
dilaton_bounds=(math.sqrt(3) / 2, 10.),
axion_bounds=(-0.5, 0.5),
seed=42,
)
Nmax = 10
bf = bounded_fluxes(model, sampler, Nmax=Nmax)
print(f"Model: h12={h12}, Nmax={Nmax}")
Phase 1: Export — prepare and save chunks#
The export_cluster_job method:
Calls
_prepare_isd_pipeline()to precompute eigenvalue bounds, moduli slices, ISD matrices.Generates h-vectors (enumerate or sample mode) and applies cheap pre-filters (s_max, continuous tadpole).
Saves pre-filtered h-chunks as
.npyfiles, the pipeline aspipeline.npz, and config asconfig.json.Optionally generates a SLURM array job script.
output_dir = "./cluster_demo_enumerate"
info = bf.export_cluster_job(
output_dir=output_dir,
mode="enumerate",
chunk_size=50_000,
generate_slurm=True,
slurm_opts={"partition": "cpu", "time": "01:00:00", "mem": "8G"},
verbose=True,
)
print(f"\nSummary: {info}")
# Inspect the exported directory structure
for root, dirs, files in os.walk(output_dir):
level = root.replace(output_dir, "").count(os.sep)
indent = " " * level
# Show directory name
print(f"{indent}{os.path.basename(root)}/")
# Show files (max 5 per dir)
sub_indent = " " * (level + 1)
for i, f in enumerate(sorted(files)[:5]):
size = os.path.getsize(os.path.join(root, f))
print(f"{sub_indent}{f} ({size / 1024:.1f} KB)")
if len(files) > 5:
print(f"{sub_indent}... and {len(files) - 5} more files")
# Look at the contents of a single chunk
chunk_path = os.path.join(output_dir, "chunks", "chunk_0010.npy")
if os.path.exists(chunk_path):
h_chunk = np.load(chunk_path)
print(f"Chunk 10: shape={h_chunk.shape}, dtype={h_chunk.dtype}")
print(f"First 5 h-vectors:\n{h_chunk[:5]}")
else:
print("Chunk 10 does not exist (model may have fewer chunks)")
# Show the generated SLURM script
slurm_path = os.path.join(output_dir, "submit_array.sh")
if os.path.exists(slurm_path):
with open(slurm_path) as f:
print(f.read())
Phase 2: Process chunks#
On a cluster, each chunk would be processed by a separate SLURM array task via:
sbatch ./cluster_demo_enumerate/submit_array.sh
Each task calls bounded_fluxes.process_chunk_from_disk(output_dir, chunk_id, model), which:
Loads the pipeline and chunk from disk.
Reconstructs the ISD pipeline (including filter closures) from saved numerical data.
Runs
_process_h_chunk— ISD completion + bounds checking.Saves valid (flux, moduli, tau) triples to
results/result_NNNN.npz.
Here we simulate this locally by processing all chunks sequentially:
n_chunks = info['n_chunks']
t0 = time.perf_counter()
for i in range(n_chunks):
bounded_fluxes.process_chunk_from_disk(
output_dir=output_dir,
chunk_id=i,
model=model,
verbose=(i % 50 == 0), # print every 50th chunk
)
dt = time.perf_counter() - t0
print(f"\nProcessed {n_chunks} chunks in {dt:.1f}s ")
print(f" ({dt / n_chunks:.2f}s per chunk, "
f"on a cluster these would run in parallel)")
Phase 3: Merge results#
merge_cluster_results loads all result files, deduplicates by flux vector, and optionally:
Newton-refines the candidates to find exact SUSY vacua (DW = 0).
Writes the final results to a vacua database via
_VacuaWriter.
# Merge without Newton refinement (just raw ISD candidates)
results_raw = bounded_fluxes.merge_cluster_results(
output_dir,
verbose=True,
)
print(f"\nRaw candidates: {len(results_raw)}")
# Merge with Newton refinement → exact flux vacua
results = bounded_fluxes.merge_cluster_results(
output_dir,
model=model,
sampler=sampler,
refine=True,
verbose=True,
)
print(f"\nRefined flux vacua: {len(results)}")
if results:
print(f"\nExample vacuum:")
r = results[0]
print(f" flux = {r['flux']}")
print(f" moduli = {r['moduli']}")
print(f" tau = {r['tau']:.4f}")
print(f" residual = {r['residual']:.2e}")
# Verify tadpole
tad = abs(float(jnp.real(model.tadpole(jnp.array(r['flux'])))))
print(f" N_flux = {tad:.0f} (Nmax = {Nmax})")
Writing merged results to the vault#
The merge step can simultaneously write results to a CYDatabase (session
tier) and promote them to the permanent vacua_vault/ with
designate=True, label=..., committed_by=.... The vault is the canonical
store for curated solutions — queryable via db.load_local_vacua(model)
across sessions, and never touched by clear_cache().
# Point the vault at a scratch directory for this demo
os.environ["STRINGFORGE_VAULT"] = os.path.abspath("./cluster_demo_vault")
db = jvc.CYDatabase()
# Re-merge with database + designate path
results_designated = bounded_fluxes.merge_cluster_results(
output_dir,
model=model,
sampler=sampler,
refine=True,
database=db, # session-tier write
designate=True, # promote to permanent vault
label="demo_cluster_run",
committed_by="demo@example.com",
validate_before_designate=False, # skip full F-term check (demo)
tags=["demo", f"Nmax_{Nmax}"],
verbose=True,
)
# Query back from the vault
df = db.load_local_vacua(model=model, label="demo_cluster_run")
print(f"\nLoaded {len(df)} vacua from the vault:")
print(df[["flux", "moduli_re", "moduli_im", "tau_re", "tau_im"]].head())
Sample mode#
Instead of exhaustively enumerating h-vectors, mode="sample" generates them stochastically using Gaussian importance sampling weighted by the ISD matrix. This is useful when the bounding box is too large to enumerate (e.g. at high \(N_{\max}\) or \(h^{1,2}\)).
output_dir_sample = "./cluster_demo_sample"
info_s = bf.export_cluster_job(
output_dir=output_dir_sample,
mode="sample",
n_total_samples=50_000,
chunk_size=10_000,
seed=42,
verbose=True,
)
# Process all chunks
for i in range(info_s['n_chunks']):
bounded_fluxes.process_chunk_from_disk(
output_dir_sample, i, model, verbose=False)
# Merge
results_s = bounded_fluxes.merge_cluster_results(
output_dir_sample, verbose=True)
print(f"\nSample mode: {len(results_s)} unique candidates "
f"from {info_s['n_h_total']:,} h-vectors")
Handling missing chunks#
If some cluster jobs fail (e.g. OOM or timeout), merge_cluster_results reports the missing chunk IDs so you can resubmit them:
sbatch --array=3,7,15 ./cluster_demo_enumerate/submit_array.sh
# Simulate missing chunks by deleting a few result files
import shutil, glob
results_dir = os.path.join(output_dir, "results")
# Delete chunks 5, 10, 15
for cid in [5, 10, 15]:
p = os.path.join(results_dir, f"result_{cid:04d}.npz")
if os.path.exists(p):
os.remove(p)
# Merge with missing chunks
results_partial = bounded_fluxes.merge_cluster_results(
output_dir, verbose=True)
print(f"Partial merge: {len(results_partial)} candidates")
Full cluster workflow#
On a real cluster, the workflow is:
# === Head node: prepare ===
bf = bounded_fluxes(model, sampler, Nmax=34)
info = bf.export_cluster_job(
output_dir="./flux_search_run1",
mode="enumerate",
chunk_size=100_000,
generate_slurm=True,
slurm_opts={"partition": "gpu", "time": "04:00:00", "mem": "16G"},
)
# Submit to cluster
$ sbatch ./flux_search_run1/submit_array.sh
# === Head node: after all jobs complete ===
results = bounded_fluxes.merge_cluster_results(
"./flux_search_run1",
model=model, sampler=sampler,
refine=True,
)
print(f"Found {len(results)} flux vacua")
The auto-generated worker.py needs the model construction code filled in. Edit the MODEL SETUP section to match your model.
# Show the auto-generated worker script
worker_path = os.path.join(output_dir, "worker.py")
with open(worker_path) as f:
print(f.read())
# Clean up demo directories (including the scratch vault)
for d in [output_dir, output_dir_sample, "./cluster_demo_vault"]:
if os.path.isdir(d):
shutil.rmtree(d)
print(f"Cleaned up {d}")
# Also clear the STRINGFORGE_VAULT env override so later work
# resumes using the default vault location.
os.environ.pop("STRINGFORGE_VAULT", None)