如何使用JavaStream API来解决滑动窗口问题



自从Java 8以来,使用Stream API已经变得非常普遍。然而,使用基于批处理的算法可以容易地解决一些问题,而使用基于流的解决方案则不容易解决。

例如,给定一张信用卡按时间顺序的交易流,我想在每个24小时内找到该卡的总交易金额,这样我就可以将其与阈值进行比较,以猜测该卡是否被盗。流数据可以像一样简单

transaction time            amount
2019-01-23T10:12:31.484Z    100
2019-01-24T00:12:30.004Z    50
2019-01-24T09:00:00.000Z    23
2019-01-27T05:10:00.300Z    65

这可以看作是一个滑动窗口问题,需要检查元素之间的关系。基于批处理的解决方案不是很复杂。我可以使用队列来保持24小时内只发生事务。该算法可以大致描述为以下步骤:

  1. 创建一个队列,并将第一个事务放入队列;

  2. 将下一个事务与队列头的事务进行比较。

  3. 检查两个交易的交易时差

    如果时差小于24小时,

  4. 将事务添加到队列中并返回到步骤2。

    否则,如果超过24小时,则

    5.1.计算队列中交易的总交易金额,因为这些交易发生在24小时内

    5.2将结果放入结果列表中。

    5.3轮询队列以删除最旧的事务,直到队列中有事务的新事务发生不到24小时。

    5.4循环至步骤2。

但是,我发现使用JavaStream API很难实现上述算法,所以我想知道使用JavaStream来实现滑动窗口问题是个好主意吗?如果是的话,有人能用Java Stream来实现它吗?没有必要使用上述算法。任何基于流的算法都可以。

这是一个如何使用reduce实现的示例。创建了用于收集数据的类:

class Row {
private LocalDateTime date;
private Integer value;
public Row(LocalDateTime date, Integer value) {
this.date = date;
this.value = value;
}
// getters and setters

首先将示例数据读入流中。

Stream<Row> readData = Stream.of(
"2019-01-23T10:12:31    100",
"2019-01-24T00:12:30    50",
"2019-01-24T09:00:00    23",
"2019-01-25T03:00:00    23",
"2019-01-27T05:10:00    65")
.map(s -> s.split("\s+"))
.map(a -> new Row(LocalDateTime.parse(a[0], DateTimeFormatter.ISO_LOCAL_DATE_TIME), Integer.valueOf(a[1])));

reduce方法中,每隔24小时收集一次行到分隔列表中,并将其存储在主列表中。

List<List<Row>> all = new ArrayList<>();
all.add(readData
.map(Arrays::asList)
.reduce(new ArrayList<>(), (a, v) -> {
if (a.isEmpty()) {
a.addAll(v);
} else {
LocalDateTime first = a.get(0).getDate().plusHours(24);
if (first.isAfter(v.get(0).getDate())) {
a.addAll(v);
} else {
all.add(a);
LocalDateTime last = v.get(0).getDate().minusHours(24);
a = new ArrayList<>(a.stream()
.filter(r -> last.isBefore(r.getDate()))
.collect(Collectors.toList()));
a.addAll(v);
}
}
return a;
}));

最后,您可以打印出所有列表或计算每个期间的交易价值。

all.forEach(System.out::println);
all.stream().map(l -> l.stream()
.map(Row::getValue)
.reduce(Integer::sum)
.get()
)
.forEach(System.out::println);

更新

CCD_ 3也可以用简单的CCD_。然后主要部分将是这样的,并且将不那么复杂:

LinkedList<List<Row>> all = new LinkedList<>(Arrays.asList(new ArrayList<>()));
readData
.forEach(v -> {
if (!all.getLast().isEmpty()) {
// check if in 24h boundary
LocalDateTime upperValue = all.getLast().get(0).getDate().plusHours(24);
if (!upperValue.isAfter(v.getDate())) {
// create copy with row older earlier than 24h
LocalDateTime lowerValue = v.getDate().minusHours(24);
all.add(new ArrayList<>(all.getLast().stream()
.filter(r -> lowerValue.isBefore(r.getDate()))
.collect(Collectors.toList())));
}
}
all.getLast().add(v);
});

通过Streams实现这一点的"问题"在于,它们被设计为易于并行。仅在设计上,它们是而不是顺序的,因此纯顺序算法(如基于队列的算法(不能很好地与Streams配合使用。这是从命令式编程到函数式编程的飞跃,有时你需要一种新的算法,新的方法。将命令式代码转换为函数式代码并不总是那么简单。

如果您的数据源可以轻松地多次拆分(我认为,这相当于能够高效地同时提供所需的窗口(,那么您可以使用Streams来实现这一点。例如,如果数据源是List,则可以(重新(使用StreamEx的ofSublists()方法。这是因为创建任何子列表都是高效的,并且独立于其他子列表,所以可以安全地同时调用它。

或者,您可以假设您的Stream永远不会并行运行,并使用专注于纯序列的东西,例如jOOL:Seq.sliding()。这适用于任何Seq,因为实现可以使用迭代状态,而不必担心并行性。

最新更新