Link Search Menu Expand Document

Preprocess with Analysis Result Design

This document describes the design about how to use analysis result while preprocessing feature inputs.

Motivation

Before preprocessing the feature inputs, we need to analyze the training dataset to collect the feature statistical results. For example, we need the mean and standard deviation to normalize a numeric value, vocabulary to lookup a string value to an integer id and boundary to discretize a numeric value. Using SQLFlow, the training dataset is usually a table saved in MySQL or MaxCompute and other databases, so we can use SQL to analyze the training table. During data transformation pipeline, we may launch a pod to analyze the training table and then submit the ElasticDL training job. So, the design is to solve how to pass the analysis results into the pods of an ElasticDL training job.

Define preprocess layers with analysis result

1. Persist the analysis result collected in the analysis pod

For MySQL or MaxCompute table, we can use SQL to analyze each column. For example, the table is

age education marital
34 Master Divorced
54 Doctor Never-married
42 Bachelor Never-married
49 Bachelor Divorced

For numeric column, we can get the min, max, mean, standard deviation and bucket boundaries using

SELECT
    MIN(age) as age_min,
    MAX(age) as age_max,
    AVG(age) AS age_avg,
    STDDEV(age) AS age_stddev,
    PERCENTILE(age, ARRAY(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) AS age_boundaries
FROM ${training_table}

The SQL expression use MaxCompute SQL syntax and PERCENTILE is a function in MaxCompute SQL

For feature to hash with bucket size, we can get the count of distinct values by

SELECT count(distinct(marital)) AS martial_distinct_count FROM ${training_table}

For feature to lookup with vocabulary, we can get the vocabulary by

SELECT value FROM (
    SELECT education AS value, count(education) AS _count
    FROM {training_table}
    GROUP BY education
    ORDER BY _count DESC
)
WHERE _count >= {threshold};

SELECT value FROM (
    SELECT martial AS value, count(martial) AS _count
    FROM {training_table}
    GROUP BY martial
    ORDER BY _count DESC
)
WHERE _count >= {threshold};

The WHERE _count >= {threshold} will filter the values whose count is less than threshold to avoid overfitting.

Besides vocabulary, other analysis results are a number or a list of number like bucket boundaries. So we can save them into a table like:

feature_stats value
age_min 10
age_max 90
age_mean 44.75
age_std_dev 56.6875
age_bucket_boundries 30,40,50
martial-status-count 2

Because the vocabulary size may be huge, we cannot save it into a record like:

feature_stats value
education_vocab Master,Doctor,Bachelor
martial_vocab Divorced,Never-married

So, we save the vocabulary into a column and each record has an element, like

education martial
Master Divorced
Doctor Never-married
Bachelor  

After analysis, we get two tables with the analysis results. One is the statistics table which saves the mean, standard deviation, bucket boundaries and distinct count. And another is vocabulary table which saves the vocabulary.

Pass analysis results to build a model in training pods

For the values in the statistics table, we can write them into environment variables for the training pod to build model. For example:

envs='_age_mean=44.75,_age_std=56.6875,_age_boundaries="30,40,50"'

In preprocessing layers, we can get the statistics from environment variables like:

import os
from elasticdl_preprocessing.layers import Discretization
age_boundaries = list(
    map(float, os.getenv("_age_boundaries", "30,50").split(","))
)
layer = Discretization(bins=age_boundaries)

Further, we can provide an analyzer_utils in elasticdl_preprocessing to get the statistics from environment variables like:

def get_bucket_boundaries(feature_name, default_value):
    env_name = "_" + feature_name + "_boundaries"
    boundaries = os.getenv(env_name, None)
    if boundaries is None:
        return default_value
    else:
        return list(map(float, boundaries.split(",")))

def get_distinct_count(feature_name, default_value):
    env_name = "_" + feature_name + "_distinct_count"
    count = os.getenv(env_name, None)
    if count is None:
        return default_value
    else:
        return int(count)

Using the default values in analyzer_utils, users can debug the model without analysis. So, we can define the preprocessing layers like:

import os
from elasticdl_preprocessing.layers import Discretization, Hashing
from elasticdl_preprocessing import analyzer_utils

discretize_layer = Discretization(
    bins=analyzer_utils.get_bucket_boundaries(
        feature_name="age", default_value=[10, 30])
)
hash_layer = Hashing(
    num_bins=analyzer_utils.get_distinct_count(
        feature_name="martial", default_value=100
    )
)

For the values in the vocabulary table, we cannot save the vocabulary into environment variables because the vocabulary size may be huge. However, we can save the vocabulary into the shared storage like glusterfs and write the path into the environment variables of a training pod. After the training job completes, we can clear the vocabulary files in the storage.

envs="_education_vocab=/testdata/elasticdl/vocabulary/education.txt"
def get_vocabulary(feature_name, default_value):
    env_name = feature_name + "_vocab"
    vocabulary_path = os.getenv(env_name, None)
    if vocabulary_path is None:
        return default_value
    else:
        return vocabulary_path

lookup_layer = IndexLookup(
    vocabulary=get_vocabulary("education", ["Master"])
)