Build a typed schema with NumberFormat, SingleSelectFormat, and TimestampFormat, then attach a Pipeline and an Agent that fill cells for you.
What this builds. A vendor table whose schema is seeded with three typed columns (vendor: StringFormat, amount: NumberFormat(currency), status: SingleSelectFormat), evolved with add_column / rename_column / delete_column, and finally extended with two AI-generated columns — one filled by a Pipeline, one by an Agent.
You’ll end up with. A working pattern for both the static-schema and AI-fill paths, including a run_and_wait(...) call that triggers the generators on the rows you’ve inserted.
Stage 3 — add an AI-filled column backed by a Pipeline
A PipelineGenerator on a ColumnSpec binds a pipeline to that column. When you call table.run(columns=[...]), the engine runs the pipeline once per row, reading inputs from the other columns and writing the named output into the cell.
from vectorshift.pipeline import Pipelinefrom vectorshift.table import PipelineGenerator# Fetch a pipeline you've built and deployed via the SDK or platform UI.# It should accept `vendor_name: str` and `raw_notes: str` inputs and emit# a `summary: str` output.summarizer = Pipeline.fetch(name="vendor-summarizer")# Add a 'notes' column to capture free-text context for each vendor,# plus a 'summary' column that the pipeline fills.t.add_column("notes", StringFormat())t.add_column( "summary", StringFormat(), generation=PipelineGenerator( pipeline=summarizer, # Keys are the pipeline's input names; values are the table-column # names the engine reads for each row. input_mapping={"vendor_name": "vendor", "raw_notes": "notes"}, output_name="summary", ),)# Populate notes on the existing rows.from vectorshift.table import RelationalOperatort.update_rows({"notes": "fast shipping, premium pricing"}, filters=CompoundFilter.of(FilterCondition("vendor", TableFilterOperator.EQ, "Acme")))t.update_rows({"notes": "slow but cheap"}, filters=CompoundFilter.of(FilterCondition("vendor", TableFilterOperator.EQ, "Beta")))t.update_rows({"notes": "reliable mid-tier vendor"}, filters=CompoundFilter.of(FilterCondition("vendor", TableFilterOperator.EQ, "Gamma")))# Fill the 'summary' column for every row — synchronously.task = t.run_and_wait(columns=["summary"], timeout=120)print(f"6. summary fill: status={task.status} rows_processed={task.rows_processed}")
Stage 4 — add an AI-filled column backed by an Agent
An AgentGenerator swaps the pipeline for a conversational Agent, with optional KnowledgeBase retrieval context. Same run / run_and_wait flow.
from vectorshift.agent import Agentfrom vectorshift.knowledge_base import KnowledgeBasefrom vectorshift.table import AgentGeneratorreviewer = Agent.fetch(name="vendor-risk-reviewer")policies = KnowledgeBase.fetch(name="vendor-policies")t.add_column( "risk", StringFormat(), generation=AgentGenerator( agent=reviewer, knowledge_base=policies, instructions="Classify the vendor as low / medium / high risk based on the notes; cite the relevant policy.", input_mapping={"vendor_notes": "notes"}, ),)task = t.run_and_wait(columns=["risk"], timeout=240)print(f"7. risk fill: status={task.status} rows_processed={task.rows_processed}")# Cleanupt.delete()print("8. Deleted Table.\n\nDone.")
input_mapping direction. Keys are the Pipeline/Agent input names; values are the table-column names the engine reads for each row. So {"vendor_name": "vendor"} means “send the value of the vendor column into the vendor_name input.”
This example assumes vendor-summarizer, vendor-risk-reviewer, and vendor-policies already exist in your account. Build them via the Pipeline / Agent / Knowledge Base SDKs (or the platform UI) first, or swap the names for objects you already have.