Demonstrates:Documentation Index
Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt
Use this file to discover all available pages before exploring further.
- ListenNode in button variant with multiple button choices
- Wiring downstream nodes to button paths via dependencies
- Merging button branches back together with MergeNode
from vectorshift.pipeline import Pipeline
from vectorshift.request import VectorshiftApiError
PIPELINE_NAME = "Button Node Example"
try:
pipeline = Pipeline.fetch(name=PIPELINE_NAME)
print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
print(f"Error fetching pipeline: {e}")
pipeline = Pipeline.new(name=PIPELINE_NAME)
print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")
try:
# --- Conversational pipeline needs a start_flag instead of input nodes ---
start = pipeline.add(name="start", id="start").start_flag()
# --- Button prompt ---
talk = pipeline.add(name="talk", id="talk").talk(
variant="message",
content="What would you like to do?",
dependencies=[start.complete],
)
# --- Button node: present choices to the user ---
buttons = pipeline.add(name="buttons", id="buttons").listen(
variant="button",
buttons=[
{"name": "Summarize", "id": "button_1"},
{"name": "Translate", "id": "button_2"},
{"name": "Explain", "id": "button_3"},
],
processed_outputs={
"button_1": "path",
"button_2": "path",
"button_3": "path",
},
dependencies=[talk.complete],
)
# --- Downstream nodes gated by button selection ---
summarize_msg = pipeline.add(name="summarize_msg", id="summarize_msg").text(
text="You chose to summarize.",
dependencies=[buttons.button_1],
)
translate_msg = pipeline.add(name="translate_msg", id="translate_msg").text(
text="You chose to translate.",
dependencies=[buttons.button_2],
)
explain_msg = pipeline.add(name="explain_msg", id="explain_msg").text(
text="You chose to explain.",
dependencies=[buttons.button_3],
)
# --- Merge the button branches and respond ---
merge = pipeline.add(name="merge", id="merge").merge(
function="first",
type="string",
fields=[summarize_msg.text, translate_msg.text, explain_msg.text],
)
pipeline.add(name="respond", id="respond").talk(
variant="message",
content=merge.output,
)
pipeline.save()
print(f"Pipeline saved: id={pipeline.id}, branch_id={pipeline.branch_id}")
except VectorshiftApiError as e:
print(f"API error: {e.status_code} {e.method} {e.endpoint}")
print(f"Message: {e.error_message}")
Source:
examples/pipelines/button_node.py in the SDK repo.