Pipelines

Compose Nodes together to form Pipelines, which you can save, run, and compose with the SDK.

Pipeline objects are created by composing together Nodes. Connections between Nodes that were formed during their initialization as defined by each node's inputs are converted into edges in the pipeline graph. Nodes are also procedurally assigned string IDs. When printed, a Pipeline object lists all its nodes in order; node IDs are used to identify which nodes' outputs are passed as inputs into other nodes.

We support interoperability between no-code and SDK forms of Pipelines: they are equivalent representations.

Note: In the signatures below, the type annotation Node refers to any listed node class.

Construction

vectorshift.pipeline.Pipeline(
    name: str, 
    description: str,
    nodes: list[Node]
)

Create a new Pipeline object consisting of nodes and connections between them. If an output of node n1 is passed as an input into node n2, then there is an edge from n1 to n2 in the graph representing the Pipeline.

A few checks for well-formedness are made upon initialization. First, each node is typechecked: the data types of outputs that were used as arguments for a node's constructors are checked to ensure that they are compatible with the node's expected input data types. If the types are definitely incompatible, an error is thrown. If the types are possibly incompatible (e.g. the given input has data type Any), then a warning is printed. Second, the graph is checked to ensure it contains no cycles. API calls may be made upon construction to retrieve data from nodes referencing objects on the VectorShift platform.

The resulting Pipeline has its overall inputs and outputs defined by the input and output nodes given, analogous to the no-code editor.

Arguments:

  • name: The name of the Pipeline.

  • description: A brief description of the Pipeline's functionality.

  • nodes: A list of all nodes to be included in the Pipeline. They need not be in any order, but should satisfy the checks noted above.

vectorshift.pipeline.Pipeline.fetch(
    pipeline_name: str = None,
    pipeline_id: str = None,
    username: str = None,
    org_name: str = None,
    api_key: str = None,
)

A static method to create a Pipeline object representing an existing Pipeline on the VectorShift platform. The Pipeline should already exist on the VectorShift platform, so that it can be referenced by its name or ID. An API call is made to retrieve relevant Pipeline data, meaning an API key is required.

Arguments:

  • pipeline_name: The name of the Pipeline being represented.

  • pipeline_id: The ID of the Pipeline being represented. At least one of pipeline_id and pipeline_name should be provided. If both are provided, pipeline_id is used to search for the Pipeline.

  • username: The username of the user owning the Pipeline.

  • org_name: The organization name of the user owning the Pipeline, if applicable.

  • api_key: The VectorShift API key to make calls to retrieve the Pipeline data.

Retrieving Information

construction_str()

A method that returns a string mimicking Python code for constructing the Pipeline and its constitutent nodes; useful if you want to decompose a Pipeline into code (to copy and edit). Assigns string IDs to all nodes, and emits Python code that replicates how to construct the Pipeline using the SDK functions. Prints an instantiation of each node as a Python variable named its string ID, followed by a Pipeline constructor.

get_nodes()

A method that returns a dictionary of constituent nodes. Keys are the assigned node string IDs, and values are the Node objects in the Pipeline.

Editing

add_node(node: Node)

A method that adds a node to the pipeline. Note that the inputs to the node are determined by the node's constructor, and the destination of the node's outputs must be determined via other node constructors.

replace_node(
    node_id: str,
    replacement_node: Node, 
    input_map: dict[str, str] = None, 
    output_map: dict[str, str] = None
)

A method to replace a node with another node in-place.

Arguments:

  • node_id: The node ID assigned to the node being replaced in the Pipeline.

  • replacement_node: The Node object to replace the existing node. It will be given the node_id.

  • input_map: An optional mapping of the existing node's input names to the replacement node's input names. If this is not provided, the replacement node's input names should be a subset of the existing node's input names, and the replacement node's inputs are carried over from the existing node's inputs. Otherwise, the keys ofinput_map should be a subset of the existing node's input names, and the replacement node's inputs are given by the mapped names from the existing node's inputs. Edges corresponding to unused inputs in the existing node are dropped.

  • output_map: Analogous to input_map, replacing the existing node's outputs.

delete_node(node_id: str)

A method to delete a node from a Pipeline. Prints warnings for edges that are removed as a result of the deletion.

Arguments:

  • node_id: The node ID assigned to the node being deleted in the Pipeline.

Deployment

First make sure to save your pipeline.

save(
    update_existing: bool = False,
    api_key: str = None, 
)

A method to save or update a Pipeline object to the VectorShift platform.

Arguments:

  • update_existing: Whether or not to save the Pipeline as a new object or replace an existing one. If set to True, the Pipeline should have an ID, and the existing Pipeline with the ID will be replaced with the object's data. If set to False, the ID, if any, is ignored and a new Pipeline object is created with the object's data.

  • api_key: The VectorShift API key.

update(api_key: str = None)

A method to update a Pipeline object in the VectorShift platform. Akin to save with update_existing = True.

run(
    inputs: dict = {}, 
    api_key: str = None, 
)

Run a Pipeline. We currently only support running Pipelines that take text inputs.

Arguments:

  • inputs: A map of input names to corresponding input values for the Pipeline. Should match up with the Pipeline's defined inputs.

  • api_key: Optional API key for authentication if not configured elsewhere.

Running A Pipeline

To run a pipeline through the SDK you first have to identify an existing pipeline that you would like to run. For example, let's say we have a pipeline named wikipedia-question-answering; we can fist use the pipeline fetch method to retrieve the pipeline

pipeline = Pipeline.fetch(pipeline_name = 'wikipedia-question-answering')

Then we can run the pipeline using the run method:

output = pipeline.run(
    inputs = {'question': 'What is the capital of Malaysia?'},
    api_key = YOUR_API_KEY,
)

Running Pipelines Asynchronously

To launch pipeline runs asynchronously in your pipeline ,use the run_async method.

For example, you can concurrently run the wikipedia-question-answering pipeline on a list of questions using asyncio:

import asyncio
from typing import List

async def run_pipeline_async(pipeline, questions: List[str]):
    async_runs = []
    for question in questions:
        async_runs.append(pipeline.run_async(inputs = {"question": question}))
    return await asyncio.gather(*async_runs)

results = asyncio.run(run_pipeline_async(pipline,questions))

Last updated