在Rust WASM函数返回时创建线程



我在一个温暖的环境中使用polar。

我注意到与LazyFrame的不一致。收集操作,当处理某些数据集时,有时会创建线程。

下面是与 问题相关的代码
#[wasm_bindgen]
pub fn start(buff: &[u8],
item_id:&str, 
order_id:&str,
item_name:&str) -> JsValue{
let cursor = Cursor::new(buff);
let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();

let df = lf.groupby([col(order_id)]);
let df = df.agg([col(item_id),col(item_name)]);

// Error occurs here
let df = df.collect().unwrap();
} 

处理一个特定的数据集给我提供了错误:

panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'

,因为它试图在WASM环境中生成线程。

然而,对于其他数据集,这个过程将完美地执行。它不会尝试创建线程。问题似乎不是文件大小,因为测试了不同的数据集。

我想知道Lazyframe的哪个部分。收集操作产生这种不一致,以及如何避免它。

working.csv

Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5

notworking.csv

Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006

允许wasm的polar分支由https://github.com/universalmind303/polars/tree/wasm

你可以在这里看到完整的项目,以及两个CSV文件:https://github.com/KivalM/lazyframe-min-test

编辑:descripbe_plan()的输出

工作数据集

[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
project */3 columns |   details: None;
selection: "None"

不工作数据集

[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
project */3 columns |   details: None;
selection: "None"

schema()输出

工作数据集

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

不工作数据集

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

输出describe_optimized_plan ():

[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
project 3/3 columns |   details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
selection: "None"

编辑:在仔细查看源代码之后。这个问题似乎不是直接来自任何极地代码。我已经将问题追踪到polars-lazy/src/physical_plan/executors/groupby.rs函数

impl Executor for GroupByExec {
fn execute

返回

的值
groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)

但是,groupby_helper函数运行到完成,并且成功创建了数据框。数据帧从groupby_helper返回到fn execute时出现错误。奇怪的是,只有当此函数返回时才尝试创建线程。在RUST WASM中是否存在可能导致这种行为的东西?

所以看起来好像有一个std::thread操作发生在我创建分支时错过的groupby。

impl Drop for GroupsIdx {
fn drop(&mut self) {
let v = std::mem::take(&mut self.all);
// ~65k took approximately 1ms on local machine, so from that point we drop on other thread
// to stop query from being blocked
if v.len() > 1 << 16 {
std::thread::spawn(move || drop(v));
} else {
drop(v);
}
}
}

数据集的大小决定了线程的生成。

任何大于1 << 16(~65k)将生成一个线程。

标记impl只在非wasm目标上编译的特性应该可以解决您的问题。

最新更新