Documentation Index
Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt
Use this file to discover all available pages before exploring further.
Builds a multi-node pipeline (input → combine_text → text_formatter → LLM → output)
and streams it to see intermediate node outputs as they arrive.
Note: intermediate results are delivered via SSE streaming — the server processes
them when send_intermediate_results=True (the default), but they are only visible
in the streaming response, not the regular run() response.
from typing import Any, cast
from vectorshift.pipeline import Pipeline
# ── Build a multi-node pipeline ──────────────────────────────────
PIPELINE_NAME = "intermediate_results_example"
try:
pipeline = Pipeline.fetch(name=PIPELINE_NAME)
print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
print(f"Error fetching pipeline: {e}")
pipeline = Pipeline.new(name=PIPELINE_NAME)
print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")
# Two text inputs
topic = pipeline.add(name="topic", id="topic").input(input_type="string")
style = pipeline.add(name="style", id="style").input(input_type="string")
# Combine them into a single prompt string
combined = pipeline.add(name="combine", id="combine").combine_text(
text=[topic.text, style.text],
)
# Uppercase the combined text (so we can see the formatter's intermediate output)
formatted = pipeline.add(name="formatter", id="formatter").text_formatter(
text=combined.processed_text,
formatter="To Uppercase",
)
out1 = pipeline.add(name="output_1", id="output_1").output(
output_type="string", value=formatted.output
)
# Feed into an LLM
llm = pipeline.add(name="llm", id="llm").llm(
provider="openai",
model="gpt-4o",
stream=True,
system="You are a creative writer. Respond in 2-3 sentences.",
prompt=formatted.output,
)
# Final output
out = pipeline.add(name="output_0", id="output_0").output(
output_type="string", value=llm.response
)
pipeline.save(deploy=True)
print("Pipeline created and deployed.\n")
# ── Run with streaming to see intermediate results ───────────────
# Intermediate node outputs (combine, formatter, llm) arrive as
# "stream" chunks before the final "result" chunk.
print("=== Streaming with intermediate results ===\n")
for chunk in pipeline.stream(
inputs={"topic": "black holes", "style": "Write a haiku about:"},
):
if chunk.type == "stream":
print(
f" [stream] {chunk.output_name}: {str(chunk.output_value)[:200]}",
flush=True,
)
elif chunk.type == "result":
print(f"\n [result] status={chunk.status}, run_id={chunk.run_id}")
for key, value in chunk.outputs.items():
print(f" {key}: {str(value)[:200]}")
# ── Compare: non-streaming run (no intermediate results visible) ─
print("\n=== Normal run (final outputs only) ===")
result = cast(
dict[str, Any],
pipeline.run(
inputs={"topic": "black holes", "style": "Write a haiku about:"},
),
)
for key, value in result.get("outputs", {}).items():
print(f" {key}: {str(value)[:200]}")
Source: examples/pipelines/intermediate_results.py in the SDK repo.