Python-first orchestration for data work.
Prefect is a workflow orchestrator. You write normal Python functions, mark the important pieces with @flow and @task, then let Prefect schedule, run, retry, log, and observe them.
This version is written for an Airflow 3.0 engineer moving data through AWS, Snowflake, Snowpark, Pulumi, and bronze/silver/gold layers.
The honest mental model
Pythonic Data Orchestration with Prefect 3
A podcast-style walkthrough of Prefect, Airflow migration, and the AWS/Snowflake/Pulumi stack.
Prefect is not a warehouse, not a compute platform, and not a data modeling tool. It is the traffic controller for work that should run reliably.
In Airflow, the DAG is the object you design first. In Prefect, the Python function is the thing you design first. A function decorated with @flow becomes the workflow entrypoint. A function decorated with @task becomes an observed step inside that workflow.
from prefect import flow, task
@task(retries=3, retry_delay_seconds=30)
def extract_transactions(since_ts: str) -> list[dict]:
return call_canton_api(since_ts=since_ts)
@task
def land_raw_json(rows: list[dict]) -> str:
key = write_json_to_s3(rows)
return key
@task
def load_bronze(s3_key: str) -> int:
return copy_json_into_snowflake_bronze(s3_key)
@flow(name="canton-transactions-bronze")
def bronze_transactions_flow() -> None:
since_ts = read_cdc_watermark("bronze_canton_transactions")
rows = extract_transactions(since_ts)
s3_key = land_raw_json(rows)
loaded = load_bronze(s3_key)
write_cdc_watermark("bronze_canton_transactions", loaded)
if __name__ == "__main__":
bronze_transactions_flow()Core Prefect pieces
The objects you actually need
| Piece | Plain meaning | Airflow translation |
|---|---|---|
| Flow | The top-level workflow function. | DAG role, but written as normal Python. |
| Task | A step Prefect tracks, retries, caches, logs, and maps. | Operator or TaskFlow function. |
| Deployment | A named runnable version of a flow, with schedule and params. | DAG registration plus schedule metadata. |
| Work pool | The queue/channel that says where runs should execute. | Executor/queue/infrastructure boundary. |
| Worker | The process that polls a work pool and starts flow runs. | Worker process, but attached to Prefect pools. |
| Blocks | Reusable connection/config objects, often secrets or infra. | Connections and Variables, but typed. |
| Assets and artifacts | Human-readable run outputs and lineage hints. | Task logs, XCom summaries, lineage views. |
The backend stores orchestration metadata: states, logs, events, assets, blocks, variables, automations, schedules, work pools, and deployments. Your data work can still run in AWS, ECS/Fargate, Kubernetes, local Docker, or Prefect-managed infrastructure.
Airflow migration
The shift is from declaring a static graph up front to writing a Python program that Prefect observes as it runs.
Airflow habit
- Define the DAG file first.
- Choose Operators for every step.
- Pass small values through XCom.
- Scheduler parses DAG files repeatedly.
- Acyclic graph is enforced by the framework.
Prefect habit
- Write importable Python functions first.
- Use tasks for steps worth observing or retrying.
- Pass normal Python values between calls.
- Deploy the flow when it should be scheduled remotely.
- Acyclic behavior is a design discipline, not a graph rule.
For a clean migration, start with one Airflow DAG and rewrite only the workflow shell. Keep the business logic functions intact. Then add Prefect retries, schedules, blocks, and work pools around that code.
AWS, Snowflake, Snowpark, Pulumi
Recommended boundary
Let Pulumi own infrastructure, Snowflake own analytical storage and compute, S3 own raw landing files, and Prefect own orchestration. That separation keeps each tool in its lane.
| Layer | Tool | Responsibility |
|---|---|---|
| IaC | Pulumi | S3 buckets, IAM roles, ECS task roles, alerts, secrets. |
| Landing | S3 | Raw API extracts as immutable JSON files. |
| Warehouse | Snowflake | Bronze, silver, and gold tables. |
| Transforms | Snowpark | Python transformations that execute close to warehouse data. |
| Control plane | Prefect | Schedules, retries, state, logs, alerts, and run history. |
import * as aws from "@pulumi/aws";
const rawBucket = new aws.s3.Bucket("canton-raw-api", {
versioning: { enabled: true },
serverSideEncryptionConfiguration: {
rule: {
applyServerSideEncryptionByDefault: { sseAlgorithm: "AES256" },
},
},
});
const prefectTaskRole = new aws.iam.Role("prefect-task-role", {
assumeRolePolicy: aws.iam.assumeRolePolicyForPrincipal({
Service: "ecs-tasks.amazonaws.com",
}),
});Medallion architecture
Bronze keeps source truth, silver standardizes it, and gold shapes it for analytics.
| Layer | What goes there | Prefect role |
|---|---|---|
| Bronze | Raw API payload plus ingestion metadata. | CDC check, API pull, S3 landing, quality gate, load. |
| Silver | Typed, deduped, conformed transaction rows. | Transform, validate, merge, publish lineage/artifacts. |
| Gold | Dashboard-ready facts and aggregates. | Business rules, rollups, SLA checks, alerts. |
create table if not exists bronze.canton_transactions_raw (
bronze_record_id string default uuid_string(),
source_system string,
source_api_url string,
source_event_id string,
cdc_from_ts timestamp_ntz,
cdc_to_ts timestamp_ntz,
s3_object_key string,
raw_payload variant,
ingest_run_id string,
created_at timestamp_ntz default current_timestamp(),
updated_at timestamp_ntz default current_timestamp()
);Canton transaction example
Bronze workflow
- Read the CDC watermark from Snowflake.
- Call the Canton Network transaction API from the last point to now.
- Write the raw response to S3 as JSON.
- Read S3 back, run schema and quality checks, then load bronze.
- Run a post-write integration check and send Slack if it fails.
Silver and gold workflows
Silver repeats the same operating pattern, but transforms bronze data into typed and conformed rows. Gold turns silver transactions into dashboard models: roll timestamps to the nearest minute, count transactions by account, sum Canton, and calculate USD value with a configured conversion rate.
Deploy and operate
A deployment is the point where a local flow becomes a named thing Prefect can schedule and workers can run. For AWS data infra, the usual production target is an ECS/Fargate or Kubernetes work pool.
prefect deploy flows/bronze_transactions.py:bronze_transactions_flow \
--name canton-bronze-hourly \
--pool aws-ecs-prod \
--cron "0 * * * *"- Use retries on tasks that talk to APIs or cloud services.
- Use alerts for failed runs, late runs, and freshness failures.
- Store API credentials and Slack webhooks in blocks or your secret manager.
- Keep raw S3 objects immutable so failed loads can be replayed.
- Promote dev to prod by changing deployment config, not flow logic.