通过turbodc将dask数据帧写入mssql时出错



我有一个dask数据帧,它有220个分区和7列。我已经从bcp文件导入了这个文件,并在dask中完成了一些争论。然后,我想使用turboodbc将整个文件写入mssql。我按如下方式连接到数据库:

mydb = 'TEST'
from turbodbc import connect, make_options
connection = connect(driver="ODBC Driver 17 for SQL Server",
server="TEST SERVER",
port="1433",
database=mydb,
uid="sa",
pwd="5pITfir3")

然后,我使用从一篇媒体文章中找到的函数将其写入DB:中的测试表

def-turbo_write(mydb、df、table(:"quot"使用turbodbc将数据插入sql"quot"start=time.time((#准备柱columns='('column+=','.join(df.columns(columns+='('

# preparing value place holders
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'
# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {columns}
VALUES {sql_val}
"""
print(sql)
print(sql_val)
# writing array of values for turbodbc
values_df = [df[col].values for col in df.columns]
print(values_df)
# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
# inserts data, for real
with connection.cursor() as cursor:
#try:
cursor.executemanycolumns(sql, values_df)
connection.commit()
# except Exception:
# connection.rollback()
# print('something went wrong')
stop = time.time() - start
return print(f'finished in {stop} seconds')

当我上传少量行时,这是有效的,如下所示:

turbo_write(mydb, df_train.head(1000), table)

当我尝试做更多的行时,它失败了:

turbo_write(mydb, df_train.head(10000), table)

我得到错误:

RuntimeError:无法将Python实例强制转换为C++类型(编译于调试模式详细信息(

如何在没有任何错误的情况下将整个dask数据帧写入mssql?

我需要通过更改转换为maskedarray

# writing array of values for turbodbc
values_df = [df[col].values for col in df.columns]

values_df = [np.ma.MaskedArray(df[col].values, pd.isnull(df[col].values)) for col in df.columns]

然后我可以使用写入所有数据

for i in range(df_train.npartitions):
partition = df_train.get_partition(i)
turbo_write(mydb, partition, table)
i += 1

与使用BCP保存文件和写入DB相比,这仍然需要很长的写入时间。如果有人有更有效的建议,我很乐意看到他们。

最新更新