我有一个spark dataframe作为ORC存储为orc,其中约10000行,然后遵循架构:
>>> df.printSchema()
root
|-- contig: string (nullable = true)
|-- start: integer (nullable = true)
|-- ref: string (nullable = true)
|-- alt: string (nullable = true)
|-- gt: array (nullable = true)
| |-- element: integer (containsNull = true)
其中Arrayfield是200000个整数的列表。我想将其转换为具有平坦结构的数据框:
>>> from pyspark.sql.functions import posexplode
>>> flat = df.select('contig', 'start', 'ref', 'alt', posexplode(df.gt))
>>> flat.explain()
== Physical Plan ==
*Project [contig#0, start#1, ref#2, alt#3, pos#11, col#12]
+- Generate posexplode(gt#4), true, false, [pos#11, col#12]
+- *FileScan orc [contig#0,start#1,ref#2,alt#3,gt#4] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/path/to/data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<contig:string,start:int,ref:string,alt:string,gt:array<int>>
>>> flat.write.orc('/path/to/output/file')
将扁平的数据帧写入文件,在具有24 CPU内核和超过100GB内存的机器上需要超过五个小时。这只是posexplode函数的功能还是其他问题?
似乎Spark在这里对行进行了疯狂的事情。使用RDD,我能够获得更好的性能(每秒1/3行,每秒CPU核心与每秒CPU核心核心每秒1/40行(。这仍然不是很快。
df = sql_context.read.orc('/path/to/source/file')
rdd = df.rdd
def expand(row):
contig, start, ref, alt, gt = row
def getrow(index, genotype):
return contig, start, ref, alt, index, genotype
return [getrow(index, genotype) for index, genotype in enumerate(gt)]
rdd_flat = rdd.flatMap(expand)
schema = ('contig', 'start', 'ref', 'alt', 'index', 'genotype')
sqlc.createDataFrame(rdd_flat, schema=schema).write.orc('/path/to/output/file')
有趣的是,如果我重新定义了扩展功能为
def expand(row):
def getrow(index, genotype):
return Row(
contig=row.contig,
start=row.start,
ref=row.ref,
alt=row.alt,
index=index,
genotype=genotype
)
return [getrow(index, genotype) for index, genotype in enumerate(row.gt)]
它的运行速度慢约13倍(单个功能调用大约需要1.4秒(。
很清楚,行对象效率极低。
但是,还有更多解决问题。一个核心应能够每秒运行9次扩展功能,但实际性能为每3秒1行。
编辑:找到一个'解决方案':使用prestodb查询代替spark。这以每秒CPU核心每秒1行超过1行的运行 - 超过20倍以上的速度,比RDD快4倍:
create table flat (
contig varchar,
start int,
ref varchar,
alt varchar,
index bigint,
genotype tinyint
)
WITH (format = 'ORC');
insert into flat
select contig, start, ref, alt, index, genotype, partition_name
from nested cross join unnest(gt) with ordinality as g (genotype, index)
where partition_name='10-70329347';