Pyspark 将时间戳从 UTC 转换为多个时区

数据挖掘 pyspark
2021-09-27 05:29:14

这是将 python 与 Spark 1.6.1 和数据帧一起使用。

我有想要转换为本地时间的 UTC 时间戳,但给定的行可能位于多个时区中的任何一个。我有一个“偏移量”值(或者,本地时区缩写。我可以很容易地将所有时间戳调整到一个区域或一个偏移量,但我不知道如何根据“ offset' 或 'tz' 列。

似乎有两种调整时间戳的主要方法:使用 'INTERVAL' 方法,或使用pyspark.sql.from_utc_timestamp.

这是一个例子:

data = [
  ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1, 300,"MST"),  
  ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2, 60, "EST"),  
  ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3, 120,"EST"),  
  ("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),  
  ("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"PST"),  
  ("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),  
  ("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"PST"),  
  ("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),  
  ("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"MST"),  
  ("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),  
  ("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"MST"),
  ...
]

(我意识到 offset 和 tz 列不一致 - 这不是真实数据)

df = sqlCtx.createDataFrame(data, ["start_time", "end_time", "id","offset","tz"])  
from pyspark.sql import functions as F

这两个选项都符合预期:

df.withColumn('testthis', F.from_utc_timestamp(df.start_time, "PST")).show()
df.withColumn('testThat', df.start_time.cast("timestamp") - F.expr("INTERVAL 50 MINUTES")).show()

但是,如果我尝试将“PST”字符串替换为 df.tz,或者将“50”字符串替换为 df.offset.cast('string'),则会出现类型错误:

TypeError:“列”对象不可调用

我已经尝试过这方面的变化,但无济于事。

3个回答

你可以使用 SQL 接口来得到你想要的:

> df.selectExpr("from_utc_timestamp(start_time, tz) as testthis").show()
+--------------------+
|            testthis|
+--------------------+
|2015-01-01 16:59:...|
|2015-01-02 18:00:...|
|2015-01-02 17:59:...|
|2015-03-02 07:59:...|
|2015-03-16 08:15:...|
|2015-10-02 11:59:...|
|2015-11-16 10:58:...|
|2015-03-02 08:59:...|
|2015-03-16 08:15:...|
|2015-10-02 11:59:...|
|2015-11-16 11:58:...|
+--------------------+

如果您想使用相同的数据框并添加一个带有转换时间戳的新列,您可以使用exprwithColumn以一种非常有效的方式。

df = df.withColumn('localTimestamp',
                   expr("from_utc_timestamp(utcTimestamp, timezone)"))

数据框中的列在哪里utcTimestamp和是。timezone这将添加一个localTimestamp包含转换时间的新列。

如果您的“tz”列的数据类型是字符串,那么您可以执行以下操作:

df.select(from_utc_timestamp(df.start_time,tz).alias('start_time')).show()