计划任务与Apache Flink



我有一个并行度为5的flink作业(目前!!)。其中一个richFlatMap流在open(Configuration parameters)方法中打开一个文件。在flatMap操作中没有任何打开动作,它只是读取文件来搜索一些东西。(有一个实用程序类,它有一个像utilityClass.searchText('abc')这样的方法)。下面是样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}

python脚本每天在特定时间更新此文件。因此,我还应该在flatMap流中打开新创建的文件(通过python脚本)。

我只是认为这可以通过ScheduledExecutorService只有一个线程池来完成。

我不能每次flatMap调用都打开这个文件,因为它太大了。

这是我想写的样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private MyUtilityFile myFile;
@Override
public void run() {
myFile.Open("fileLocation");     
}
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}

这个样板适合Flink环境吗?如果没有,如何按预定的方式打开文件?(没有选项,如"更新文件后发送事件与kafka和读取事件的flink")

也许你可以直接实现ProcessingTimeCallback接口,它支持定时器操作

public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback { 
private MyUtilityFile myFile;

@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
myFile.Open("fileLocation");
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
}

最新更新