在DATABRICKS中运行python代码时出现此错误



THE ERROR

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

的代码
from pyspark.sql.functions import explode, col
# Read the JSON file from Databricks storage
df_json = spark.read.json("/mnt/BigData_JSONFiles/new_test.json")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

# Convert the dataframe to a dictionary
data = df_json.toPandas().to_dict()
# Split the data into two parts
d1 = dict(itertools.islice(data.items(), 8))
d2 = dict(itertools.islice(data.items(), 8, len(data.items())))
# Convert the first part of the data back to a dataframe
df1 = spark.createDataFrame([d1])
# Write the first part of the data to a JSON file in Databricks storage
df1.write.format("json").save("/mnt/BigData_JSONFiles/new_test_header.json")
# Convert the second part of the data back to a dataframe
df2 = spark.createDataFrame([d2])
# Write the second part of the data to a JSON file in Databricks storage
df2.write.format("json").save("/mnt/BigData_JSONFiles/new_test_detail.json")

大型JSON文件的JSON示例文件

{
"reporting_entity_name": "launcher",
"reporting_entity_type": "launcher",
"plan_name": "launched",
"plan_id_type": "hios",
"plan_id": "1111111111",
"plan_market_type": "individual",
"last_updated_on": "2020-08-27",
"version": "1.0.0",
"in_network": [
{
"negotiation_arrangement": "ffs",
"name": "Boosters",
"billing_code_type": "CPT",
"billing_code_type_version": "2020",
"billing_code": "27447",
"description": "Boosters On Demand",
"negotiated_rates": [
{
"provider_groups": [
{
"npi": [
0
],
"tin": {
"type": "ein",
"value": "11-1111111"
}
}
],
"negotiated_prices": [
{
"negotiated_type": "negotiated",
"negotiated_rate": 123.45,
"expiration_date": "2022-01-01",
"billing_class": "organizational"
}
]
}
]
}
]
}

嗨,我正试图将一个大json文件分成两种格式,这是由上面的代码完成的。但它没有说缓存我使用.cache()在加载文件的最后,但仍然得到这个错误。请告诉我如何解决这个错误。

我可以通过更改this来解决这个错误

df_json = spark.read.json("/mnt/BigData_JSONFiles/new_test.json")

df_json = spark.read.option("multiline","true").json("/mnt/BigData_JSONFiles/new_test.json")

相关内容

最新更新