Use this file to discover all available pages before exploring further.
What this builds. A streaming LLM pipeline consumed via pipeline.stream(...), which yields typed StreamChunk objects with a .type discriminator ("stream" for token chunks, "result" for the terminal outputs payload).
You’ll end up with. A live response in stdout followed by a Final outputs: {...} dict and the run id — no manual SSE parsing required.
from vectorshift.pipeline import Pipeline, StreamChunk# Create a streaming LLM pipelinePIPELINE_NAME = "stream_chunks_example"try: pipeline = Pipeline.fetch(name=PIPELINE_NAME) print(f"Pipeline fetched: id={pipeline.id}, branch_id={pipeline.branch_id}")except Exception as e: print(f"Error fetching pipeline: {e}") pipeline = Pipeline.new(name=PIPELINE_NAME) print(f"Pipeline created: id={pipeline.id}, branch_id={pipeline.branch_id}")inp = pipeline.add(name="input_0", id="input_0").input(input_type="string")llm = pipeline.add(name="llm", id="llm").llm( provider="openai", model="gpt-4o", stream=True, prompt=inp.text)out1 = pipeline.add(name="output_1", id="output_1").output( output_type="stream<string>", value=llm.response)out = pipeline.add(name="output_0", id="output_0").output( output_type="string", value=llm.response)pipeline.save(deploy=True)# Stream returns typed StreamChunk objectsprint("Streaming response:\n")for chunk in pipeline.stream( inputs={"input_0": "Tell me a story about a brave adventurer in 100 words."}): if chunk.type == "stream": print(chunk.output_value, end="", flush=True) elif chunk.type == "result": print(f"\n\nFinal outputs: {chunk.outputs}") print(f"Status: {chunk.status}") print(f"Run ID: {chunk.run_id}")
Pipeline created: id=..., branch_id=...Streaming response:Once upon a time, in a kingdom...Final outputs: {'output_0': '...', 'output_1': '...'}Status: successRun ID: ...
Stream chunks always arrive before the single result chunk — use that ordering to drive UI updates and final-state handling.