如何使用python将kafka消费者数据映射到Mysql



我的主机配置如下:

卡 夫 卡 火花 Mysql , 在码头工人上

我的代码如下:

# To find out where the pyspark
import sys
from kafka import KafkaProducer,KafkaConsumer
import findspark
from boto3 import *
import boto3
import json
findspark.init()
# Creating Spark Context
from pyspark import SparkContext
from pyspark.sql import SparkSession
def get_connection(self):
spark = SparkSession.builder.master("local[*]").appName("SparkByExamples.com").getOrCreate()  
return spark   
def json_serializer(data):
return json.dumps(data).encode("utf-8")

def read_s3():
p1 = KafkaProducer(bootstrap_servers=['broker:29092'], value_serializer=json_serializer)

s3 = boto3.resource('s3')
bucket = s3.Bucket('kakfa')
for obj in bucket.objects.all():
key = obj.key
body = obj.get()['Body'].read().decode('utf-8')
p1.send("Uber_Eats",body)
p1.flush()
def read_from_topic(self,spark):
df = spark.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "broker:29092") 
.option("subscribe", "Uber_Eats") 
.option("startingOffsets", "earliest") 
.load()
df2 = df 
.writeStream 
.format("console") 
.start()
print(df2.awaitTermination()  )  
def get_consumer(self):
consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
"broker:29092",value_deserializer=lambda x: json.loads(x.decode('utf-8')))
return  consumer   
def print_details(self,c1):
for msg in c1:
print(msg.value)
print("Dom  dfe")            


class Foo:
def __init__(self):

spark = get_connection(self)
read_s3()
# System.setProperty("hadoop.home.dir", "$HADOOP_HOMEwinutils-masterhadoop-2.x.x")
c1 = get_consumer(spark)
print_details(self,c1)

f = Foo()  

我从上面的代码中输出的是 folows:

{

{
"Customer Number": "1",
"Customer Name": "Shyam",
"Restaurant Number": "2201",
"Restaurant NameOrdered": "Bawarchi",
"Number of Items": "3",
"price": "10",
"Operating Start hours": "9:00",
"Operating End hours": "23:00"
},
{
"Customer Number": "2",
"Customer Name": "Rohini",
"Restaurant Number": "2202",
"Restaurant NameOrdered": "Sarvana Bhavan",
"Number of Items": "4",
"price": "20",
"Operating Start hours": "8:00",
"Operating End hours": "20:00"
},
{
"Customer Number": "3",
"Customer Name": "Bhairav",
"Restaurant Number": "2203",
"Restaurant NameOrdered": "Taco Bell",
"Number of Items": "5",
"price": "30",
"Operating Start hours": "11:00",
"Operating End hours": "21:00"
}

}

如何将其读入 mysql 中的列? i)它是否像一个普通的json文件,读取和插入?

二)或者我们有什么特定于 kakfa 消费者"json"格式的?

iii) 我指定了 value_deserializer=lambda x: json.loads(x.decode('utf-8')))

在代码中获取 JSON 格式是将数据加载到 MySQL 所必需的吗

谢谢

阿迪

是不是像普通的json文件,读取和插入?

不知道你这是什么意思。Mysql 不接受 json 文件

Spark有自己的JSON文件阅读器,但你正在从Kafka读取,所以这无关紧要。

我们有什么特定于 kakfa 消费者"json"格式的吗?

是的。CAST(value as STRING)接踵而至的是各种get_json_object电话。我已经将您链接到此Databricks博客系列

我指定了value_deserializer=lambda x: json.loads(x.decode('utf-8')))

这不是Spark。我不知道你为什么还有这个。其次,def get_consumer(self)不接受或使用你传递的spark变量,并且你在那里没有类定义,所以不鼓励使用self作为命名的参数(换句话说,你所有的函数都应该在class Foo范围内,但你也根本不需要类)

重要细节- 您显示的文件不是有效的 JSON,因此无论如何,这些方法都不会立即起作用

tl;dr- 假设你真的想使用Spark

  1. 使用您编写的使用 Spark 消费者的函数

  2. 取代

df 
.writeStream 
.format("console") 

使用 JDBC 编写器,.writeStream.format("jdbc").save("jdbc:mysql//…")但仅在修改数据帧以匹配数据库模式之后


否则,如果你不再需要Spark,那么JSON或Kafka是一个实现细节 - 下载并配置Mysql python客户端,然后像往常一样插入数据 - 小心事务,回滚,错误处理,准备的查询等


或者,正如多次回答和评论的那样,以及更多的容错解决方案,忘记Python并使用Kafka Connect(脚本在您的Kafka bin目录中可用,不需要编码)

最新更新