读取csv时,相当于na_filter的Pyspark=True



我有一个包含10列的csv。每一列都具有"NA";值和为字符串。所以在熊猫进口的时候,我做了

df=pd.read_csv(r'file_name.csv',na_filter=True(

我想知道na_ filter=True的pyspark等价物来改变";NA";字符串格式的值转换为正确的空值

您可以使用这样的东西:


from __future__ import annotations
from pyspark.sql import functions as funcs, SparkSession, Column, DataFrame as SparkDataFrame
from typing import Any, List
spark = SparkSession.builder.getOrCreate()

def filter_na_values(df: SparkDataFrame, *patterns: str) -> SparkDataFrame:
"""Port of `na_filter` from `pandas.read_csv` to Pyspark RDD.
Parameters
----------
df : pyspark.sql.dataframe.DataFrame
The dataframe to clean.
patterns : str
The null patterns.
Returns
-------
pyspark.sql.dataframe.DataFrame
The cleaned dataframe.
"""
final_pattern = "|".join(
f'({pattern if isinstance(pattern, str) else pattern.decode("utf8")})'
for pattern in patterns
)
df = df.select(
*[
funcs.regexp_replace(column, final_pattern, "").alias(column)
for column in df.columns
]
)
for column in df.columns:
df = df.withColumn(
column,
funcs.when(funcs.col(column) == "", None).otherwise(funcs.col(column)),
)
return df

def read_csv(path: str | List[str], **options: Dict[str, Any]) -> SparkDataFrame:
"""Read a CSV file into a Spark DataFrame.
Parameters
----------
path : str | List[str]
Path to the CSV file.
**options : dict
Options to pass to the Spark DataFrameReader.
Returns
-------
df : pyspark.sql.DataFrame
Spark DataFrame containing the CSV data.
"""
na_filter = options.pop('na_filter', True)
extra_na_values = options.pop('na_values', [])
df = spark.read.csv(path, **options)
if na_filter:
na_values = [
b'NaN',
b'1.#IND',
b'-1.#QNAN',
b'-nan',
b'1.#QNAN',
b'<NA>',
b'n/a',
b'-1.#IND',
b'#N/A N/A',
b'N/A',
b'#NA',
b'nan',
b'-NaN',
b'NULL',
b'#N/A',
b'NA',
b'?',
'null',
*extra_na_values
]
return filter_na_values(df, *na_values)
return df

示例:


import pyspark.pandas as ps
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ps.DataFrame(
[[10, 'NA', '?', 'nan', ''],
[20, 'NA', 'a', 'b', 'c'],
[30, '?', 'a', 'b', 'c'],
[None, '', 'a', 'b', 'c'],
[22.2,'nan', 'a', 'b', 'c']]
).to_csv('data.csv', num_files=1)

read_csv,带filter_na=True:

>>> read_csv('data.csv', inferSchema=True, header=True, na_filter=True).show()
+----+----+----+----+----+
|   0|   1|   2|   3|   4|
+----+----+----+----+----+
|10.0|null|null|null|null|
|20.0|null|   a|   b|   c|
|30.0|null|   a|   b|   c|
|null|null|   a|   b|   c|
|22.2|null|   a|   b|   c|
+----+----+----+----+----+

read_csv,带filter_na=False:

>>> read_csv('data.csv', inferSchema=True, header=True, na_filter=False).show()
+----+----+---+---+----+
|   0|   1|  2|  3|   4|
+----+----+---+---+----+
|10.0|  NA|  ?|nan|null|
|20.0|  NA|  a|  b|   c|
|30.0|   ?|  a|  b|   c|
|null|null|  a|  b|   c|
|22.2| nan|  a|  b|   c|
+----+----+---+---+----+

最新更新