如何使用clickhouse go有效地将avro数据插入clickhouse



背景

优化内存使用

问题

  1. 如何使用缓冲区将avro数据插入clickhouse
  2. 我可以直接插入来自消耗脉冲星的avro数据而不进行解组和编组吗

代码

  1. 现在我接收来自脉冲星的信息
msg, err := pulsarConsumer.Receive(ctx)

并将消息发送到信道

dataWriteChan <- msg
  1. 另一个从频道接收消息并使用avro解组的功能
msg <- dataWriteChan 
dataPayload := msg.Payload()
var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)

然后将avroData发送到切片以缓存

dataCache = append(dataCache, avroData)
  1. 直到dataCache达到20M,程序开始封送并插入到clickhouse
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)
config := goavro.OCFConfig{
W:     bf,
Codec: goavroCodec,
}
ocfWriter, _ := goavro.NewOCFWriter(config)
ocfWriter.Append(dataCache)

然后使用缓冲区bf生成sql

sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)

exec-sql-

conn.Exec(ctx, sql)
  1. 以上步骤可以正常插入avro数据,我不想使用Sprinrf生成sql,因为它会占用新内存。所以我想使用缓冲区数据并更改为
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())

我不知道这是否能节省内存;但更大的问题是它插入错误!

write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)

内存消耗高的几个地方

1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)

谢谢

不管你能不能帮我解决,我也很感激你愿意花时间一起思考!这对我来说非常重要。

您似乎正在使用的驱动程序本身不支持avro格式。您需要将数据整理到一个结构中。然后,您可以利用驱动程序的appendStruct方法。

相关内容

  • 没有找到相关文章

最新更新