Flink和Beam SDK如何处理窗口-哪种更有效



我将Apache Beam SDK与Flink SDK进行流处理比较,以确定使用Beam作为附加框架的成本/优势。

我有一个非常简单的设置,其中从Kafka源读取数据流,并由运行Flink的节点集群并行处理。

根据我对这些SDK如何工作的理解,逐窗口处理数据流的最简单方法是:

  1. 使用Apache Beam(在Flink上运行):

    1.1.创建一个Pipeline对象。

    1.2.创建一个Kafka记录的PCollection。

    1.3.应用开窗功能。

    1.4.通过窗口将管道转换为按键。

    1.5.按键(窗口)对记录进行分组。

    1.6.对窗口记录应用所需的任何功能。

  2. 使用Flink SDK

    2.1.从Kafka源创建数据流。

    2.2.通过提供键函数将其转换为键控流。

    2.3.应用开窗功能。

    2.4.对窗口记录应用所需的任何功能。

虽然Flink解决方案在编程上看起来更简洁,但根据我的经验,它在高数据量下的效率较低。我只能想象密钥提取函数会引入开销,因为Beam不需要此步骤。

我的问题是:我是在比较同类吗?这些过程是否不等效?什么可以解释Beam方式更有效,因为它使用Flink作为跑步者(所有其他条件都相同)?

这是使用Beam SDK 的代码

PipelineOptions options = PipelineOptionsFactory.create();
//Run with Flink
FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
flinkPipelineOptions.setRunner(FlinkRunner.class);
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);
// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
.withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
.withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
.updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));
//Apply Windowing Function    
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
//Transform the pipeline to key by window
PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
windowedKafkaCollection.apply(
ParDo.of(
new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
@ProcessElement
public void processElement(ProcessContext context, IntervalWindow window) {
context.output(KV.of(window, context.element()));
}
}));
//Group records by key (window)
PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
.apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());
//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
.apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
// Run the pipeline.
p.run().waitUntilFinish();

这是使用Flink SDK 的代码

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);
//Connect to Kafka
Properties properties = new Properties();   
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));
//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())
//Set the windowing function.
.timeWindow(Time.seconds(5L), Time.seconds(1L))
//Process Windowed Data
.process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));
// execute program
env.execute("Using Flink SDK");

非常感谢您的真知灼见。

编辑

我想我应该补充一些可能相关的指标。

网络接收字节

Flink SDK

  • 任务管理器。2
    • 2644786446
  • 任务经理。3
    • 2645765232
  • 任务管理器。1
    • 2827676598
  • 任务经理。6
    • 22422309148
  • 任务经理。4
    • 2428570491
  • 任务经理。5
    • 2431368644

  • 任务管理器。2
    • 4092154160
  • 任务经理。3
    • 4435132862
  • 任务管理器。1
    • 4766399314
  • 任务经理。6
    • 4425190393
  • 任务经理。4
    • 4096576110
  • 任务经理。5
    • 4092849114

CPU利用率(最大值)

Flink SDK

  • 任务管理器。2
    • 93.00%
  • 任务经理。3
    • 92.00%
  • 任务管理器。1
    • 91.00%
  • 任务经理。6
    • 90.00%
  • 任务经理。4
    • 90.00%
  • 任务经理。5
    • 92.00%

  • 任务管理器。2
    • 52.0%
  • 任务经理。3
    • 71.0%
  • 任务管理器。1
    • 72.0%
  • 任务经理。6
    • 40.0%
  • 任务经理。4
    • 56.0%
  • 任务经理。5
    • 26.0%

Beam似乎使用了更多的网络,而Flink使用的CPU明显更多。这是否表明Beam正在以更有效的方式并行处理?

编辑2号

我很确定PueCalculatorFn类是等效的,但我将在这里分享代码,看看这两个过程之间是否有任何明显的差异。

public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;
@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();
//Calculate Pue
IPueResult result = calculatePue(element.getKey(), records);
//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);
//Return Pue keyed by Window
c.output(KV.of(intervalWindowResult, result));
}
private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;
//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();
//Transform the results into a stream
Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);
//Iterate through each reading and add to the increment count
streamOfRecords
.map(record -> {
byte[] valueBytes = record.getKV().getValue();
assert valueBytes != null;
String valueString = new String(valueBytes);
assert !valueString.isEmpty();
return KV.of(record, valueString);
}).map(kv -> {
Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
return KV.of(kv.getKey(), consumption);
}).forEach(consumptionRecord -> {
switch (consumptionRecord.getKey().getTopic()) {
case REAL_ENERGY_TOPIC:
totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
realEnergyRecords.add(consumptionRecord.getValue());
break;
case IT_ENERGY_TOPIC:
totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
itEnergyRecords.add(consumptionRecord.getValue());
break;
}
}
);
assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;
//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
//Calculate PUE
pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}
//Create a PueResult object to return
IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
return new PueResult(intervalWindow, pue.stripTrailingZeros());
}
@Override
protected void finalize() throws Throwable {
super.finalize();
RecordSenderFactory.closeSender();
WindowSenderFactory.closeSender();
}
} 

Flink

public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;
@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
BigDecimal pue = calculatePue(iterable);
//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
formatter.format(windowEnd), realEnergyRecords
.stream()
.map(e -> (IKafkaConsumption) e)
.collect(Collectors.toList()), itEnergyRecords
.stream()
.map(e -> (IKafkaConsumption) e)
.collect(Collectors.toList()));

//Create PueResult object to return
IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());
//Collect result
collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));
}
protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;
//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();
//Iterate through each reading and add to the increment count
StreamSupport.stream(iterable.spliterator(), false)
.forEach(object -> {
switch (object.get("topic").textValue()) {
case REAL_ENERGY_TOPIC:
totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
break;
case IT_ENERGY_TOPIC:
totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
break;
}
});
assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;
//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
//Calculate PUE
pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}
return pue;
}
}

这是我在Beam示例中使用的自定义取消序列化程序。

卡夫卡消费沙漠主义者

public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {
public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
if(jsonElement == null) {
return null;
} else {
JsonObject jsonObject = jsonElement.getAsJsonObject();
JsonElement id = jsonObject.get("id");
JsonElement energyConsumed = jsonObject.get("energyConsumed");
Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
JsonElement topic = jsonObject.get("topic");
Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
}
}
}

不确定为什么您编写的Beam管道更快,但从语义上讲,它与Flink作业不同。与Flink中的窗口工作方式类似,一旦在Beam中分配了窗口,以下所有操作都会自动将窗口考虑在内你不需要按窗口分组

您的Beam管道定义可以简化如下:

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);
// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...
//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
.apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
// Run the pipeline.
p.run().waitUntilFinish();

至于性能,它取决于许多因素,但请记住,Beam是Flink之上的抽象层。一般来说,如果你看到Beam在Flink上的性能有所提高,我会感到惊讶。

edit:为了进一步澄清,您没有像在Flink片段中那样对Beam管道中的JSON"id"字段进行分组。

值得一提的是,如果窗口处理可以通过reduce()或aggregate()进行预聚合,那么本地Flink作业应该比当前执行得更好。

许多细节,如状态后端的选择、序列化、检查点等,也会对性能产生很大影响。

是否在两种情况下都使用相同的Flink?即,相同的版本,相同的配置?

相关内容

  • 没有找到相关文章

最新更新