使用 model.fit、model.fit_generator 和 model.train_on_batch 训练 tf.keras.Sequential 模型的预期性能

数据挖掘 喀拉斯 张量流 python-3.x
2022-02-14 20:11:46

我正在使用带有 Tensorflow 后端的 Keras 来训练一个简单的 1D CNN 来检测传感器数据中的特定事件。虽然具有数千万样本的数据很容易以一维浮点数组的形式放入 ram,但显然将数据存储为 N x inputDim 数组需要大量内存,该数组可以传递给 model.fit 进行训练. 虽然我可以使用 model.fit_generator 或 model.train_on_batch 动态生成所需的小批量,但出于某种原因,我观察到 model.fit 和 model.fit_generator 和 model.train_on_batch 之间存在巨大的性能差距,即使所有内容都存储在内存中小批量生成速度很快,因为它基本上只包括重塑数据。因此,我想知道我是否做错了什么,或者这种性能差距是否可以预料。我正在使用带有 3.2 GHz Intel Core i7 处理器(支持多线程的 4 个内核)和 Python 3.6.3 的 CPU 版本的 Tensorflow 2.0。在 Mac Os X Mojave 上。

简而言之,我创建了一个虚拟 python 脚本来重新创建问题,它显示,批量大小为 64 时,使用 model.fit 运行 10 个 epoch 需要 407 秒,使用 model.fit_generator 需要 1852 秒,使用 model.fit_generator 需要 1985 秒。 train_on_batch。CPU 负载分别为 ~220%、~130% 和 ~120%,而且 model.fit_generator 和 model.train_on_batch 实际上相当奇怪,而 model.fit_generator 应该能够并行化小批量创建和模型。 train_on_batch 绝对没有。也就是说,model.fit(具有巨大的内存需求)比其他具有易于管理的内存需求的候选解决方案高出四倍。显然,CPU 负载会随着批大小的增加而增加,总训练时间会减少,但 model.fit 总是最快的,边距至少为 2,直到批大小为 8096。

这种行为是正常的(当不涉及 GPU 时)还是可以/应该做些什么来提高具有合理批量大小的较少内存密集型选项的计算性能?特别是 model.fit_generator 无法提供不错的性能。似乎没有这样的选项可以将所有数据分成可管理的部分,然后以迭代方式运行 model.fit 并不断变化的训练数据。

请注意,提供的虚拟脚本正如其名称所暗示的那样,并且数据量已被修剪,以便使所有三个选项都可行。但是,使用的模型与我实际使用的模型相似(以提供现实情况)。

from tqdm       import tqdm

import numpy as np
import tensorflow as tf

import time
import sys
import argparse

inputData    = None
outputData   = None
batchIndices = None
opts         = None

class DataGenerator(tf.keras.utils.Sequence):

    global inputData
    global outputData
    global batchIndices

    'Generates data for Keras'
    def __init__(self, batchSize, shuffle):
        'Initialization'
        self.batchIndices = batchIndices
        self.batchSize    = batchSize
        self.shuffle      = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int( np.floor( inputData.size / self.batchSize ) )

    def __getitem__(self, index):
        'Generate one batch of data'

        # Generate data
        X, y = self.__data_generation(self.indexes[index*self.batchSize:(index+1)*self.batchSize])

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(inputData.size)
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, INDX):
        'Generates data containing batch_size samples'

        # Generate data
        X = np.expand_dims( inputData[ np.mod( batchIndices + np.reshape(INDX,(INDX.size,1)) , inputData.size ) ], axis=2)
        y = outputData[INDX,:] 

        return X, y

def main( ):

    global inputData
    global outputData
    global batchIndices
    global opts

    # Data generation

    print(' ')
    print('Generating data...')

    np.random.seed(0) # For reproducible results

    inputDim  = int(104)                      # Input  dimension
    outputDim = int(  2)                      # Output dimension
    N         = int(1049344)                  # Total number of samples
    M         = int(5e4)                      # Number of anomalies
    trainINDX = np.arange(N, dtype=np.uint32)

    inputData = np.sin(trainINDX) + np.random.normal(loc=0.0, scale=0.20, size=N) # Source data stored in a single array

    anomalyLocations = np.random.choice(N, M, replace=False)

    inputData[anomalyLocations] += 0.5

    outputData = np.zeros((N,outputDim)) # One-hot encoded target array without ones

    for i in range(N):
        if( np.any( np.logical_and( anomalyLocations >= i, anomalyLocations < np.mod(i+inputDim,N) ) ) ): 
            outputData[i,1] = 1 # set class #2 to one if there is at least a single anomaly within range [i,i+inputDim)
        else:
            outputData[i,0] = 1 # set class #1 to one if there are no anomalies within range [i,i+inputDim)

    print('...completed')
    print(' ')

    # Create a model for anomaly detection

    model = tf.keras.Sequential([
        tf.keras.layers.Conv1D(filters=24, kernel_size=9, strides=1, padding='valid', dilation_rate=1, activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', input_shape=(inputDim,1)),
        tf.keras.layers.MaxPooling1D(pool_size=4, strides=None, padding='valid'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(20, activation='relu', use_bias=True),
        tf.keras.layers.Dense(outputDim, activation='softmax')
    ])

    model.compile( tf.keras.optimizers.Adam(),
                   loss=tf.keras.losses.CategoricalCrossentropy(),
                   metrics=[tf.keras.metrics.CategoricalAccuracy()])

    print(' ')

    relativeIndices = np.arange(inputDim)                            # Indices belonging to a single sample relative to current position
    batchIndices    = np.tile( relativeIndices, (opts.batchSize,1) ) # Relative indices tiled into an array of size ( batchSize , inputDim )  
    stepsPerEpoch   = int( np.floor( N / opts.batchSize ) )          # Steps per epoch

    # Create an intance of dataGenerator class
    generator = DataGenerator(batchSize=opts.batchSize, shuffle=True)

    # Solve by gathering data into a large float32 array of size ( N , inputDim ) and feeding it to model.fit

    startTime = time.time()

    X = np.expand_dims( inputData[ np.mod( np.tile(relativeIndices,(N,1)) + np.reshape(trainINDX,(N,1)) , N ) ], axis=2)
    y = outputData[trainINDX, :]

    history = model.fit(x=X, y=y, sample_weight=None, batch_size=opts.batchSize, verbose=1, callbacks=None, validation_split=None, shuffle=True, epochs=opts.epochCount)

    referenceTime = time.time() - startTime
    print(' ')
    print('Total solution time with model.fit: %6.3f seconds' % referenceTime)
    print(' ')

    # Solve with model.fit_generator  

    startTime = time.time()

    history = model.fit(x=generator, steps_per_epoch=stepsPerEpoch, verbose=1, callbacks=None, epochs=opts.epochCount, max_queue_size=1024, use_multiprocessing=False)

    generatorTime = time.time() - startTime
    print(' ')
    print('Total solution time with model.fit_generator: %6.3f seconds (%6.2f %% more)' % (generatorTime, 100.0 * generatorTime/referenceTime))
    print(' ')

    # Solve by gathering data into batches of size ( batchSize , inputDim ) and feeding it to model.train_on_batch

    startTime = time.time()

    for epoch in range(opts.epochCount):

        print(' ')
        print('Training epoch # %2d ...' % (epoch+1))
        print(' ')

        np.random.shuffle(trainINDX)

        epochStartTime = time.time()

        for step in tqdm( range( stepsPerEpoch ) ):

            INDX = trainINDX[ step*opts.batchSize : (step+1)*opts.batchSize ]

            X = np.expand_dims( inputData[ np.mod( batchIndices + np.reshape(INDX,(opts.batchSize,1)) , N ) ], axis=2)
            y = outputData[INDX,:]

            history = model.train_on_batch(x=X, y=y, sample_weight=None, class_weight=None, reset_metrics=False)

        print(' ')
        print('...completed with loss = %9.6e, accuracy = %6.2f %%, %6.2f ms/step' % (history[0], 100.0*history[1], (1000*(time.time() - epochStartTime)/np.floor(trainINDX.size / opts.batchSize))))
        print(' ')

    batchTime = time.time() - startTime
    print(' ')
    print('Total solution time with model.train_on_batch: %6.3f seconds (%6.2f %% more)' % (batchTime, 100.0 * batchTime/referenceTime))
    print(' ')

parser = argparse.ArgumentParser()


parser.add_argument('--batchSize', type=int,
                default=128,
                help='Batch size')
parser.add_argument('--epochCount', type=int,
                default=5,
                help='Epoch count')

opts, unparsed = parser.parse_known_args()

if __name__== "__main__":
  main( )
```
1个回答

为了自己回答这个问题,我最近更新到 Python 3.7.7 和 TensorFlow 2.2.0 rc2,突然间我所有的问题都消失了。现在,在默认批量大小为 128 的情况下运行 5 个 epoch,使用明确形成的 numpy 数组的 model.fit 需要 126.162 秒,使用提供的生成器的 model.fit 需要 149.053 秒,而 model.train_on_batch 需要 240.698 秒。这与我的 CPU 支持的不支持 AVX2 和 FMA 指令的默认 TensorFlow 版本一起使用。