天鹅座:无法传递事件.当事务为OPEN时调用close()-必须先提交或回滚



我安装了Fiware Orion(v1.13.0(、Fiware Cygnus(2.8.0(和Kafka,以便通过Cygnus将数据从Orion发送到Kafka。

我使用了cygnus文档中建议的conf文件

cygnus-ngsi.sources =http-source
cygnus-ngsi.sinks =kafka-sink
cygnus-ngsi.channels =kafka-channel
cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf

cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100
cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 192.168.1.142:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 192.168.1.142:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

使用Docker群组一切就绪

ID             NAME              MODE         REPLICAS   IMAGE                       PORTS
c195zwpxparu   cygnus_cygnus     replicated   1/1        fiware/cygnus-ngsi:latest   *:5050->5050/tcp, *:5055->5055/tcp, *:5080->5080/tcp
pzjvud4q4ibc   kafka_kafka       replicated   1/1        bitnami/kafka:2             *:9092->9092/tcp
bpyaz4jphuhh   kafka_zookeeper   replicated   1/1        bitnami/zookeeper:3         *:2181->2181/tcp
jk4po3ofs3bm   orion_mongodb     replicated   1/1        mongo:3.6.5
mlknk6j7y5kd   orion_orion       replicated   1/1        fiware/orion:1.13.0         *:1026->1026/tcp

这是我关于天鹅座的docker-compose.yml

version: "3"
services:
cygnus:
image: fiware/cygnus-ngsi
ports:
- "5050:5050"
- "5055:5055"
- "5080:5080"
volumes:
- ./conf/agent.conf:/opt/apache-flume/conf/agent.conf
environment:
- CYGNUS_SKIP_CONF_GENERATION=true
- CYGNUS_MULTIAGENT=false
deploy:
replicas: 1
restart_policy:
condition: any
delay: 5s
max_attempts: 3
window: 120s

networks:
default:
driver: overlay
driver_opts:
com.docker.network.driver.mtu: 1400

当通知从猎户座到达localhost:55050/notify时,在天鹅座控制台上,此错误显示为


cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6
7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at java.lang.Thread.run(Thread.java:748)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67
ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception  (begin() calle
d when transaction is OPEN!)

有什么建议吗?

目前天鹅座的版本确实是2.8.0.,但猎户座1.13.0已经有几年的历史了,不太可能与最新的天鹅座版本保持一致。

Orion目前的版本是3.0.0。试着使用它。

最新猎户座和最新天鹅座的基本工作示例可以在NGSI-v2 的分步教程中找到

最新更新