运行BigQuery udf时通信通道错误



我最近发布了关于在单个表上运行多个udf时发生OOM错误的堆栈溢出(bigquery udf out of memory问题)。这个错误似乎已经部分修复,但是,当在10,000行表上运行udf时,我遇到了一个新的错误。下面是错误信息:

Error:与子进程通信时发生错误。消息:"通信通道错误期间4命令" sandbox_process_error {}

错误位置:用户自定义函数

作业ID: broad-cga-het:bquijob_32bc01d_1569f11b8a2

当我删除udf中的emit语句时不会发生错误,因此当udf试图回写到另一个表时一定会发生错误。

下面是udf本身的副本:

bigquery.defineFunction(
  'permute',
  ['obj_nums','num_obj_per_indiv','row_number'], // Names of input columns
  [{"name": "num_cooccurrences_list","type": "string","mode":"nullable"}], // Output schema
  permute
);
function permute(row, emit) {
  var obj_ids = row['obj_nums'].split(",").map(function (x) { 
      return parseInt(x, 10); 
  });
  var num_obj_per_indiv = row['num_obj_per_indiv'].split(",").map(function (x) { 
      return parseInt(x, 10); 
  });
  var row_number = row['row_number']
  // randomly shuffle objs using Durstenfeld shuffle algorithm
  obj_ids = shuffle_objs(obj_ids);
  // form dictionary of obj_pairs from obj_ids
  var perm_run_obj_set = new Set(obj_ids);
  var perm_run_obj_unique = Array.from(perm_run_obj_set);
  perm_run_obj_unique.sort();
  var perm_run_obj_pairs_dict = {};
  output = {}
  for (var i = 0; i < perm_run_obj_unique.length - 1; i++) {
    for (var j = i + 1; j < perm_run_obj_unique.length; j++) {
      var obj_pair = [perm_run_obj_unique[i],perm_run_obj_unique[j]].sort().join("_")
      perm_run_obj_pairs_dict[obj_pair] = 0
    }
  }
  // use fixed number of objs per indiv and draw from shuffled objs
  var perm_cooccur_dict = {};
        //num_obj_per_indiv = num_obj_per_indiv.slice(0,3);
        for(var index in num_obj_per_indiv) {
                var obj_count = num_obj_per_indiv[index]
                var perm_run_objs = [];
                for(var j = 0; j < obj_count; j++) {
                        perm_run_objs.push(obj_ids.pop());
                }
                perm_run_objs = new Set(perm_run_objs);
                perm_run_objs = Array.from(perm_run_objs)
                while(perm_run_objs.length > 1) {
                         current_obj = perm_run_objs.pop()
                         for(var pair_obj_ind in perm_run_objs) {
                                  var pair_obj = perm_run_objs[pair_obj_ind]
                                  var sorted_pair = [current_obj,pair_obj].sort().join("_")
                                  perm_run_obj_pairs_dict[sorted_pair] += 1
                                  // console.log({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number})
                                  // emit({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number});
                          }
                 }
        }
        // emit({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number});
  // form output dictionary
  num_cooccur_output = ""
  for (var obj_pair in perm_run_obj_pairs_dict) {
    //emit({"obj_pair":obj_pair,"num_cooccur":perm_run_obj_pairs_dict[obj_pair]});
      num_cooccur_output += String(perm_run_obj_pairs_dict[obj_pair])
      num_cooccur_output += ","
  }
  num_cooccur_output = num_cooccur_output.substring(0, num_cooccur_output.length - 1);
  emit({"num_cooccurrences_list":num_cooccur_output});
}
/**
 * Randomize array element order in-place.
 * Using Durstenfeld shuffle algorithm.
 */
function shuffle_objs(obj_array) {
  for (var i = obj_array.length - 1; i > 0; i--) {
    var j = Math.floor(Math.random() * (i + 1));
    var temp = obj_array[i];
    obj_array[i] = obj_array[j];
    obj_array[j] = temp;
  }
        return obj_array;
}

任何帮助都将非常感激!

谢谢你,丹尼尔

并不是说这直接回答了您最初的问题,但是我不认为您需要一个UDF来进行这种类型的转换。例如,使用标准SQL(取消勾选"Show Options"下的"Use Legacy SQL"),您可以执行如下数组转换:

WITH T AS (SELECT [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] AS arr)
SELECT
  x,
  new_x
FROM T,
  UNNEST(
    ARRAY(SELECT AS STRUCT
            x,
            arr[OFFSET(CAST(RAND() * (off + 1) AS INT64))] AS new_x
          FROM UNNEST(arr) AS x WITH OFFSET off));
+---+-------+
| x | new_x |
+---+-------+
| 0 | 1     |
| 1 | 2     |
| 2 | 0     |
| 3 | 2     |
| 4 | 4     |
| 5 | 0     |
| 6 | 5     |
| 7 | 4     |
| 8 | 8     |
| 9 | 3     |
+---+-------+

如果有帮助,我可以解释更多,但查询的要点是它使用上面UDF中的公式随机化arr中的元素。FROM T, UNNEST(...展开数组的元素,使它们更容易看到,但我也可以这样做:

WITH T AS (SELECT [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] AS arr)
SELECT
  ARRAY(SELECT AS STRUCT
          x,
          arr[OFFSET(CAST(RAND() * (off + 1) AS INT64))] AS new_x
        FROM UNNEST(arr) AS x WITH OFFSET off)
FROM T;

这给出了一个结构体数组作为输出,其中每个x都与new_x相关联。

相关内容

最新更新