在Python中从Tweets Kafka Producer创建pyFlink DataStream Consumer.



我想在pyFlink中创建I流kafka消费者,它可以在反序列化(json)后读取tweet数据,我有pyflink版本1.14.4(最新版本)

我可以有kafka生产者的一个例子和一个简单的代码flink消费者流在python?

下面是PyFlink示例中给出的一个示例,该示例展示了如何从PyFlink DataStream API中的Kafka消费者中读取json数据:

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer

# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
type_info = Types.ROW([Types.INT(), Types.STRING()])
ds = env.from_collection(
[(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=type_info)
serialization_schema = JsonRowSerializationSchema.Builder() 
.with_type_info(type_info) 
.build()
kafka_producer = FlinkKafkaProducer(
topic='test_json_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
)
# note that the output type of ds must be RowTypeInfo
ds.add_sink(kafka_producer)
env.execute()

def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() 
.type_info(Types.ROW([Types.INT(), Types.STRING()])) 
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_json_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_earliest()
env.add_source(kafka_consumer).print()
env.execute()

if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
print("start writing data to kafka")
write_to_kafka(env)
print("start reading data from kafka")
read_from_kafka(env)

最新更新