Spark流媒体应用程序无法接收来自Kafka的消息



我使用以下python生产者向我的kafka主题发布一些消息(我也可以使用jupyter中的python消费者完美地接收我发布的数据(。

from kafka import KafkaProducer
import json,time
userdata={
"ipaddress": "172.16.0.57",
"logtype": "",
"mid": "",
"name":"TJ"
}
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
print("adding",i)
producer.send('test', userdata)
time.sleep(3)

但是,当我尝试在spark中运行kafkastrewing示例时,我什么都没有得到(我应该注意,spark在我的工作站中是可操作的,因为我可以在没有任何问题的情况下运行网络流示例(:

from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.10:2.0.2 pyspark-shell'  
sc = SparkContext("local[2]", "KafkaSTREAMWordCount")
ssc = StreamingContext(sc, 2)
kafka_stream = KafkaUtils.createStream(ssc,"localhost:2181","raw-event-streaming-consumer",{"test":1})
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.pprint()
ssc.start()
ssc.awaitTermination()

以下是输出示例:

-------------------------------------------
Time: 2017-08-28 14:08:32
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:33
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:34
-------------------------------------------

注意:我的系统规格如下:

Ubuntu 16.04火花:火花-2.20-bin-hadoop2.7Jupyter笔记本(python 2.7(卡夫卡:Kafka_2.11-11.0.0

我的.bashrc中有以下行:

export PATH="/home/myubuntu/anaconda3/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/jars:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark/streaming:$PATH"

function snotebook () 
{
#Spark path (based on your computer)
SPARK_PATH=~/spark-2.0.0-bin-hadoop2.7
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
# For python 3 users, you have to add the line below or you will get an error 
#export PYSPARK_PYTHON=python3
#$SPARK_PATH/bin/pyspark --master local[2]
/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin/pyspark  --master local[2]
}

我发现了错误。对于spark-2.2.0-bin-hadoop2.7,我们需要使用以下罐子:

--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

最新更新