如何为 TFX 时间序列创建窗口序列

数据挖掘 时间序列
2022-03-06 14:39:55

我正在使用 uci 开源数据https://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction为时间序列构建 TFX 管道。由于时间序列训练和评估需要数据窗口序列,我在 Trainer.py 模块文件中tf.keras.preprocessing.timeseries_dataset_from_array的函数中使用 APIinput_fn来创建训练和评估窗口,但我得到了一个TypeError: dataset length is infinite.

Trainer.py 模块文件包含以下代码:

from typing import List, Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options

LABEL_KEY = 'Appliances'

_DENSE_FLOAT_FEATURE_KEYS = ['lights', 'T1', 'RH_1', 'T2', 'RH_2', 'T3',
       'RH_3', 'T4', 'RH_4', 'T5', 'RH_5', 'T6', 'RH_6', 'T7', 'RH_7', 'T8',
       'RH_8', 'T9', 'RH_9', 'T_out', 'Press_mm_hg', 'RH_out', 'Windspeed',
       'Visibility', 'Tdewpoint', 'rv1', 'rv2']


def _transformed_name(key):
    return key + '_xf'


def _transformed_names(keys):
    return [_transformed_name(key) for key in keys]


def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn


**def _input_fn(file_pattern: List[Text],
              data_accessor: DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
    import numpy as np
    """Generates features and label for tuning/training.

    Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_transformed_name(LABEL_KEY)),
      tf_transform_output.transformed_metadata.schema)
  
    sequence_generator = tf.keras.preprocessing.timeseries_dataset_from_array(
    dataset, dataset, sequence_length=144, batch_size=1)
      
    
    return sequence_generator**


def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
    """Creates a DNN Keras model for classifying taxi data.

    Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

    Returns:
    A keras Model.
    """
    real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
    ]
    
    model = regressor_model(
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
    
    return model


def regressor_model(deep_columns, dnn_hidden_units):
    """Build a simple keras regressor model.

    Args:
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

    Returns:
    A Deep Keras model
    """
    # Following values are hard coded for simplicity in this example,
    # However prefarably they should be passsed in as hparams.

    # Keras needs the feature definitions at compile time.
    input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
    }

    deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
    for numnodes in dnn_hidden_units:
        deep = tf.keras.layers.Dense(numnodes)(deep)


        output = tf.keras.layers.Dense(1)(deep)

        model = tf.keras.Model(input_layers, output)
        model.compile(
          loss='mean_absolute_error',
          optimizer=tf.keras.optimizers.Adam(lr=0.001),
          metrics=[tf.keras.metrics.MeanAbsoluteError()])
        model.summary(print_fn=absl.logging.info)
    return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
    """Train the model based on given args.

    Args:
    fn_args: Holds args used to train the model as name/value pairs.
    """
    # Number of nodes in the first layer of the DNN
    first_dnn_layer_size = 100
    num_dnn_layers = 4
    dnn_decay_factor = 0.7

    sequence_length = 144
    batch_size = 1

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    
    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, 
                            tf_transform_output, 40)
    
    print('fn_args.train_files:', fn_args.train_files)
    print('fn_args.data_accessor:', fn_args.data_accessor)
    print('tf_transform_output:', tf_transform_output)
    print('train_dataset:')
    #for example in train_dataset.take(1):
    #  print(example)

    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, 
                           tf_transform_output, 40)
    
    print('fn_args.eval_files:', fn_args.eval_files)
    print('fn_args.data_accessor:', fn_args.data_accessor)
    print('tf_transform_output:', tf_transform_output)
    print('eval_dataset:', eval_dataset)

    model = _build_keras_model(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir, update_freq='batch')
    model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

    signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

培训师配置:

trainer = Trainer(
    module_file=os.path.abspath(trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

错误:

--------------------------------------------------------------------------- 
TypeError Traceback (most recent call last) <ipython-input-53-cc5cfdc341eb> in <module>() 
7 train_args=trainer_pb2.TrainArgs(num_steps=10000), 
8 eval_args=trainer_pb2.EvalArgs(num_steps=5000))
 ----> 9 context.run(trainer) 
/usr/local/lib/python3.6/dist-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs) 
65 # __IPYTHON__ variable is set by IPython, see 
66 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython. ---> 67 return fn(*args, **kwargs) 
68 else: 
69 absl.logging.warning(
 /usr/local/lib/python3.6/dist-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args) 
180 telemetry_utils.LABEL_TFX_RUNNER: runner_label, 
181 }): 
--> 182 execution_id = launcher.launch().execution_id 
183 
184 return execution_result.ExecutionResult( 
/usr/local/lib/python3.6/dist-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self) 
203 execution_decision.input_dict, 
204 execution_decision.output_dict, 
--> 205 execution_decision.exec_properties) 
206 
207 absl.logging.info('Running publisher for %s', /usr/local/lib/python3.6/dist-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties) 
65 executor_context) # type: ignore 
66 
---> 67 executor.Do(input_dict, output_dict, exec_properties) 

/usr/local/lib/python3.6/dist-packages/tfx/components/trainer/executor.py in Do(self, input_dict, output_dict, exec_properties) 
217 # Train the model 
218 absl.logging.info('Training model.') 
--> 219 run_fn(fn_args) 
220 
221 # Note: If trained with multi-node distribution workers, it is the user /content/trainer.py in run_fn(fn_args) 
160 
161 train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, 
--> 162 tf_transform_output, 40) 
163 
164 print('fn_args.train_files:', fn_args.train_files) /content/trainer.py in _input_fn(file_pattern, data_accessor, tf_transform_output, batch_size) 
70 
71 dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
 ---> 72 dataset, dataset, sequence_length=144, batch_size=1) 
73 
74 #label_key=_transformed_name(LABEL_KEY) 
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/preprocessing/timeseries.py in timeseries_dataset_from_array(data, targets, sequence_length, sequence_stride, sampling_rate, batch_size, shuffle, seed, start_index, end_index) 
117 """ 
118 # Validate the shape of data and targets --> 
119 if targets is not None and len(targets) != len(data): 
120 raise ValueError('Expected data and targets to have the same number of ' 
121 'time steps (axis 0) but got ' 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/data/ops/dataset_ops.py in __len__(self) 
443 length = self.cardinality() 
444 if length.numpy() == INFINITE: --> 
445 raise TypeError("dataset length is infinite.") 
446 if length.numpy() == UNKNOWN: 
447 raise TypeError("dataset length is unknown.") 

TypeError: dataset length is infinite.
0个回答
没有发现任何回复~