Flink 1.11 Flink KafkaConsumer无法传播水印,而Flink 1.12成功



我看到一些奇怪的行为。我用Flink 1.12编写了一些Flink处理器,并试图让它们在亚马逊电子病历上工作。然而,亚马逊电子病历目前只支持Flink 1.11.2。当我去降级时,我莫名其妙地发现水印不再传播了。

主题上只有一个分区,并行度设置为1。我这里缺什么了吗?我觉得我有点疯了。

以下是Flink 1.12的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] test message
Emitting watermark 9223372036768375807

这是Flink 1.11的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 9223372036768375807

以下是暴露它的集成测试:

package mytest;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.*;
public class FailTest {
public Properties getKafkaConsumerProperties() {
Properties result = new Properties();
result.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
result.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
result.put("group.id", "0");
result.put("enable.auto.commit", "true");
result.put("auto.commit.interval.ms", "1000");
result.put("session.timeout.ms", "30000");
return result;
}
public Properties getProducerProperties() {
// Use Kafka provided properties
Properties result = new Properties();
result.put("bootstrap.servers", "localhost:9092");
result.put("compression.type", "none");
return result;
}
public Properties getServerProperties(int port) {
// Use Kafka provided properties
Properties result = new Properties();
result.put("broker.id", "0");
result.put("num.network.threads", "3");
result.put("num.io.threads", "8");
result.put("socket.send.buffer.bytes", "102400");
result.put("socket.recv.buffer.bytes", "102400");
result.put("num.partitions", "1");
result.put("offset.topic.replication.factor", "1");
result.put("transaction.state.log.replication.factor", "1");
result.put("transaction.state.log.min.isr", "1");
result.put("log.retention.hours", "168");
result.put("log.segment.bytes", "1073741824");
result.put("log.retention.check.interval.ms", "300000");
result.put("zookeeper.connect", "localhost:" + port);
result.put("zookeeper.connection.timeout.ms", "18000");
result.put("group.initial.rebalance.delay.ms", "0");
String path = "target/kafka-logs/run.";
int index = 0;
while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
index += 1;
}
result.put("log.dirs", path + String.valueOf(index));
return result;
}
public void printTopics(AdminClient admin, String inputTopic) throws Exception {
Map<String, TopicDescription> topics = admin.describeTopics(Arrays.asList(inputTopic)).all().get();
for (Map.Entry<String, TopicDescription> topic : topics.entrySet()) {
System.out.printf("Topic:%s partitions=%dn", topic.getValue().name(), topic.getValue().partitions().size());
System.out.println(topic.getValue().toString());
}
}
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Test
public void testFail() throws Exception {
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EmbeddedZookeeper zooKeeper = new EmbeddedZookeeper();
KafkaServer server = TestUtils.createServer(new KafkaConfig(getServerProperties(zooKeeper.port())), new MockTime());
AdminClient admin = AdminClient.create(getProducerProperties());
String inputTopic = "input";
Map<String, String> configs = new HashMap<>();
int partitions = 1;
short replication = 1;
CreateTopicsResult result = admin.createTopics(Arrays.asList(
new NewTopic(inputTopic, partitions, replication).configs(configs)
));
result.all().get();
printTopics(admin, inputTopic);
// Some subscription events
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(getProducerProperties(), stringSerializer, stringSerializer);
producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
producer.flush();
producer.close();
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), getKafkaConsumerProperties());
source.setStartFromEarliest();
source.assignTimestampsAndWatermarks(
new WatermarkStrategy<String>() {
@Override
public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new TimestampAssigner<String>() {
@Override
public long extractTimestamp(String event, long recordTimestamp) {
System.out.printf("Assigning timestamp %dn", recordTimestamp);
return recordTimestamp;
}
};
}
@Override
public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<String>() {
public long latestWatermark = Long.MIN_VALUE;
@Override
public void onEvent(String event, long timestamp, WatermarkOutput output) {
long eventWatermark = timestamp - Time.days(1).toMilliseconds();
if (eventWatermark > latestWatermark) {
System.out.printf("Emitting watermark %dn", eventWatermark);
output.emitWatermark(new Watermark(eventWatermark));
latestWatermark = eventWatermark;
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
};
}
});
env.addSource(source)
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
System.out.printf("Source ");
if (ctx != null) {
TimerService srv = ctx.timerService();
Long timestampLong = ctx.timestamp();
long timestamp = 0;
if (timestampLong != null) {
timestamp = timestampLong;
}
long watermark = 0;
if (srv != null) {
watermark = srv.currentWatermark();
}
System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
}
System.out.println(value);
out.collect(value);
}
});
System.out.println(env.getExecutionPlan());
JobClient client = null;
try {
client = env.executeAsync("Fail Test");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
printTopics(admin, inputTopic);
TimeUnit.SECONDS.sleep(5);
client.cancel().get(5, TimeUnit.SECONDS);
try {
server.shutdown();
zooKeeper.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Flink 1.12将TimeCharacteristic默认为EventTime,并弃用整个TimeCharacteristics流。因此,要降级到Flink 1.11,您必须添加以下语句来配置StreamExecutionEnvironment。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

最新更新