Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt

Use this file to discover all available pages before exploring further.

What this builds. A four-stage pipeline (combine_texttext_formatter → LLM) streamed via pipeline.stream(...) so each node’s intermediate output shows up before the final result chunk. You’ll end up with. A live stream of [stream] lines (one per node) followed by a final [result] chunk, plus a side-by-side pipeline.run(...) for comparison showing only the final outputs.
from typing import Any, cast

from vectorshift.pipeline import Pipeline

# ── Build a multi-node pipeline ──────────────────────────────────
PIPELINE_NAME = "intermediate_results_example"
try:
    pipeline = Pipeline.fetch(name=PIPELINE_NAME)
    print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
    print(f"Error fetching pipeline: {e}")
    pipeline = Pipeline.new(name=PIPELINE_NAME)
    print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")

# Two text inputs
topic = pipeline.add(name="topic", id="topic").input(input_type="string")
style = pipeline.add(name="style", id="style").input(input_type="string")

# Combine them into a single prompt string
combined = pipeline.add(name="combine", id="combine").combine_text(
    text=[topic.text, style.text],
)

# Uppercase the combined text (so we can see the formatter's intermediate output)
formatted = pipeline.add(name="formatter", id="formatter").text_formatter(
    text=combined.processed_text,
    formatter="To Uppercase",
)

out1 = pipeline.add(name="output_1", id="output_1").output(
    output_type="string", value=formatted.output
)

# Feed into an LLM
llm = pipeline.add(name="llm", id="llm").llm(
    provider="openai",
    model="gpt-4o",
    stream=True,
    system="You are a creative writer. Respond in 2-3 sentences.",
    prompt=formatted.output,
)

# Final output
out = pipeline.add(name="output_0", id="output_0").output(
    output_type="string", value=llm.response
)

pipeline.save(deploy=True)
print("Pipeline created and deployed.\n")

# ── Run with streaming to see intermediate results ───────────────
# Intermediate node outputs (combine, formatter, llm) arrive as
# "stream" chunks before the final "result" chunk.
print("=== Streaming with intermediate results ===\n")
for chunk in pipeline.stream(
    inputs={"topic": "black holes", "style": "Write a haiku about:"},
):
    if chunk.type == "stream":
        print(
            f"  [stream] {chunk.output_name}: {str(chunk.output_value)[:200]}",
            flush=True,
        )
    elif chunk.type == "result":
        print(f"\n  [result] status={chunk.status}, run_id={chunk.run_id}")
        for key, value in chunk.outputs.items():
            print(f"    {key}: {str(value)[:200]}")

# ── Compare: non-streaming run (no intermediate results visible) ─
print("\n=== Normal run (final outputs only) ===")
result = cast(
    dict[str, Any],
    pipeline.run(
        inputs={"topic": "black holes", "style": "Write a haiku about:"},
    ),
)
for key, value in result.get("outputs", {}).items():
    print(f"  {key}: {str(value)[:200]}")

Expected output

Pipeline created: id=..., branch_id=...
Pipeline created and deployed.

=== Streaming with intermediate results ===

  [stream] combine: Write a haiku about: black holes
  [stream] formatter: WRITE A HAIKU ABOUT: BLACK HOLES
  [stream] llm: ...
  [result] status=success, run_id=...
    output_1: WRITE A HAIKU ABOUT: BLACK HOLES
    output_0: ...

=== Normal run (final outputs only) ===
  output_1: WRITE A HAIKU ABOUT: BLACK HOLES
  output_0: ...
Intermediate results only appear in the streamed response — pipeline.run(...) returns just the final outputs.

See also

Streaming

Plain token streaming from a single LLM node.

Streaming with chunks

Typed StreamChunk parsing for the same idea.

Pipeline reference

Full method surface.