edit2:最后,我使用Java制作了自己的生产者,并且效果很好,所以问题在于Kafka-Console-producer 。Kafka-Console-Consumer效果很好。
编辑:我已经尝试了版本0.9.0.1并且具有相同的行为。
我正在研究我的单身汉的最终项目,这是Spark Streaming和Flink之间的比较。在两个框架之前,我都使用Kafka和脚本来生成数据(下面说明(。我的第一个测试是将两个框架之间的延迟与简单的工作负载进行比较,而Kafka为我提供了很高的延迟(不断1秒(。为简单起见,目前我只在一台机器上跑步的卡夫卡和火花。
我已经在寻找并发现了类似的问题,并尝试了他们提供的解决方案,但没有改变。我还检查了官方文档中的所有KAFKA配置,并在配置文件中放置了延迟的重要信息,这是我的配置:
kafka 0.10.2.1 -Spark 2.1.0
server.properties:
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=2
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=1000
log.flush.interval.ms=50
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
flush.messages=100
flush.ms=10
Producer.properties:
compression.type=none
max.block.ms=200
linger.ms=50
batch.size=0
火花流程序:(它打印收到的数据,以及在创建数据时与何时为函数进行处理之间的差异(
package com.tfg.spark1.spark1;
import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka.*;
public final class Timestamp {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Timestamp <topics> <numThreads>");
System.exit(1);
}
SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
int numThreads = Integer.parseInt(args[1]);
topicMap.put(args[0], numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2>
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call (Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> newLine = lines.map(new Function<String, String>() {
private static final long serialVersionUID = 1L;
public String call(String line) {
String[] tuple = line.split(" ");
String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1]));
//String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime);
return totalTime;
}
});
lines.print();
newLine.print();
jssc.start();
jssc.awaitTermination();
}
}
生成的数据具有此格式:
"Random bits" + " " + "current time in ms"
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
最终,当我运行火花流程序和脚本生成器时,每200ms每200ms生成数据时,Spark(批次间隔= 100ms(会打印9个空批次,每秒(总是900ms时刻,如此示例:时间:时间:时间:时间:时间:时间:时间:时间:时间:时间:1496421619 900 ms(此结果:
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
1416
1006
599
1214
803
另外,如果我运行一个kafka命令行 - 生产商和另一个命令行 - 消费者,它总是需要一些时间才能在消费者中打印产生的数据。
事先感谢您的帮助!
我刚刚更新了您打开的jira,原因是您始终看到1000 ms延迟的原因。
https://issues.apache.org/jira/browse/kafka-5426
我在这里报告原因...
使用命令行上的--timeout
选项设置Linger.ms参数,如果未指定为1000 ms。同时,使用命令行上的--max-partition-memory-bytes
选项设置了batch.size参数,如果未指定为16384。这意味着,即使您指定了linger.ms and batch.size使用-producer-property或-producer.config,它们也将始终被上述"特定"选项覆盖。