Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt

Use this file to discover all available pages before exploring further.

import json
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True,  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0", value=llm_node.response, output_type="stream<string>"
)

# Create and save the pipeline
PIPELINE_NAME = "streaming-llm-pipeline-1"
try:
    pipeline = Pipeline.fetch(name=PIPELINE_NAME)
    print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
    print(f"Error fetching pipeline: {e}")
    pipeline = Pipeline.new(
        name=PIPELINE_NAME, nodes=[input_node, llm_node, output_node]
    )
    print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue
Source: examples/pipelines/streaming.py in the SDK repo.