我使用Spark V1.6。我有以下数据框。
primary_key |dim_id
pk1 |1
pk2 |2
PK3 |3
每当有新记录进来时,我想创建一个带有新序列#的新数据框。可以说,我从源中获得了2个新记录,其中带有值pk4&PK5,我想创建具有值4和5的新DIM_ID。
primary_key |dim_id
pk1 |1
pk2 |2
PK3 |3
PK4 |4
PK5 |5
如何在Spark DataFrame v1.6中为新记录生成运行序列号?
如果您在某个地方有数据库,则可以在其中创建一个序列,并与用户定义的函数一起使用(当您时,我偶然发现了这个问题...)。
保留一个序列编号的桶,并使用它(增量参数必须与用于创建序列的参数相同)。由于它是一个对象,因此sequenceId将是每个工作节点上的单例,您可以使用原子元来迭代序列的桶。
这远非完美(可能的连接泄漏,依靠DB,锁定静态类,确实如此),欢迎评论。
import java.sql.Connection
import java.sql.DriverManager
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.sql.functions.udf
object SequenceID {
var current: AtomicLong = new AtomicLong
var max: Long = 0
var connection: Connection = null
var connectionLock = new ReentrantLock
var seqLock = new ReentrantLock
def getConnection(): Connection = {
if (connection != null) {
return connection
}
connectionLock.lock()
if (connection == null) {
// create your jdbc connection here
}
connectionLock.unlock()
connection
}
def next(sequence: String, incrementBy: Long): Long = {
if (current.get == max) {
// sequence bucket exhausted, get a new one
seqLock.lock()
if (current.get == max) {
val rs = getConnection().createStatement().executeQuery(s"SELECT NEXT VALUE FOR ${sequence} FROM sysibm.sysdummy1")
rs.next()
current.set(rs.getLong(1))
max = current.get + incrementBy
}
seqLock.unlock()
}
return current.getAndIncrement
}
}
class SequenceID() extends Serializable {
def next(sequence: String, incrementBy: Long): Long = {
return SequenceID.next(sequence, incrementBy)
}
}
val sequenceGenerator = new SequenceID(properties)
def sequenceUDF(seq: SequenceID) = udf[Long](() => {
seq.next("PK_SEQUENCE", 500L)
})
val seq = sequenceUDF(sequenceGenerator)
myDataframe.select(myDataframe("foo"), seq())