代码优化
我有一个flink应用程序,它从url/端口读取数据并对其进行处理并返回JSON。然后,我将JSON转换为字符串,并将其转换为Kafka。
当前性能&注意事项
如果我只是执行处理->我可以通过该函数运行大约30000个字符串,但当我添加该函数将其转换为STring,然后转换为kafka时,我的吞吐量降至每秒17000个字符串。
在转换为Kafka之前,我需要将JSON转换为String吗?如果不是,我如何将json ObjectNode下沉到kafka?
还有其他什么解决方案。我认为瓶颈是字符串函数
我尝试使用几种方法(.toString函数、StringBuilder到String)将JSON转换为String。
// Read from Source
val in_stream = env.socketTextStream(url, port, socket_stream_deliminator, socket_connection_retries).setParallelism(1)
// Perform Process
.map(x=>{Process(x)}).setParallelism(1)
// Convert to STring
.map(x => ObjectNodeToString({
val json_string_builder = StringBuilder.newBuilder
json_string_builder.append(x)
return json_string_builder.toString()
})).setParallelism(1)
// sink data
.addSink(new FlinkKafkaProducer[String](broker_hosts, global_topic, new SimpleStringSchema()))
我想坚持每秒处理30000根琴弦。这是我从convert到string函数中得到的。我可以将ObjectNode直接下沉到kafka吗?
您可以。Sink正在将给定的对象序列化为字节数组,然后再将其发送到kafka。请确保您的sink函数提供了能够将ObjectNode转换为字节数组的序列化程序。
还要确保使用者已准备好接收ObjectNode对象,而不是Strings。