无法在Apache Flink中的自定义源代码函数中休眠,该函数与其他源代码联合



我有两个源,一个是Kafka源,另一个是自定义源,我需要制作一个睡眠自定义源一个小时,但我受到了干扰。

java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.hulu.hiveIngestion.HiveAddPartitionThread.run(HiveAddPartitionThread.java:48)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

代码:

<kafka_Source>.union(<custom_source>)
public class custom_source implements SourceFunction<String> {
public void run(SourceContext<String> ctx)  {
while(true)
{
Thread.sleep(1000);
ctx.collect("string");
}
}
}

如何使sleep自定义源代码,而Kafka源代码将继续其流。为什么我得到线程中断异常?

这更像是一个Java问题,而不是Flink问题。简而言之,你永远不能依靠Thread.sleep(x(来睡眠x毫秒。正确支持中断也很重要,否则你就无法正常关闭你的工作。

public class custom_source implements SourceFunction<String> {
private static final Duration SLEEP_DURATION = Duration.ofHours(1);
private volatile boolean isCanceled = false;
public void run(SourceContext<String> ctx) {
while (!isCanceled) {
// 1 hour wait time
LocalTime end = LocalTime.now().plusHours(1);
// this loop ensures that random interruption is not prematurely closing the source
while (LocalTime.now().compareTo(end) < 0) {
try {
Thread.sleep(Duration.between(LocalTime.now(), end).toMillis());
} catch (InterruptedException e) {
// swallow interruption unless source is canceled
if (isCanceled) {
Thread.interrupted();
return;
}
}
}
ctx.collect("string");
}
}
@Override
public void cancel() {
isCanceled = true;
}
}

最新更新