Link

Argo Workflow on SQLFLow

Motivations

SQLFlow translates a SQL program, perhaps with extended SQL syntax for AI, into a workflow. Argo is a Kubernetes native workflow engine. When deploying SQLFlow on Kubernetes, SQLFlow leverages Argo to do workflow management.

When SQLFLow server receives a gRPC Run request that contains a SQL program, it:

  1. Translates the SQL program to an Argo workflow YAML file.
  2. Submits the YAML file to Kubernetes and receives an Argo workflow ID. <---(1)
  3. Returns the workflow ID as Job ID to the client.

When SQLFLow server receives a gRPC Fetch request contains a job ID, it:

  1. Looks up the associated Argo workflow and fetches most recent logs. <---(2)
  2. Returns the fetched log content and offset as FetchResponse to the client.

The package pkg/argo contains two functions Submit and Fetch corresponding to the above steps marked (1) and (2) respectively.

We expect the user to call Submit once then keep calling Fetch until completion. For example,

func WorkflowDemo() {
  wfJob := Submit(argoYAML)
  req := newFetchRequest(wfJob)
  for {
    res := Fetch(req)
    fmt.Println(res.Logs)
    if isComplete(res) {
      break
    }
    req = res.NewFetchRequest
    time.Sleep(time.Second)
  }
}

API Design

In pkg/argo, we design Submit as Submit(argoYAML string) *Job, where

  • argoYAML describes the Argo workflow in YAML format.
  • Job contains the Argo workflow ID. The function calls kubectl create to submit the workflow, captures its standard output, and extracts the workflow ID.

For example, when submitting the argoYAML, the function returns steps-xxxx as the workflow ID.

We design Fetch as Fetch(request FetchRequest) FetchResponse where

  • FetchRequest contains the workflow ID and the current fetching status.
  • FetchResponse contains the most recent logs and refreshed FetchRequest.

Implementation

Submit API

We implement the Submit API as simple as the following logic:

func Submit(argoYAML string) *Job {
  workflowID, err := createWorkflowFromYAML(argoYAML)
  return newJob(workflowID)
}

where

  • createWorkflowFromYAML calls Kubernetes Create Resource API to submit the workflow YAML file.

  • newJob packages the workflowID into a Job structure:

      message Job {
        string id = 1;
      }
    

Fetch API

We implement the Fetch API as the following pseudo-code:

func Fetch(req *FetchRequest) *FetchResponse {
  // kubectl get workflow ...
  wf := getWorkflow(req)

  // return the next step ID if necessary
  stepID := req.stepID

  // kubectl get pod ...
  pod := getPod(wf, stepID)

  // return step phase logs if the phase changed
  stepPhaseLogs := fetchStepPhaseLogs(wf, stepID)

  // append step phase status logs to Response
  responses := []pb.Response{&Message{stepPhaseLogs}}

  eof := false // True for end of job

  if podComplete(pod) {
    // if the current step execute a standard SQL, the query result would be output
    // as protobuf message string format with prefix SQLFLOW_PROTOBUF:
    //
    // SQLFLOW_PROTOBUF: head:<column_names:"col1" column_names:"col2" column_names:"col3" ... >
    // SQLFLOW_PROTOBUF: row:<data:<type_url:"type.googleapis.com/google.protobuf.DoubleValue" value:"\t\232\231\231\231\231\231\031@" >
    // ...
    responses = append(responses, unmarshalResponseFromPodLogs(pod))
    if isLastStep(wf, stepID) {
      eof = true
    } else {
      stepID = nextStep(wf, stepID)
    }
  }

  return newFetchResponse(responses, newUpdatedFetchRequest(wf, stepID), eof)
}

where

  • the FetchRequest and FetchResponse structure would as the following:

      message FetchRequest {
        Job job = 1;
        // the following fields keep the fetching state
        string step_id = 2;
        string step_phase = 3;
      }
    
      message Response {
        oneof response {
            Head head = 1;
            Row row = 2;
            Message message = 3;
            EndOfExecution eoe = 4;
            Job job = 5;
        }
      }
    
      message FetchResponse {
        message Responses {
          repeated Response responses = 1;
        }
        FetchRequest updated_fetch_since = 2;
        bool eof = 2;
      }
    
  • getPod calls Kubernetes Read Pod API to read the specified Pod resource.
  • getWorkflow calls Kubernetes Read Resource API to read the specified Argo workflow resource which is a CRD on Kubernetes.
  • fetchStepPhaseLogs returns the format step phase logs like:

      Step [1/3] Execute Code: echo hello1
      Step [1/3] Log: http://localhost:8001/workflows/default/steps-bdpff?nodeId=steps-bdpff-xx1
      Step [1/3] Status: Pending
      Step [1/3] Status: Running
      Step [1/3] Status: Succeed/Failed
      Step [2/3] Execute Code: echo hello2
      Step [2/3] Log: http://localhost:8001/workflows/default/steps-bdpff?nodeId=steps-bdpff-xx2
    

    the log URL can be the log panel on Argo UI.

  • unmarshalResponseFromPodLogs unmarshal protobuf message from Pod logs as following pseudo-code:

    ``` go func unmarshalResponseFromPodLogs(pod Pod) ([]pb.Response, error) { responses := []*pb.Response{} offset := “” logs, newOffset := k8s.readPodLogs(pod.Name, offset) while(!isNoMoreLogs(pod, offset, newOffset)) { offset = newOffset for _, line := range(logs) { // using prefix to identify the protobuf string format can avoid // performance degradation caused by unmarshaling all logs. if strings.HasPrefix(SQLFlowProtobufPrefix, line) { res := new(pb.Response) // Unmarshal the protobuf message from pod logs to Response Message proto.Unmarshal(line, res) response = append(response, res) } } log, newOffset = k8s.ReadPodLogs(pod.Name, offset) } return responses, nil }