Skip to main content

Lifecycle

new

Pipeline.new(name: str, nodes: Optional[list[Node]] = None) -> Pipeline
Create a new pipeline with the specified parameters. Parameters
name
str
required
The name of the pipeline.
nodes
Optional[list[Node]]
default:"None"
List of nodes to include in the pipeline.
Returns
returns
Pipeline
A new Pipeline instance.

save

Pipeline.save(deploy: bool = True, bump: Optional[BumpLevel] = None, description: Optional[str] = None)
Save the pipeline with its current configuration. Updates the pipeline in the database with the current nodes, inputs and outputs configuration.

deploy=True vs deploy=False — the branch model

Think of a pipeline like a Git repo:
  • Working tree — the Pipeline object in your Python process. Every pipeline.add(...) mutation lives here.
  • Main branch — the branch_id on the server. save(deploy=False) writes your working tree to this branch as a new commit. Anything reading by branch_id (the platform editor, ad-hoc pipeline.run(...) from this SDK process) immediately sees the change.
  • Deployed version — what callers of the published pipeline get. Chatbots, integrations, scheduled runs, and the platform’s “Run published” button all read this version. save(deploy=True) (the default) promotes the current main-branch state to be the new deployed version.
bump attaches a semantic-version tag ("major", "minor", or "patch") to the deploy. Consumers can then pin to that version via Pipeline.fetch(id=..., version="1.2.0") instead of always tracking latest. In practice:
  • Iterating on a draft? save(deploy=False) — keeps the deployed version stable while you tweak.
  • Ready to ship? save(deploy=True) (default) — optionally with bump="patch" + description="..." for a labelled release.
Parameters
deploy
bool
default:"True"
When True, promotes the saved state to be the new deployed version (the one chatbots, integrations, and the “Run published” path will hit). When False, only updates the working/main branch — the deployed version is untouched until the next save(deploy=True).
bump
Optional[BumpLevel]
default:"None"
Semantic-version bump level: "major", "minor", or "patch". Only meaningful with deploy=True. See BumpLevel.
description
Optional[str]
default:"None"
Updates the pipeline description; also used as the changelog entry when bumping a version.
Returns
returns
dict: A dictionary containing the status of the save operation.
Raises
Exception
exception
If the pipeline update fails.

fetch

Pipeline.fetch(id: Optional[str] = None, name: Optional[str] = None, branch_id: Optional[str] = None, version: Optional[str] = None, username: Optional[str] = None, org_name: Optional[str] = None) -> Pipeline
Fetches an existing pipeline by id or name. branch_id and version only apply when fetching by id. Parameters
id
Optional[str]
default:"None"
The unique identifier of the pipeline to fetch.
name
Optional[str]
default:"None"
The name of the pipeline to fetch. When multiple pipelines share the same name, the most recently modified one is returned.
branch_id
Optional[str]
default:"None"
Fetch the pipeline for a specific branch (id-fetch only).
version
Optional[str]
default:"None"
Semantic version to fetch (e.g. "1.2.0"), or "latest" (id-fetch only).
username
Optional[str]
default:"None"
The username of the pipeline owner.
org_name
Optional[str]
default:"None"
The organization name of the pipeline owner.
Returns
returns
Pipeline
Pipeline: The fetched Pipeline instance.
Raises
ValueError
exception
If neither id nor name is provided.

list

Pipeline.list(folder_id: Optional[str] = None, include_shared: Optional[bool] = None, verbose: Optional[bool] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> "list[Pipeline]"
List pipelines. Parameters
folder_id
Optional[str]
default:"None"
Filter pipelines by folder.
include_shared
Optional[bool]
default:"None"
Include shared pipelines.
verbose
Optional[bool]
default:"None"
Include full pipeline details.
offset
Optional[int]
default:"None"
Pagination offset.
limit
Optional[int]
default:"None"
Maximum number of pipelines to return.
Returns
returns
list[Pipeline]
A list of Pipeline instances.

delete

Pipeline.delete()
Deletes an existing pipeline. Returns
returns
dict: A dictionary containing the status of the deletion operation.
Raises
Exception
exception
If the pipeline couldn’t be deleted.

duplicate

Pipeline.duplicate(name: Optional[str] = None, description: Optional[str] = None, folder_id: Optional[str] = None) -> Pipeline
Duplicate this pipeline. Parameters
name
Optional[str]
default:"None"
Name for the duplicate. Auto-generated if omitted.
description
Optional[str]
default:"None"
Description for the duplicate.
folder_id
Optional[str]
default:"None"
Folder to place the duplicate in.
Returns
returns
Pipeline
A new Pipeline instance for the duplicated pipeline.

Building

add

Pipeline.add -> NodeAdder  # property
Fluent builder for adding nodes to this pipeline. Usage
inp = pipeline.add.input(name="query", input_type="string")
llm = pipeline.add.llm(provider="openai", model="gpt-4o", prompt=inp.text)
Returns
returns
NodeAdder

add_batch

Pipeline.add_batch -> BatchNodeAdder  # property
Fluent builder that adds nodes with execution_mode='batch'. Usage
llm = pipeline.add_batch.llm(provider="openai", model="gpt-5.1", prompt=inp.text)
# equivalent to: pipeline.add.llm(..., execution_mode="batch")
Returns
returns
BatchNodeAdder

Node catalogue

pipeline.add(name=, id=).<node_type>(...) exposes every node type as a typed builder method. There are ~500 node types in total. The biggest categories:
CategoryRepresentative builders
I/Oinput, output, text, file, image, audio
LLM & AIllm, ai_routing, ai_filter_list, ai_operations, ai_text_to_image, ai_image_to_text, ai_text_to_speech, ai_speech_to_text, ai_fill_pdf, categorizer
Knowledge & retrievalknowledge_base_reader, knowledge_base_writer, web_scraper, arxiv, wikipedia, deep_research, semantic_search, parallel_ai_search
Control flowcondition, ai_routing, merge, combine_text, pipeline (sub-pipeline), agent (agent-as-tool)
Transformstext_formatter, combine_text, split_text, transformation (custom Python), generate_chart
Data sourcesread_json, read_csv, dataframe_*, excel_*, append_files, file_save
Integrationsgmail, slack, notion, airtable, salesforce, hubspot, … (every connected service exposes a node)
Interactionbutton, chat_*, browser_extension, api
Each builder returns a typed node object with attributes (.text, .response, .results, .path_0, etc.) you wire into downstream nodes. The full surface ships as a .pyi stub (vectorshift/pipeline/node_adder.pyi) — your editor’s autocomplete is the canonical browseable catalogue.
inp = pipeline.add(name="q", id="q").input(input_type="string")
llm = pipeline.add(name="llm", id="llm").llm(
    provider="openai", model="gpt-5.1", prompt=inp.text,
)
pipeline.add(name="out", id="out").output(output_type="string", value=llm.response)
For runnable examples covering each category, see the pipeline examples gallery — the sidebar groups them by purpose (Core, Logic & routing, Composition, Streaming, Background, Integrations, Lifecycle).

add_node

Pipeline.add_node(node: Node) -> None
Add a node to the pipeline. If a node with the same name already exists, it will be replaced in-place. Parameters
node
Node
required
The node to add to the pipeline.

remove_node

Pipeline.remove_node(node_id: Optional[str] = None, node_name: Optional[str] = None) -> dict[str, Any]
Remove a node (and its connected edges) from the pipeline. Provide either node_id or node_name. Parameters
node_id
Optional[str]
default:"None"
ID of the node to remove.
node_name
Optional[str]
default:"None"
Name of the node to remove.
Returns
returns
dict[str, Any]
\{"status": "success", "node_id": "\<resolved_node_id>"\}

Running

run

Pipeline.run(inputs: dict[str, Any], stream: bool = False, stream_all_outputs: bool = False, session_id: Optional[str] = None, node_input_overrides: Optional[dict[str, Any]] = None, send_intermediate_results: Optional[bool] = None) -> Union[dict[str, Any], Generator]
Run the pipeline with the specified inputs. Parameters
inputs
dict[str, Any]
required
Dictionary of input nodes -> input values for the pipeline. eg: {“input_node”: “Hello, world!”}
stream
bool
default:"False"
Whether to stream the response. (Set true only when pipeline has an output node with a streaming llm input)
stream_all_outputs
bool
default:"False"
Whether to stream all outputs as they arrive.
session_id
Optional[str]
default:"None"
Optional session ID for run grouping and tracing. Groups multiple runs under the same session for analytics and observability. Note: this does not provide conversational memory to LLM nodes — each run’s LLM nodes only see the current inputs.
node_input_overrides
Optional[dict[str, Any]]
default:"None"
Optional per-node input overrides.
send_intermediate_results
Optional[bool]
default:"None"
Whether to send intermediate results (default: True on server).
Returns
returns
Union[dict[str, Any], Generator]
Union[dict[str, Any], Generator]: A dictionary containing pipeline outputs and run_id. If stream is True, returns a generator that yields response chunks.
Raises
Exception
exception
If the pipeline execution fails.

bulk_run

Pipeline.bulk_run(inputs: list[dict[str, Any]]) -> list[dict[str, Any]]
Run the pipeline with a list of specified inputs. Parameters
inputs
list[dict[str, Any]]
required
List of dictionaries of input values for the pipeline.
Returns
returns
list[dict[str, Any]]
A list of dictionaries containing the run_id and outputs for each set of inputs.
Raises
Exception
exception
If the pipeline execution fails.

run_status

Pipeline.run_status(run_id: str) -> RunStatus
Check the status of an async pipeline run. Parameters
run_id
str
required
The run/task ID returned by :meth:start.
Returns
returns
RunStatus
A :class:RunStatus dict with keys task_id, status, and optionally error or result. See RunStatus.

Background runs

start

Pipeline.start(inputs: dict[str, Any], session_id: Optional[str] = None, node_input_overrides: Optional[dict[str, Any]] = None, send_intermediate_results: Optional[bool] = None, webhook_url: Optional[str] = None, version: Optional[str] = None, trace_id: Optional[str] = None) -> RunHandler
Start pipeline in background, return a RunHandler. Parameters
inputs
dict[str, Any]
required
Dictionary of input values for the pipeline.
session_id
Optional[str]
default:"None"
Optional session ID for run grouping and tracing. Groups multiple runs under the same session for analytics and observability. Note: this does not provide conversational memory to LLM nodes — each run’s LLM nodes only see the current inputs.
node_input_overrides
Optional[dict[str, Any]]
default:"None"
Optional per-node input overrides.
send_intermediate_results
Optional[bool]
default:"None"
Whether to send intermediate results.
webhook_url
Optional[str]
default:"None"
URL to call when the run completes.
version
Optional[str]
default:"None"
Pipeline version to run.
trace_id
Optional[str]
default:"None"
Trace ID for observability.
Returns
returns
RunHandler
A :class:RunHandler for the background run.

terminate

Pipeline.terminate(run_id: str) -> dict[str, Any]
Terminate an active pipeline run. Parameters
run_id
str
required
The pipeline run ID to terminate.
Returns
returns
dict[str, Any]
\{"status": "success"\}

Streaming

stream

Pipeline.stream(inputs: dict[str, Any], stream_all_outputs: bool = False, session_id: Optional[str] = None, node_input_overrides: Optional[dict[str, Any]] = None, version: Optional[str] = None) -> Generator[StreamChunk, None, None]
Run the pipeline with streaming, yielding StreamChunk objects. Parameters
inputs
dict[str, Any]
required
Dictionary of input values for the pipeline.
stream_all_outputs
bool
default:"False"
Whether to stream all outputs as they arrive.
session_id
Optional[str]
default:"None"
Optional session ID for run grouping and tracing. Groups multiple runs under the same session for analytics and observability. Note: this does not provide conversational memory to LLM nodes — each run’s LLM nodes only see the current inputs.
node_input_overrides
Optional[dict[str, Any]]
default:"None"
Optional per-node input overrides.
version
Optional[str]
default:"None"
Pipeline version to run.
Returns
returns
Generator[StreamChunk, None, None]
A generator of :class:StreamChunk objects.

Sharing & publishing

share

Pipeline.share(user_id: Optional[str] = None, org_id: Optional[str] = None, role: "Literal[viewer, editor]" = viewer, name: Optional[str] = None) -> dict[str, Any]
Share this pipeline with a user or organization. At least one of user_id or org_id is required. Parameters
user_id
Optional[str]
default:"None"
User ID to share with.
org_id
Optional[str]
default:"None"
Organization ID to share with.
role
Literal[viewer, editor]
default:"'viewer'"
Permission role ("viewer" or "editor").
name
Optional[str]
default:"None"
Display name for the shared entity.
Returns
returns
dict[str, Any]
\{"status": "success"\}

unshare

Pipeline.unshare(user_id: Optional[str] = None, org_id: Optional[str] = None) -> dict[str, Any]
Remove sharing for this pipeline. At least one of user_id or org_id is required. Parameters
user_id
Optional[str]
default:"None"
User ID to unshare.
org_id
Optional[str]
default:"None"
Organization ID to unshare.
Returns
returns
dict[str, Any]
\{"status": "success"\}

publish

Pipeline.publish(title: str, description: str = '', tags: Optional[list[str]] = None, internal: bool = False) -> dict[str, Any]
Publish this pipeline to the marketplace. Parameters
title
str
required
Marketplace listing title.
description
str
default:"''"
Marketplace listing description.
tags
Optional[list[str]]
default:"None"
Optional list of tags.
internal
bool
default:"False"
If True, publish as internal-only.
Returns
returns
dict[str, Any]
\{"status": "success", "id": "\<marketplace_object_id>"\}

unpublish

Pipeline.unpublish(marketplace_id: str) -> dict[str, Any]
Remove this pipeline from the marketplace. Parameters
marketplace_id
str
required
The marketplace listing ID to remove.
Returns
returns
dict[str, Any]
\{"status": "success"\}

move_to_folder

Pipeline.move_to_folder(folder_id: str) -> dict[str, Any]
Move the pipeline to a different folder. Parameters
folder_id
str
required
Target folder ID.
Returns
returns
dict[str, Any]
\{"status": "success"\}

Versioning

revert

Pipeline.revert(version: str) -> dict[str, Any]
Revert pipeline to a specific version. .. note:
The local `Pipeline` object is **not** updated after this call.
Call `Pipeline.fetch(id=...)` to get the reverted state.
Parameters
version
str
required
Semantic version string, e.g. "1.2.3".
Returns
returns
dict[str, Any]
\{"status": "success"\}

Serialization

to_dict

Pipeline.to_dict() -> dict
Convert Pipeline to ObjectInfo dict format for use in other nodes. Returns
returns
dict

from_json

Pipeline.from_json(data: dict) -> Pipeline
Parameters
data
dict
required
Returns
returns
Pipeline

serialize_inputs

Pipeline.serialize_inputs(inputs: dict[str, Any]) -> dict[str, Any]
Parameters
inputs
dict[str, Any]
required
Returns
returns
dict[str, Any]

Types

Configuration objects, response shapes, and enums used by the methods above.

BumpLevel

Members
  • PATCH = "patch"
  • MINOR = "minor"
  • MAJOR = "major"

RunResult

Response from :meth:Pipeline.run / :meth:Pipeline.arun. Fields
run_id
str
required
outputs
dict[str, Any]
required
trace_id
str
required

RunStatus

Response from :meth:Pipeline.run_status / :meth:RunHandler.run_status. Fields
task_id
str
required
status
Literal[in_progress, completed, failed]
required
error
str
required
result
dict[str, Any]
required

StartResult

Response from :meth:Pipeline.start background run initiation. Fields
task_id
str
required
trace_id
str
required