Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt

Use this file to discover all available pages before exploring further.

Creates a streaming LLM pipeline and uses pipeline.stream() which returns typed StreamChunk objects instead of raw SSE lines.
from vectorshift.pipeline import Pipeline, StreamChunk

# Create a streaming LLM pipeline
PIPELINE_NAME = "stream_chunks_example"
try:
    pipeline = Pipeline.fetch(name=PIPELINE_NAME)
    print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
    print(f"Error fetching pipeline: {e}")
    pipeline = Pipeline.new(name=PIPELINE_NAME)
    print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")
inp = pipeline.add(name="input_0", id="input_0").input(input_type="string")
llm = pipeline.add(name="llm", id="llm").llm(
    provider="openai", model="gpt-4o", stream=True, prompt=inp.text
)
out1 = pipeline.add(name="output_1", id="output_1").output(
    output_type="stream<string>", value=llm.response
)
out = pipeline.add(name="output_0", id="output_0").output(
    output_type="string", value=llm.response
)
pipeline.save(deploy=True)

# Stream returns typed StreamChunk objects
print("Streaming response:\n")
for chunk in pipeline.stream(
    inputs={"input_0": "Tell me a story about a brave adventurer in 100 words."}
):
    if chunk.type == "stream":
        print(chunk.output_value, end="", flush=True)
    elif chunk.type == "result":
        print(f"\n\nFinal outputs: {chunk.outputs}")
        print(f"Status: {chunk.status}")
        print(f"Run ID: {chunk.run_id}")
Source: examples/pipelines/streaming_with_chunks.py in the SDK repo.