我正在编写一个pyspark面向生物的应用程序,在其中一个步骤中,我有一个提取dna序列的spark数据框架。对于那些出现在负链中的,我想要反向补码。
我能够使用udf执行任务,但我理解这限制了spark的效率(特别是因为这是pyspark)。这也会导致OOM问题。
反转字符串很容易,因为它是内置功能,但我找不到一种方法来补充dna碱基(a ->T, G->C, N->N,…)。
是否有一个spark sql的方式来做到这一点?如果没有,在java中实现它并将其注册为python中的udf是否有帮助?
我正在运行EMR 6.20,所以它是基于spark 3
编辑:请求的示例数据。假设我有一个包含以下数据的数据框:
+------------+
| sequence|
+------------+
|ATTGCCATGCCA|
|GTTCGTTA |
|ATNNGGRRG |
+------------+
期望的输出应该是:
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
|CAAGCAAT |
|TANNCCYYC |
+------------+
该映射基于DNA的IUPAC表示法,补体是指DNA配对的互补碱基(A<->T, G<->C)。
编辑(解决方案):谢谢@mck的解决方案。反向补码调用的一个版本,假设使用大写序列(否则只添加小写选项)
from pyspark.sql import functions as F
df2 = df.withColumn(
'stranded_sequence',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
)
如果你像我一样在df中有一个strand
列,你甚至可以这样切换大小写:
df2 = df.withColumn(
'stranded_sequence',
F.when(
F.col('strand') == '-',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
).otherwise(F.col('sequence'))
)
试试translate
:
import pyspark.sql.functions as F
df2 = df.withColumn('sequence', F.translate('sequence', 'ATCGRY', 'TAGCYR'))
df2.show()
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
| CAAGCAAT|
| TANNCCYYC|
+------------+
要考虑到所有可能的碱基,您可以将字符串扩展为像
这样的内容ATCGRYSWKM...
TAGCYRWSMK...