Documentation Index
Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt
Use this file to discover all available pages before exploring further.
import json
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode
# Set API key
vectorshift.api_key = 'your api key here'
# Create input node
input_node = InputNode(node_name="input_0")
# Create LLM node that will stream responses
llm_node = LlmNode(
node_name="llm_node",
system="You are a helpful assistant.",
prompt=input_node.text,
provider="openai",
model="gpt-4o-mini",
temperature=0.7,
stream=True, # Enable streaming
)
# Create output node connected to LLM response
output_node = OutputNode(
node_name="output_0", value=llm_node.response, output_type="stream<string>"
)
# Create and save the pipeline
PIPELINE_NAME = "streaming-llm-pipeline-1"
try:
pipeline = Pipeline.fetch(name=PIPELINE_NAME)
print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
except Exception as e:
print(f"Error fetching pipeline: {e}")
pipeline = Pipeline.new(
name=PIPELINE_NAME, nodes=[input_node, llm_node, output_node]
)
print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")
# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}
# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
try:
# Parse the chunk as a JSON line
chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
if chunk_str.startswith('data: '):
json_str = chunk_str[6:] # Remove 'data: ' prefix
data = json.loads(json_str)
if data.get('output_name') == 'output_0':
print(data.get('output_value', ''), end="", flush=True)
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
# If parsing fails, just continue to next chunk
continue
Source:
examples/pipelines/streaming.py in the SDK repo.