Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.vectorshift.ai/llms.txt

Use this file to discover all available pages before exploring further.

What this builds. An async streaming pipeline that exposes both a plain string output and a stream<string> output. The script iterates pipeline.astream(...) and prints tokens as they arrive, then the final outputs once the run completes. You’ll end up with. Live token streaming inside an asyncio event loop, terminating with a Final: {...} dict.
import asyncio

from vectorshift.pipeline import Pipeline


PIPELINE_NAME = "stream_async_example"


async def main():
    try:
        pipeline = await Pipeline.afetch(name=PIPELINE_NAME)
        print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")
    except Exception as e:
        print(f"Error fetching pipeline: {e}")
        pipeline = await Pipeline.anew(name=PIPELINE_NAME)
        print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")
    inp = pipeline.add(name="input_0", id="input_0").input(input_type="string")
    llm = pipeline.add(name="llm", id="llm").llm(
        provider="openai", model="gpt-4o", stream=True, prompt=inp.text
    )
    pipeline.add(name="output_0", id="output_0").output(
        output_type="string", value=llm.response
    )
    pipeline.add(name="output_1", id="output_1").output(
        output_type="stream<string>", value=llm.response
    )
    await pipeline.asave(deploy=True)

    async for chunk in pipeline.astream(
        inputs={"input_0": "Tell me a joke about programming."}
    ):
        if chunk.type == "stream":
            print(chunk.output_value, end="", flush=True)
        elif chunk.type == "result":
            print(f"\n\nFinal: {chunk.outputs}")


asyncio.run(main())

Expected output

Pipeline created: id=..., branch_id=...
Why do programmers prefer dark mode? Because light attracts bugs...

Final: {'output_0': '...', 'output_1': '...'}
StreamChunk has a discriminated type field ("stream" vs "result") — branch on it to separate tokens from the terminal outputs payload.

See also

Streaming

Synchronous SSE consumption.

Streaming with chunks

Same chunk shape, synchronous iteration.

Pipeline reference

Full method surface.