Apache Flink:如何并行执行但保持消息顺序?



我有几个关于flink并行性的问题。这是我的设置:

我有 1 个主节点和 2 个从节点。在 flink 中,我创建了 3 个 kafka 消费者,每个消费者都从不同的主题消费。
由于元素的顺序对我来说很重要,因此每个主题只有一个分区,我有 flink 设置来使用事件时间。

然后,我在每个数据流上运行以下管道(伪代码):

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)

到目前为止,我用参数启动我的 flink 程序-p 2假设这将允许我使用我的两个节点。结果不是我所希望的,因为我的输出顺序有时会乱七八糟。

在阅读了 flink 文档并试图更好地理解它之后,有人可以确认我的以下"学习"吗?

1.) 传递-p 2仅配置任务并行性,即任务(例如map(deserialize))将被拆分为的最大并行实例数。如果我想在整个管道中保持订单,我必须使用-p 1.

2.)这对我来说似乎是矛盾/令人困惑的:即使并行度设置为1,不同的任务仍然可以并行运行(同时)。因此,如果我通过-p 1,我的 3 条管道也将并行运行。

作为后续问题:有没有办法弄清楚哪些任务映射到哪个任务槽,以便我可以自己确认并行执行?

我将不胜感激任何意见!

更新

这是 flink 对-p 2的执行计划。

在Apache Flink用户电子邮件列表中提出了这个问题之后,答案如下:

1.)-p选项定义每个作业的任务并行度。如果选择的并行度高于 1 并且数据被重新分发(例如,通过 rebalance() 或 keyBy()),则无法保证顺序。

2.)-p设置为 1 时,仅使用 1 个任务插槽,即 1 个 CPU 内核。因此,可能有多个线程同时在一个内核上运行,但不并行运行。

至于我的要求:为了并行运行多个管道并仍然保持顺序,我可以只运行多个 Flink 作业,而不是在同一个 Flink 作业中运行所有管道。

我会尝试用我所知道的来回答。

1) 是的,对于 CLI 客户端,可以使用 -p 指定并行度参数。您说这是并行实例的最大数量是正确的。但是,我没有看到并行性和顺序之间的联系?据我所知,订单由 Flink 管理,使用事件中提供的时间戳或他自己的摄取时间戳。如果您想在不同的数据源中保持顺序,这对我来说似乎很复杂,或者您可以将这些不同的数据源合并为一个。

2) 如果将并行度设置为 3,则 3 个管道可以并行运行。我认为这里的并行性意味着在不同的插槽上。

跟进问题)您可以在 http://localhost:8081 时检查哪些任务映射到 JobManager 的 Web 前端上的哪个任务槽。

请在下面找到使用侧输出和插槽组在本地缩放的示例。

package org.example
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* This example shows an implementation of WordCount with data from a text socket.
* To run the example make sure that the service providing the text data is already up and running.
*
* To start an example socket text stream on your local machine run netcat from a command line,
* where the parameter specifies the port number:
*
* {{{
*   nc -lk 9999
* }}}
*
* Usage:
* {{{
*   SocketTextStreamWordCount <hostname> <port> <output path>
* }}}
*
* This example shows how to:
*
*   - use StreamExecutionEnvironment.socketTextStream
*   - write a simple Flink Streaming program in scala.
*   - write and use user-defined functions.
*/
object SocketTextStreamWordCount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("USAGE:nSocketTextStreamWordCount <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val outputTag1 = OutputTag[String]("side-1")
val outputTag2 = OutputTag[String]("side-2")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
//Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")
val counts = text.flatMap {
_.toLowerCase.split("\W+") filter {
_.nonEmpty
}
}
.process(new ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
else ctx.output(outputTag2, String.valueOf(value))
}
})
val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)
val output1 = sideOutputStream1.map {
(_, 1)
}.slotSharingGroup("map1")
.keyBy(0)
.sum(1)
val output2 = sideOutputStream2.map {
(_, 1)
}.slotSharingGroup("map2")
.keyBy(0)
.sum(1)
output1.print()
output2.print()
env.execute("Scala SocketTextStreamWordCount Example")
}
}

相关内容

  • 没有找到相关文章

最新更新