PySpark中的列筛选



我有一个从Hive表加载的数据帧df,它有一个时间戳列,比如ts,字符串类型为dd-MMM-yy hh.mm.ss.MS a格式(转换为python日期时间库,这是%d-%b-%y %I.%M.%S.%f %p(。

现在我想过滤数据帧中最后五分钟的行:

only_last_5_minutes = df.filter(
    datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)

然而,这不起作用,我收到这个消息

TypeError: strptime() argument 1 must be string, not Column

我似乎有错误的列操作应用程序,在我看来,我必须创建一个lambda函数来过滤每个满足所需条件的列,但作为Python和lambda表达式的新手,我不知道如何正确创建过滤器。请告知。

p.S。我更喜欢将我的过滤器表示为Python本机(或SparkSQL(,而不是Hive sql查询表达式"WHERE"中的过滤器。

首选:

df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)

不优选:

df = sqlContext.sql("SELECT * FROM my_table WHERE...")

Spark>=1.5

由于Spark 1.5,您可以按如下方式解析日期字符串:

from pyspark.sql.functions import expr, from_unixtime, lit, unix_timestamp
from pyspark.sql.types import TimestampType
parsed_df = df.select((from_unixtime(unix_timestamp(
    # Note: am-pm: pattern length should be 1 for Spark >= 3.0
    df.datetime, "dd-MMM-yy h.mm.ss.SSSSSS a"  
))).cast(TimestampType()).alias("datetime"))
parsed_df.where(col("datetime") >= lit(now) - expr("INTERVAL 5 minutes"))

然后应用间隔:

from pyspark.sql.functions import current_timestamp, expr

火花<1.5

可以使用用户定义的功能。

from datetime import datetime, timedelta
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.functions import udf, col
def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
        return then_parsed > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())

使用一些伪数据:

df = sqlContext.createDataFrame([
    (1, '14-Jul-15 11.34.29.000000 AM'),
    (2, '14-Jul-15 11.34.27.000000 AM'),
    (3, '14-Jul-15 11.32.11.000000 AM'),
    (4, '14-Jul-15 11.29.00.000000 AM'),
    (5, '14-Jul-15 11.28.29.000000 AM')
], ('id', 'datetime'))
now = datetime(2015, 7, 14, 11, 35)
df.where(in_last_5_minutes(now)(col("datetime"))).show()

正如预期的那样,我们只得到3个条目:

+--+--------------------+
|id|            datetime|
+--+--------------------+
| 1|14-Jul-15 11.34.2...|
| 2|14-Jul-15 11.34.2...|
| 3|14-Jul-15 11.32.1...|
+--+--------------------+

重新分析日期时间字符串的效率相当低,因此您可以考虑存储TimestampType

def parse_dt():
    def _parse(dt):
        return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
    return udf(_parse, TimestampType())
df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))
def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        return then > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())
df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))

结果:

+--+--------------------+--------------------+
|id|            datetime|           timestamp|
+--+--------------------+--------------------+
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
+--+--------------------+--------------------+

最后,可以使用带有时间戳的原始SQL查询:

query = """SELECT * FROM df
     WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
     """.format(time.mktime((now - timedelta(minutes=5)).timetuple()))
sqlContext.sql(query)

和上面一样,一次性解析日期字符串会更有效。

如果列已经是timestamp,则可以使用datetime文字:

from pyspark.sql.functions import lit
df_with_timestamp.where(
    df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))
from pyspark.sql.functions import *
df.withColumn("seconds_from_now", current_timestamp() - col("ts").cast("long"))
df = df.filter(df.seconds_from_now <= 5*60).drop("seconds_from_now")

df是包含最后五分钟结果的结果数据帧。

相关内容

  • 没有找到相关文章

最新更新