我的flink版本是1.13。
dataSet.first(limit).print();
如何用数据流api替换它?
如果您不能使用Flink SQL,那么您可以编写自己的limit
运算符,类似于(警告,未测试!(:
public class LimitFilter<T> extends RichFilterFunction<T> {
private int _limit;
private transient int _remainingRecords;
public LimitFilter(int limit) {
_limit = limit;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Calc number of records to return from this subtask
int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
int remainingItems = _limit;
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
for (int i = 0; i < parallelism; i++) {
int remainingGroups = parallelism - i;
int itemsInGroup = remainingItems / remainingGroups;
if (i == mySubtask) {
_remainingRecords = itemsInGroup;
break;
}
remainingItems -= itemsInGroup;
}
}
@Override
public boolean filter(T value) throws Exception {
if (_remainingRecords <= 0) {
return false;
}
_remainingRecords--;
return true;
}
}
这将仅在并行性>1,如果您在运算符之间有一些合理的随机数据分布,例如在.filter(new LimitFilter(limit))
运算符之前使用rebalance()
。
在数据集的情况下,数据是有限的,而对于数据流,元素的数量可以是无限的。我想你已经知道了,所以当你读取连续到达的数据时,.first(n)
元素的概念是不一样的,有时是无序的。