writer#
Worker-side shard writer for Vulcan.
Worker code never touches HuggingFace. write_shard() lands a
validated parquet file inside {staging_dir}/pending/ together with
a per-file .meta.json sidecar carrying the geometry, solver, and
provenance dicts. The sync tier (stringforge.vulcan.sync)
later batches these into HuggingFace commits.
Atomicity: each of the parquet body and the JSON sidecar is staged as a .tmp file and os.replace-d into place, so neither file is ever observed half-written. The two os.replace calls are sequential, not atomic as a pair: a crash between them can leave a parquet without a sidecar. list_pending() silently skips such orphan parquets, so the sync tier never uploads a half-pair, and a re-run of the same write rebuilds the missing sidecar.
- stringforge.vulcan.writer.PROVENANCE_METADATA_KEY: bytes = b'stringforge.vulcan.provenance'#
Parquet kv-metadata key under which we stash the full provenance blob. Mirrors the pattern at vacua_writer.py:1714-1727.
- stringforge.vulcan.writer.SCHEMA_VERSION_METADATA_KEY: bytes = b'schema_version'#
Parquet kv-metadata key carrying the schema version.
- class stringforge.vulcan.writer.StagedShard(run_id, parquet_path, sidecar_path, path_in_repo, n_rows)#
Bases:
objectReference to a freshly staged shard on local disk.
- run_id#
The run identifier carried on every row.
- parquet_path#
Absolute path to the staged parquet file.
- sidecar_path#
Absolute path to the per-file
.meta.json.
- path_in_repo#
Canonical HF-repo-relative target path.
- n_rows#
Number of vacuum rows written.
- stringforge.vulcan.writer.ensure_staging_layout(staging_dir)#
Ensure the Vulcan staging directory carries the canonical
pending/,synced/, andfailed/subdirectories.Idempotent: safe to call before every
write_shard()orsync_pending()invocation.
- stringforge.vulcan.writer.list_pending(staging_dir)#
Enumerate currently-staged shards awaiting sync.
Each
.parquetinpending/is paired with its.parquet.meta.jsonsidecar. Shards without a sidecar (e.g. after a crashed write where the parquet flushed but the sidecar did not) are skipped silently – the next worker call recreates them.- Parameters:
staging_dir (
Path) – Root staging directory.- Returns:
list[StagedShard] – All complete shard records found in
``pending/``, sorted by filename for deterministic ordering.
- Return type:
- stringforge.vulcan.writer.move_to(shard, staging_dir, target)#
Move a staged shard’s
(parquet, sidecar)pair into a sibling subdirectory.The two files are moved sequentially, not as an atomic pair: a crash between the two
shutil.movecalls can leave the parquet in the destination directory while the sidecar remains in the source. Orphan recovery is the responsibility oflist_pending()(which skips parquets without a sidecar) and of a futureprune_orphanshelper.- Parameters:
shard (
StagedShard) – The shard reference returned bywrite_shard().staging_dir (
Path) – Root staging directory.target (
str) – Target subdirectory name – typicallySYNCED_DIRNAMEorFAILED_DIRNAME.
- Returns:
StagedShard – A new shard record whose paths point at the
post-move locations.
- Return type:
- stringforge.vulcan.writer.write_shard(df, *, staging_dir, run_id, geometry, tadpole_charge=None, solver=None, provenance=None, bucket_bits=4)#
Write a single vacuum-batch parquet shard to
staging_dir/pending/.The writer is the authority on identity:
run_id,geometry_id, and the geometry-key columns are unconditionally overwritten here, regardless of any per-row values the caller may have supplied.tadpole_chargeandcreated_at, by contrast, are filled in only when absent fromdf. The defaultrun_idsubstitution dict used by the calling Vulcan handle is the union ofgeometry,solver['name'](when supplied),provenance['seed'](when supplied), andextra_kwargs.Schema validation runs after this coercion –
validate_schema()must succeed before any I/O happens, so a bad batch never leaves a half-written shard behind.The on-disk write is atomic per-file – not per-pair: each of the parquet body and the JSON sidecar is staged as a
.tmpfile andos.replace-d into place, but the two replacements are sequential. A crash between them can leave a parquet without a sidecar;list_pending()silently skips such orphans, so the sync tier never uploads a half-pair, and a re-run of the same write rebuilds the missing sidecar. The.tmpsuffix carries a per-process, per-call nonce so concurrent writers on the same(geometry, run_id)do not corrupt each other’s staged files.- Parameters:
df (
DataFrame) – Per-vacuum DataFrame. Must carry the vault-floor columns (seeREQUIRED_COLUMNS).staging_dir (
Path) – Root staging directory. Subdirs are created on demand viaensure_staging_layout().run_id (
str) – Identifier carried on every row. Must satisfyis_safe_run_id().geometry (
Mapping[str,Any]) –{h11, h12, ks_id, triang_id, conifold_id, cicy_id}mapping. h11, h12 are interpreted in mirror (jaxvacua / lcs_tree) convention. Missing keys are filled withGEOMETRY_KEY_SENTINEL, but at least one key must be set to a non-sentinel value.tadpole_charge (
Optional[int]) –N_fluxfor the batch. Required unless already present ondfas a column.solver (
Optional[Mapping[str,Any]]) – Optional{name, config_hash, ...}mapping; embedded in the parquet kv-metadata blob.provenance (
Optional[Mapping[str,Any]]) – Optional{git_sha, seed, wall_clock_s, ...}mapping; embedded alongsidesolverin the same blob.bucket_bits (
int) – Width of the bucket prefix in the HF path layout.
- Returns:
StagedShard – Reference to the on-disk parquet + sidecar pair
and the canonical HF-repo-relative path.
- Raises:
ValueError –
run_idis not slug-safe,dfis empty,geometrycarries no non-sentinel keys, orvalidate_schema()rejects the coerced DataFrame.- Return type: