Skip to main content
What this builds. A minimal streaming LLM pipeline (input_0llm_nodeoutput_0 with stream<string> output) consumed via pipeline.run(..., stream=True), which yields raw SSE byte chunks. You’ll end up with. A live, token-by-token printout of the model’s response on stdout — exactly the same UX as a chat app’s typing animation.
import json
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True,  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0", value=llm_node.response, output_type="stream<string>"
)

# Create and save the pipeline
PIPELINE_NAME = "streaming-llm-pipeline-1"
try:
    pipeline = Pipeline.fetch(name=PIPELINE_NAME)
    print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
    print(f"Error fetching pipeline: {e}")
    pipeline = Pipeline.new(
        name=PIPELINE_NAME, nodes=[input_node, llm_node, output_node]
    )
    print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue

Expected output

Pipeline created: id=..., branch_id=...
Once upon a time, there was a brave adventurer named ...
The output is the LLM response, printed incrementally with no newline between tokens. For a typed-chunk API instead of raw SSE parsing, see streaming-with-chunks.

See also

Streaming (async)

async for chunk in pipeline.astream(...).

Streaming with chunks

Typed StreamChunk objects instead of raw SSE.

Pipeline reference

Full method surface.