无法使用Anaconda多进程包生成进程



嗨,我正在运行Python Snowpark的一个用例,它专门用于并行运行大量数据加载。我有将近42个表是从SnowFlake.AccountUsage视图加载的,这些表中的数据相互独立,所以我尝试使用Snowpark python库使用多进程包库进行并行数据加载。然而,当我在笔记本电脑上运行这段代码时,代码运行成功,但在Snowflake机器上相同的代码失败了;看起来Snowflake已经将其仓库设计为不允许分叉/派生用于并行处理的流程。关于如何在不使用多进程包的情况下进行并行数据加载,您有什么想法吗。

这是架构级别的问题,我请求您在没有来自架构师的输入的情况下不要关闭此查询。

错误消息我得到它

"usr/lib/python\uudf/1439992e4e54a095348cc1d96f9448a9579f940638c334772ebfeb71ef5b03e0/lib/python3.8/site packages/multiprocess/popen_fork.py";,第70行,在_launch self.pid=os.fork((PermissionError中:[Erno 1]不允许在处理程序运行的函数TEstrongNOWPARK中执行操作

from multiprocess import Process
from snowflake.snowpark import Session
TransactionDataLoadSqlList=["""INSERT OVERWRITE INTO  DB.SCHEMA.T_STAGES  SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.STAGES" """]
# There are mutiple insert statements like this but for ex i have taken here only one
main()
processes=[]  
try:  

print(" Multi-processing started")


for TSqls in  TransactionDataLoadSqlList:
# print(TSqls)

p=Process(target=RunAtSnowflake,args=[Session,TSqls])

p.start()
processes.append(p) 

for process in  processes:
process.join()

print(" Multi-processing finished")
except BaseException as err:
print(f"Unexpected {err=}, {type(err)=}") 

raise

def RunAtSnowflake(Session,Query):
Session.sql(Query).collect()      
return "SUCCESS" 


if __name__ == '__main__':  
main(Session)

这是关于运行异步查询的。

在Snowpark之外,使用Snowflake Python连接器,您可以执行以下操作:

  • https://docs.snowflake.com/en/user-guide/python-connector-example.html?#performing-异步查询
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

在Snowpark内部,作为一个存储过程:目前不支持此功能,但请继续关注,因为这是eng团队想要实现的功能。

作为一种选择,我尝试了Scala与Snowpark:

// https://docs.snowflake.com/en/sql-reference/stored-procedures-scala.html
import com.snowflake.snowpark._
object Procedure {
def main(session: Session): String = {
var rows = session.sql("select 'Hello, Scala!'").async.collect();
return rows.getResult()(0).getString(0);
}
}

这个抛出了一个异常";当前上下文中不支持异步执行"-也就是说,我们需要等待在存储过程中获得异步查询。

最新更新