我正在尝试使用Hazelcast的map-reduce特性来执行聚合操作,该操作需要访问共定位的条目。使用数据关联控制共定位。
想象一下Hazelcast文档中关于数据关联的经典customer/order模型。在我的示例中,我想返回一个客户摘要,其中包含客户及其所有订单的总和,例如,给定此数据集:
customer_id | name
------------------
1 | Dave
2 | Kate
order_id | customer_id | value
------------------------------
1 | 1 | 5
2 | 1 | 10
3 | 2 | 12
我想返回:
customer_id | name | value
--------------------------
1 | Dave | 15
2 | Kate | 12
这很简单,但是使用数据关联的原因是能够在保存数据的各个分区内执行求和逻辑,只需获取该分区内的所有顺序,从而避免任何跨JVM通信。
所以我的问题是,从Mapper或类似的内部,你如何在另一个缓存中获得共同定位的条目?
编辑:在@noctarius的回答和评论之后,这里有一些代码(我已经试图使它尽可能简短),突出了我只想要当前分区的顺序。
order key类如下所示:
public class OrderKey implements PartitionAware<CustomerIdentity>
{
...
@Override
public CustomerIdentity getPartitionKey()
{
return this.customerIdentity;
}
...
}
和Mapper
这样:
public class OrderSumMapper implements Mapper<CustomerKey, Customer, CustomerKey, CustomerOrderTotal>, HazelcastInstanceAware
{
...
@Override
public void map(CustomerKey customerKey, Customer customer, Context<CustomerKey, CustomerOrderTotal> context)
{
Predicate ordersForCustomer = new OrdersForCustomerPredicate(customerKey);
int totalValue = 0;
//******************************************************************
//
// Given orders are co-located with the customer, how do you ensure
// this call to get the orders only runs in the current partition?
//
//******************************************************************
for (Order order : hazelcastInstance.getMap("orders").values(ordersForCustomer))
{
totalValue += order.getValue();
}
context.emit(customerKey, new CustomerOrderTotal(customer, total));
}
...
}
突出显示的调用hazelcastInstance.getMap("orders").values(ordersForCustomer)
通常会访问集群中的所有节点,但由于数据位于同一位置,这是不必要的开销。
那么回到我最初的问题,你如何得到只返回当前分区中的顺序?
只需将当前节点的HazelcastInstance注入到Mapper中,并检索第二个数据结构来读取数据。
看一个基本的例子:https://github.com/noctarius/hazelcast-mapreduce-presentation/blob/master/src/main/java/com/hazelcast/examples/tutorials/impl/SalaryMapper.java
我已经算出来了,希望这对其他人有用(所以我无耻地回答并接受了我自己的问题)。
经过一些实验,可以从在该分区中运行的Mapper
获取分区内另一个映射中保存的对象。
第一件事是让Mapper
实现NodeAware
,这导致Hazelcast注入对Mapper
正在运行的Node
的引用。
一旦你有了Node
,你就可以像这样写一个方法来访问一个给定分区内的其他映射中的数据,像这样:
private Collection<Order> getCustomerOrders(CustomerKey customerKey)
{
List<Order> orders = new ArrayList<>();
MapService mapService = node.getClusterService().getNodeEngine().getService(MapService.SERVICE_NAME);
RecordStore recordStore = mapService.getRecordStore(node.getPartitionService().getPartitionId(customerKey), "orders");
for (Data key : recordStore.keySet())
{
OrderKey orderKey = mapService.getSerializationService().toObject(key);
if (customerKey.equals(orderKey.getCustomerKey()))
{
orders.add(mapService.getSerializationService().toObject(recordStore.get(key)));
}
}
return orders;
}
有一点反序列化开销,但使用Predicate
和以这种方式工作的情况下,所有由Mapper
在包含被映射数据的JVM中执行的处理,因此避免了任何昂贵的进程/网络跳转-基本上它应该更快,并且肯定会减少由节点间通信引起的网络流量。