从流入线路协议到QuestDB的写入问题



我的问题是,我无法使用Influx Line Protocol(ILP(将数据写入Quest DB,因为我已经创建了一个带有架构的表。如果我有一个空表(即没有模式(,那么我可以执行模式,并且模式是自动创建的。

我的问题是:

  • 为什么会发生这种情况
  • 我怎样才能使它工作(如果可能的话(

下面我描述了我所做的(在QuestDB server 5.0.6上,从docker容器开始(:

  1. 创建一个表
CREATE TABLE my_table(
    location SYMBOL,
    car_brand SYMBOL,
    ts TIMESTAMP,
    kmph FLOAT,
    age INT
) timestamp(ts)  PARTITION BY MONTH;
  1. 在Python中,我尝试写入表
import time
import socket
HOST = 'localhost'
PORT = 9009
# For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    sock.connect((HOST, PORT))
    sock.send('my_table,location=london,car_brand=vw kmph=281.14000000,age=2 1420701827750051000n'.encode())
except socket.error as e:
    print("Got error: %s" % (e))
sock.close()

这就产生了两个问题:

  • 它不向表中写入任何内容(该表之后没有行(,也不会引发任何错误
  • 如果它没能写出来,我预计会出错

然后我想这可能与指定的时间戳/分区有关,所以我创建了一个新表:

CREATE TABLE my_table_v2(
    location SYMBOL,
    car_brand SYMBOL,
    ts TIMESTAMP,
    kmph FLOAT,
    age INT
);

写出来的结果是一样的——什么都没写。

最后我创建了一个空表(没有模式(我试过这样(但没用(

CREATE TABLE my_empty_table();

所以我做了一个类似的变通办法;

CREATE TABLE my_empty_table(smth INT);

然后移除柱:

ALTER TABLE my_empty_table
DROP COLUMN smth;

然后,当我将数据写入空表时,一切都如预期…

理想情况下,我想定义模式,然后写入表,理想情况下如果它无法写入,我想以某种方式捕获它。

第一个python示例在QuestDB日志中抛出错误:

E i.q.c.l.t.LineTcpMeasurementScheduler mismatched column and value types [table=so_table, column=kmph, columnType=FLOAT, valueType=DOUBLE]

如果表具有kmphDOUBLE列类型。age也会抛出类似的解析错误。您要创建的模式是

CREATE TABLE new_ilp_table(
    location SYMBOL,
    timestamp TIMESTAMP,
    car_brand SYMBOL,
    kmph double,
    age long
) timestamp(timestamp)  PARTITION BY MONTH;

因此,需要更改以下类型:

  • FLOAT->DOUBLE
  • INT->LONG

有关数据类型的更多信息,请参阅ILP数据类型文档

根据上面的错误,您可以在QuestDB日志中查找LineTcpMeasurementScheduler

编辑:

CCD_ 10和CCD_。数值的默认类型是longdouble,但如果手动创建具有任何等效类型的较低分辨率的表,则会按预期进行处理。

这适用于测试版版本的6.0:

docker pull questdb/questdb:6.0.0-beta-linux-amd64
docker run -p 9000:9000 -p 8812:8812 -p 9009:9009  
questdb/questdb:6.0.0-beta-linux-amd64

SQL创建表

CREATE TABLE ilp_table(
    location SYMBOL,
    car_brand SYMBOL,
    kmph FLOAT,
    age INT,
    ts TIMESTAMP
) timestamp(ts)  PARTITION BY MONTH;

Python示例写入此表

import time
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
  sock.connect(('localhost', 9009))
  sock.send(('master_ilpf_table,location=london,car_brand=vw kmph=1.1,age=2i %dn' %(time.time_ns())).encode())
except socket.error as e:
  print("Got error: %s" % (e))
sock.close()

最新更新