我有此格式的数据框架
Date | Return
01/01/2015 0.0
02/02/2015 -0.02
03/02/2015 0.05
04/02/2015 0.07
我想进行复合并添加一个将返回复利返回的列。复合回报计算为:
1对于第一行。
(1 返回(i))*复合(i-1))
所以我的DF终于是
Date | Return | Compounded
01/01/2015 0.0 1.0
02/02/2015 -0.02 1.0*(1-0.2)=0.8
03/02/2015 0.05 0.8*(1+0.05)=0.84
04/02/2015 0.07 0.84*(1+0.07)=0.8988
Java中的答案将不胜感激。
您还可以创建一个自定义的聚合函数并在窗口函数中使用。
类似的东西(写自由形式,所以可能会有一些错误):
package com.myuadfs
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class MyUDAF() extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType)))
def bufferSchema = StructType(StructField("compounded", DoubleType))
def dataType: DataType = DoubleType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 1.0 // set compounded to 1
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getDouble(0) * ( input.getDouble(0) + 1)
}
// this generally merges two aggregated buffers. This means this
// would not have worked properly had you been working with a regular
// aggregate but since you are planning to use this inside a window
// only this should not be called at all.
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
}
def evaluate(buffer: Row) = {
buffer.getDouble(0)
}
}
现在,您可以在窗口函数中使用此信息。这样的东西:
import org.apache.spark.sql.Window
val windowSpec = Window.orderBy("date")
val newDF = df.withColumn("compounded", df("Return").over(windowSpec)
请注意,这具有一个限制,整个计算应适合单个分区,因此,如果您的数据太大,则会有问题。也就是说,名义上的这种操作是在通过键进行分区后执行的(例如,在窗口中添加一个分区),然后单个元素应成为键的一部分。
首先,我们定义一个函数 f(line)
(建议一个更好的名称,请!)处理行。
def f(line):
global firstLine
global last_compounded
if line[0] == 'Date':
firstLine = True
return (line[0], line[1], 'Compounded')
else:
firstLine = False
if firstLine:
last_compounded = 1
firstLine = False
else:
last_compounded = (1+float(line[1]))*last_compounded
return (line[0], line[1], last_compounded)
使用两个全局变量(可以改进吗?),我们保留复合(i-1)值,如果我们正在处理第一行。
使用您的数据 some_file ,解决方案可能是:
rdd = sc.textFile('some_file').map(lambda l: l.split())
r1 = rdd.map(lambda l: f(l))
rdd.collect()
[[u'date',u'Return'],[u'01/01/2015',u'0.0'],[u'02/02/02/2015',u'-0.02'],[u'03/02/2015',u'0.05'],[u'04/02/2015',u'0.07']]
r1.collect()
[(u'date',u'Return','复合'),(U'01/01/2015',u'0.0',1.0),(u'02/02/02/2015',u'0.0.02',0.98),(U'03/02/2015',U'0.05',1.05),(U'04/02/2015',U'0.07',1.12350000000000000002)