在火花中复合



我有此格式的数据框架

Date        |   Return
01/01/2015       0.0
02/02/2015      -0.02
03/02/2015       0.05
04/02/2015       0.07

我想进行复合并添加一个将返回复利返回的列。复合回报计算为:

  • 1对于第一行。

  • (1 返回(i))*复合(i-1))

所以我的DF终于是

Date          |   Return  | Compounded
01/01/2015         0.0         1.0
02/02/2015        -0.02        1.0*(1-0.2)=0.8
03/02/2015         0.05        0.8*(1+0.05)=0.84
04/02/2015         0.07        0.84*(1+0.07)=0.8988

Java中的答案将不胜感激。

您还可以创建一个自定义的聚合函数并在窗口函数中使用。

类似的东西(写自由形式,所以可能会有一些错误):

package com.myuadfs
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class MyUDAF() extends UserDefinedAggregateFunction {
  def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType)))
  def bufferSchema = StructType(StructField("compounded", DoubleType))
  def dataType: DataType = DoubleType
  def deterministic = true
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = 1.0 // set compounded to 1
  }
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) = buffer.getDouble(0) * ( input.getDouble(0) + 1)
  }
  // this generally merges two aggregated buffers. This means this 
  // would not have worked properly had you been working with a regular
  // aggregate but since you are planning to use this inside a window 
  // only this should not be called at all. 
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
 }
  def evaluate(buffer: Row) = {
    buffer.getDouble(0)
  }
}

现在,您可以在窗口函数中使用此信息。这样的东西:

import org.apache.spark.sql.Window
val windowSpec = Window.orderBy("date")
val newDF = df.withColumn("compounded", df("Return").over(windowSpec)

请注意,这具有一个限制,整个计算应适合单个分区,因此,如果您的数据太大,则会有问题。也就是说,名义上的这种操作是在通过键进行分区后执行的(例如,在窗口中添加一个分区),然后单个元素应成为键的一部分。

首先,我们定义一个函数 f(line)(建议一个更好的名称,请!)处理行。

def f(line):
    global firstLine
    global last_compounded
    if line[0] == 'Date':
        firstLine = True
        return (line[0], line[1], 'Compounded')
    else:
        firstLine = False
    if firstLine:
        last_compounded = 1
        firstLine = False
    else:
        last_compounded = (1+float(line[1]))*last_compounded
    return (line[0], line[1], last_compounded)

使用两个全局变量(可以改进吗?),我们保留复合(i-1)值,如果我们正在处理第一行。

使用您的数据 some_file ,解决方案可能是:

rdd = sc.textFile('some_file').map(lambda l: l.split())
r1 = rdd.map(lambda l: f(l))

rdd.collect()
[[u'date',u'Return'],[u'01/01/2015',u'0.0'],[u'02/02/02/2015',u'-0.02'],[u'03/02/2015',u'0.05'],[u'04/02/2015',u'0.07']]
r1.collect()
[(u'date',u'Return','复合'),(U'01/01/2015',u'0.0',1.0),(u'02/02/02/2015',u'0.0.02',0.98),(U'03/02/2015',U'0.05',1.05),(U'04/02/2015',U'0.07',1.12350000000000000002)

相关内容

  • 没有找到相关文章

最新更新