Cluster-Parallel Flux Enumeration and Sampling#

What’s in this notebook? This notebook demonstrates how to distribute a flux vacuum search across multiple cluster nodes using the export_cluster_job / process_chunk_from_disk / merge_cluster_results pipeline.

  • Export — precompute the ISD pipeline and save pre-filtered h-vector chunks to disk.

  • Process — run each chunk independently (designed for SLURM array jobs).

  • Merge — collect results, deduplicate, optionally Newton-refine and write to database.

The per-chunk processing is embarrassingly parallel — each chunk reads from a shared pipeline of pure arrays with no shared mutable state.

(Created: Andreas Schachner, 2026-03-25)

Imports and model setup#

import warnings, os, shutil, time, math
import numpy as np

import jax
import jax.numpy as jnp
jax.config.update("jax_enable_x64", True)

import jaxvacua as jvc
from jaxvacua.flux_bounding import bounded_fluxes

warnings.filterwarnings('ignore')
h12 = 2
model = jvc.FluxVacuaFinder(
    h12=h12, model_ID=1, model_type="KS", maximum_degree=5)
model.lcs_tree.a_matrix = jnp.array([[4.5, 1.5], [1.5, 0.]])

sampler = jvc.data_sampler(
    model,
    moduli_bounds=(2., 5.),
    dilaton_bounds=(math.sqrt(3) / 2, 10.),
    axion_bounds=(-0.5, 0.5),
    seed=42,
)

Nmax = 10
bf = bounded_fluxes(model, sampler, Nmax=Nmax)
print(f"Model: h12={h12}, Nmax={Nmax}")

Phase 1: Export — prepare and save chunks#

The export_cluster_job method:

  1. Calls _prepare_isd_pipeline() to precompute eigenvalue bounds, moduli slices, ISD matrices.

  2. Generates h-vectors (enumerate or sample mode) and applies cheap pre-filters (s_max, continuous tadpole).

  3. Saves pre-filtered h-chunks as .npy files, the pipeline as pipeline.npz, and config as config.json.

  4. Optionally generates a SLURM array job script.

output_dir = "./cluster_demo_enumerate"

info = bf.export_cluster_job(
    output_dir=output_dir,
    mode="enumerate",
    chunk_size=50_000,
    generate_slurm=True,
    slurm_opts={"partition": "cpu", "time": "01:00:00", "mem": "8G"},
    verbose=True,
)

print(f"\nSummary: {info}")
# Inspect the exported directory structure
for root, dirs, files in os.walk(output_dir):
    level = root.replace(output_dir, "").count(os.sep)
    indent = "  " * level
    # Show directory name
    print(f"{indent}{os.path.basename(root)}/")
    # Show files (max 5 per dir)
    sub_indent = "  " * (level + 1)
    for i, f in enumerate(sorted(files)[:5]):
        size = os.path.getsize(os.path.join(root, f))
        print(f"{sub_indent}{f}  ({size / 1024:.1f} KB)")
    if len(files) > 5:
        print(f"{sub_indent}... and {len(files) - 5} more files")
# Look at the contents of a single chunk
chunk_path = os.path.join(output_dir, "chunks", "chunk_0010.npy")
if os.path.exists(chunk_path):
    h_chunk = np.load(chunk_path)
    print(f"Chunk 10: shape={h_chunk.shape}, dtype={h_chunk.dtype}")
    print(f"First 5 h-vectors:\n{h_chunk[:5]}")
else:
    print("Chunk 10 does not exist (model may have fewer chunks)")
# Show the generated SLURM script
slurm_path = os.path.join(output_dir, "submit_array.sh")
if os.path.exists(slurm_path):
    with open(slurm_path) as f:
        print(f.read())

Phase 2: Process chunks#

On a cluster, each chunk would be processed by a separate SLURM array task via:

sbatch ./cluster_demo_enumerate/submit_array.sh

Each task calls bounded_fluxes.process_chunk_from_disk(output_dir, chunk_id, model), which:

  1. Loads the pipeline and chunk from disk.

  2. Reconstructs the ISD pipeline (including filter closures) from saved numerical data.

  3. Runs _process_h_chunk — ISD completion + bounds checking.

  4. Saves valid (flux, moduli, tau) triples to results/result_NNNN.npz.

Here we simulate this locally by processing all chunks sequentially:

n_chunks = info['n_chunks']
t0 = time.perf_counter()

for i in range(n_chunks):
    bounded_fluxes.process_chunk_from_disk(
        output_dir=output_dir,
        chunk_id=i,
        model=model,
        verbose=(i % 50 == 0),  # print every 50th chunk
    )

dt = time.perf_counter() - t0
print(f"\nProcessed {n_chunks} chunks in {dt:.1f}s ")
print(f"  ({dt / n_chunks:.2f}s per chunk, "
      f"on a cluster these would run in parallel)")

Phase 3: Merge results#

merge_cluster_results loads all result files, deduplicates by flux vector, and optionally:

  • Newton-refines the candidates to find exact SUSY vacua (DW = 0).

  • Writes the final results to a vacua database via _VacuaWriter.

# Merge without Newton refinement (just raw ISD candidates)
results_raw = bounded_fluxes.merge_cluster_results(
    output_dir,
    verbose=True,
)
print(f"\nRaw candidates: {len(results_raw)}")
# Merge with Newton refinement → exact flux vacua
results = bounded_fluxes.merge_cluster_results(
    output_dir,
    model=model,
    sampler=sampler,
    refine=True,
    verbose=True,
)

print(f"\nRefined flux vacua: {len(results)}")
if results:
    print(f"\nExample vacuum:")
    r = results[0]
    print(f"  flux = {r['flux']}")
    print(f"  moduli = {r['moduli']}")
    print(f"  tau = {r['tau']:.4f}")
    print(f"  residual = {r['residual']:.2e}")
    # Verify tadpole
    tad = abs(float(jnp.real(model.tadpole(jnp.array(r['flux'])))))
    print(f"  N_flux = {tad:.0f} (Nmax = {Nmax})")

Writing merged results to the vault#

The merge step can simultaneously write results to a CYDatabase (session tier) and promote them to the permanent vacua_vault/ with designate=True, label=..., committed_by=.... The vault is the canonical store for curated solutions — queryable via db.load_local_vacua(model) across sessions, and never touched by clear_cache().

# Point the vault at a scratch directory for this demo
os.environ["STRINGFORGE_VAULT"] = os.path.abspath("./cluster_demo_vault")

db = jvc.CYDatabase()

# Re-merge with database + designate path
results_designated = bounded_fluxes.merge_cluster_results(
    output_dir,
    model=model,
    sampler=sampler,
    refine=True,
    database=db,                             # session-tier write
    designate=True,                          # promote to permanent vault
    label="demo_cluster_run",
    committed_by="demo@example.com",
    validate_before_designate=False,         # skip full F-term check (demo)
    tags=["demo", f"Nmax_{Nmax}"],
    verbose=True,
)

# Query back from the vault
df = db.load_local_vacua(model=model, label="demo_cluster_run")
print(f"\nLoaded {len(df)} vacua from the vault:")
print(df[["flux", "moduli_re", "moduli_im", "tau_re", "tau_im"]].head())

Sample mode#

Instead of exhaustively enumerating h-vectors, mode="sample" generates them stochastically using Gaussian importance sampling weighted by the ISD matrix. This is useful when the bounding box is too large to enumerate (e.g. at high \(N_{\max}\) or \(h^{1,2}\)).

output_dir_sample = "./cluster_demo_sample"

info_s = bf.export_cluster_job(
    output_dir=output_dir_sample,
    mode="sample",
    n_total_samples=50_000,
    chunk_size=10_000,
    seed=42,
    verbose=True,
)

# Process all chunks
for i in range(info_s['n_chunks']):
    bounded_fluxes.process_chunk_from_disk(
        output_dir_sample, i, model, verbose=False)

# Merge
results_s = bounded_fluxes.merge_cluster_results(
    output_dir_sample, verbose=True)

print(f"\nSample mode: {len(results_s)} unique candidates "
      f"from {info_s['n_h_total']:,} h-vectors")

Handling missing chunks#

If some cluster jobs fail (e.g. OOM or timeout), merge_cluster_results reports the missing chunk IDs so you can resubmit them:

sbatch --array=3,7,15 ./cluster_demo_enumerate/submit_array.sh
# Simulate missing chunks by deleting a few result files
import shutil, glob

results_dir = os.path.join(output_dir, "results")
# Delete chunks 5, 10, 15
for cid in [5, 10, 15]:
    p = os.path.join(results_dir, f"result_{cid:04d}.npz")
    if os.path.exists(p):
        os.remove(p)

# Merge with missing chunks
results_partial = bounded_fluxes.merge_cluster_results(
    output_dir, verbose=True)
print(f"Partial merge: {len(results_partial)} candidates")

Full cluster workflow#

On a real cluster, the workflow is:

# === Head node: prepare ===
bf = bounded_fluxes(model, sampler, Nmax=34)
info = bf.export_cluster_job(
    output_dir="./flux_search_run1",
    mode="enumerate",
    chunk_size=100_000,
    generate_slurm=True,
    slurm_opts={"partition": "gpu", "time": "04:00:00", "mem": "16G"},
)
# Submit to cluster
$ sbatch ./flux_search_run1/submit_array.sh
# === Head node: after all jobs complete ===
results = bounded_fluxes.merge_cluster_results(
    "./flux_search_run1",
    model=model, sampler=sampler,
    refine=True,
)
print(f"Found {len(results)} flux vacua")

The auto-generated worker.py needs the model construction code filled in. Edit the MODEL SETUP section to match your model.

# Show the auto-generated worker script
worker_path = os.path.join(output_dir, "worker.py")
with open(worker_path) as f:
    print(f.read())
# Clean up demo directories (including the scratch vault)
for d in [output_dir, output_dir_sample, "./cluster_demo_vault"]:
    if os.path.isdir(d):
        shutil.rmtree(d)
        print(f"Cleaned up {d}")

# Also clear the STRINGFORGE_VAULT env override so later work
# resumes using the default vault location.
os.environ.pop("STRINGFORGE_VAULT", None)