Documentation Index Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt
Use this file to discover all available pages before exploring further.
What this builds. An async streaming pipeline that exposes both a plain string output and a stream<string> output. The script iterates pipeline.astream(...) and prints tokens as they arrive, then the final outputs once the run completes.
You’ll end up with. Live token streaming inside an asyncio event loop, terminating with a Final: {...} dict.
import asyncio
from vectorshift.pipeline import Pipeline
PIPELINE_NAME = "stream_async_example"
async def main ():
try :
pipeline = await Pipeline.afetch( name = PIPELINE_NAME )
print ( f "Pipeline fetched: id= { pipeline.id } , branch_id= { pipeline.branch_id } " )
except Exception as e:
print ( f "Error fetching pipeline: { e } " )
pipeline = await Pipeline.anew( name = PIPELINE_NAME )
print ( f "Pipeline created: id= { pipeline.id } , branch_id= { pipeline.branch_id } " )
inp = pipeline.add( name = "input_0" , id = "input_0" ).input( input_type = "string" )
llm = pipeline.add( name = "llm" , id = "llm" ).llm(
provider = "openai" , model = "gpt-4o" , stream = True , prompt = inp.text
)
pipeline.add( name = "output_0" , id = "output_0" ).output(
output_type = "string" , value = llm.response
)
pipeline.add( name = "output_1" , id = "output_1" ).output(
output_type = "stream<string>" , value = llm.response
)
await pipeline.asave( deploy = True )
async for chunk in pipeline.astream(
inputs = { "input_0" : "Tell me a joke about programming." }
):
if chunk.type == "stream" :
print (chunk.output_value, end = "" , flush = True )
elif chunk.type == "result" :
print ( f " \n\n Final: { chunk.outputs } " )
asyncio.run(main())
Expected output
Pipeline created: id=..., branch_id=...
Why do programmers prefer dark mode? Because light attracts bugs...
Final: {'output_0': '...', 'output_1': '...'}
StreamChunk has a discriminated type field ("stream" vs "result") — branch on it to separate tokens from the terminal outputs payload.
See also
Streaming Synchronous SSE consumption.
Streaming with chunks Same chunk shape, synchronous iteration.
Pipeline reference Full method surface.