我试图强制Apache Ignite 1.5.0.final使用单个节点上所有可用的CPU功率来并行处理本地缓存数据,但是我可以清楚地看到它没有使用所有可用的内核。
缓存的创建方式如下:
CacheConfiguration<Integer, MyObject> cfg = new CacheConfiguration<Integer, MyObject>();
cfg.setSwapEnabled(false);
cfg.setName(CACHE_NAME);
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
cfg.setBackups(0);
cfg.setCopyOnRead(false);
this.cache = ignite.getOrCreateCache(CACHE_NAME);
CPU 使用率看起来只有一个线程在执行工作。当我将实现切换到 ArrayList 时 - 不使用 Ignite,CPU 使用率达到 400%。
这段代码用于过滤缓存:
IgniteCache<Integer, MyObject> cache = ignite.getOrCreateCache(CACHE_NAME);
Spliterator<Entry<Integer, MyObject>> split = cache.localEntries().spliterator();
Stream<MyObject> stream = StreamSupport.stream(split, true).map( entry -> entry.getValue());
aggregate.setCount(stream.filter(new SomePredicate()).count());
在使用 Ignite 运行时做了一些分析,并注意到一次只有一个 Runnable 线程,而使用 ArrayList 时有 3 或 4 个线程完成工作。
非常感谢帮助,
巴特
> localEntries
返回内部 Ignite 数据结构的迭代器,因此我认为在没有额外支持的情况下无法使用 Java 流 API 对其进行拆分。但是,您可以使用允许循环访问特定分区的ScanQuery
并行化任务。获取本地分区号的列表,为每个分区创建单独的查询,并在单独的线程中执行查询。代码将如下所示:
// Get list of local partitions.
int[] partitions = ignite.affinity(CACHE_NAME).allPartitions(ignite.cluster().localNode());
// For each partition submit a task to a thread pool.
for (final int partition : partitions) {
executor.submit(new Runnable() {
@Override public void run() {
ScanQuery<Integer, MyObject> query = new ScanQuery<>();
query.setPartition(partition);
query.setLocal(true);
QueryCursor<Cache.Entry<Integer, MyObject>> cursor = cache.query(query);
for (Cache.Entry<Integer, MyObject> entry : cursor) {
// ...
}
}
});
}