我是Spark中的乞eg。我正在关注Pyspark上的视频课程。我正在尝试使用以下代码将JSON字符串转换为数据框。
import pyspark as ps
from pyspark.sql import HiveContext # to interface dataframe API
sc = ps.SparkContext()
hive_context = HiveContext(sc)
# some code .... and build meals_json
meals_json.take(1) # below is output of this code
#['{"meal_id": 1, "dt": "2013-01-01", "type": "french", "price": 10}']
# some more code
meals_dataframe = hive_context.jsonRDD(meals_json)
meals_dataframe.first()
在运行最后一行时,我要低于错误。
AttributeError Traceback (most recent call last)
<ipython-input-19-43e4f3006ac3> in <module>()
----> 1 meals_dataframe = hive_context.jsonRDD(meals_json)
2 meals_dataframe.first()
AttributeError: 'HiveContext' object has no attribute 'jsonRDD'
我搜索了网络,我无法在讨论此问题的地方找到任何资源。我正在使用Python 3.5上的Jupyter笔记本上使用Spark 2.1.1运行此代码。
从文档中,我可以看到jsonrdd是从 class org.apache.spark.sql.sql.sqlcontext 继承的。我不太确定,这可能是什么原因。任何建议都会有所帮助。谢谢。
sqlContext.jsonRDD
被弃用。从1.4.0开始,它已被read().json()
替换。我在下面包括一个在Spark 2.1.1
import json
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
r = [{'a': 'aaa', 'b': 'bbb', 'c': 'ccc'},
{'a': 'aaaa','b': 'bbbb','c': 'cccc','d': 'dddd'}]
r = [json.dumps(d) for d in r]
# known schema
schema = ['a', 'b', 'c', 'd']
fields = [StructField(field_name, StringType(), True) for field_name in schema]
schema = StructType(fields)
rdd = sc.parallelize(r)
df = sqlContext.read.schema(schema).json(rdd)
df.collect()
这给出了Spark 2.1.1的以下输出:
[Row(a=u'aaa', b=u'bbb', c=u'ccc', d=None),
Row(a=u'aaaa', b=u'bbbb', c=u'cccc', d=u'dddd')]
请注意,此片段的第一部分是受Apache Spark用户列表上的这个问题的重大启发