我使用以下python生产者向我的kafka主题发布一些消息(我也可以使用jupyter中的python消费者完美地接收我发布的数据(。
from kafka import KafkaProducer
import json,time
userdata={
"ipaddress": "172.16.0.57",
"logtype": "",
"mid": "",
"name":"TJ"
}
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
print("adding",i)
producer.send('test', userdata)
time.sleep(3)
但是,当我尝试在spark中运行kafkastrewing示例时,我什么都没有得到(我应该注意,spark在我的工作站中是可操作的,因为我可以在没有任何问题的情况下运行网络流示例(:
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
import sys
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.10:2.0.2 pyspark-shell'
sc = SparkContext("local[2]", "KafkaSTREAMWordCount")
ssc = StreamingContext(sc, 2)
kafka_stream = KafkaUtils.createStream(ssc,"localhost:2181","raw-event-streaming-consumer",{"test":1})
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.pprint()
ssc.start()
ssc.awaitTermination()
以下是输出示例:
-------------------------------------------
Time: 2017-08-28 14:08:32
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:33
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:34
-------------------------------------------
注意:我的系统规格如下:
Ubuntu 16.04火花:火花-2.20-bin-hadoop2.7Jupyter笔记本(python 2.7(卡夫卡:Kafka_2.11-11.0.0
我的.bashrc中有以下行:
export PATH="/home/myubuntu/anaconda3/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/jars:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark/streaming:$PATH"
function snotebook ()
{
#Spark path (based on your computer)
SPARK_PATH=~/spark-2.0.0-bin-hadoop2.7
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
# For python 3 users, you have to add the line below or you will get an error
#export PYSPARK_PYTHON=python3
#$SPARK_PATH/bin/pyspark --master local[2]
/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin/pyspark --master local[2]
}
我发现了错误。对于spark-2.2.0-bin-hadoop2.7,我们需要使用以下罐子:
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0