如何在气流中使用Scala操作符运行Scala代码



我刚刚写了一个恢复过程到Aerospike,它看起来很适合气流,我正在寻找一些气流操作符到Scala。

当前实现:

// Register UDF for LUT
aerospikeService.registerUDFs(
"""
|function getLUT(r)
|    return record.last_update_time(r)
|end
|""".stripMargin
)
// Pause Connectors
k8sService.pauseConnectors()
// Get Connectors, Current Offsets and LUTs
val connectors = k8sService.getConnectors()
val originalState = kafkaService.getCurrentState()
val startTime = aerospikeService.calculateCurrentLUTs()
// Delete Connectors
k8sService.deleteConnectors()
kafkaService.resetOffsets(originalState)
// Recreate Connectors
k8sService.createConnectors(connectors)
// Wait until Offset Reached
kafkaService.waitTillOriginalOffsetsReached(originalState)
// Truncate
aerospikeService.truncate(startTime, durableDelete)
// Cleanup
aerospikeService.cleanup()

没有' scalaoperator ';运行Scala代码。Python不是JVM语言,因此需要构建一个jar文件,该文件可以从另一个进程执行。例如,在气流中使用BashOperator:

scala_task = BashOperator(
task_id="scala_task",
dag=dag,
bash_command="java -jar myjar.jar",
)

另一个流行的解决方案是将代码构建到Docker容器中,并使用KubernetesPodOperator在Kubernetes集群上启动它。

请注意,BashOperator(1)要求JVM存在于气流工作节点上,(2)如果用BashOperator触发,该进程将在工作节点上运行,因此要确保有足够的资源来处理它。如果没有,"外包";在其他地方进行繁重的处理,例如K8S或Spark集群。

最新更新