闪烁某些分区上生成的丢失窗口



我正试图编写一个小的Flink数据流,以了解它的工作原理,但我面临着一种奇怪的情况,每次运行它时,我都会得到不一致的输出。有时我期待的一些记录会丢失。请记住,这只是我为学习DataStream API的概念而构建的一个玩具示例。

我有一个大约7600行的CSV格式数据集,如下所示:

Date,Country,City,Specie,count,min,max,median,variance
28/06/2021,GR,Athens,no2,116,0.5,58.9,5.5,2824.39
28/06/2021,GR,Athens,wind-speed,133,0.1,11.2,3,96.69
28/06/2021,GR,Athens,dew,24,14,20,18,35.92
28/06/2021,GR,Athens,temperature,141,24.4,38.4,30.5,123.18
28/06/2021,GR,Athens,pm25,116,34,85,68,702.29

此处为完整数据集:https://pastebin.com/rknnRnPc

没有特殊的字符或引号,所以简单的字符串分割会很好。

每个城市的日期范围为2021年6月28日至2021年10月3日。

我正在使用DataStream API:阅读它

final DataStream<String> source = env.readTextFile("data.csv");

每一行都映射到一个简单的POJO,如下所示:

public class CityMetric {
private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy");
private final LocalDate localDate;
private final String country;
private final String city;
private final String reading;
private final int count;
private final double min;
private final double max;
private final double median;
private final double variance;
private CityMetric(LocalDate localDate, String country, String city, String reading, int count, double min, double max, double median, double variance) {
this.localDate = localDate;
this.country = country;
this.city = city;
this.reading = reading;
this.count = count;
this.min = min;
this.max = max;
this.median = median;
this.variance = variance;
}
public static CityMetric fromArray(String[] arr) {
LocalDate date = LocalDate.parse(arr[0], dateFormatter);
int count = Integer.parseInt(arr[4]);
double min = Double.parseDouble(arr[5]);
double max = Double.parseDouble(arr[6]);
double median = Double.parseDouble(arr[7]);
double variance = Double.parseDouble(arr[8]);
return new CityMetric(date, arr[1], arr[2], arr[3], count, min, max, median, variance);
}
public long getTimestamp() {
return getLocalDate()
.atStartOfDay()
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
}
//getters follow

记录都是按日期顺序排列的,所以我有这个来设置事件时间和水印:

final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>forMonotonousTimestamps()  //we know they are sorted by time
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

我在Tuple4上有一个StreamingFileSink,用于输出日期范围、城市和平均值:

final StreamingFileSink<Tuple4<LocalDate, LocalDate, String, Double>> fileSink =
StreamingFileSink.forRowFormat(
new Path("airquality"),
new SimpleStringEncoder<Tuple4<LocalDate, LocalDate, String, Double>>("UTF-8"))
.build();

最后我得到的数据流如下:

source
.map(s -> s.split(",")) //split the CSV row into its fields
.filter(arr -> !arr[0].startsWith("Date")) // if it starts with Date it means it is the top header
.map(CityMetric::fromArray)  //create the object from the fields
.assignTimestampsAndWatermarks(cityMetricWatermarkStrategy) // we use the date as the event time
.filter(cm -> cm.getReading().equals("pm25")) // we want air quality of fine particulate matter pm2.5
.keyBy(CityMetric::getCity) // partition by city name
.window(TumblingEventTimeWindows.of(Time.days(7))) //windows of 7 days
.aggregate(new CityAverageAggregate()) // average the values
.name("cityair")
.addSink(fileSink); //output each partition to a file

CityAverageAggregate只是累加总和和计数,并跟踪它所覆盖范围的最早和最晚日期。

public class CityAverageAggregate
implements AggregateFunction<
CityMetric, CityAverageAggregate.AverageAccumulator, Tuple4<LocalDate, LocalDate, String, Double>> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(CityMetric cityMetric, AverageAccumulator averageAccumulator) {
return averageAccumulator.add(
cityMetric.getCity(), cityMetric.getLocalDate(), cityMetric.getMedian());
}
@Override
public Tuple4<LocalDate, LocalDate, String, Double> getResult(
AverageAccumulator averageAccumulator) {
return Tuple4.of(
averageAccumulator.getStart(),
averageAccumulator.getEnd(),
averageAccumulator.getCity(),
averageAccumulator.average());
}
@Override
public AverageAccumulator merge(AverageAccumulator acc1, AverageAccumulator acc2) {
return acc1.merge(acc2);
}
public static class AverageAccumulator {
private final String city;
private final LocalDate start;
private final LocalDate end;
private final long count;
private final double sum;
public AverageAccumulator() {
city = "";
count = 0;
sum = 0;
start = null;
end = null;
}
AverageAccumulator(String city, LocalDate start, LocalDate end, long count, double sum) {
this.city = city;
this.count = count;
this.sum = sum;
this.start = start;
this.end = end;
}
public AverageAccumulator add(String city, LocalDate eventDate, double value) {
//make sure our dataflow is correct and we are summing data from the same city
if (!this.city.equals("") && !this.city.equals(city)) {
throw new IllegalArgumentException(city + " does not match " + this.city);
}
return new AverageAccumulator(
city,
earliest(this.start, eventDate),
latest(this.end, eventDate),
this.count + 1,
this.sum + value);
}
public AverageAccumulator merge(AverageAccumulator that) {
LocalDate mergedStart = earliest(this.start, that.start);
LocalDate mergedEnd = latest(this.end, that.end);
return new AverageAccumulator(
this.city, mergedStart, mergedEnd, this.count + that.count, this.sum + that.sum);
}
private LocalDate earliest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isBefore(d2) ? d1 : d2;
}
}
private LocalDate latest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isAfter(d2) ? d1 : d2;
}
}
public double average() {
return sum / count;
}
public String getCity() {
return city;
}
public LocalDate getStart() {
return start;
}
public LocalDate getEnd() {
return end;
}
}
}

问题:

我面临的问题是,有时我没有得到我期望的所有窗户。这种情况并不总是发生,有时连续跑步会产生不同的结果,所以我怀疑某个地方存在某种比赛条件。

例如,在我有时得到的一个分区文件输出中:

(2021-07-12,2021-07-14,Belgrade,56.666666666666664)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

有时我会得到全套:

(2021-06-28,2021-06-30,Belgrade,48.666666666666664)
(2021-07-01,2021-07-07,Belgrade,41.142857142857146)
(2021-07-08,2021-07-14,Belgrade,52.857142857142854)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

我的数据流管道中有明显的错误吗?不明白为什么会发生这种事。这也不总是发生在同一个城市。

可能会发生什么?

更新

因此,当我禁用Watermarks时,问题似乎不再发生。我将水印策略更改为以下内容:

final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>noWatermarks()  
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

到目前为止,我得到了一致的结果。当我查看文件时,上面写着:

静态水印策略noWatermarks((

创建一个根本不生成水印的水印策略。这在进行纯处理基于时间的流处理的场景中可能很有用。

但我不是在处理基于时间的流处理,而是在处理事件时间。

为什么forMonotonousTimestamps()会有我看到的奇怪行为?事实上,我的时间戳是单调增加的(如果noWatermarks策略不起作用,它们就不会起作用(,但不知何故,改变这一点对我的场景来说并不奏效。

Flink的工作方式有什么我遗漏的吗?

Flink不支持按密钥水印。每个并行任务基于观察流经该任务的所有事件,独立生成水印。

因此,forMonotonousTimestamps水印策略不起作用的原因是输入实际上没有按时间戳排序。它在每个城市内按时间排序,但不是全局排序。这将导致一些记录延迟,但这是不可预测的,具体取决于水印的生成时间。这些延迟事件被应该包含它们的窗口忽略。

您可以通过多种方式解决此问题:

(1( 使用forBoundedOutOfOrderness水印策略,其持续时间足以说明数据集中的实际无序性。假设数据看起来像这样:

03/10/2021,GR,Athens,pressure,60,1017.9,1040.6,1020.9,542.4
28/06/2021,US,Atlanta,co,24,1.4,7.3,2.2,19.05

这将需要大约100天的故障持续时间。

(2( 将窗口配置为具有足够的允许延迟。这将导致一些窗口被多次触发——一次是当水印指示它们可以关闭时,另一次是每次向窗口添加延迟事件时。

(3( 使用noWatermarks策略。这将导致作业只有在到达其输入文件的末尾时才会产生结果。对于连续流作业,这是不可行的,但对于有限(有界(输入,这是可行的。

(4( 以RuntimeExecutionMode.BATCH模式运行作业。然后,在消耗了所有输入之后,作业将只在最后产生结果。这将使用为批处理工作负载设计的更优化的运行时来运行作业,但结果应该与(3(相同。

(5( 更改输入,使其不会出现故障。

相关内容

  • 没有找到相关文章

最新更新