计算一个马尔可夫链值序列



我有一个Spark问题,所以对于每个实体k的输入,我有一个概率序列p_iv_i相关的值,例如数据可以看起来像这样

entity | Probability | value
A      | 0.8         | 10
A      | 0.6         | 15
A      | 0.3         | 20
B      | 0.8         | 10

然后,对于实体A,我期望平均值为0.8*10 + (1-0.8)*0.6*15 + (1-0.8)*(1-0.6)*0.3*20 + (1-0.8)*(1-0.6)*(1-0.3)*MAX_VALUE_DEFINED

我如何在Spark中使用DataFrame agg func实现这一点?考虑到groupBy实体的复杂性和计算结果序列,我发现这是具有挑战性的。

您可以使用UDF来执行这种自定义计算。这个想法是使用collect_listA的所有概率和值分组到一个地方,以便您可以遍历它。但是,collect_list不尊重记录的顺序,因此可能导致错误的计算。修复它的一种方法是使用monotonically_increasing_id

为每行生成ID
import pyspark.sql.functions as F
@F.pandas_udf('double')
def markov_udf(values):
def markov(lst):
# you can implement your markov logic here
s = 0
for i, prob, val in lst:
s += prob
return s
return values.apply(markov)

(df
.withColumn('id', F.monotonically_increasing_id())
.groupBy('entity')
.agg(F.array_sort(F.collect_list(F.array('id', 'probability', 'value'))).alias('values'))
.withColumn('markov', markov_udf('values'))
.show(10, False)
)
+------+------------------------------------------------------+------+
|entity|values                                                |markov|
+------+------------------------------------------------------+------+
|B     |[[3.0, 0.8, 10.0]]                                    |0.8   |
|A     |[[0.0, 0.8, 10.0], [1.0, 0.6, 15.0], [2.0, 0.3, 20.0]]|1.7   |
+------+------------------------------------------------------+------+

可能有更好的解决方案,但我认为这就是您需要的。

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('A', 0.8, 10),
('A', 0.6, 15),
('A', 0.3, 20),
('B', 0.8, 10)],
['entity', 'Probability', 'value']
)
w_desc = W.partitionBy('entity').orderBy(F.desc('value'))
w_asc = W.partitionBy('entity').orderBy('value')
df = df.withColumn('_ent_max_val', F.max('value').over(w_desc))
df = df.withColumn('_prob2', 1 - F.col('Probability'))
df = df.withColumn('_cum_prob2', F.product('_prob2').over(w_asc) / F.col('_prob2'))
df = (df.groupBy('entity')
.agg(F.round((F.max('_ent_max_val') * F.product('_prob2')
+ F.sum(F.col('_cum_prob2') * F.col('Probability') * F.col('value'))
),2).alias('mean_value'))
)
df.show()
# +------+----------+
# |entity|mean_value|
# +------+----------+
# |     A|      11.4|
# |     B|      10.0|
# +------+----------+

最新更新