createDataFrame(rdd,schema)只返回空值



我的RDD(来自ElasticSearch(是这样的。

[
('rty456ui', {'@timestamp': '2022-10-10T24:56:10.000259+0000', 'host': {'id': 'test-host-id-1'}, 'watchlists': {'ioc': {'summary': '127.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '127.0.0.1'}, 'event': {'created': '2022-10-10T13:56:10+00:00', 'id': 'rty456ui'}, 'tags': ('Mon',)}),
('cxs980qw', {'@timestamp': '2022-10-10T13:56:10.000259+0000', 'host': {'id': 'test-host-id-2'}, 'watchlists': {'ioc': {'summary': '0.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '0.0.0.1'}, 'event': {'created': '2022-10-10T24:56:10+00:00', 'id': 'cxs980qw'}, 'tags': ('Mon', 'Tue')})
]

(我觉得有趣的是ES中的列表在RDD中转换为元组(

我正试图把它变成这样。

+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|host.id        |event.id   |source.ip  |event.created              |watchlists.ioc.summary |watchlists.ioc.tags    |tags           |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|test-host-id-1 |rty456ui   |127.0.0.1  |2022-10-10T13:56:10+00:00  |127.0.0.1              |[Dummy Tag]            |[Mon]          |
|test-host-id-2 |cxs980qw   |0.0.0.1    |2022-10-10T24:56:10+00:00  |127.0.0.1              |[Dummy Tag]            |[Mon, Tue]     |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+

然而,得到这个。

+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|host.id|event.id|source.ip|event.created|watchlists.ioc.summary|watchlists.ioc.tags|tags                           |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|null   |null    |null     |null         |null                  |null               |[Ljava.lang.Object;@6c704e6e   |
|null   |null    |null     |null         |null                  |null               |[Ljava.lang.Object;@701ea4c8   |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+

代码

from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([ 
StructField("host.id",StringType(), True), 
StructField("event.id",StringType(), True), 
StructField("source.ip",StringType(), True), 
StructField("event.created", StringType(), True), 
StructField("watchlists.ioc.summary", StringType(), True), 
StructField("watchlists.ioc.tags", StringType(), True),
StructField("tags", StringType(), True)
])

df = spark.createDataFrame(es_rdd.map(lambda x: x[1]),schema)
df.show(truncate=False)

我正在尝试将rdd转换为Dataframe。此外,我想为它定义模式。然而,即使rdd有数据,pyspark.createDataFrame(rdd, schema)也只返回null值。此外,我在输出中也得到了[Ljava.lang.Object;@701ea4c8。那么我在这里错过了什么?

您的文章包含2个问题:

  1. 为什么当我将RDD转换为数据帧时,即使我声明了架构,所有列也将为null:在schema中,您使用StructTypeColumn。StructFiedColumn(例如host.id(以获取RDD中的值。然而,这种类型的选择语句只能在使用Spark SQL选择语句时工作,我认为这里没有这样的解析。为了实现您的目标,您可能需要在map函数中更新lambda函数,以提取类似rdd_trans = rdd.map(lambda x: (x[1]['host']['id'], x[1]['event']['id'], ))

  2. 为什么tag列的输出没有按预期显示:这是因为当您声明tag列时,您将其声明为字符串列,您应该使用ArrayType。

最新更新