Idempotent Ingestion
Every ingestion run is governed by a manifest: a small JSON record written to S3 at the end of the run that serves as both the audit record and the control signal. Four properties fall out of that single primitive.
Idempotency. Before opening a database connection, an SFTP transport, or listing a single S3 object, each ingestion path checks whether the latest manifest for that source and date already succeeded, and returns early if so (Snippet 1, Snippet 2). The check fails open: a corrupt or unreadable manifest re-ingests rather than skipping, so malformed state is never mistaken for success. Because the skip is keyed on business date, a scheduler that fires repeatedly only does work when a date hasn't yet been ingested successfully.
Append-only auditability. Append-only is structural, not enforced by a policy (Snippet 3). Every run mints a fresh uuid4 run_id and writes its manifest to a key namespaced by that id, so two runs can never collide, and nothing in the codebase deletes or overwrites a manifest. A failed run and its successful re-run coexist as a complete audit trail; "latest wins" is simply max(LastModified) over the prefix — exactly what the idempotency check reads back.
Source-matched integrity. Each manifest records source, environment, run ID, business date, status, start/end timing, an error list, and a per-entity outputs block. Integrity controls are matched to the source rather than applied uniformly (Snippet 4): SFTP is an opaque third-party file drop with no native integrity metadata, so every file is SHA-256'd on read to fingerprint exactly what landed; S3 objects already carry an ETag content hash; and the Postgres path reads a live query result, where a byte digest of the derived Parquet would be meaningless. An integrity control for the Postgres path would capture row count, though that had not yet been implemented and is in the backlog.
Incremental resume. Because the pipeline processes one business day at a time and there is no state between sessions, every run must answer "which day do I process next?" with no database and no cursor file. The manifests are the cursor (Snippet 5). discover_latest_successful_date scans the durable manifest history for a source, newest-first, and returns the most recent date with a successful run; discover_next_business_date then advances all sources together as min(latest_successful_date) + 1 day. The min is deliberate — if one source fails or lags, taking the minimum guarantees none races ahead and leaves a gap, so dbt and the Gold layer always see complete days. The mechanism is stateless and crash-safe: an interrupted run leaves no successful manifest, so the next run rediscovers the same target and continues, letting a freshly redeployed pipeline pick up exactly where the previous session stopped.
Taken together, the manifest is a single primitive behind four properties — idempotency, append-only auditability, source-matched integrity, and incremental resume — rather than four mechanisms bolted on.
# src/access_iq/ingestion/idempotency.py:25-42
def should_skip_if_already_successful(*, s3: Any, bucket: str, manifest_prefix: str) -> bool:
manifest_prefix = normalize_manifest_prefix(manifest_prefix)
key = _latest_manifest_key(s3=s3, bucket=bucket, prefix=manifest_prefix)
if not key:
return False
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
try:
manifest = json.loads(body)
except (TypeError, json.JSONDecodeError):
log.warning("manifest_decode_failed", bucket=bucket, key=key) #fail open: re-ingest, don't skip on bad data
return False
if not isinstance(manifest, dict):
log.warning("manifest_not_dict", bucket=bucket, key=key) #fail open: never treat malformed manifest as success
return False
return bool(manifest.get("status") == "success") # src/access_iq/ingestion/postgres.py:104-115
if should_skip_if_already_successful(
s3=s3, bucket=platform_bucket, manifest_prefix=manifest_prefix
):
bound_log.info("ingest_skipped", reason="latest_manifest_success")
return {
"source": db,
"run_id": run_id,
"env": env,
"ingest_date": ingest_date.isoformat(),
"status": "skipped",
"reason": "latest_manifest_success",
} # src/access_iq/ingestion/manifests.py:48-49, 58-80
# run_id is defined as str(uuid.uuid4()) #fresh per run — guarantees a unique manifest key
def build_manifest_key(*, source: str, ingest_date: str, run_id: str) -> str:
#run_id namespaces the key, so two runs can never target the same object
return f"_manifests/source={source}/ingest_date={ingest_date}/run_id={run_id}.json"
def write_manifest(
*, s3: Any, bucket: str, manifest: Manifest, kms_key_arn: str | None = None
) -> str:
key = build_manifest_key(
source=manifest.source,
ingest_date=manifest.ingest_date,
run_id=manifest.run_id,
)
body = json.dumps(manifest.model_dump(), indent=2, default=str).encode("utf-8")
#only ever put_object to a fresh run_id key — no delete, no overwrite anywhere
s3.put_object(
Bucket=bucket,
Key=key,
Body=body,
ContentType="application/json",
**s3_kms_args(kms_key_arn),
)
return key # src/access_iq/ingestion/sftp.py:31-32, 134-137
def sha256_bytes(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
# ... within ingest_sftp_directory_to_bronze:
with sftp.open(remote_path, "rb") as f:
data = f.read()
digest = sha256_bytes(data) #fingerprint exactly what landed — SFTP gives no integrity metadata # src/access_iq/ingestion/manifests.py:83-119
def discover_latest_successful_date(*, s3: Any, bucket: str, source: str) -> date | None:
"""Find the latest ingest_date with a successful manifest for a source.
Scans manifest prefixes by ingest_date, checks from newest to oldest,
and returns the first date whose latest manifest has status=success.
"""
prefix = normalize_manifest_prefix(f"_manifests/source={source}/")
paginator = s3.get_paginator("list_objects_v2")
dates: set[date] = set()
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/"):
for cp in page.get("CommonPrefixes", []):
match = _re.search(r"ingest_date=(\d{4}-\d{2}-\d{2})", cp["Prefix"])
if match:
dates.add(date.fromisoformat(match.group(1)))
if not dates:
return None
for d in sorted(dates, reverse=True): #newest first — stop at the first proven-good date
mp = build_manifest_prefix(source=source, ingest_date=d.isoformat())
latest: dict[str, Any] | None = None
for page in paginator.paginate(Bucket=bucket, Prefix=mp):
for obj in page.get("Contents", []):
if latest is None or obj["LastModified"] > latest["LastModified"]:
latest = obj
if not latest:
continue
body = s3.get_object(Bucket=bucket, Key=latest["Key"])["Body"].read()
try:
manifest_data = json.loads(body)
if isinstance(manifest_data, dict) and manifest_data.get("status") == "success":
return d
except (TypeError, json.JSONDecodeError):
continue
return None