我最近发布了关于在单个表上运行多个udf时发生OOM错误的堆栈溢出(bigquery udf out of memory问题)。这个错误似乎已经部分修复,但是,当在10,000行表上运行udf时,我遇到了一个新的错误。下面是错误信息:
Error:与子进程通信时发生错误。消息:"通信通道错误期间4命令" sandbox_process_error {}
错误位置:用户自定义函数
作业ID: broad-cga-het:bquijob_32bc01d_1569f11b8a2
当我删除udf中的emit语句时不会发生错误,因此当udf试图回写到另一个表时一定会发生错误。
下面是udf本身的副本:
bigquery.defineFunction(
'permute',
['obj_nums','num_obj_per_indiv','row_number'], // Names of input columns
[{"name": "num_cooccurrences_list","type": "string","mode":"nullable"}], // Output schema
permute
);
function permute(row, emit) {
var obj_ids = row['obj_nums'].split(",").map(function (x) {
return parseInt(x, 10);
});
var num_obj_per_indiv = row['num_obj_per_indiv'].split(",").map(function (x) {
return parseInt(x, 10);
});
var row_number = row['row_number']
// randomly shuffle objs using Durstenfeld shuffle algorithm
obj_ids = shuffle_objs(obj_ids);
// form dictionary of obj_pairs from obj_ids
var perm_run_obj_set = new Set(obj_ids);
var perm_run_obj_unique = Array.from(perm_run_obj_set);
perm_run_obj_unique.sort();
var perm_run_obj_pairs_dict = {};
output = {}
for (var i = 0; i < perm_run_obj_unique.length - 1; i++) {
for (var j = i + 1; j < perm_run_obj_unique.length; j++) {
var obj_pair = [perm_run_obj_unique[i],perm_run_obj_unique[j]].sort().join("_")
perm_run_obj_pairs_dict[obj_pair] = 0
}
}
// use fixed number of objs per indiv and draw from shuffled objs
var perm_cooccur_dict = {};
//num_obj_per_indiv = num_obj_per_indiv.slice(0,3);
for(var index in num_obj_per_indiv) {
var obj_count = num_obj_per_indiv[index]
var perm_run_objs = [];
for(var j = 0; j < obj_count; j++) {
perm_run_objs.push(obj_ids.pop());
}
perm_run_objs = new Set(perm_run_objs);
perm_run_objs = Array.from(perm_run_objs)
while(perm_run_objs.length > 1) {
current_obj = perm_run_objs.pop()
for(var pair_obj_ind in perm_run_objs) {
var pair_obj = perm_run_objs[pair_obj_ind]
var sorted_pair = [current_obj,pair_obj].sort().join("_")
perm_run_obj_pairs_dict[sorted_pair] += 1
// console.log({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number})
// emit({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number});
}
}
}
// emit({"obj_pair":[current_obj,pair_obj].sort().join("_"),"perm_run_id":row_number});
// form output dictionary
num_cooccur_output = ""
for (var obj_pair in perm_run_obj_pairs_dict) {
//emit({"obj_pair":obj_pair,"num_cooccur":perm_run_obj_pairs_dict[obj_pair]});
num_cooccur_output += String(perm_run_obj_pairs_dict[obj_pair])
num_cooccur_output += ","
}
num_cooccur_output = num_cooccur_output.substring(0, num_cooccur_output.length - 1);
emit({"num_cooccurrences_list":num_cooccur_output});
}
/**
* Randomize array element order in-place.
* Using Durstenfeld shuffle algorithm.
*/
function shuffle_objs(obj_array) {
for (var i = obj_array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = obj_array[i];
obj_array[i] = obj_array[j];
obj_array[j] = temp;
}
return obj_array;
}
任何帮助都将非常感激!
谢谢你,丹尼尔
并不是说这直接回答了您最初的问题,但是我不认为您需要一个UDF来进行这种类型的转换。例如,使用标准SQL(取消勾选"Show Options"下的"Use Legacy SQL"),您可以执行如下数组转换:
WITH T AS (SELECT [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] AS arr)
SELECT
x,
new_x
FROM T,
UNNEST(
ARRAY(SELECT AS STRUCT
x,
arr[OFFSET(CAST(RAND() * (off + 1) AS INT64))] AS new_x
FROM UNNEST(arr) AS x WITH OFFSET off));
+---+-------+
| x | new_x |
+---+-------+
| 0 | 1 |
| 1 | 2 |
| 2 | 0 |
| 3 | 2 |
| 4 | 4 |
| 5 | 0 |
| 6 | 5 |
| 7 | 4 |
| 8 | 8 |
| 9 | 3 |
+---+-------+
如果有帮助,我可以解释更多,但查询的要点是它使用上面UDF中的公式随机化arr
中的元素。FROM T, UNNEST(...
展开数组的元素,使它们更容易看到,但我也可以这样做:
WITH T AS (SELECT [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] AS arr)
SELECT
ARRAY(SELECT AS STRUCT
x,
arr[OFFSET(CAST(RAND() * (off + 1) AS INT64))] AS new_x
FROM UNNEST(arr) AS x WITH OFFSET off)
FROM T;
这给出了一个结构体数组作为输出,其中每个x
都与new_x
相关联。