pyspark streaming DStreams to kafka topic



这么简单,是否可以将DStream流式传输到Kafka主题?

我有 Spark 流作业,它执行所有数据处理,现在我想将数据推送到 Kafka 主题。在 pyspark 中可以这样做吗?

最好在写入Kafka之前转换为json,否则指定要写入Kafka的键和值列。

query = jdf.selectExpr("to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("zookeeper.connect", "localhost:2181")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-spark")
.option("checkpointLocation", "/root/")
.outputMode("append")
.start()

如果你的消息是 AVRO 格式 ,我们可以 serazlie 消息并直接用 kafka 编写。

from pyspark import SparkConf, SparkContext
from kafka import KafkaProducer
from kafka.errors import KafkaError
from pyspark.sql import SQLContext, SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
import avro.schema
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
import pandas as pd

ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(handler)
def handler(message):
records = message.collect()
for record in records:
<Data processing whatever you want and creating the var_val_value,var_val_key pair >

var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url} 
avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
avroProducer.flush()

最新更新