如何修复缓存的模式注册表查找,从而导致性能可怕



编辑:我从几年前发现了另一个问题(如何在cachedschemaregistryclient中填充缓存而不打电话以注册新的模式?(。它提到,缓存的CacheMareGistriont需要将架构注册到实际注册表以使其被缓存,并且尚无解决方案来解决此问题。因此,在这里留下我的问题,但也希望让人知道。

我正在研究一个正在从kafka提取字节数组的程序,解密它(因此在kafka上是安全的(,将字节转换为字符串,json字符串到json对象,从架构注册表(利用缓存的ChemareGistryClient(,使用注册表元数据检索的模式将JSON字节转换为通用记录,然后将该通用记录序列化为Avro Bytes。

在进行一些测试后,似乎缓存的SchemareGistyClient是主要的性能流失。但是我能说的是获取模式元数据的最佳方法。我是否实施了很差的东西,或者有其他方法可以与我的用例一起使用?

这是解密后处理所有内容的代码:

package org.apache.flink;
import avro.fullNested.FinalMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import serializers.AvroFinishedMessageSerializer;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> {
    private transient CachedSchemaRegistryClient schemaRegistryClient;
    private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer;
    private String schemaUrl;
    private Integer identityMaxCount;
    public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){
        schemaUrl = passedSchemaUrl;
        identityMaxCount = passedImc;
    }
    private void ensureInitialized() {
        if (schemaUrl.equals("")) {
            schemaUrl = "https://myschemaurl.com/";
        }
        if(identityMaxCount == null){
            identityMaxCount = 5;
        }
        if(schemaRegistryClient == null){
            schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount);
        }
        if(avroFinalMessageSerializer == null){
            avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class);
        }
    }
    @Override
    public void flatMap(String s, Collector<byte[]> collector) throws Exception {
        ensureInitialized();
        Object obj = new JSONParser().parse(s);
        JSONObject jsonObject = (JSONObject) obj;
        try {
            String headers = jsonObject.get("headers").toString();
            JSONObject body = (JSONObject) jsonObject.get("requestBody");
            if(headers != null && body != null){
                String kafkaTopicFromHeaders = "hard_coded_name-value";
                //NOTE: this schema lookup has serious performance issues.
                SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders);
                //TODO: need to implement recovery method if schema cannot be reached.
                JsonAvroConverter converter = new JsonAvroConverter();
                GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema()));
                byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord);
                collector.collect(bytesToReturn);
            }
            else {
                System.out.println("json is incorrect.");
            }
        } catch (Exception e){
            System.out.println("json conversion exception caught");
        }
    }
}

感谢提前的任何帮助!

看来GetLatestschememetadata方法不使用缓存。如果您希望使用缓存来提高性能,也许可以重新组织程序以使用使用缓存的其他方法之一,也许是通过ID查找模式或用名称登记为定义字符串的架构。

我在找到Java(或Python或C (的文档时遇到了困难,该文档确认这是SchemareGistry的工作方式(在此处尝试(。但是.NET文档至少在该客户端API中说GetLatest方法没有缓存。

相关内容

  • 没有找到相关文章

最新更新