Skip to main content
Add these nodes with the pipeline builder: pipeline.add(name="...").<node>(...). Each entry lists the node’s configuration parameters. See the Pipeline reference for add, run, and lifecycle methods.

add_node — Add Node

Add a new node to the conversation.
pipeline.add(name="node").add_node()

ai_routing — AI Routing

Route the execution flow to different paths based on the value of the conditions.
Platform docs: AI Routing
pipeline.add(name="node").ai_routing(provider="anthropic", input_query="...")
Parameters
provider
str
required
The model provider
enable_shared_memory
bool
default:"False"
If enabled, Shared Memory will be included as context for routing decisions
model
str
default:"''"
The specific model for categorization
conditions
list[str]
default:"[]"
input_query
str
required
Specify the message that the AI router will classify
outputs
dict
default:"{}"
shared_memory
str
default:"''"
The shared memory that will be used as context for AI to determine the routing
sampling
SamplingConfig
max_tokens
int
The maximum number of tokens to generate
temperature
float
The temperature of the model
top_p
float
The top-p value

code_execution — Code Execution

Execute code in the language of your choice.
Platform docs: Code Execution
pipeline.add(name="node").code_execution(code="...")
Parameters
code
str
required
function_name
str
default:"'main'"
input_schema
ListType | list[NameTypeValueAny]
default:"[]"
language
int
default:"0"
output_schema
ListType | list[NameType]
default:"[]"
processed_inputs
dict
default:"{}"
processed_outputs
dict
default:"{}"

code_interpreter — Code Interpreter

Execute Python code in a sandboxed environment with file I/O support. Supports spreadsheet analysis, data visualization, and generating artifacts. To read or edit uploaded files, pass them in the files input as an actual array of file references; those files are mounted at /work/inputs/{filename} (curly braces shown for clarity; replace with the exact filename). Continued sessions restore prior inputs and generated files, so work can resume even if Python memory reset. Save output files to /work/outputs/ to have them collected as artifacts; call save_file(…) for durable/versioned user-visible files. Example: files=[“$history.69dd11a49033ae4d8e65ab33”] and pd.read_csv(‘/work/inputs/data.csv’); plt.savefig(‘/work/outputs/chart.png’). You will see stdout and stderr outputs after each execution step. Include an existing session id to keep executing code within the same session.
pipeline.add(name="node").code_interpreter(code="...")
Parameters
code
str
required
files
AcceptsFileList
default:"[]"

condition — Condition

Platform docs: Condition
pipeline.add(name="node").condition()
Parameters
conditions
list[ConditionGroup]
paths
list[str]
outputs
dict

convert_type — Convert Type

Convert value from source type to target type.
pipeline.add(name="node").convert_type(source_type="string", value="...")
Parameters
target_type
str
default:"'int32'"
The type to convert the value to. One of: bool, float, int32, timestamp
source_type
str
required
The type of the value to convert. One of: string
value
str
required
The value to convert

custom_group — Group multiple nodes together

Group multiple nodes together
pipeline.add(name="node").custom_group()

merge — Merge

Recombine paths created by a condition node. Note: if you are not using a condition node, you shouldn’t use a merge node
Platform docs: Merge
pipeline.add(name="node").merge(fields=...)
Parameters
function
str
default:"'first'"
The function to apply to the input fields One of: first, join
type
str
default:"'string'"
The expected type of the input and output fields
fields
ListType | list[Any]
required

nl_to_sql — NL To SQL

Convert natural language queries to SQL queries.
pipeline.add(name="node").nl_to_sql(text="...", db_dialect="MySQL", schema="...")
Parameters
model
str
default:"'gpt-4o'"
text
str
required
db_dialect
str
required
One of: MySQL, PostgreSQL, SQLite3, Snowflake
schema
str
required

set_variable — Set Variable

Set a variable to a new value
Platform docs: Set Variable
pipeline.add(name="node").set_variable(variable_set=..., scope="...", variable_id="...", value="...")
Parameters
variable_set
AcceptsVariableSet
required
scope
str
required
variable_id
str
required
value
str
required

time — Time

Outputs the current time (often connected to LLM node)
pipeline.add(name="node").time()
Parameters
delta_time_unit
str
default:"'Seconds'"
One of: days, hours, minutes, seconds, weeks
delta_value
int
default:"0"
is_positive
str
default:"'+'"
One of: +, -
is_positive_delta
bool
default:"True"
output_format
str
default:"'MM/DD/YYYY'"
One of: MM-DD-YYYY / HH:MM:SS, MM/DD/YYYY, Timestamp
time_node_zone
str
default:"'America/New_York'"
use_delta
bool
default:"False"

wait_node — Wait Node

Pause Pipeline execution for a specified duration
pipeline.add(name="node").wait_node(wait_time=0, wait_time_type="micros")
Parameters
wait_time
int
required
wait_time_type
str
required
One of: micros, milliseconds, nanos, seconds