我的RDD(来自ElasticSearch(是这样的。
[
('rty456ui', {'@timestamp': '2022-10-10T24:56:10.000259+0000', 'host': {'id': 'test-host-id-1'}, 'watchlists': {'ioc': {'summary': '127.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '127.0.0.1'}, 'event': {'created': '2022-10-10T13:56:10+00:00', 'id': 'rty456ui'}, 'tags': ('Mon',)}),
('cxs980qw', {'@timestamp': '2022-10-10T13:56:10.000259+0000', 'host': {'id': 'test-host-id-2'}, 'watchlists': {'ioc': {'summary': '0.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '0.0.0.1'}, 'event': {'created': '2022-10-10T24:56:10+00:00', 'id': 'cxs980qw'}, 'tags': ('Mon', 'Tue')})
]
(我觉得有趣的是ES中的列表在RDD中转换为元组(
我正试图把它变成这样。
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|host.id |event.id |source.ip |event.created |watchlists.ioc.summary |watchlists.ioc.tags |tags |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|test-host-id-1 |rty456ui |127.0.0.1 |2022-10-10T13:56:10+00:00 |127.0.0.1 |[Dummy Tag] |[Mon] |
|test-host-id-2 |cxs980qw |0.0.0.1 |2022-10-10T24:56:10+00:00 |127.0.0.1 |[Dummy Tag] |[Mon, Tue] |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
然而,得到这个。
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|host.id|event.id|source.ip|event.created|watchlists.ioc.summary|watchlists.ioc.tags|tags |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|null |null |null |null |null |null |[Ljava.lang.Object;@6c704e6e |
|null |null |null |null |null |null |[Ljava.lang.Object;@701ea4c8 |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
代码
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("host.id",StringType(), True),
StructField("event.id",StringType(), True),
StructField("source.ip",StringType(), True),
StructField("event.created", StringType(), True),
StructField("watchlists.ioc.summary", StringType(), True),
StructField("watchlists.ioc.tags", StringType(), True),
StructField("tags", StringType(), True)
])
df = spark.createDataFrame(es_rdd.map(lambda x: x[1]),schema)
df.show(truncate=False)
我正在尝试将rdd转换为Dataframe。此外,我想为它定义模式。然而,即使rdd有数据,pyspark.createDataFrame(rdd, schema)
也只返回null值。此外,我在输出中也得到了[Ljava.lang.Object;@701ea4c8
。那么我在这里错过了什么?
您的文章包含2个问题:
-
为什么当我将RDD转换为数据帧时,即使我声明了架构,所有列也将为null:在
schema
中,您使用StructTypeColumn。StructFiedColumn(例如host.id
(以获取RDD中的值。然而,这种类型的选择语句只能在使用Spark SQL选择语句时工作,我认为这里没有这样的解析。为了实现您的目标,您可能需要在map
函数中更新lambda函数,以提取类似rdd_trans = rdd.map(lambda x: (x[1]['host']['id'], x[1]['event']['id'], ))
-
为什么
tag
列的输出没有按预期显示:这是因为当您声明tag
列时,您将其声明为字符串列,您应该使用ArrayType。