使用 Spark 进行流式处理时查询数据库是否很好



我正在尝试用Spark(以及Flink(制作一个简单的流媒体应用程序。输入是一个包含一些产品信息的 Kafka 主题,在将产品信息写入其他地方之前,我想获取产品的类别信息并在流式传输期间添加到我的模型中。我计划将我的产品类别映射数据存储在PostgreSQL表或Couchbase存储桶中。因此,我需要为每个流数据查询其中一个。我想学习的是:

  • 它适用于Spark/Flink吗?

  • 这是流式处理应用程序的好做法吗?如果没有,我如何通过大数据方式做到这一点?我应该将映射数据存储在其他地方,还是以其他方式将该映射数据与流数据联接起来?

这至少对于 Spark 来说是很常见的做法,但它可能更多地取决于数据库的选择。 就像 Cassandra 在通过全主键进行查找时非常快(如果您没有大量数据,启用行缓存也可能有所帮助(,我认为 Couchbase 也可能是一个不错的选择(我很长时间没有使用它(。 代码可能如下所示(完整代码在此齐柏林飞艇笔记本中。此代码还需要 Spark Cassandra Connector 2.5.0,它支持"直接加入 Cassandra" - 请参阅该版本的博客文章(:

val streamingInputDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.101.36.9:9092")
.option("subscribe", "tweets2")
.load()
val tweetDF = streamingInputDF.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("tweet"))
.select($"tweet.payload.created_at".as("created_at").cast(TimestampType),
$"tweet.payload.lang".as("language"))
val streamingCountsDF =  tweetDF
.where(col("language").isNotNull)
.groupBy($"language", window($"created_at", "1 minutes"))
.count()
.select($"language", $"window.start".as("ts"), $"count")
import org.apache.spark.sql.cassandra._
val lang_details = spark.read.cassandraFormat("languages", "zep").load()
val joined = streamingCountsDF.join(lang_details, 
lang_details("id") === streamingCountsDF("language"), "left_outer")
.select($"language", $"native_name".as("lang_name"), $"ts", $"count")
...

此外,您还需要考虑其他要求 - 您获得更新的频率、这些更新需要传播到流作业的速度等。 例如,对于 Spark,您可能为存储在数据库中的数据设置一个单独的数据帧,并且您缓存此数据以加快联接速度,但每 N 分钟刷新一次数据帧,以便从数据库获取最新更新。 (您可以在 Stream Processing with Apache Spark 一书中找到源代码(

这是 Flink 应用程序中的常见模式,有几种方法需要考虑:

(1( 可以使用外部数据库进行查找联接。Table API 内置了对 JDBC 数据库(包括 PostgreSQL(执行此操作的支持。例如,下面是 Kafka 流与 MySQL 中查找表的扩充联接,其中 MySQL 表是通过 Hive 目录访问的:

SELECT
l_proctime AS `querytime`,
l_orderkey AS `order`,
l_linenumber AS `linenumber`,
l_currency AS `currency`,
rs_rate AS `cur_rate`, 
(l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM prod_lineitem
JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency
WHERE
l_linestatus = 'O';

文档。

(2( 对于其他(非 JDBC(数据源,您可以使用异步函数通过外部服务/数据库实现自己的扩充。

(3( Flink1.11 增加了对摄取 Debezium CDC(变更数据捕获(流的支持,使得维护在 Flink 状态下具体化的外部数据库的同步视图变得更加容易。支持MySQL,PostgreSQL,Oracle,Microsoft SQL Server和许多其他数据库。通过将外部数据源镜像到 Flink 中,您将获得更高的吞吐量和更低的延迟,并减少外部数据库的负载。

相关内容

  • 没有找到相关文章

最新更新