我在这里设置了一个最小的例子,其中我有来自N Kakfa主题的N个流(在下面的示例中为100(。
我想在看到"EndofStream"消息时完成每个流。 当所有流都完成后,我希望 Flink 程序能够优雅地完成.
当并行度设置为 1 时,这是正确的,但通常不会发生。
从另一个问题来看,似乎并不是卡夫卡消费群体的所有线程都结束了。
其他人则建议抛出一个例外。但是,程序将在第一个异常时终止,并且不会等待所有流完成。
我还添加了一个最小的 python 程序,用于向 kafka 主题添加消息以实现可重现性。请填写每个程序的<IP>:<PORT>
。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";
Properties kafkaProps = null;
kafkaProps = new Properties();
String brokers = "<IP>:<PORT>";
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("auto.offset.reset", "earliest");
ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();
for (int i = 0; i < 100; i++) {
consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
if (nextElement.contains("EndofStream")) {
// throw new RuntimeException("End of Stream");
return true;
} else {
return false;
}
}
}
, kafkaProps));
consumersList.get(i).setStartFromEarliest();
streamList.add(env.addSource(consumersList.get(i)));
streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
}
// execute program
env.execute("Flink Streaming Java API Skeleton");
蟒蛇 3 程序
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')
for i in range(100): # Channel Number
for j in range(100): # Message Number
message = "Message: " + str(j) + " going on channel: " + str(i)
producer.send(str(i), str.encode(message))
message = "EndofStream on channel: " + str(i)
producer.send(str(i), str.encode(message))
producer.flush()
改变这一行:streamList.add(env.addSource(consumersList.get(i)));
到streamList.add(env.addSource(consumersList.get(i)).setParallelism(1));
也可以完成这项工作,但随后 Flink 将所有消费者放在同一台物理机器上。
我也希望分发消费者。
flink-conf.yaml
parallelism.default: 2
cluster.evenly-spread-out-slots: true
最后的手段是将每个主题写入单独的文件中,并使用文件作为源而不是kafka consumer.
最终目标是测试flink处理某些程序的某些工作负载需要多少时间。
使用 FlinkKafkaConsumerBase 中的cancel
方法,该方法是FlinkKafkaConsumer
的父类。
公共无效取消(( 从接口复制的描述:源函数 取消源。大多数源在 SourceFunction.run(SourceContext( 方法。实现需要 确保在此方法之后源将脱离该循环 被称为。典型的模式是具有"易失性布尔值" 在此方法中设置为 false 的 isRunning" 标志。那面旗帜是 在循环条件下已检查。
当源被取消时,执行线程也将 中断(通过 Thread.interrupt(((。发生中断 严格在调用此方法之后,因此任何中断 处理程序可以依赖于此方法已完成的事实。是的 使此方法更改的任何标志"易失性"的良好做法,在 为了保证此方法的效果对任何可见 中断处理程序。
指定者:在接口中取消 源函数
你是对的。有必要使用SimpleStringSchema
.这是基于这个答案 https://stackoverflow.com/a/44247452/2096986。看看这个例子。首先,我发送字符串Flink code we saw also works in a cluster
,Kafka 消费者使用消息。然后我发送SHUTDOWNDDDDDDD
,它也没有效果来完成流。最后,我发送了SHUTDOWN
,流作业完成了。请参阅程序下方的日志。
package org.sense.flink.examples.stream.kafka;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class KafkaConsumerQuery {
public KafkaConsumerQuery() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer(java.util.regex.Pattern.compile("test"),
new MySimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
stream.print();
System.out.println("Execution plan >>>n" + env.getExecutionPlan());
env.execute(KafkaConsumerQuery.class.getSimpleName());
}
private static class MySimpleStringSchema extends SimpleStringSchema {
private static final long serialVersionUID = 1L;
private final String SHUTDOWN = "SHUTDOWN";
@Override
public String deserialize(byte[] message) {
return super.deserialize(message);
}
@Override
public boolean isEndOfStream(String nextElement) {
if (SHUTDOWN.equalsIgnoreCase(nextElement)) {
return true;
}
return super.isEndOfStream(nextElement);
}
}
public static void main(String[] args) throws Exception {
new KafkaConsumerQuery();
}
}
原木:
2020-07-02 16:39:59,025 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-8, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
3> Flink code we saw also works in a cluster. To run this code in a cluster
3> SHUTDOWNDDDDDDD
2020-07-02 16:40:27,973 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.
2020-07-02 16:40:27,973 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e).
2020-07-02 16:40:27,974 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) [FINISHED]
2020-07-02 16:40:27,975 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (3/4) 5f47c2b3f55c5eb558484d49fb1fcf0e.
2020-07-02 16:40:27,979 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.