使用Samza使用远程Kafka主题



我正在尝试将hello-samza教程修改为:

(1) 从远程代理(即非本地主机)上的 kafka 主题读取 (2) 将消息写入文件

我修改了WikipediaFeedStreamTask.java如下所示:

public class WikipediaFeedStreamTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-ra
w");
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoo
rdinator coordinator) {
//System.out.println("Message Received!");
//System.out.println(envelope.getMessage());
try{
PrintWriter writer = new PrintWriter("test.txt", "UTF-8");
writer.println(envelope.getMessage());
writer.println("The second line");
writer.close();}
catch(IOException e)
{}
Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envel
ope.getMessage());
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
}
}

这只是标准文件,添加了将消息写入文件的功能。

我修改了属性文件

,如下所示:
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-feed
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Wikipedia System
systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
systems.wikipedia.host=irc.wikimedia.org
systems.wikipedia.port=6667
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=REMOTE-ZOOKEEPER-IP:2181/
systems.kafka.producer.bootstrap.servers=REMOTE-BROKER-IP:9092
# Job Coordinator
job.coordinator.system=kafka

当我运行作业时(就像这样),我在test.txt中看到来自维基百科流的数据。我的假设显然是错误的,即简单地修改 .properties 文件中的 kafka 消费者值将强制 samza 从该代理读取。那么我需要改变什么呢?

在哪里指定 samza 应该侦听的主题名称?

我看到您已经修改了kafka系统的连接字符串。但是,您的StreamTask的输入仍然引用维基百科中的流:task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews

您应该将task.inputs的值更改为"kafka.$yourInputStreamName"。请试一试。我认为这应该可以解决您的问题。

最新更新