自从Java 8以来,使用Stream API已经变得非常普遍。然而,使用基于批处理的算法可以容易地解决一些问题,而使用基于流的解决方案则不容易解决。
例如,给定一张信用卡按时间顺序的交易流,我想在每个24小时内找到该卡的总交易金额,这样我就可以将其与阈值进行比较,以猜测该卡是否被盗。流数据可以像一样简单
transaction time amount
2019-01-23T10:12:31.484Z 100
2019-01-24T00:12:30.004Z 50
2019-01-24T09:00:00.000Z 23
2019-01-27T05:10:00.300Z 65
这可以看作是一个滑动窗口问题,需要检查元素之间的关系。基于批处理的解决方案不是很复杂。我可以使用队列来保持24小时内只发生事务。该算法可以大致描述为以下步骤:
创建一个队列,并将第一个事务放入队列;
将下一个事务与队列头的事务进行比较。
检查两个交易的交易时差
如果时差小于24小时,
将事务添加到队列中并返回到步骤2。
否则,如果超过24小时,则
5.1.计算队列中交易的总交易金额,因为这些交易发生在24小时内
5.2将结果放入结果列表中。
5.3轮询队列以删除最旧的事务,直到队列中有事务的新事务发生不到24小时。
5.4循环至步骤2。
但是,我发现使用JavaStream API很难实现上述算法,所以我想知道使用JavaStream来实现滑动窗口问题是个好主意吗?如果是的话,有人能用Java Stream来实现它吗?没有必要使用上述算法。任何基于流的算法都可以。
这是一个如何使用reduce
实现的示例。创建了用于收集数据的类:
class Row {
private LocalDateTime date;
private Integer value;
public Row(LocalDateTime date, Integer value) {
this.date = date;
this.value = value;
}
// getters and setters
首先将示例数据读入流中。
Stream<Row> readData = Stream.of(
"2019-01-23T10:12:31 100",
"2019-01-24T00:12:30 50",
"2019-01-24T09:00:00 23",
"2019-01-25T03:00:00 23",
"2019-01-27T05:10:00 65")
.map(s -> s.split("\s+"))
.map(a -> new Row(LocalDateTime.parse(a[0], DateTimeFormatter.ISO_LOCAL_DATE_TIME), Integer.valueOf(a[1])));
在reduce
方法中,每隔24小时收集一次行到分隔列表中,并将其存储在主列表中。
List<List<Row>> all = new ArrayList<>();
all.add(readData
.map(Arrays::asList)
.reduce(new ArrayList<>(), (a, v) -> {
if (a.isEmpty()) {
a.addAll(v);
} else {
LocalDateTime first = a.get(0).getDate().plusHours(24);
if (first.isAfter(v.get(0).getDate())) {
a.addAll(v);
} else {
all.add(a);
LocalDateTime last = v.get(0).getDate().minusHours(24);
a = new ArrayList<>(a.stream()
.filter(r -> last.isBefore(r.getDate()))
.collect(Collectors.toList()));
a.addAll(v);
}
}
return a;
}));
最后,您可以打印出所有列表或计算每个期间的交易价值。
all.forEach(System.out::println);
all.stream().map(l -> l.stream()
.map(Row::getValue)
.reduce(Integer::sum)
.get()
)
.forEach(System.out::println);
更新
CCD_ 3也可以用简单的CCD_。然后主要部分将是这样的,并且将不那么复杂:
LinkedList<List<Row>> all = new LinkedList<>(Arrays.asList(new ArrayList<>()));
readData
.forEach(v -> {
if (!all.getLast().isEmpty()) {
// check if in 24h boundary
LocalDateTime upperValue = all.getLast().get(0).getDate().plusHours(24);
if (!upperValue.isAfter(v.getDate())) {
// create copy with row older earlier than 24h
LocalDateTime lowerValue = v.getDate().minusHours(24);
all.add(new ArrayList<>(all.getLast().stream()
.filter(r -> lowerValue.isBefore(r.getDate()))
.collect(Collectors.toList())));
}
}
all.getLast().add(v);
});
通过Streams实现这一点的"问题"在于,它们被设计为易于并行。仅在设计上,它们是而不是顺序的,因此纯顺序算法(如基于队列的算法(不能很好地与Streams配合使用。这是从命令式编程到函数式编程的飞跃,有时你需要一种新的算法,新的方法。将命令式代码转换为函数式代码并不总是那么简单。
如果您的数据源可以轻松地多次拆分(我认为,这相当于能够高效地同时提供所需的窗口(,那么您可以使用Streams来实现这一点。例如,如果数据源是List
,则可以(重新(使用StreamEx的ofSublists()
方法。这是因为创建任何子列表都是高效的,并且独立于其他子列表,所以可以安全地同时调用它。
或者,您可以假设您的Stream永远不会并行运行,并使用专注于纯序列的东西,例如jOOL:Seq.sliding()
。这适用于任何Seq,因为实现可以使用迭代状态,而不必担心并行性。