目标
我的目标是通过它们的公共列(基因名称(合并两个DataFrames,这样我就可以在每个基因行中获得每个基因得分的乘积。然后,我会对患者和细胞进行groupby
,并将每个细胞的所有得分相加。最终的数据帧应该是这样的:
patient cell
Pat_1 22RV1 12
DU145 15
LN18 9
Pat_2 22RV1 12
DU145 15
LN18 9
Pat_3 22RV1 12
DU145 15
LN18 9
最后一部分应该很好,但由于MemoryError
,我无法对基因名称进行第一次合并。以下是每个DataFrame的片段。
数据
cell_s=
Description Name level_2 0
0 LOC100009676 100009676_at LN18_CENTRAL_NERVOUS_SYSTEM 1
1 LOC100009676 100009676_at 22RV1_PROSTATE 2
2 LOC100009676 100009676_at DU145_PROSTATE 3
3 AKT3 10000_at LN18_CENTRAL_NERVOUS_SYSTEM 4
4 AKT3 10000_at 22RV1_PROSTATE 5
5 AKT3 10000_at DU145_PROSTATE 6
6 MED6 10001_at LN18_CENTRAL_NERVOUS_SYSTEM 7
7 MED6 10001_at 22RV1_PROSTATE 8
8 MED6 10001_at DU145_PROSTATE 9
cell_s大约是10000000行
patient_s=
id level_1 0
0 MED6 Pat_1 1
1 MED6 Pat_2 1
2 MED6 Pat_3 1
3 LOC100009676 Pat_1 2
4 LOC100009676 Pat_2 2
5 LOC100009676 Pat_3 2
6 ABCD Pat_1 3
7 ABCD Pat_2 3
8 ABCD Pat_3 3
....
patient_s大约是1200000行
代码
def get_score(cell, patient):
cell_s = cell.set_index(['Description', 'Name']).stack().reset_index()
cell_s.columns = ['Description', 'Name', 'cell', 's1']
patient_s = patient.set_index('id').stack().reset_index()
patient_s.columns = ['id', 'patient', 's2']
# fails here:
merged = cell_s.merge(patient_s, left_on='Description', right_on='id')
merged['score'] = merged.s1 * merged.s2
scores = merged.groupby(['patient','cell'])['score'].sum()
return scores
当最初read_csv
处理这些文件时,我收到了一个MemoryError,但后来指定数据类型解决了这个问题。确认我的python是64位也没有解决我的问题。我还没有达到熊猫的极限,是吗?
Python 3.4.3 | Anaconda 2.3.0(64位(| Pandas 0.16.2
考虑两种解决方案:
CSV By CHUNKS
显然,read_csv可能会遇到性能问题,因此必须以迭代块的形式加载大文件。
cellsfilepath = 'C:\PathToCellsCSVFile.csv'
tp = pd.io.parsers.read_csv(cellsfilepath, sep=',', iterator=True, chunksize=1000)
cell_s = pd.concat(tp, ignore_index=True)
patientsfilepath = 'C:\PathToPatientsCSVFile.csv'
tp = pd.io.parsers.read_csv(patientsfilepath, sep=',', iterator=True, chunksize=1000)
patient_s = pd.concat(tp, ignore_index=True)
CSV VIA SQL
作为一名数据库专家,我总是建议处理大量数据负载,并使用SQL关系引擎进行合并/连接,该引擎可以很好地扩展此类进程。我已经写了很多关于数据帧合并Q/a的评论,甚至在R中也是如此。你可以使用任何SQL数据库,包括文件服务器dbs(Access、SQLite(或客户端服务器dbs(MySQL、MSSQL或其他(,即使你的dfs是从哪里派生的。Python为SQLite维护了一个内置库(否则使用ODBC(;并且可以使用pandasto_sql:将数据帧作为表推送到数据库中
import sqlite3
dbfile = 'C:\PathToSQlitedb.sqlite'
cxn = sqlite3.connect(dbfile)
c = cxn.cursor()
cells_s.to_sql(name='cell_s', con = cxn, if_exists='replace')
patient_s.to_sql(name='patient_s', con = cxn, if_exists='replace')
strSQL = 'SELECT * FROM cell_s c INNER JOIN patient_s p ON c.Description = p.id;'
# MIGHT HAVE TO ADJUST ABOVE FOR CELL AND PATIENT PARAMS IN DEFINED FUNCTION
merged = pd.read_sql(strSQL, cxn)
您可能需要分块完成,或者查看火焰。http://blaze.pydata.org