Data Ingestion with TensorFlow eXtended (TFX)

tfx-components

The first step in a ML pipeline is data ingestion which consists of reading data from raw format and formatting it into a binary format suitable for ML (e.g. TFRecord). TFX provides a standard component called ExampleGen which is responsible for generating training examples from different data sources. This article will explain usage of this component in different scenarios:

For an overview of TFX standard components read this post.

To be able to test the code snippets in the rest of this article, make sure TFX is installed (simply pip install tfx) and a runtime context is available. TFX provides the InteractiveContext class to use when running a TFX component (or pipeline) interactively.

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext(
  pipeline_name='mypipeline',
  pipeline_root='.'
  )

Using local data

Generating TFRecord from CSV files

The basic example of using the ExampleGen component to generate TFRecords is with local CSV files as inputs:

from tfx.components import CsvExampleGen
from tfx.utils import dsl_utils

examples = dsl_utils.external_input('data')
example_gen = CsvExampleGen(input=examples, instance_name='ingestion')

context.run(example_gen)

After the component is run successfully, an artifact representing metadata about the run is generated in addition to the TFRecords. We can inspect this artifact as follows:

for artifact in example_gen.outputs['examples'].get():
  print(artifact)

An example output would look like the following example. Among the metadata, notice the notice pipeline name, the eval and train splits. Also, among the metadata is the fingerprint of the original raw data which can be very useful when inspecting what data was given to the pipeline:

Artifact(artifact: id: 3
type_id: 5
uri: "./CsvExampleGen.ingestion/examples/3"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:150828752,xor_checksum:1568937884,sum_checksum:1568937884"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "examples"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "pipeline_name"
  value {
    string_value: "mypipeline"
  }
}
custom_properties {
  key: "producer_component"
  value {
    string_value: "CsvExampleGen.ingestion"
  }
}
custom_properties {
  key: "span"
  value {
    string_value: "0"
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
, artifact_type: id: 5
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)

On disk the resulting TFRecods data would have a structure that looks like this (notice the eval and train splits):

./CsvExampleGen.ingestion/
└── examples
    └── 1
        ├── eval
        │   └── data_tfrecord-00000-of-00001.gz
        └── train
            └── data_tfrecord-00000-of-00001.gz

4 directories, 2 files

Note: by default the root folder of the output TFRecords is CsvExampleGen if the instance name of the component (i.e. the instance_name parameter) is not set.

Generating TFRecord from binary files

With TFX, we can generate TFRecord from binary serialized data using the generic FileBasedExampleGen class. This is done by overriding the component’s executor_class with the right implementation that can ingest the raw data.

For example, to generate TFRecords from a Parquet dataset:

# Write some Parquet formatted data for testing
import pyarrow as pa
import pyarrow.parquet as pq
df = pd.read_csv('data/creditcard.csv')
table = pa.Table.from_pandas(df)
pq.write_table(table, 'parquet_data/creditcard.parquet')

# Import generic file loader component and Parquet-specific executor
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import parquet_executor
from tfx.dsl.components.base.executor_spec import BeamExecutorSpec
from tfx.utils.dsl_utils import external_input

examples = external_input('parquet_data/')
executor_spec = BeamExecutorSpec(parquet_executor.Executor)
example_gen = FileBasedExampleGen(input_base=examples, custom_executor_spec=executor_spec)

context.run(example_gen)

Similarly, to generate TFRecords from an Avro dataset:

# Write some AVRO formatted data for testing
import pandavro as pdx

df = pd.read_csv('data/creditcard.csv')
pdx.to_avro('avro_data/creditcard.avro', df)

# Import generic file loader component and Avro-specific executor
from tfx.components import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import avro_executor
from tfx.utils.dsl_utils import external_input

examples = external_input('avro_data/')
executor_spec = ExecutorClassSpec(avro_executor.Executor)
example_gen = FileBasedExampleGen(input=examples, custom_executor_spec=executor_spec)

context.run(example_gen)

Generating TFRecord from TFRecord files

TFX also let us ingest existing TFRecords (e.g. previously serialised images or text dataset as tf.Example) into a pipeline using the ImportExampleGen component without a need for conversion.

This can be achieved as follows:

from tfx.components import ImportExampleGen
from tfx.utils import dsl_utils

examples = dsl_utils.external_input('tfrecord_data')
example_gen = ImportExampleGen(input=examples)

context.run(example_gen)

Using remote data

Generating TFRecord from cloud storage

In addition to reading local files of differnet format, the ExampleGen component can be used to read files stored on a cloud storage service (e.g. AWS or GCP).

# read from Google storage
examples = dsl_utils.external_input("gs://bucket/path/to/data")
example_gen = CsvExampleGen(input=examples)

# read from AWS S3
examples = dsl_utils.external_input("s3://bucket/path/to/data")
example_gen = CsvExampleGen(input=examples)

Note: to access a private bucket valid credentials of the cloud provider are required. For instance, to access private bucket on GCP you can set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of GCP account credential file (see documentation).

Generating TFRecord from databases

The ExampleGen component has specific implementations for reading from files, currently only BigQuery through and Presto db are supported.

For generating TFRecord examples from Big Query use BigQueryExampleGen component as follows (for testing try public datasets)

from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen

query = "SELECT * FROM <project_id>.<database>.<table_name>"
example_gen = BigQueryExampleGen(query=query)

Note: you will need to set the GOOGLE_APPLICATION_CREDENTIALS environment variable.

Similarly, to read from a Presto database use PrestoExampleGen as follows

# Import PrestoExampleGen and config class PrestoConnConfig
from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

# Create a config object with Presto DB connection information
presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
# Create an example generator for a query
query = "SELECT * FROM <table_name>"
example_gen = PrestoExampleGen(presto_config, query=query)

The prestodb component requires a special package tfx-presto-example-gen to be installed (learn more here)

$ git clone https://github.com/tensorflow/tfx
$ cd tfx/tfx/examples/custom_components/presto_example_gen
$ pip install -e .

Advanced configuration

The ExampleGen component provides two parameters that control how input data should be expected (with input_config parameter) and how the output data should look like (with output_config parameter). For instance, for incremental data we could use input_config, and for train/eval splits we would use output_config.

Splitting

With a SplitConfig we can specify in how many parts the data have to be splits, in the following example we split the input data into TFRecords with a ration of 8:1:1 between training, evaluation and test set.

from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=8),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1),
        example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=1)
    ]))

examples = dsl_utils.external_input('data')
example_gen = CsvExampleGen(input=examples, output_config=output)

context.run(example_gen)

The resulting TFRecods data would have a structure that looks like this with a dedicated folder per split (i.e. eval, test and train):

CsvExampleGen/
└── examples
    └── 1
        ├── eval
        │   └── data_tfrecord-00000-of-00001.gz
        ├── test
        │   └── data_tfrecord-00000-of-00001.gz
        └── train
            └── data_tfrecord-00000-of-00001.gz

5 directories, 3 files

Note: TFX uses a default split of ratio 2:1 between train and eval outpout if no output configuration is provided.

We can also preserve an existent input data split using Input.Split config which we pass to the component input_config parameter as follows:

from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

input = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(name='train', pattern='train/*'),
  example_gen_pb2.Input.Split(name='eval', pattern='eval/*'),
  example_gen_pb2.Input.Split(name='test', pattern='test/*')
])

examples = external_input('data')
example_gen = CsvExampleGen(input=examples, input_config=input)

Spanning

There are cases where the input data arrives periodically and is supposed to be used to train a model incrementally. For example, in the following folder strucutre each input-{SPAN} folder represents a subset of the dataset that is created periorically and have to be ingested as it comes.

└── data
    ├── input-0
    │   └─ data.csv
    ├── input-1
    │   └─ data.csv
    └── input-2
        └─ data.csv
...

The ExampleGen provides a feature called spanning that can be used for this use case. We can configure the input_config parameter so that it takes Input.Split with the pattern of the input data as follows:

input = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(pattern='input-{SPAN}/*')
])

examples = external_input('data')
example_gen = CsvExampleGen(input=examples, input_config=input)
context.run(example_gen)

If the input data comes with a train/eval split, we can ingest it as follows by just updating the data folders pattern:

input_cfg = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(name='train', pattern='input-{SPAN}/train/*'),
  example_gen_pb2.Input.Split(name='eval', pattern='input-{SPAN}/eval/*')
])
...

If the data folders contain date information, we can use {YYYY} to match years, {MM} to match months and {DD} to match dates. For instance, to ingest data from folders like input-2020-01-01 we can use the following span configuration:

input = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(name='train', pattern='input-{YYYY}-{MM}-{DD}/train/*'),
  example_gen_pb2.Input.Split(name='eval', pattern='input-{YYYY}-{MM}-{DD}/eval/*')
])

examples = external_input('data')
example_gen = CsvExampleGen(input=examples, input_config=input)

Versioning

In addition to the span and date information, the input data can be versioned and TFX provides a pattern the properly handle with using {VERSION}. Here is an configuration example that can be used to ingestion train/eval data with shpae like root-folder/span-1/version-0:

input = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(name='train', pattern='span-{SPAN}/version-{VERSION}/train/*'),
  example_gen_pb2.Input.Split(name='eval', pattern='span-{SPAN}/version-{VERSION}/eval/*')
])

examples = external_input('data')
example_gen = CsvExampleGen(input=examples, input_config=input)