我正在实施客户管理解决方案,并从多个来源接收数据,例如游戏平台,CRM/营销工具等,并将其合并到SQL Server内置的标准化数据仓库中。它可以根据客户端、数据提供者或需求接收多种格式的数据——定期(每天/每小时/等)的平面文件,访问SNS/SQS流,手动出站API调用,以及通过Kafka(以及其他需要的来源)。
对于当前的实现,它将通过我们在AWS中设置的Kafka基础设施(使用AWS MSK)接收来自其他平台(B)的数据。我们的实现需要实时地将这些数据使用到我们的数据库staging层(数据消息格式的表,预转换到标准化数据平台),以便可以定期处理和转换。因此,我需要构建一个进程来"从外部Kafka读取(Kafka托管在AWS之外,因此不会从MSK获得lambda触发器),写入db"。
我定义了4种可能的方法-
- 使用Kafka消费者库
- 使用Kafka connect connector
- 使用托管Kafka流媒体服务,如confluent cloud或Amazon MSK
- 使用Amazon Kinesis
就我的情况而言,第二个选项是最好的,所以我尽量利用它。为了测试目的,我在我的AWS账户上创建了Kafka集群,并尝试在AWS ECS上配置Kafka连接服务。但是不清楚我应该使用哪个连接器以及如何配置它。
如果有人能想出一个关于这方面的快速指南,我将不胜感激。
选项3和4不会写入RDS。它们只启动一个Kafka集群,听起来你已经有了。
不清楚我应该使用哪个连接器以及如何配置这个
您应该使用JDBC Sink。你可以把它部署到Kafka Connect集群的任何地方。在AWS中,这包括MSK Connect(我认为它可以连接到自我管理的Kafka,但不确定)、EC2、ECS或EKS。它不在Lambda中运行。
遵循Confluent关于如何使用Kafka Connect的文档。
多种格式
该连接器只能使用预定义格式的一小部分,如JSONSchema、Avro或Protobuf。
引用
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
- https://rmoff.net/2021/03/12/kafka-connect-jdbc-sink-deep-dive-working-with-primary-keys/
您也可以使用Lambda,但这会导致数据库的高IO负载,因为每个事件都会打开一个新的数据库连接。