我想用数据流api替换数据集api



我的flink版本是1.13。

dataSet.first(limit).print();

如何用数据流api替换它?

如果您不能使用Flink SQL,那么您可以编写自己的limit运算符,类似于(警告,未测试!(:

public class LimitFilter<T> extends RichFilterFunction<T> {
private int _limit;

private transient int _remainingRecords;

public LimitFilter(int limit) {
_limit = limit;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

// Calc number of records to return from this subtask
int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
int remainingItems = _limit;
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
for (int i = 0; i < parallelism; i++) {
int remainingGroups = parallelism - i;
int itemsInGroup = remainingItems / remainingGroups;
if (i == mySubtask) {
_remainingRecords = itemsInGroup;
break;
}

remainingItems -= itemsInGroup;
}
}

@Override
public boolean filter(T value) throws Exception {
if (_remainingRecords <= 0) {
return false;
}

_remainingRecords--;
return true;
}
}

这将仅在并行性>1,如果您在运算符之间有一些合理的随机数据分布,例如在.filter(new LimitFilter(limit))运算符之前使用rebalance()

在数据集的情况下,数据是有限的,而对于数据流,元素的数量可以是无限的。我想你已经知道了,所以当你读取连续到达的数据时,.first(n)元素的概念是不一样的,有时是无序的。

相关内容

  • 没有找到相关文章

最新更新