我需要将历史数据写入InfluxDB(我使用Python,在这种情况下这不是必须的,所以我可能愿意接受非Python解决方案)。我像这样设置写API
write_api = client.write_api(write_options=ASYNCHRONOUS)
数据来自一个DataFrame,以时间戳作为键,所以我像这样将它写入数据库
result = write_api.write(bucket=bucket, data_frame_measurement_name=field_key, record=a_data_frame)
这个调用不会抛出异常,即使InfluxDB服务器关闭。result
有一个受保护的属性_success
,在调试中是一个布尔值,但我不能从代码中访问它。
如何检查写入是否成功?
如果你使用后台批处理,你可以添加自定义的成功,错误和重试回调。
from influxdb_client import InfluxDBClient
def success_cb(details, data):
url, token, org = details
print(url, token, org)
data = data.decode('utf-8').split('n')
print('Total Rows Inserted:', len(data))
def error_cb(details, data, exception):
print(exc)
def retry_cb(details, data, exception):
print('Retrying because of an exception:', exc)
with InfluxDBClient(url, token, org) as client:
with client.write_api(success_callback=success_cb,
error_callback=error_cb,
retry_callback=retry_cb) as write_api:
write_api.write(...)
如果您急于测试所有回调,并且不想等到所有重试完成,您可以覆盖重试间隔和次数。
from influxdb_client import InfluxDBClient, WriteOptions
with InfluxDBClient(url, token, org) as client:
with client.write_api(success_callback=success_cb,
error_callback=error_cb,
retry_callback=retry_cb,
write_options=WriteOptions(retry_interval=60,
max_retries=2),
) as write_api:
...
如果您想立即将数据写入数据库,则使用同步版本的write_api
- https://github.com/influxdata/influxdb-client-python/blob/58343322678dd20c642fdf9d0a9b68bc2c09add9/examples/example.py#L12
异步写入应该是"触发"by call.get()
- https://github.com/influxdata/influxdb-client-python#asynchronous-client
对
write_api.write()
返回multiprocessing.pool.AsyncResult
或multiprocessing.pool.AsyncResult
(两者相同)。
使用这个返回对象,您可以用几种方式检查异步请求。详见:https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult
如果可以使用阻塞请求,则可以使用write_api = client.write_api(write_options=SYNCRONOUS)
。
from datetime import datetime
from influxdb_client import WritePrecision, InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
p = Point("my_measurement")
.tag("location", "Prague")
.field("temperature", 25.3)
.time(datetime.utcnow(), WritePrecision.MS)
try:
client.write_api(write_options=SYNCHRONOUS).write(bucket="my-bucket", record=p)
reboot = False
except Exception as e:
reboot = True
print(f"Reboot? {reboot}")