如何在Spark DataFrame V1.6中生成运行序列号



我使用Spark V1.6。我有以下数据框。

primary_key |dim_id

pk1 |1

pk2 |2

PK3 |3

每当有新记录进来时,我想创建一个带有新序列#的新数据框。可以说,我从源中获得了2个新记录,其中带有值pk4&PK5,我想创建具有值4和5的新DIM_ID。

primary_key |dim_id

pk1 |1

pk2 |2

PK3 |3

PK4 |4

PK5 |5

如何在Spark DataFrame v1.6中为新记录生成运行序列号?

如果您在某个地方有数据库,则可以在其中创建一个序列,并与用户定义的函数一起使用(当您时,我偶然发现了这个问题...)。

保留一个序列编号的桶,并使用它(增量参数必须与用于创建序列的参数相同)。由于它是一个对象,因此sequenceId将是每个工作节点上的单例,您可以使用原子元来迭代序列的桶。

这远非完美(可能的连接泄漏,依靠DB,锁定静态类,确实如此),欢迎评论。

import java.sql.Connection
import java.sql.DriverManager
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.sql.functions.udf
object SequenceID {
  var current: AtomicLong = new AtomicLong
  var max: Long = 0
  var connection: Connection = null
  var connectionLock = new ReentrantLock
  var seqLock = new ReentrantLock
  def getConnection(): Connection = {
    if (connection != null) {
      return connection
    }
    connectionLock.lock()
    if (connection == null) {
     // create your jdbc connection here
    }
    connectionLock.unlock()
    connection
  }
  def next(sequence: String, incrementBy: Long): Long = {
    if (current.get == max) {
      // sequence bucket exhausted, get a new one
      seqLock.lock()
      if (current.get == max) {
        val rs = getConnection().createStatement().executeQuery(s"SELECT NEXT VALUE FOR ${sequence} FROM sysibm.sysdummy1")
        rs.next()
        current.set(rs.getLong(1))
        max = current.get + incrementBy
      }
      seqLock.unlock()
    }
    return current.getAndIncrement
  }
}
class SequenceID() extends Serializable {
  def next(sequence: String, incrementBy: Long): Long = {
    return SequenceID.next(sequence, incrementBy)
  }  
}

val sequenceGenerator = new SequenceID(properties)
def sequenceUDF(seq: SequenceID) = udf[Long](() => {
  seq.next("PK_SEQUENCE", 500L)
})
val seq = sequenceUDF(sequenceGenerator)
myDataframe.select(myDataframe("foo"), seq())

相关内容

  • 没有找到相关文章

最新更新