背景
优化内存使用
问题
- 如何使用缓冲区将avro数据插入clickhouse
- 我可以直接插入来自消耗脉冲星的avro数据而不进行解组和编组吗
代码
- 现在我接收来自脉冲星的信息
msg, err := pulsarConsumer.Receive(ctx)
并将消息发送到信道
dataWriteChan <- msg
- 另一个从频道接收消息并使用avro解组的功能
msg <- dataWriteChan
dataPayload := msg.Payload()
var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)
然后将avroData发送到切片以缓存
dataCache = append(dataCache, avroData)
- 直到dataCache达到20M,程序开始封送并插入到clickhouse
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)
config := goavro.OCFConfig{
W: bf,
Codec: goavroCodec,
}
ocfWriter, _ := goavro.NewOCFWriter(config)
ocfWriter.Append(dataCache)
然后使用缓冲区bf生成sql
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)
exec-sql-
conn.Exec(ctx, sql)
- 以上步骤可以正常插入avro数据,我不想使用Sprinrf生成sql,因为它会占用新内存。所以我想使用缓冲区数据并更改为
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())
我不知道这是否能节省内存;但更大的问题是它插入错误!
write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)
内存消耗高的几个地方
1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
谢谢
不管你能不能帮我解决,我也很感激你愿意花时间一起思考!这对我来说非常重要。
您似乎正在使用的驱动程序本身不支持avro格式。您需要将数据整理到一个结构中。然后,您可以利用驱动程序的appendStruct方法。