我正在使用带有 Tensorflow 后端的 Keras 来训练一个简单的 1D CNN 来检测传感器数据中的特定事件。虽然具有数千万样本的数据很容易以一维浮点数组的形式放入 ram,但显然将数据存储为 N x inputDim 数组需要大量内存,该数组可以传递给 model.fit 进行训练. 虽然我可以使用 model.fit_generator 或 model.train_on_batch 动态生成所需的小批量,但出于某种原因,我观察到 model.fit 和 model.fit_generator 和 model.train_on_batch 之间存在巨大的性能差距,即使所有内容都存储在内存中小批量生成速度很快,因为它基本上只包括重塑数据。因此,我想知道我是否做错了什么,或者这种性能差距是否可以预料。我正在使用带有 3.2 GHz Intel Core i7 处理器(支持多线程的 4 个内核)和 Python 3.6.3 的 CPU 版本的 Tensorflow 2.0。在 Mac Os X Mojave 上。
简而言之,我创建了一个虚拟 python 脚本来重新创建问题,它显示,批量大小为 64 时,使用 model.fit 运行 10 个 epoch 需要 407 秒,使用 model.fit_generator 需要 1852 秒,使用 model.fit_generator 需要 1985 秒。 train_on_batch。CPU 负载分别为 ~220%、~130% 和 ~120%,而且 model.fit_generator 和 model.train_on_batch 实际上相当奇怪,而 model.fit_generator 应该能够并行化小批量创建和模型。 train_on_batch 绝对没有。也就是说,model.fit(具有巨大的内存需求)比其他具有易于管理的内存需求的候选解决方案高出四倍。显然,CPU 负载会随着批大小的增加而增加,总训练时间会减少,但 model.fit 总是最快的,边距至少为 2,直到批大小为 8096。
这种行为是正常的(当不涉及 GPU 时)还是可以/应该做些什么来提高具有合理批量大小的较少内存密集型选项的计算性能?特别是 model.fit_generator 无法提供不错的性能。似乎没有这样的选项可以将所有数据分成可管理的部分,然后以迭代方式运行 model.fit 并不断变化的训练数据。
请注意,提供的虚拟脚本正如其名称所暗示的那样,并且数据量已被修剪,以便使所有三个选项都可行。但是,使用的模型与我实际使用的模型相似(以提供现实情况)。
from tqdm import tqdm
import numpy as np
import tensorflow as tf
import time
import sys
import argparse
inputData = None
outputData = None
batchIndices = None
opts = None
class DataGenerator(tf.keras.utils.Sequence):
global inputData
global outputData
global batchIndices
'Generates data for Keras'
def __init__(self, batchSize, shuffle):
'Initialization'
self.batchIndices = batchIndices
self.batchSize = batchSize
self.shuffle = shuffle
self.on_epoch_end()
def __len__(self):
'Denotes the number of batches per epoch'
return int( np.floor( inputData.size / self.batchSize ) )
def __getitem__(self, index):
'Generate one batch of data'
# Generate data
X, y = self.__data_generation(self.indexes[index*self.batchSize:(index+1)*self.batchSize])
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(inputData.size)
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, INDX):
'Generates data containing batch_size samples'
# Generate data
X = np.expand_dims( inputData[ np.mod( batchIndices + np.reshape(INDX,(INDX.size,1)) , inputData.size ) ], axis=2)
y = outputData[INDX,:]
return X, y
def main( ):
global inputData
global outputData
global batchIndices
global opts
# Data generation
print(' ')
print('Generating data...')
np.random.seed(0) # For reproducible results
inputDim = int(104) # Input dimension
outputDim = int( 2) # Output dimension
N = int(1049344) # Total number of samples
M = int(5e4) # Number of anomalies
trainINDX = np.arange(N, dtype=np.uint32)
inputData = np.sin(trainINDX) + np.random.normal(loc=0.0, scale=0.20, size=N) # Source data stored in a single array
anomalyLocations = np.random.choice(N, M, replace=False)
inputData[anomalyLocations] += 0.5
outputData = np.zeros((N,outputDim)) # One-hot encoded target array without ones
for i in range(N):
if( np.any( np.logical_and( anomalyLocations >= i, anomalyLocations < np.mod(i+inputDim,N) ) ) ):
outputData[i,1] = 1 # set class #2 to one if there is at least a single anomaly within range [i,i+inputDim)
else:
outputData[i,0] = 1 # set class #1 to one if there are no anomalies within range [i,i+inputDim)
print('...completed')
print(' ')
# Create a model for anomaly detection
model = tf.keras.Sequential([
tf.keras.layers.Conv1D(filters=24, kernel_size=9, strides=1, padding='valid', dilation_rate=1, activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', input_shape=(inputDim,1)),
tf.keras.layers.MaxPooling1D(pool_size=4, strides=None, padding='valid'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(20, activation='relu', use_bias=True),
tf.keras.layers.Dense(outputDim, activation='softmax')
])
model.compile( tf.keras.optimizers.Adam(),
loss=tf.keras.losses.CategoricalCrossentropy(),
metrics=[tf.keras.metrics.CategoricalAccuracy()])
print(' ')
relativeIndices = np.arange(inputDim) # Indices belonging to a single sample relative to current position
batchIndices = np.tile( relativeIndices, (opts.batchSize,1) ) # Relative indices tiled into an array of size ( batchSize , inputDim )
stepsPerEpoch = int( np.floor( N / opts.batchSize ) ) # Steps per epoch
# Create an intance of dataGenerator class
generator = DataGenerator(batchSize=opts.batchSize, shuffle=True)
# Solve by gathering data into a large float32 array of size ( N , inputDim ) and feeding it to model.fit
startTime = time.time()
X = np.expand_dims( inputData[ np.mod( np.tile(relativeIndices,(N,1)) + np.reshape(trainINDX,(N,1)) , N ) ], axis=2)
y = outputData[trainINDX, :]
history = model.fit(x=X, y=y, sample_weight=None, batch_size=opts.batchSize, verbose=1, callbacks=None, validation_split=None, shuffle=True, epochs=opts.epochCount)
referenceTime = time.time() - startTime
print(' ')
print('Total solution time with model.fit: %6.3f seconds' % referenceTime)
print(' ')
# Solve with model.fit_generator
startTime = time.time()
history = model.fit(x=generator, steps_per_epoch=stepsPerEpoch, verbose=1, callbacks=None, epochs=opts.epochCount, max_queue_size=1024, use_multiprocessing=False)
generatorTime = time.time() - startTime
print(' ')
print('Total solution time with model.fit_generator: %6.3f seconds (%6.2f %% more)' % (generatorTime, 100.0 * generatorTime/referenceTime))
print(' ')
# Solve by gathering data into batches of size ( batchSize , inputDim ) and feeding it to model.train_on_batch
startTime = time.time()
for epoch in range(opts.epochCount):
print(' ')
print('Training epoch # %2d ...' % (epoch+1))
print(' ')
np.random.shuffle(trainINDX)
epochStartTime = time.time()
for step in tqdm( range( stepsPerEpoch ) ):
INDX = trainINDX[ step*opts.batchSize : (step+1)*opts.batchSize ]
X = np.expand_dims( inputData[ np.mod( batchIndices + np.reshape(INDX,(opts.batchSize,1)) , N ) ], axis=2)
y = outputData[INDX,:]
history = model.train_on_batch(x=X, y=y, sample_weight=None, class_weight=None, reset_metrics=False)
print(' ')
print('...completed with loss = %9.6e, accuracy = %6.2f %%, %6.2f ms/step' % (history[0], 100.0*history[1], (1000*(time.time() - epochStartTime)/np.floor(trainINDX.size / opts.batchSize))))
print(' ')
batchTime = time.time() - startTime
print(' ')
print('Total solution time with model.train_on_batch: %6.3f seconds (%6.2f %% more)' % (batchTime, 100.0 * batchTime/referenceTime))
print(' ')
parser = argparse.ArgumentParser()
parser.add_argument('--batchSize', type=int,
default=128,
help='Batch size')
parser.add_argument('--epochCount', type=int,
default=5,
help='Epoch count')
opts, unparsed = parser.parse_known_args()
if __name__== "__main__":
main( )
```