我使用以下代码读取 100 万行 SQL 数据并替换数据中可能出现的任何控制字符,唯一的问题是速度很慢,而且绝对是"替换"正在减慢速度。有人对采用不同的方法或调整以使代码更快有任何建议吗?
d = {}
x = map(chr, list(range(0,9)) + list(range(11,13)) + list(range(14,32)) +
list(range(127,160)))
for item in list(x):
d.update({item:' '})
with open("out_cleaned.csv", "w", encoding='utf-8') as fh:
chunks = pd.read_sql_query(SQLCommand, connection, chunksize=10000)
c = next(chunks)
c.replace(d, regex=True, inplace=True)
c.to_csv(fh, index=False, header=False, sep='t', chunksize=10000)
for chunk in chunks:
chunk.replace(d, regex=True, inplace=True)
chunk.to_csv(fh, index=False, header=False, sep='t', chunksize=10000)
读取、清理和写出 100 万行(共 31 个字段)需要 16 分钟。
您不需要正则表达式 - 您只需在一对一替换中用空白替换"特殊"字符 - 但除此之外,您几乎不需要解析并将数据转换为数据帧。
您可以直接使用数据库连接并使用内置的csv
模块导出列,而无需冒险使用pandas
、SQLAlchemy
和类似的重量级数据,这些重量级内容会为您的用例增加不必要的开销。
因此,首先,您可以创建一个转换表并将其与str.translate()
一起使用以清理任何字符串,而不是正则表达式:
chr_ranges = (0x00, 0x09), (0x0B, 0x20), (0x7F, 0xA0) # 'special' character ranges
trans_table = {x: " " for r in chr_ranges for x in range(*r)} # 'special'->space trans. table
这使您可以快速轻松地将所有特殊字符(在chr_ranges
范围内定义)转换为任何字符串上的空格,例如:
print("Your string with >x05x06x1A< special characters!".translate(trans_table))
# Your string with > < special characters!
当我们使用它时,我们可以创建一个小函数来处理任何传递字段的翻译尝试,这样我们就不需要在迭代数据库数据时检查类型:
def trans_field(value):
try:
return value.translate(trans_table) # try to translate and return
except AttributeError: # if there's no translate method on the passed value...
return value # return the original value
现在我们所需要的只是连接到数据库,然后执行我们的查询,这取决于你正在使用的数据库 - 我将编写下一个示例,就好像你使用的是SQLite一样,但大多数数据库驱动程序使用Python数据库API,并且在很大程度上是可互换的,所以代码应该以最少的更改工作:
import sqlite3
connection = sqlite3.connect("your_db") # connect to the database
cursor = connection.cursor() # grab a database cursor
results = cursor.execute("select * from your_table") # execute the select query
header = [c[0] for c in cursor.description] # get the column names for our CSV header
最后,我们可以迭代结果,处理每个字段并将其全部存储到 CSV 中:
import csv
with open("output.csv", "w", newline="") as f: # open("output.csv", "wb") on Python 2.x
writer = csv.writer(f, delimiter="t") # create a CSV writer with t as a delimiter
writer.writerow(header) # write the header (column names)
for result in results: # iterate over the returned results
writer.writerow(map(trans_field, result)) # process result fields and write the row
这避免了所有不必要的转换,并且应该与 Python 和您的数据库一样快。从技术上讲,您可以通过检查cursor.description
并仅为结果集中的字符串创建替换映射(而不是尝试处理每个字段)来压缩更高的速度,但这可能不会增加太多整体速度。
所以,把它们放在一起:
import csv
import sqlite3
chr_ranges = (0x00, 0x09), (0x0B, 0x20), (0x7F, 0xA0) # 'special' character ranges
trans_table = {x: " " for r in chr_ranges for x in range(*r)} # 'special'->space trans. table
def trans_field(value):
try:
return value.translate(trans_table) # try to translate and return
except AttributeError: # if there's no translate method on the passed value...
return value # return the original value
connection = sqlite3.connect("your_db") # connect to the database
cursor = connection.cursor() # grab a database cursor
results = cursor.execute("select * from your_table") # execute the select query
header = [c[0] for c in cursor.description] # get the column names for our CSV header
with open("output.csv", "w", newline="") as f: # open("output.csv", "wb") on Python 2.x
writer = csv.writer(f, delimiter="t") # create a CSV writer with t as a delimiter
writer.writerow(header) # write the header (column names)
for result in results: # iterate over the returned results
writer.writerow(map(trans_field, result)) # process result fields and write the row
作为测试,我在SQLite中创建了一个31x1M
表,其中包含22个TEXT
字段(每个字段在0x00 - 0xA0
范围内填充10-50个随机字符),穿插INTEGER
和REAL
字段,在我的系统上,它在56秒内清理了数据并生成了output.csv
。当然是YMMV,但肯定不应该花16分钟。