所以我在pyspark shell上运行以下内容:
>>> data = spark.read.csv("annotations_000", header=False, mode="DROPMALFORMED", schema=schema)
>>> data.show(3)
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
| item_id| review_id| text| aspect|sentiment|comments| annotation_round|
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
|9999900031|9999900031/custom...|Just came back to...|breakfast| 3| null|ASE_OpeNER_round2|
|9999900031|9999900031/custom...|Just came back to...| staff| 3| null|ASE_OpeNER_round2|
|9999900031|9999900031/custom...|The hotel was loc...| noise| 2| null|ASE_OpeNER_round2|
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
>>> data.registerTempTable("temp")
>>> df = sqlContext.sql("select first(item_id), review_id, first(text), concat_ws(';', collect_list(aspect)) as aspect from temp group by review_id")
>>> df.show(3)
+---------------------+--------------------+--------------------+--------------------+
|first(item_id, false)| review_id| first(text, false)| aspect|
+---------------------+--------------------+--------------------+--------------------+
| 100012|100012/tripadviso...|We stayed here la...| staff;room|
| 100013|100013/tripadviso...|We stayed for two...| breakfast|
| 100031|100031/tripadviso...|We stayed two nig...|noise;breakfast;room|
+---------------------+--------------------+--------------------+--------------------+
它与 shell sqlContext 变量完美配合。
当我把它写成脚本时:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
sc = SparkContext(appName="AspectDetector")
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
data.registerTempTable("temp")
df = sqlContext.sql("select first(item_id), review_id, first(text), concat_ws(';', collect_list(aspect)) as aspect from temp group by review_id")
并运行它,我得到以下内容:
pyspark.sql.utils.AnalysisException: u'Table or view not found: temp; 1号线 99'
这怎么可能?我在 sqlContext 的配置上做错了什么吗?
首先,您需要使用 Hive 支持初始化 Spark,例如:
spark = SparkSession.builder
.master("yarn")
.appName("AspectDetector")
.enableHiveSupport()
.getOrCreate()
sqlContext = SQLContext(spark)
但是,您将需要使用spark.sql()
来运行查询,而不是使用 sqlContext.sql()
。
我也发现这令人困惑,但我认为这是因为当您执行data.registerTempTable("temp")
时,您实际上处于 Spark 上下文而不是 sqlContext 上下文中。如果要查询配置单元表,仍应使用 sqlContext.sql()
。