Scio Apache Beam - 如何正确分离管道代码?



我有一个带有一组 PTransforms 的管道,我的方法变得很长。

我想将我的 DoFns 和复合变换写在一个单独的包中,并在我的主方法中使用它们。使用python非常简单,如何使用Scio实现它?我没有看到任何这样做的例子。:(

withFixedWindows(
FIXED_WINDOW_DURATION,
options = WindowOptions(
trigger = groupedWithinTrigger,
timestampCombiner = TimestampCombiner.END_OF_WINDOW,
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
allowedLateness = Duration.ZERO
)
)
.sumByKey
// How to write this in an another file and use it here?
.transform("Format Output") {
_
.withWindow[IntervalWindow]
.withTimestamp
}

如果我正确理解了您的问题,您希望将map, groupBy, ...转换捆绑在一个单独的包中,并在主管道中使用它们。

一种方法是使用applyTransform,但最终你会使用 PTransforms,它对 scala 不友好。

您可以简单地编写一个函数来接收 SCollection 并返回转换后的函数,例如:

def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???

但是,如果您打算编写自己的Source/Sink,请查看ScioIO类

您可以使用map函数来映射元素示例。

您可以从另一个类传递方法引用,而不是传递 lambda 示例.map(MyClass.MyFunction)

我认为解决此问题的一种方法是在另一个包中定义一个对象,然后在该对象中创建一个具有转换所需逻辑的方法。例如:

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val defaulTopic = "tweets"
val input = args.getOrElse("inputTopic", defaulTopic)
val output = args("outputTopic")
val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
.withName("map to tweet class").map(x => {parse(x).extract[Tweet]})
inputStream
.flatMap(sentiment.predict) // object sentiment with method predict
}
object sentiment  {
def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
val data = tweet.text
val emptyCase = Some("")
Some(data) match {
case `emptyCase` => None
case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
}
}

也请此链接以获取 Scio 示例中给出的示例

最新更新