时间序列分组交叉验证

数据挖掘 机器学习 时间序列 交叉验证
2021-09-19 10:27:22

我有以下结构的数据:

created_at | customer_id | features | target
2019-01-01             2   xxxxxxxx       y  
2019-01-02             3   xxxxxxxx       y  
2019-01-03             3   xxxxxxxx       y  
...

也就是说,会话时间戳、客户 ID、一些功能和目标。我想建立一个机器学习模型来预测这个目标,但我在正确进行交叉验证时遇到了问题。

这个想法是部署此模型并用于为新客户建模。出于这个原因,我需要交叉验证设置来满足以下属性:

  • 它必须以时间序列的方式完成:也就是说,对于交叉验证中的每个训练验证拆分,我们需要所有created_at验证集高于所有created_at训练集。
  • 它必须拆分客户:也就是说,对于交叉验证中的每个火车验证拆分,我们不能在火车和验证中都有任何客户。

你能想出一种方法吗?在 python 或 scikit-learn 生态系统中是否有实现?

4个回答

正如@NoahWeber 提到的,一种解决方案是:

  • 按客户 ID 拆分(A)
  • 对所有数据集进行时间序列拆分(B)
  • 在训练(resp. testing)数据集中只保留来自训练(resp. testing)客户拆分(A) 训练(resp. testing)时间序列拆分(B)的数据。

下面是我在他回答的同时编写的代码示例。

import pandas as pd
import numpy as np
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import TimeSeriesSplit

# Generating dates
def pp(start, end, n):
    start_u = start.value//10**9
    end_u = end.value//10**9

    return pd.DatetimeIndex((10**9*np.random.randint(start_u, end_u, n, dtype=np.int64)).view('M8[ns]'))

start = pd.to_datetime('2015-01-01')
end = pd.to_datetime('2018-01-01')
fake_date = pp(start, end, 500)

# Fake dataframe
df = pd.DataFrame(data=np.random.random((500,5)), index=fake_date, columns=['feat'+str(i) for i in range(5)])
df['customer_id'] = np.random.randint(0, 5, 500)
df['label'] = np.random.randint(0, 3, 500)

# First split by customer
rkf = RepeatedKFold(n_splits=2, n_repeats=5, random_state=42)
for train_cust, test_cust in rkf.split(df['customer_id'].unique()):
    print("training/testing with customers : " + str(train_cust)+"/"+str(test_cust))

    # Then sort all the data (if not already sorted)
    sorted_df = df.sort_index()

    # Then do the time series split
    tscv = TimeSeriesSplit(max_train_size=None, n_splits=5)
    for train_index, test_index in tscv.split(sorted_df.values):
        df_train, df_test = sorted_df.iloc[train_index], sorted_df.iloc[test_index]

        # Keep the right customers for training/testing 
        df_train_final = pd.concat( [ df_train.groupby('customer_id').get_group(i) for i in train_cust ])
        df_test_final = pd.concat( [ df_test.groupby('customer_id').get_group(i) for i in test_cust ])

注意:生成随机日期是基于这篇文章

注意之二:我使用此示例代码测试了生成的训练/测试数据帧以进行交叉验证,您可以在该行之后添加该示例代码df_test_final

# Test condition 1: temporality
for i in range(len(df_test_final)):
    for j in range(len(df_train_final)):
        if df_test_final.index[i] < df_train_final.index[j]:
            print("Error with " + str(i) + "/" + str(j))

# Test condition 2: training customers are not in testing final df
for i in train_cust:
    if i in df_test_final['customer_id'].values:
        print("Error in df_train with " + str(i) + "th customer")
    
# Test condition 2: testing customers are not in training final df
for i in test_cust:
    if i in df_train_final['customer_id'].values:
        print("Error in df_train with " + str(i) + "th customer")

这是一个伪代码实现:

function keep_customer_ids( data, ids ):
    goal: this function returns a subset of data with only the events that have
          an id tag that is in ids
    data: labeled events containing features, date and a customer id tag
    ids: the customer ids you want to keep
    for event in data:
        if event has a customer id tag that is in ids, keep it
        else, drop it
    return data

algorithm:
    for the number of cross-val you want:
        customer_train_ids, customer_test_ids = split_by_customers( customer_ids )
        train_data, test_data = split_data_in_time_series_way( data )
        final_train_data = keep_customer_ids( train_data, customer_train_ids )
        final_test_data = keep_customer_ids( test_data, customer_test_ids )
        do_the_fit_predict_things( final_train_data, final_test_data )

这是基于@NoahWeber 和@etiennedm 答案的解决方案。它基于拆分的并列,1)重复的 k 折拆分(以获得培训客户和测试客户),以及 2)时间序列在每个 k 折上拆分。

此策略基于使用日期上的自定义 CV 拆分迭代器进行时间序列拆分(而通常的 CV 拆分迭代器基于样本大小/折叠数)。

提供了 sklearn 生态系统中的实现。

让我们重述这个问题。

假设您有 10 个期间和 3 个客户,索引如下:

example_data = pd.DataFrame({
    'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29],
    'cutomer': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
    'date': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
})

我们使用 2 次折叠和 2 次迭代(总共 4 次)进行重复的 k 折叠,并且在每个 k 折叠拆分中,我们再次使用时间序列拆分进行拆分,这样每个时间序列拆分都有 2 次折叠

kfold split 1 : 培训客户是 [0, 1] 和测试客户是 [2]

kfold 拆分 1 时间序列拆分 1:训练索引为 [0, 1, 2, 3, 10, 11, 12, 13],测试索引为 [24, 25, 26]

kfold 拆分 1 时间序列拆分 2:训练索引为 [0, 1, 2, 3, 4, 5, 6, 10, 11, 12, 13, 14, 15, 16],测试索引为 [27, 28, 29 ]

kfold split 2 : 培训客户是 [2] 和测试客户是 [0, 1]

kfold 拆分 2 时间序列拆分 1:训练索引为 [20, 21, 22, 23],测试索引为 [4, 5, 6, 7, 15, 16, 17]

kfold 拆分 2 时间序列拆分 2:训练索引为 [20, 21, 22, 23, 24, 25, 26],测试索引为 [7, 8, 9, 17, 18, 19]

kfold 拆分 3 :培训客户为 [0, 2],测试客户为 [1]

kfold 拆分 3 时间序列拆分 1:训练索引为 [0, 1, 2, 3, 20, 21, 22, 23],测试索引为 [14, 15, 16]

kfold 拆分 3 时间序列拆分 2:训练索引为 [0, 1, 2, 3, 4, 5, 6, 20, 21, 22, 23, 24, 25, 26],测试索引为 [17, 18, 19 ]

kfold 拆分 4:培训客户为 [1],测试客户为 [0, 2]

kfold 拆分 4 时间序列拆分 1:训练索引为 [10, 11, 12, 13,],测试索引为 [4, 5, 6, 24, 25, 26]

kfold 拆分 4 时间序列拆分 2:训练索引为 [10, 11, 12, 13, 14, 15, 16],测试索引为 [7, 8, 9, 27, 28, 29]

通常,交叉验证迭代器,例如sklearn中的那些,它是基于折叠的数量,即每个折叠中的样本大小。不幸的是,这些不适合我们的 kfold / 时间序列与真实数据的拆分。事实上,没有任何东西可以保证数据随着时间和组的完美分布。(正如我们在前面的例子中假设的那样)。

例如,我们可以在测试样本(例如客户 2)中的第 4 次观察之后出现在消费者训练样本中的第 4 次观察(例如示例中 kfold 拆分 1 中的客户 0 和 1)。这违反了条件 1。

这是一种基于折叠日期(而不是样本大小或折叠次数)的 CV 拆分策略。假设您有以前的数据,但日期随机。定义一个initial_training_rolling_months,rolling_window_months。比如说 6 个月和 1 个月。

kfold split 1 : 培训客户是 [0, 1] 和测试客户是 [2]

kfold 拆分 1 时间序列拆分 1:训练样本是客户 [0, 1] 的前 6 个月,测试样本是客户训练样本后开始的月份 [2]

kfold 拆分 1 时间序列拆分 2:训练样本是客户的前 7 个月 [0, 1],测试样本是客户训练样本后开始的月份 [2]

下面是构建这样一个时间序列拆分迭代器的实施建议。

返回的迭代器是一个元组列表,您可以将其用作另一个交叉验证迭代器。

使用我们前面示例中的简单生成数据来调试折叠生成,注意客户 1(resp.2)数据从索引 366 和(resp.732)开始。

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
df = generate_happy_case_dataframe()
grouped_ts_validation_iterator = build_grouped_ts_validation_iterator(df)
gridsearch = GridSearchCV(estimator=RandomForestClassifier(), cv=grouped_ts_validation_iterator, param_grid={})
gridsearch.fit(df[['feat0', 'feat1', 'feat2', 'feat3', 'feat4']].values, df['label'].values)
gridsearch.predict([[0.1, 0.2, 0.1, 0.4, 0.1]])

使用@etiennedm 示例中随机生成的数据(为了调试拆分,我介绍了简单的情况,例如测试样本在训练样本之前或之后开始的时间)。

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
df = generate_fake_random_dataframe()
grouped_ts_validation_iterator = build_grouped_ts_validation_iterator(df)
gridsearch = GridSearchCV(estimator=RandomForestClassifier(), cv=grouped_ts_validation_iterator, param_grid={})
gridsearch.fit(df[['feat0', 'feat1', 'feat2', 'feat3', 'feat4']].values, df['label'].values)
gridsearch.predict([[0.1, 0.2, 0.1, 0.4, 0.1]])

实施:

import pandas as pd
import numpy as np
from sklearn.model_selection import RepeatedKFold


def generate_fake_random_dataframe(start=pd.to_datetime('2015-01-01'), end=pd.to_datetime('2018-01-01')):
    fake_date = generate_fake_dates(start, end, 500)
    df = pd.DataFrame(data=np.random.random((500,5)), columns=['feat'+str(i) for i in range(5)])
    df['customer_id'] = np.random.randint(0, 5, 500)
    df['label'] = np.random.randint(0, 3, 500)
    df['dates'] = fake_date
    df = df.reset_index() # important since df.index will be used as split index 
    return df


def generate_fake_dates(start, end, n):
    start_u = start.value//10**9
    end_u = end.value//10**9
    return pd.DatetimeIndex((10**9*np.random.randint(start_u, end_u, n, dtype=np.int64)).view('M8[ns]'))


def generate_happy_case_dataframe(start=pd.to_datetime('2019-01-01'), end=pd.to_datetime('2020-01-01')):
    dates = pd.date_range(start, end)
    length_year = len(dates)
    lenght_df = length_year * 3
    df = pd.DataFrame(data=np.random.random((lenght_df, 5)), columns=['feat'+str(i) for i in range(5)])
    df['label'] = np.random.randint(0, 3, lenght_df)
    df['dates'] = list(dates) * 3
    df['customer_id'] = [0] * length_year + [1] * length_year + [2] * length_year
    return df


def build_grouped_ts_validation_iterator(df, kfold_n_split=2, kfold_n_repeats=5, initial_training_rolling_months=6, rolling_window_months=1):
    rkf = RepeatedKFold(n_splits=kfold_n_split, n_repeats=kfold_n_repeats, random_state=42)
    CV_iterator = list()
    for train_customers_ids, test_customers_ids in rkf.split(df['customer_id'].unique()):
        print("rkf training/testing with customers : " + str(train_customers_ids)+"/"+str(test_customers_ids))
        this_k_fold_ts_split = split_with_dates_for_validation(df=df,
                                                               train_customers_ids=train_customers_ids, 
                                                               test_customers_ids=test_customers_ids, 
                                                               initial_training_rolling_months=initial_training_rolling_months, 
                                                               rolling_window_months=rolling_window_months)
        print("In this k fold, there is", len(this_k_fold_ts_split), 'time series splits')
        for split_i, split in enumerate(this_k_fold_ts_split) :
            print("for this ts split number", str(split_i))
            print("train ids is len", len(split[0]), 'and are:', split[0])
            print("test ids is len", len(split[1]), 'and are:', split[1])
        CV_iterator.extend(this_k_fold_ts_split)
        print('***')

    return tuple(CV_iterator)


def split_with_dates_for_validation(df, train_customers_ids, test_customers_ids, initial_training_rolling_months=6, rolling_window_months=1):
    start_train_df_date, end_train_df_date, start_test_df_date, end_test_df_date = \
        fetch_extremas_train_test_df_dates(df, train_customers_ids, test_customers_ids)
    
    start_training_date, end_training_date, start_testing_date, end_testing_date = \
        initialize_training_dates(start_train_df_date, start_test_df_date, initial_training_rolling_months, rolling_window_months)
    
    ts_splits = list()
    while not stop_time_series_split_decision(end_train_df_date, end_test_df_date, start_training_date, end_testing_date, rolling_window_months):
        # The while implies that if testing sample is les than one month, then the process stops
        this_ts_split_training_indices = fetch_this_split_training_indices(df, train_customers_ids, start_training_date, end_training_date)
        this_ts_split_testing_indices = fetch_this_split_testing_indices(df, test_customers_ids, start_testing_date, end_testing_date)
        if this_ts_split_testing_indices:
            # If testing data is not empty, i.e. something to learn
            ts_splits.append((this_ts_split_training_indices, this_ts_split_testing_indices))
        start_training_date, end_training_date, start_testing_date, end_testing_date =\
            update_testing_training_dates(start_training_date, end_training_date, start_testing_date, end_testing_date, rolling_window_months)
    return ts_splits


def fetch_extremas_train_test_df_dates(df, train_customers_ids, test_customers_ids):
    train_df, test_df = df.loc[df['customer_id'].isin(train_customers_ids)], df.loc[df['customer_id'].isin(test_customers_ids)]
    start_train_df_date, end_train_df_date = min(train_df['dates']), max(train_df['dates'])
    start_test_df_date, end_test_df_date = min(test_df['dates']), max(test_df['dates'])
    return start_train_df_date, end_train_df_date, start_test_df_date, end_test_df_date 


def initialize_training_dates(start_train_df_date, start_test_df_date, initial_training_rolling_months, rolling_window_months):
    start_training_date = start_train_df_date 
    # cover the case where test consumers begins long after (initial_training_rolling_months after) train consumers
    if start_training_date + pd.DateOffset(months=initial_training_rolling_months) < start_test_df_date:
        start_training_date = start_test_df_date - pd.DateOffset(months=initial_training_rolling_months)
    end_training_date = start_train_df_date + pd.DateOffset(months=initial_training_rolling_months)
    start_testing_date = end_training_date
    end_testing_date = start_testing_date + pd.DateOffset(months=rolling_window_months)
    return start_training_date, end_training_date, start_testing_date, end_testing_date


def stop_time_series_split_decision(end_train_df_date, end_test_df_date, end_training_date, end_testing_date, rolling_window_months):
    no_more_training_data_stoping_condition = end_training_date + pd.DateOffset(months=rolling_window_months) > end_train_df_date
    no_more_testing_data_stoping_condition = end_testing_date + pd.DateOffset(months=rolling_window_months) > end_test_df_date
    stoping_condition = no_more_training_data_stoping_condition or no_more_testing_data_stoping_condition
    return stoping_condition


def update_testing_training_dates(start_training_date, end_training_date, start_testing_date, end_testing_date, rolling_window_months):
    start_training_date = start_training_date
    end_training_date += pd.DateOffset(months=rolling_window_months)
    start_testing_date += pd.DateOffset(months=rolling_window_months)
    end_testing_date += pd.DateOffset(months=rolling_window_months)
    return start_training_date, end_training_date, start_testing_date, end_testing_date


def fetch_this_split_training_indices(df, train_customers_ids, start_training_date, end_training_date):
    train_df = df.loc[df['customer_id'].isin(train_customers_ids)]
    in_training_period_df = train_df.loc[(train_df['dates'] >= start_training_date) & (train_df['dates'] < end_training_date)]
    this_ts_split_training_indices = in_training_period_df.index.to_list()
    return this_ts_split_training_indices


def fetch_this_split_testing_indices(df, test_customers_ids, start_testing_date, end_testing_date):
    test_df = df.loc[df['customer_id'].isin(test_customers_ids)]
    in_testing_period_df = test_df.loc[(test_df['dates'] >= start_testing_date) & (test_df['dates'] < end_testing_date)]
    this_ts_split_testing_indices = in_testing_period_df.index.to_list()
    return this_ts_split_testing_indices

首先,当您说“这个想法是部署并用于为新客户建模”时,我猜您的意思是并用于推断新客户,对吗?我可以想到两种可能的选择:

  1. 按照您强加的属性,您可以首先通过 scikit-learn使用TimeSeriesSplit 交叉验证器,这样您就可以获得每个火车验证拆分的时间排序索引,以便您以后可以在您的客户端 ID 上使用它们决定满足第二个条件,例如: 在此处输入图像描述

  2. 作为第二种选择,您可以尝试基于某些功能在您的客户端上应用集群,并构建与您获得的客户端类型一样多的模型(每个集群有 n 个客户端历史数据)。这将解决我在您的方法中看到的一个可能的问题,即(由于第二个限制)不使用客户端整个历史数据进行培训和验证

按客户 ID 排序。而不是时间序列分裂。如果有任何重叠,则尽可能删除这些行。

这些是相互排斥的条件,这意味着如果您在时间序列的开头和 Right 和结尾有类别 2 的客户 ID,您不能期望不必在开始时删除这些行。因为不这样做会损坏两个构成条件之一。