如何在Spark的数据帧中枚举列?如果列是嵌套的怎么办?



附带地,它没有items()方法。那怎么办?

我正在尝试将Row发送到数据库,代码为:

def write_row(table_name, cur, row):
data = []
for key, value in row.items():
data.append((key, value))
data = zip(*data)
columns = ", ".join(data[0])
values = data[1]
questionmarks = ", ".join(["?"] * len(columns))
query = f"INSERT INTO {table_name} ({columns}) VALUES ({questionmarks})"
cur.execute(query, values)

def write_data_frame(df, epoch1):
conn = mariadb.connect(**config["mariadb"])
cur = conn.cursor()
table_name = "pysparktest"
rows = df.collect()
for row in rows:
write_row(table_name, cur, row)
conn.commit()

它发誓

AttributeError: items

如果行是嵌套的呢?

root
|-- track: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- version: string (nullable = true)
|-- car: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- version: string (nullable = true)
|-- cnt: long (nullable = false)
|-- minBestLapTime: double (nullable = true)

就像编译器发誓的那样,没有一个方法叫做"items(("在Row类中。

你需要做的是使用";asDict"方法它将Row中的键、值输出为python dict.

在嵌套列的情况下,asDict函数中有一个名为recursive的参数,请将其设置为True。默认情况下,它设置为False。

例如:

row = Row(name="Alice", age=11)
row_as_dict = row.asDict()
row_as_dict

输出:

{'name': 'Alice', 'age': 11}

用于迭代:

for key in row_as_dict:
print("{} : {}".format(key, row_as_dict[key]))

输出:

name : Alice
age : 11

如果是嵌套列

row = Row(key=1, value=Row(name='a', age=2))
row_as_dict = row.asDict(recursive=True)
row_as_dict

输出:

{'key': 1, 'value': {'name': 'a', 'age': 2}}

最新更新