writer#

Worker-side shard writer for Vulcan.

Worker code never touches HuggingFace. write_shard() lands a validated parquet file inside {staging_dir}/pending/ together with a per-file .meta.json sidecar carrying the geometry, solver, and provenance dicts. The sync tier (stringforge.vulcan.sync) later batches these into HuggingFace commits.

Atomicity: each of the parquet body and the JSON sidecar is staged as a .tmp file and os.replace-d into place, so neither file is ever observed half-written. The two os.replace calls are sequential, not atomic as a pair: a crash between them can leave a parquet without a sidecar. list_pending() silently skips such orphan parquets, so the sync tier never uploads a half-pair, and a re-run of the same write rebuilds the missing sidecar.

stringforge.vulcan.writer.PROVENANCE_METADATA_KEY: bytes = b'stringforge.vulcan.provenance'#

Parquet kv-metadata key under which we stash the full provenance blob. Mirrors the pattern at vacua_writer.py:1714-1727.

stringforge.vulcan.writer.SCHEMA_VERSION_METADATA_KEY: bytes = b'schema_version'#

Parquet kv-metadata key carrying the schema version.

class stringforge.vulcan.writer.StagedShard(run_id, parquet_path, sidecar_path, path_in_repo, n_rows)#

Bases: object

Reference to a freshly staged shard on local disk.

run_id#

The run identifier carried on every row.

parquet_path#

Absolute path to the staged parquet file.

sidecar_path#

Absolute path to the per-file .meta.json.

path_in_repo#

Canonical HF-repo-relative target path.

n_rows#

Number of vacuum rows written.

stringforge.vulcan.writer.ensure_staging_layout(staging_dir)#

Ensure the Vulcan staging directory carries the canonical pending/, synced/, and failed/ subdirectories.

Idempotent: safe to call before every write_shard() or sync_pending() invocation.

Parameters:

staging_dir (Path) – Root staging directory.

Returns:
  • Path – The resolved staging root (with the three subdirectories

  • guaranteed to exist).

Return type:

Path

stringforge.vulcan.writer.list_pending(staging_dir)#

Enumerate currently-staged shards awaiting sync.

Each .parquet in pending/ is paired with its .parquet.meta.json sidecar. Shards without a sidecar (e.g. after a crashed write where the parquet flushed but the sidecar did not) are skipped silently – the next worker call recreates them.

Parameters:

staging_dir (Path) – Root staging directory.

Returns:
  • list[StagedShard] – All complete shard records found in

  • ``pending/``, sorted by filename for deterministic ordering.

Return type:

list[StagedShard]

stringforge.vulcan.writer.move_to(shard, staging_dir, target)#

Move a staged shard’s (parquet, sidecar) pair into a sibling subdirectory.

The two files are moved sequentially, not as an atomic pair: a crash between the two shutil.move calls can leave the parquet in the destination directory while the sidecar remains in the source. Orphan recovery is the responsibility of list_pending() (which skips parquets without a sidecar) and of a future prune_orphans helper.

Parameters:
  • shard (StagedShard) – The shard reference returned by write_shard().

  • staging_dir (Path) – Root staging directory.

  • target (str) – Target subdirectory name – typically SYNCED_DIRNAME or FAILED_DIRNAME.

Returns:
  • StagedShard – A new shard record whose paths point at the

  • post-move locations.

Return type:

StagedShard

stringforge.vulcan.writer.write_shard(df, *, staging_dir, run_id, geometry, tadpole_charge=None, solver=None, provenance=None, bucket_bits=4)#

Write a single vacuum-batch parquet shard to staging_dir/pending/.

The writer is the authority on identity: run_id, geometry_id, and the geometry-key columns are unconditionally overwritten here, regardless of any per-row values the caller may have supplied. tadpole_charge and created_at, by contrast, are filled in only when absent from df. The default run_id substitution dict used by the calling Vulcan handle is the union of geometry, solver['name'] (when supplied), provenance['seed'] (when supplied), and extra_kwargs.

Schema validation runs after this coercion – validate_schema() must succeed before any I/O happens, so a bad batch never leaves a half-written shard behind.

The on-disk write is atomic per-file – not per-pair: each of the parquet body and the JSON sidecar is staged as a .tmp file and os.replace-d into place, but the two replacements are sequential. A crash between them can leave a parquet without a sidecar; list_pending() silently skips such orphans, so the sync tier never uploads a half-pair, and a re-run of the same write rebuilds the missing sidecar. The .tmp suffix carries a per-process, per-call nonce so concurrent writers on the same (geometry, run_id) do not corrupt each other’s staged files.

Parameters:
  • df (DataFrame) – Per-vacuum DataFrame. Must carry the vault-floor columns (see REQUIRED_COLUMNS).

  • staging_dir (Path) – Root staging directory. Subdirs are created on demand via ensure_staging_layout().

  • run_id (str) – Identifier carried on every row. Must satisfy is_safe_run_id().

  • geometry (Mapping[str, Any]) – {h11, h12, ks_id, triang_id, conifold_id, cicy_id} mapping. h11, h12 are interpreted in mirror (jaxvacua / lcs_tree) convention. Missing keys are filled with GEOMETRY_KEY_SENTINEL, but at least one key must be set to a non-sentinel value.

  • tadpole_charge (Optional[int]) – N_flux for the batch. Required unless already present on df as a column.

  • solver (Optional[Mapping[str, Any]]) – Optional {name, config_hash, ...} mapping; embedded in the parquet kv-metadata blob.

  • provenance (Optional[Mapping[str, Any]]) – Optional {git_sha, seed, wall_clock_s, ...} mapping; embedded alongside solver in the same blob.

  • bucket_bits (int) – Width of the bucket prefix in the HF path layout.

Returns:
  • StagedShard – Reference to the on-disk parquet + sidecar pair

  • and the canonical HF-repo-relative path.

Raises:

ValueErrorrun_id is not slug-safe, df is empty, geometry carries no non-sentinel keys, or validate_schema() rejects the coerced DataFrame.

Return type:

StagedShard