RunHandler, and lets you poll for status, block for the result, stream progress, retry on transient failures, or terminate runaways.
Prerequisites. Installed SDK · API key set · about 15 minutes.
What you’ll build
RunHandler — pipeline.start() returns one, and every other operation on a background run goes through it.
Build the pipeline
Any pipeline that takes more than a few seconds is a candidate for background execution. Use the fluent The
pipeline.add(name=, id=) builder.try / except → fetch / new idiom is the standard “create or update” pattern in every canonical example script.Start the run in the background
pipeline.start(inputs=...) returns immediately with a RunHandler. No blocking.handler.task_id (Redis, your DB, whatever) — you can resume polling or terminate from any process with just the task id.Poll for status
handler.run_status() is a single non-blocking check. Use it for progress UIs, dashboards, or scheduled workers that can’t hold a long-lived connection.status ("running", "succeeded", "failed", "cancelled") and run metadata.Block until done
handler.result(poll_interval=, timeout=) polls internally and returns the final result, or raises PollTimeoutError if the timeout elapses.result.get("result") is the dict of output-node values — keyed by the output node’s node_name.Handle failures cleanly
Two failure modes matter: the run itself failed (The rule: retry transient errors (rate limit, 5xx, timeouts), don’t retry on
PipelineRunFailedError) and the API rejected the call (RateLimitError, InternalServerError, …). Catch each explicitly and act differently — retry transient errors, surface run failures.PipelineRunFailedError — that’s a bug in your inputs or pipeline, not a flake.Cancel a runaway run
Two ways: through the handler, or directly on the pipeline if you only have the See
task_id.terminate-run example.Stream progress (the alternative pattern)
pipeline.stream(inputs=...) runs the pipeline inline but yields chunks as nodes complete — handy when you want to render progress as the pipeline executes, not after.stream() is the only surface that exposes per-node intermediate outputs — regular run() only carries final-output values. See the intermediate-results example.Streaming via run() — for LLM token streams
If you want token-level streaming out of an LLM node, configure the node with Three different streaming surfaces, three different jobs:
stream=True and run the pipeline with run(inputs, stream=True). The pipeline yields raw SSE frames.pipeline.run(inputs, stream=True)→ raw SSE frames; for token-level LLM streaming. The output node must declareoutput_type="stream<string>".pipeline.stream(inputs)→ typed chunks with.type,.output_name,.output_value; for inspecting every node’s output.pipeline.start(...)+handler.result(...)→ no streaming; for background execution that returns final outputs.
Operational tips
- Idempotency. Persist the input batch with a deterministic id before calling
start. If your worker crashes mid-run, dedupe on retry instead of double-processing. - Concurrency.
startis non-blocking, but the platform applies per-org concurrency limits. Watch forRateLimitErrorand back off — don’t fan out 1,000starts at once. - Observability. Log
task_idandtrace_idon everystart,result,terminate, and exception. When something goes wrong in production, those ids are the only handles support has. - Async transport.
pipeline.astart/aresult/aterminate/astreamkeepasyncioruntimes from blocking. Installvectorshift[async]to use them.
What’s next
Content workflow
Branching pipelines with multiple LLM stages.
Streaming example
The pipeline-level streaming pattern, isolated.
Errors reference
Full exception hierarchy and retry guidance.
