Use this file to discover all available pages before exploring further.
What this builds. A four-stage pipeline (combine_text → text_formatter → LLM) streamed via pipeline.stream(...) so each node’s intermediate output shows up before the final result chunk.
You’ll end up with. A live stream of [stream] lines (one per node) followed by a final [result] chunk, plus a side-by-side pipeline.run(...) for comparison showing only the final outputs.
from typing import Any, castfrom vectorshift.pipeline import Pipeline# ── Build a multi-node pipeline ──────────────────────────────────PIPELINE_NAME = "intermediate_results_example"try: pipeline = Pipeline.fetch(name=PIPELINE_NAME) print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")except Exception as e: print(f"Error fetching pipeline: {e}") pipeline = Pipeline.new(name=PIPELINE_NAME) print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")# Two text inputstopic = pipeline.add(name="topic", id="topic").input(input_type="string")style = pipeline.add(name="style", id="style").input(input_type="string")# Combine them into a single prompt stringcombined = pipeline.add(name="combine", id="combine").combine_text( text=[topic.text, style.text],)# Uppercase the combined text (so we can see the formatter's intermediate output)formatted = pipeline.add(name="formatter", id="formatter").text_formatter( text=combined.processed_text, formatter="To Uppercase",)out1 = pipeline.add(name="output_1", id="output_1").output( output_type="string", value=formatted.output)# Feed into an LLMllm = pipeline.add(name="llm", id="llm").llm( provider="openai", model="gpt-4o", stream=True, system="You are a creative writer. Respond in 2-3 sentences.", prompt=formatted.output,)# Final outputout = pipeline.add(name="output_0", id="output_0").output( output_type="string", value=llm.response)pipeline.save(deploy=True)print("Pipeline created and deployed.\n")# ── Run with streaming to see intermediate results ───────────────# Intermediate node outputs (combine, formatter, llm) arrive as# "stream" chunks before the final "result" chunk.print("=== Streaming with intermediate results ===\n")for chunk in pipeline.stream( inputs={"topic": "black holes", "style": "Write a haiku about:"},): if chunk.type == "stream": print( f" [stream] {chunk.output_name}: {str(chunk.output_value)[:200]}", flush=True, ) elif chunk.type == "result": print(f"\n [result] status={chunk.status}, run_id={chunk.run_id}") for key, value in chunk.outputs.items(): print(f" {key}: {str(value)[:200]}")# ── Compare: non-streaming run (no intermediate results visible) ─print("\n=== Normal run (final outputs only) ===")result = cast( dict[str, Any], pipeline.run( inputs={"topic": "black holes", "style": "Write a haiku about:"}, ),)for key, value in result.get("outputs", {}).items(): print(f" {key}: {str(value)[:200]}")
Pipeline created: id=..., branch_id=...Pipeline created and deployed.=== Streaming with intermediate results === [stream] combine: Write a haiku about: black holes [stream] formatter: WRITE A HAIKU ABOUT: BLACK HOLES [stream] llm: ... [result] status=success, run_id=... output_1: WRITE A HAIKU ABOUT: BLACK HOLES output_0: ...=== Normal run (final outputs only) === output_1: WRITE A HAIKU ABOUT: BLACK HOLES output_0: ...
Intermediate results only appear in the streamed response — pipeline.run(...) returns just the final outputs.