读取多个 CSV 文件并使用 python 多处理将数据插入表中,而无需使用 pandas



我在一个文件夹中有 3 个特定于区域的 CSV 文件,即data_cityA.csvdata_cityB.csvdata_cityC.csv。我必须阅读和识别特定于区域的文件;将其插入到表中,并添加一个额外的列,该列将包含有关特定区域的信息。

list_of_file=glob.glob('./*csv')
for file_name in list_of_files:
count = 0
total = 0   
with open(file_name,'r')as csvfile:
read=csv.reader(csvfile)
next(read)
if "cityA" in file_name:
reg="cityA"
elif "cityB" in file_name:
reg="cityB"
elif "cityC" in file_name:
reg="cityC"
with open(file_name, 'r')as csv_file:
reader=csv.reader(csv_file)
data=list(reader)
total=len(data)     
temp_data=[]
for row in read:
row.append(reg) #concatenating region name 
temp_data.append(tuple(row))
count+=1
total=-1
if count>999 or total==1:
insert_query="INSERT INTO table_name(A,B,C,D,E) values (1,2,3,4,5)"
curser.executemoany(insert_query,temp_data)
conn.commit()
count=0
insert_query=" "
temp_data=[]
cursor.callproc('any_proc')
conn.close()

处理大约需要 4-5 个小时(数据大小为 <=500MB(。我尝试使用 python 多处理实现它,但未能成功做到这一点。"我不能用熊猫"。数据库sybase。有什么想法吗?有没有比多处理更好的方法?

你在这里遇到的一个大问题是:data=list(reader).这将立即将整个文件读取到内存中。如果文件为 500MB,则 500MB 将立即加载到内存中。您的另一个选择是使用阅读器作为迭代器。这样做的缺点是事先不知道记录总数,因此在退出循环后,必须执行剩余行的插入。

第二件可能极大地影响您的表现的事情是插入。您可以使用多处理并行化它(下面是使用 Pool 的建议(,但由于这是一个新进程,您将不得不再次处理连接到数据库(并在之后关闭它(。

from multiprocessing.pool import Pool
list_of_files = glob.glob('./*csv')
pool = Pool()
pool.map(process_file, list_of_files)
pool.close()
pool.join()
cursor.callproc('any_proc')
conn.close()

def process_file(file_name):
# Make a new connection
# conn = ...
cursor = conn.cursor()
temp_data = []
def do_insert():
insert_query = "INSERT INTO table_name(A,B,C,D,E) values (1,2,3,4,5)"
cursor.executemany(insert_query, temp_data)
conn.commit()
with open(file_name, 'r')as csvfile:
read = csv.reader(csvfile)
next(read)
if "cityA" in file_name:
reg = "cityA"
elif "cityB" in file_name:
reg = "cityB"
elif "cityC" in file_name:
reg = "cityC"
for row in read:
row.append(reg)  # concatenating region name
temp_data.append(tuple(row))
if len(temp_data) > 999:
do_insert()
temp_data = []
if temp_data:
do_insert()
conn.close()

数据库往返正在减慢您的速度。

您基本上是每行往返 1 次。 500MB 听起来像很多行...所以这是很多往返。检查 sybase 中是否有一种方法可以提供 csv 并将其加载到表中。具有大量行的呼叫更少(甚至可能是 1 个(,而不是每次调用 1 行。

也许你可以考虑在Python之外这样做。

请考虑下表...

create table t1 ( 
k int not null, 
v varchar(255) null, 
city varchar(255) null)
go

。和文件"文件.txt">

1,Line 1
2,Line 2
3,Line 3
4,Line 4
5,Line 5

注意不要在文件末尾出现空行。

使用"流编辑者"添加额外的列,在本例中为"CityA">

cat file.txt | sed s/$/,CityA/g > file_2.txt
cat file_2.txt
1,Line 1,CityA
2,Line 2,CityA
3,Line 3,CityA
4,Line 4,CityA
5,Line 5,CityA

确保数据库配置为大容量复制,您的 DBA 可以为此提供帮助。

use master
go
sp_dboption 'db_name', 'select', true
go

然后使用 Sybase 的 bcp 实用程序加载文件:

bcp database.owner.table in file_2.txt -U login -S server -c -t, -Y -b 1000

参数如下:

  • 数据库名称
  • 对象所有者
  • 表名
  • 方向在
  • 文件名
  • -U 用户名
  • -S 服务器名称(Sybase 实例名称而不是物理主机名(
  • -c = 使用字符数据
  • -t, = 字段终止符为 ,
  • -Y 客户端字符集转换 - 可能不需要
  • -b 1000
  • = 一次提交 1000 行。如果您要加载500MB,则可能需要这样做,以免击中LOG_SUSPEND。

相关内容

  • 没有找到相关文章

最新更新