GridGain MapReduce 函数 G.grid().reduce() 中所需的清晰度



我使用了这个例子,它存在于大多数网站中。

int letterCnt = G.grid().reduce(
GridClosureCallMode.SPREAD,
F.yield("Counting Letters In This Phrase".split(" "),
    new C1<String, Integer>() {
        @Override public Integer apply(String word) {
            return word.length();
        }
    }
),
F.sumIntReducer()
);

第一个参数表示工作负载的平等分配(我假设它更像是循环基础)第二个参数包含将在所有发现的节点中执行的代码第三个从在不同节点和进程中执行的 apply() 接收所有结果数据。

我想知道是否有可能用我们自己的函数替换第三个参数 F.sumIntReducer()。如果是,我想看看一个例子。假设创建具有与 F.sumIntReducer() 相同功能的相同函数(即对不同节点找到的所有长度求和)。

是的,您可以定义自己的自定义化简器。示例在这里。

reduce 方法具有全局输入和输出数据类型。其示意图结构如下:

1   resultValue = grid.reduce(
2   job distribution mode,
3   executable logic function,
4   split function,
5   collect function);
  1. 任务的结果值直接分配给归约函数的返回值。网格是作业发送到的相应网格。可以在此处定义多个网格并并行存在。
  2. 此处定义了作业分配设置。作业的参数分发提供三个选项:-GridClosureCallMode.BALANCE – 平衡(循环?)作业分配-GridClosureCallMode.BROADCAST – 所有节点处理所有作业-GridClosureCallMode.SPREAD – 所有作业都是随机分配的
  3. 原子逻辑函数在这里定义。这部分称为工作;它被发送到一个节点,处理并包含一部分全局结果。它定义了输入和输出数据类型,它们是支持。GridGain还支持分配所有必要的此函数的库开箱即用。这意味着主节点不限于使用库,所有节点在本地都有库可用,因为所有必需的库都随作业一起提供。这当然会产生更多的数据流量。
  4. 输入数据需要分发到节点。每个函数与拆分函数中的数据集之一一起提供。这数据的细分存储在数组列表中,其中对应的数据类型。仅支持低级数据类型,由于实现结果(根据网格增益,也很高级别数据应可转移)。要传输更复杂的数据,例如PDF和图像,必须完成对字节数组的封装。
  5. 主节点使用此函数接收生成的零件并将它们重新组装成最终结果。

简单的例子:(不充分利用网格,因为只有内存操作而不是CPU密集型,所以不要指望对单次执行的改进)

private static int countLettersReducer(String phrase) throws GridException {
        // final GridLogger log = grid.log();
        int letterCount = 0;
        // Execute Hello World task.
        try {
            @SuppressWarnings("unchecked")
            int letterCnt =
            grid.reduce(GridClosureCallMode.BALANCE,
            // input: string
            // output: integer
                    new GridClosure<String, Integer>() {
                        private static final long serialVersionUID = 1L;
                        // Create executable logic block for a job part
                        @Override
                        public Integer apply(String word) {
                            // Print out a given word, just so we can
                            // see which node is doing what.
                            // System.out.println(">>> Calculating for word: " + word);
                            // Return the length of a given word, i.e. number of
                            // letters.
                            return word.length();
                        }
                    },
                    // split tasks for single jobs according to this function
                    // split at linebreaks
                    Arrays.asList(phrase.split("n")),
                    // Collection of words.
                    // Collect the results from each job of the nodes
                    //input and output is integer
                    new GridReducer<Integer, Integer>() {
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
                        private int sum;
                        @Override
                        public boolean collect(Integer res) {
                            sum += res;
                            return true; // True means continue collecting until
                                            // last result.
                        }
                        // return the collected results
                        @Override
                        public Integer apply() {
                            return sum;
                        }
                    });
            letterCount = letterCnt;
        } catch (Exception e) {
        }
        return letterCount;
    }

相关内容

  • 没有找到相关文章

最新更新