Link

Data Analysis and Tranformation

The Problem

We want to add data analysis, in addition to data transformation, into the end-to-end machine learning pipeline generated by SQLFlow.

End-to-end machine learning means a pipeline from raw data to model training and applications. With SQLFlow, we propose to describe complex pipelines with SQL in concisely. An essential part of the solution is to describe the processing of raw data into model inputs. SQLFlow provides a COLUMN clause for this purpose.

Currently, SQLFlow converts content in the COLUMN clause into the Python source code of data transformation. The following example would call the Feature Column API tf/feature_column/categorical_column_with_hash_bucket from TensorFlow to convert a string address from the database into a single-element tensor of an integer ID.

SELECT * FROM welfare TO TRAIN DNNRegressor COLUMN hash_bucket(address, 1000) LABEL income;

However, data transform often requires parameters. The above example requires the bucket size, 1000. Other examples include

standardize(age, mean, stdev)
normalize(size, min, max)
categorize(city, city_name_list)

The challenge is – users don’t want to specify these parameters; instead, they want SQLFlow to do the data statistics and derives these parameters from data automatically. So, the above examples become

standardize(age)
normalize(size)
categorize(city)

In the terminology of TFX Transform, the word analysis refers to data statistics that derives the above parameters, given which, the transforming refers to the conversion of raw data into model inputs. This design document is about making SQLFlow support data analysis, in addition to transformation.

The Syntax Extension

Without any syntax extension, users can write SQL statements for data analysis. For example, the following SQL statement works with MySQL and can normalize the field size of table plates.

SELECT 1.00 * (t1.size - t2.size_min) / t2.size_range
FROM plates t1
JOIN
(
    SELECT
        MIN(size) AS size_min,
        MAX(size) - MIN(size) AS size_range
    FROM plates
) t2
ON 1 = 1

Unfortunately, the above code is tricky and hard to read. And users might have to write it multiples time – one before training and one before the prediction – thus doubles the source code complexity.

For the goal of making deep learning more straightforward, we hope our users can write the following statement.

SELECT * FROM plates TO TRAIN DNNRegressor COLUMN normalize(size) LABEL price INTO a_model;

This syntax implies that SQLFlow can convert the above statement into a Python program that does data analysis and data transformation.

To use the trained model to make predictions, we hope the users don’t have to rewrite the COLUMN clause. For example, the following statement assumes that the table new_plates has a field size. The converted Python code should normalize size before sending it to the input of a_mode.

SELECT * FROM new_plates TO PREDICT price USING a_model;

If the user doesn’t want normalization, but the raw value, as the input, she could write the following.

SELECT * FROM new_plates TO PREDICT price USING a_model COLUMN size

Or, if she wants standardization instead of normalization, she could write the following.

SELECT * FROM new_plates TO PREDICT price USING a_model COLUMN standardize(size)

How if the table new_plates doesn’t have the field used to train the model? How if there is no field named size, but a field diameter. We should allow users to use the latter as well.

SELECT * FROM new_plates TO PREDICT price USING a_model COLUM normalize(diameter)

Please be aware that if the user wants normalization, she must write normalize(diameter) instead of diameter. The following example uses the raw value of diameter without normalization.

SELECT * FROM new_plates TO PREDICT price USING a_model COLUMN diameter

The Challenge

From the above examples, we see that a challenge is that the TO TRAIN clause must be able to save the input field names and the data analysis results and transformation steps together with the model, so to make sure that the predictions using the same data transformation as the training.

TensorFlow Transform

TensorFlow Transform is the open source solution for data transform in TensorFlow Extended. Users need write a Python function preprocess_fn to define the preprocess logic. The preprocessing function contains two groups of API calls: TensorFlow Transform Analyzers and TensorFlow Ops. Analyzer will do the statistical work on the training dataset once and convert to constant tensors. And then the statistical value and TensorFlow Ops will make the concrete transform logic as a TensorFlow graph and then convert the data record one by one. The graph will be used for both training and serving.
Let’s take normalizing (min-max normalization) the column value capital_gain in census income dataset for example. The following is the preprocess_fn definition with TensorFlow Transform:

import tensorflow_transform as tft

def preprocess_fn(inputs):
    outputs = inputs.copy()
    outputs["capital_gain"] = tft.scale_to_0_1(inputs["capital_gain"])
    return outputs

From users’ perspective, SQLFlow users prefer to write SQL instead of python. It’s not user-friendly if we integrate TF Transform with SQLFlow directly.

Internal System

The feature engineering library in the internal system is configuration driven. It contains some primitive transform ops and users compose the transform logic with a configuration file. A part of the parameters in the configuration are the statistical values. Users need to do analysis work on the dataset manually using SQL at first and then complete the configuration. What’s more, the development work of auto analysis is also on-going.

Our Approach

Data transform contains two key parts: analyzer and transformer. Analyzer scans the entire data set and calculates the statistical values such as mean, max, min, etc. Transformer refers the statistical values if any as parameters to build the concrete transform logic. And then it transforms the data records one by one. The transform logic should be consistent between training and inference.
transform_training_inference

From the perspective of SQLFLow, SQL can naturally support statistical work just like the analyzer. Feature column API and keras preprocessing layer can take charge of the transform work as transformer. We plan to use SQL and feature column/keras preprocessing layer together to do the data analysis and transform work.

Since we use SQL to do the analysis work, SQL requires the table schema to be wide - one feature per column. So, we will normalize the table schema at first.

Normalize Table Schema to Be Wide

Wide table means that it only stores one feature or label in one column. It’s more friendly to SQL for data analysis.

Why

Let’s take this analysis work for example: calculate the max of age and mean of education_num.
If it’s a wide table as follows:

age education_num income_category
39 10 0
52 9 1
28 13 0

The SQL statement for analysis is straightforward:

SELECT
    MAX(age) AS age_max,
    AVG(education_num) AS education_num_mean
FROM census_income

Sometimes users may encode multiple feature values as a key-value string and store it in one column of the table, just like the following table:

features income_category
age:39;education_num:10 0
age:52;education_num:9 1
age:28;education_num:13 0

We can’t use SQL directly to do the same analysis work as above.

Proposal

We can provide common tools to normalize the table schema. If the data is stored in MaxCompute table, we can use PyODPS + UDF to complete the task. Please look at the doc flatten_odps_key_value_table for the detailed design.

After normalizing the table schema, we can do data analysis and transformation on this normalized table. The preprocess pipeline is described using SQLFlow statement. The logic can be very flexible and the current synatx of COLUMN clause cannot cover all the scenarios, such as standardize age. We want to design SQLFlow syntax extension to fully express the analysis and transform process elegantly.

SQLFlow Syntax Extension

We can extend the SQLFlow syntax and enrich the COLUMN expression. We propose to add some built-in functions to describe the transform process. We will implement common used functions at the first stage.

Name Transformation Statitical Parameter Input Type Output Type
NORMALIZE(x) Scale the inputs to the range [0, 1]. out = x - x_min / (x_max - x_min) x_min, x_max number (int, float) float64
STANDARDIZE(x) Scale the inputs to z-score subtracts out the mean and divides by standard deviation. out = x - x_mean / x_stddev x_mean, x_stddev number float64
BUCKETIZE(x, num_buckets, boundaries) Transform the numeric features into categorical ids using a set of thresholds. boundaries number int64
HASH_BUCKET(x, hash_bucket_size) Map the inputs into a finite number of buckets by hashing. out_id = Hash(input_feature) % bucket_size hash_bucket_size string, int32, int64 int64
VOCABULARIZE(x) Map the inputs to integer ids by looking up the vocabulary vocabulary_list string, int32, int64 int64
EMBEDDING(x, dimension) Map the inputs to embedding vectors N/A int32, int64 float32
CROSS(x1, x2, …, xn, hash_bucket_size) Hash(cartesian product of features) % hash_bucket_size N/A string, number int64
CONCAT(x1, x2, …, xn) Concatenate multiple tensors representing categorical ids into one tensor. N/A int32, int64 int64

Please check more discussion about CONCAT transform function

Let’s take the following SQLFlow statement for example.

SELECT *
FROM census_income
TO TRAIN DNNClassifier
WITH model.hidden_units = [10, 20]
COLUMN NORMALIZE(capital_gain), STANDARDIZE(age), EMBEDDING(hours_per_week, dimension=32)
LABEL label

It trains a DNN model to classify someone’s income level using the census income dataset. The transform expression is COLUMN NORMALIZE(capital_gain), STANDARDIZE(age), EMBEDDING(hours_per_week, dimension=32). It will normalize the column capital_gain, standardize the column age, and then map hours_per_week to an embedding vector.

Next, Let’s see a more complicated scenario. The following SQL statment trains a wide and deep model using the same dataset.

SELECT *
FROM census_income
TO TRAIN WideAndDeepClassifier
COLUMN
    EMBEDDING(CONCAT(VOCABULARIZE(workclass), BUCKETIZE(capital_gain, num_buckets=5), BUCKETIZE(capital_loss, num_buckets=5), BUCKTIZE(hours_per_week, num_buckets=6)) AS group_1, 8),
    EMBEDDING(CONCAT(HASH(education), HASH(occupation), VOCABULARIZE(martial_status), VOCABULARIZE(relationship)) AS group_2, 8),
    EMBEDDING(CONCAT(BUCKETIZE(age, num_buckets=5), HASH(native_country), VOCABULARIZE(race), VOCABULARIZE(sex)) AS group_3, 8)
    FOR deep_embeddings
COLUMN
    EMBEDDING(group1, 1),
    EMBEDDING(group2, 1)
    FOR wide_embeddings
LABEL label

SQLFlow will convert the COLUMN expression to Python code of data transformation. But it requires some parameters which are derived from the data. So next we will do the analysis work.

Generate Analysis SQL From SQLFlow Statement

SQLFlow will generate the analysis SQL to calculate the statistical value. For this clause COLUMN NORMALIZE(capital_gain), STANDARDIZE(age), the corresponding analysis SQL is as follows:

SELECT
    MIN(capital_gain) AS capital_gain_min,
    MAX(capital_gain) AS capital_gain_max,
    AVG(age) AS age_mean,
    STDDEV(age) AS age_stddev
FROM census_income;

Please check the discussion.

After calculating the statistical data, SQLFlow is able to generate the concrete Python source code for data transform.

Generate Transform Code From SQLFlow Statement

Please check the discussion.

At this moment, we have gotten the full transform code and can prepare for model training. For the clause TO TRAIN DNNClassifier, we will combine the transform code and DNNClassifier from model zoo to the final submitter program.

Combine Transform Code And Model Definition

The model definition in model zoo is a Python class derived from tf.keras.Model. It is compatible with various feature input. Please check the sample model.

The transform code generated from COLUMN clause specifies how to convert the SELECT result into model inputs in the form of tensors.

The submitter pragram will combine these two and feed the output tensor of transform code into the model definition.

Implementation (To Be Improved)

Data transform contains two stages: analyze and transform. In our design, we will do the analysis using SQL as the first step, and generate the feature column definition as the second step. The feature column contains the transform logic and executes along with the model training process.
We choose to convert the transform expression into two steps of the work flow described by Couler: analyze and feature column generation. Couler is a programming language for describing workflows. Its compiler translates a workflow represented by a Python program into an Argo YAML file. The output of feature column generation will be passed to the next model training step.
data_transform_pipeline

Let’s take STANDARDIZE(age) for example, the following figure describes how the data transform pipeline works in detail.

transform_steps

A transform API contains two members: analyzers and feature column template. Analyzer is the statistical operation which needs run at first to complement the whole transform logic. Feature column template is used to build the concrete feature column definition.

The Analyze Step and Feature Column Generation Step are two couler steps. Analyze Result and Generated Feature Column Definition Result are the output of these two couler steps.
In the Analyze step, we will parse the TRANSFORM expression and collect the statistics requirements. It’s a dictionary of {statistic_variable_name} -> tuple({analyze_operation_name}, {column_name_in_source_table}). The SQL generator will generate the analyze SQL expression containing built-in aggregate functions from this dictionary for different data sources such as Hive, MaxCompute and so on. After executing the SQL, the statistical result will be writen to the standard output of the container.
In the feature column generation step, we will format the feature column template with the variable name and the statistical values to get the integral feature column definition for the transform logic.
The generated feature column definitions will be passed to the next couler step: model training. We combine them with the COLUMN expression to generated the final feature column definitions and then pass to the model. Let’s take NUMERIC(STANDARDIZE(age)) for example, the final definition will be numeric_column(‘age’, normalizer_fn=lambda x: x - 18.0 / 6.0)

We plan to implement the following common used transform APIs at the first step. And we will add more according to further requirements.

Name Feature Column Template Analyzer
STANDARDIZE(x) numeric_column({var_name}, normalizer_fn=lambda x : x - {mean} / {std}) MEAN, STDDEV
NORMALIZE(x) numeric_column({var_name}, normalizer_fn=lambda x : x - {min} / {max} - {min}) MAX, MIN
LOG(x) numeric_column({var_name}, normalizer_fn=lambda x : tf.math.log(x)) N/A
BUCKETIZE(x, num_buckets=y) bucketized_column({var_name}, boundaries={percentiles}) PERCENTILE

Further Consideration

In the design above, we generated the concrete feature column definition for data transformation in the Transform stage. The actual transform logic on the raw data executes along with the model training process. Based on this design, we can further consider transforming the raw data and writing the transformed result into a new table in the stage.
After analyzing the data, we construct the TF graph for transform instead of feature column definition and export it to SavedModel. And then we submit a data processing job to transform the raw data by executing UDF with the SavedModel. The whole process is also matched with the TFX pipeline.
This solution can bring the following benifits:

  1. We can reuse the transformed data in the temporary table to execute multipe model training run for different hyperparameter combinations and all the epochs. Data transformation is only executed once.
  2. We can support more flexible transform logic such as inter column calculation. Feature column has some limit on the inter column calculation. Please check the Wiki for more details.

We need figure out the following points for this further solution:

  1. Model Export: Upgrade keras API to support exporting the transform logic and the model definition together to SavedModel for inference. Issue
  2. Transform Execution: We will transform the data records one by one using the transform logic in the SavedModel format and then write to a new table. We also need write a Jar, it packages the TensorFlow library, loads the SavedModel into memory and processes the input data. And then we register it as UDF in Hive or MaxCompute and use it to transform the data.