Workflow Package
Motivation
SQLFlow translates a SQL program, perhaps with extended SQL syntax for AI, into a workflow. Tekton/Argo are Kubernetes native workflow engine when deploying SQLFlow on Kubernetes, SQLFlow leverages Argo/Tekton to do the workflow management.
SQLFlow supports Argo/Tekton as the workflow backend and maybe more in the future. It’s different to communicate with the theses workflow engine, they are different CRD on Kubernetes, and they have different YAML spec, so it’s necessary to organize a separate package workflow to communicate the workflows with an uniform interface.
Design
To implement the above motivation, the workflow package should include the following functionalities:
CodeGen: Go interface to generate the Fluid/Argo program to generate workflow YAML.Submit/Fetch: APIs to submit the workflow and trace the status of the workflow step.
We propose the following code structure:
workflow/
|-workflow.go # workflow interface
|-argo/ # submit/trace argo workflow via k8s API
|-tekton/ # submit/trace tekton workflow via k8s API
`-codegen/
|-fluid/ # generate Tekton YAML using Fluid
`-couler/ # generate Argo YAML using Couler
Workflow Codegen
Couler/Fluid lets users write Argo and Tekton workflows in Python rather than YAML. Also, the Python code is easier to read and code review.
SQLFlow implements Fluid Codegen to translate the []ir.SQLFlowStmt into Python code, the interface can be like:
type Codegen interface {
GenCode([]ir.SQLFlowStmt) string
GenYAML(string) string
}
GenCodeinputs a SQL program and outputs the Fluid program in Python.GenYAMLcompiles the Fluid program and outputs the workflow YAML.
Workflow Interface
type Workflow interface {
Submit(yaml string) (workflowID string, err error)
Fetch(FetchRequest) FetchResponse
}
func New(backend string) (Codegen, Workflow, error) {
if backend == "tekton" {
return NewCodegen("fluid"), NewWorkflow("tekton"), nil
}
}
Submitsubmits the input YAML content to a Kubernetes cluster, and returns the workflow.Fetchfetches the step status and query result which packaged inFetchResponse.Newreturns the corresponding Codegen and Workflow implementation.
Execution Example
// New codegen and workflow operator
cg, wf, e := workflow.New("tekton")
// generate YAML file
py := cg.GenCode(SQLProgram)
yaml := cg.GenYAML(py)
// submit the workflow YAML and retrieval workflow step status
wfID := wf.Submit(yaml)
fetchRequest := NewFetchRequest(wfID)
for {
response := wf.Fetch(fetchRequest)
// deal with response.Message, response.Rows, e.g.
// Eof means the workflow completed, break the loop
if response.Eof {
break
}
fetchRequest = response.updated_request_since
}