Spark Posexplode函数运行速度非常慢



我有一个spark dataframe作为ORC存储为orc,其中约10000行,然后遵循架构:

>>> df.printSchema()
root
 |-- contig: string (nullable = true)
 |-- start: integer (nullable = true)
 |-- ref: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- gt: array (nullable = true)
 |    |-- element: integer (containsNull = true)

其中Arrayfield是200000个整数的列表。我想将其转换为具有平坦结构的数据框:

>>> from pyspark.sql.functions import posexplode
>>> flat = df.select('contig', 'start', 'ref', 'alt', posexplode(df.gt))
>>> flat.explain()
== Physical Plan ==
*Project [contig#0, start#1, ref#2, alt#3, pos#11, col#12]
+- Generate posexplode(gt#4), true, false, [pos#11, col#12]
   +- *FileScan orc [contig#0,start#1,ref#2,alt#3,gt#4] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/path/to/data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<contig:string,start:int,ref:string,alt:string,gt:array<int>>
>>> flat.write.orc('/path/to/output/file')

将扁平的数据帧写入文件,在具有24 CPU内核和超过100GB内存的机器上需要超过五个小时。这只是posexplode函数的功能还是其他问题?

似乎Spark在这里对行进行了疯狂的事情。使用RDD,我能够获得更好的性能(每秒1/3行,每秒CPU核心与每秒CPU核心核心每秒1/40行(。这仍然不是很快。

df = sql_context.read.orc('/path/to/source/file')
rdd = df.rdd
def expand(row):
    contig, start, ref, alt, gt = row
    def getrow(index, genotype):
        return contig, start, ref, alt, index, genotype
    return [getrow(index, genotype) for index, genotype in enumerate(gt)]
rdd_flat = rdd.flatMap(expand)
schema = ('contig', 'start', 'ref', 'alt', 'index', 'genotype')
sqlc.createDataFrame(rdd_flat, schema=schema).write.orc('/path/to/output/file')

有趣的是,如果我重新定义了扩展功能为

def expand(row):
    def getrow(index, genotype):
        return Row(
            contig=row.contig,
            start=row.start,
            ref=row.ref,
            alt=row.alt,
            index=index,
            genotype=genotype
        )
    return [getrow(index, genotype) for index, genotype in enumerate(row.gt)]

它的运行速度慢约13倍(单个功能调用大约需要1.4秒(。

很清楚,行对象效率极低。

但是,还有更多解决问题。一个核心应能够每秒运行9次扩展功能,但实际性能为每3秒1行。

编辑:找到一个'解决方案':使用prestodb查询代替spark。这以每秒CPU核心每秒1行超过1行的运行 - 超过20倍以上的速度,比RDD快4倍:

create table flat (
  contig varchar,
  start int,
  ref varchar,
  alt varchar,
  index bigint,
  genotype tinyint
) 
WITH (format = 'ORC');
insert into flat
select contig, start, ref, alt, index, genotype, partition_name
from nested cross join unnest(gt) with ordinality as g (genotype, index)
where partition_name='10-70329347';

最新更新