我在pyspark
控制台中遵循此示例,一切正常。
之后,我将其编写为PySpark应用程序,如下所示:
# -*- coding: utf-8 -*-
import sys
import click
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
spark = SparkSession
.builder
.master(master)
.appName("stream-test")
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
some_schema = .... # Schema removed
some_stream = spark
.readStream
.option("sep", ",")
.schema(some_schema)
.option("maxFilesPerTrigger", 1)
.csv("/data/some_stream", header=True)
streaming_counts = (
linkage_stream.groupBy(some_stream.field_1).count()
)
query = streaming_counts.writeStream
.format("memory")
.queryName("counts")
.outputMode("complete")
.start()
query.awaitTermination()
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
most_idiotic_bi_query()
应用按以下方式执行:
spark-submit test_stream.py --master spark://master:7077
现在,如果我在另一个终端中打开一个新的火花驱动程序:
pyspark --master spark://master:7077
并尝试运行:
spark.sql("select * from counts")
它失败并显示:
During handling of the above exception, another exception occurred:
AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()
/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
541 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
542 """
--> 543 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
544
545 @since(2.0)
/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table or view not found: counts; line 1 pos 14'
我不明白发生了什么。
这是预期的行为。如果查看内存接收器的文档:
输出作为内存中表存储在内存中。支持追加和完整输出模式。这应该用于在低数据量上进行调试,因为整个输出都收集并存储在驱动程序的内存中。因此,请谨慎使用它。
如您所见,内存接收器不会创建持久表或全局临时视图,而是创建仅限于驱动程序的本地结构。因此,无法从另一个 Spark 应用程序查询它。
因此,必须从写入内存输出的驱动程序查询内存输出。例如,您可以模拟console
模式,如下所示。
虚拟作家:
import pandas as pd
import numpy as np
import tempfile
import shutil
def producer(path):
temp_path = tempfile.mkdtemp()
def producer(i):
df = pd.DataFrame({
"group": np.random.randint(10, size=1000)
})
df["val"] = (
np.random.randn(1000) +
np.random.random(1000) * df["group"] +
np.random.random(1000) * i % 7
)
f = tempfile.mktemp(dir=temp_path)
df.to_csv(f, index=False)
shutil.move(f, path)
return producer
火花应用:
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
schema = StructType([
StructField("group", IntegerType()),
StructField("val", DoubleType())
])
path = tempfile.mkdtemp()
query_name = "foo"
stream = (spark.readStream
.schema(schema)
.format("csv")
.option("header", "true")
.load(path))
query = (stream
.groupBy("group")
.avg("val")
.writeStream
.format("memory")
.queryName(query_name)
.outputMode("complete")
.start())
还有一些事件:
from rx import Observable
timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())
query.awaitTermination()