读取缓慢的更改查找和丰富流式处理输入集合的最佳方法是什么?



我使用的是Apache光束,具有1.5GB的流集合。 我的查找表是一个JDBCio mysql响应。

当我在没有侧输入的情况下运行管道时,我的作业将在大约 2 分钟内完成。当我使用侧面输入运行我的作业时,我的工作将永远不会完成,卡住并死亡。

这是我用来存储查找的代码(~1M 条记录(

PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://ip")
.withUsername("username")
.withPassword("password"))
.withQuery("select a_number from cell")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(1));
}
})).apply(View.asMap());

这是我的流媒体收藏的代码

pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60),  Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))

这是我的parDo代码,用于迭代每个事件行(10M条记录(

.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String,Integer> i = c.element();
String sideInputData = c.sideInput(sideData).get(i.getKey());
if (sideInputData == null) {
c.output(i);
} 
}
}).withSideInputs(sideData));

我使用的是 flink 集群,但使用直接运行器输出相同。

簇:

2 个中央处理器 6 核 24GB 内存

我做错了什么? 我已经关注了这个

解决方案是创建一个"缓存"MAP。

sideInput 只触发一次,然后我将其缓存到映射等效的 suctruture 中。

所以,我避免为每个进程元素做一个侧输入。

.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (isFirstTime) {
myList = c.sideInput(sideData);
}
isFirstTime = false;
boolean result = myList.containsKey(c.element().getKey());         
if (result == false) {
c.output(i);
} 
}
}).withSideInputs(sideData));

如果它运行的数据要少得多,我怀疑该程序正在耗尽 java 进程的所有内存。您可以通过JVisualVM或JConsole进行监控。有很多文章涵盖了这个问题,我只是通过快速的谷歌搜索偶然发现了这篇文章。

如果内存耗尽,您的 Java 进程主要忙于清理内存,并且您会看到性能大幅下降。在某些时候,Java放弃并失败了。

要解决此问题,增加 Java 堆大小应该就足够了。如何增加它取决于你执行它的方式和位置。查看 Java 的-Xmx参数或 beam 中的某个堆选项。

最新更新