致力于设置从我们的RDS Postgres 9.6到Redhift运行的Kafka。 使用 https://blog.insightdatascience.com/from-postgresql-to-redshift-with-kafka-connect-111c44954a6a 的指南,我们已经设置了所有基础设施,并且正在努力全面设置Confluent。 我收到ava.lang.IllegalArgumentException的错误:组数必须为正数。尝试设置东西时。 这是我的配置文件:
name=source-postgres
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=16
connection.url= ((correct url and information here))
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
topic.prefix=postgres_
完全错误:
/usr/local/confluent$/usr/local/confluent/bin/connect-standalone/usr/local/confluent/etc/schema-registry/connect-avro-standalone.properties/usr/local/confluent/etc/kafka-connect-jdbc/source-postgres.properties SLF4J:类路径包含多个 SLF4J 绑定。SLF4J:找到 绑定在 [jar:file:/usr/local/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:发现结合 [jar:file:/usr/local/confluent/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:发现结合 [jar:file:/usr/local/confluent/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:发现结合 [jar:file:/usr/local/confluent/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:请参阅 http://www.slf4j.org/codes.html#multiple_bindings 以获取 解释。SLF4J:实际绑定类型为类型 [org.slf4j.impl.Log4jLoggerFactory] [2018-01-29 16:49:49,820]信息 独立配置值: 访问.控制.允许.方法 = 访问.控制.允许.来源 = 引导服务器 = [本地主机:9092] internal.key.converter = class org.apache.kafka.connect.json.JsonConverter internal.value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class io.confluent.connect.avro.AvroConverter offset.flush.interval.ms = 60000 offset.flush.timeout.ms = 5000 offset.storage.file.filename =/tmp/connect.offsets rest.advertised.host.name = 空 rest.advertised.port = null rest.host.name = 空 rest.port = 8083 task.shutdown.graceful.timeout.ms = 5000 value.converter = class io.confluent.connect.avro.AvroConverter (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180) [2018-01-29 16:49:49,942]信息日志记录初始化@549ms (org.eclipse.jetty.util.log:186)[2018-01-29 16:49:50,301]信息 卡夫卡 Connect start (org.apache.kafka.connect.runtime.Connect:52) [2018-01-29 16:49:50,302]信息 牧民开始 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70) [2018-01-29 16:49:50,302]信息工作者启动 (org.apache.kafka.connect.runtime.worker:113)[2018-01-29 16:49:50,302] 信息 起始文件偏移支持与文件一起存储/tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60) [2018-01-29 16:49:50,304]信息工作者已启动 (org.apache.kafka.connect.runtime.worker:118)[2018-01-29 16:49:50,305] 信息 牧羊人开始了 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72) [2018-01-29 16:49:50,305]信息 启动 REST 服务器 (org.apache.kafka.connect.runtime.rest.RestServer:98)[2018-01-29 16:49:50,434] 信息码头-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)2018-1-29下午04:49:51 org.glassfish.jersey.internal.errors logErrors 警告:以下内容 检测到警告: 警告: (子)资源方法 列表连接器在 org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource 包含空路径批注。警告:(子)资源方法 创建连接器 org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource 包含空路径批注。警告:(子)资源方法 listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 包含空路径批注。警告:(子)资源方法 服务器信息在 org.apache.kafka.connect.runtime.rest.resources.RootResource 包含 空路径批注。[2018-01-29 16:49:51,385]信息已启动 o.e.j.s.ServletContextHandler@5aabbb29{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)[2018-01-29 16:49:51,409] 信息开始 ServerConnector@54dab9ac{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)[2018-01-29 16:49:51,409] 信息开始@2019ms (org.eclipse.jetty.server.Server:379)[2018-01-29 16:49:51,410]信息 REST 服务器侦听 http://127.0.0.1:8083/,广告 URL http://127.0.0.1:8083/(org.apache.kafka.connect.runtime.rest.RestServer:150)[2018-01-29 16:49:51,410] 信息 卡夫卡连接开始 (org.apache.kafka.connect.runtime.Connect:58)[2018-01-29 16:49:51,412] 信息连接器配置值: connector.class = io.confluent.connect.jdbc.JdbcSourceConnector key.converter = null 名称 = 源邮局 任务.max = 16value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:180) [2018-01-29 16:49:51,413] 信息 创建连接器源帖子类型 io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.worker:159)[2018-01-29 16:49:51,416] 信息 带有版本的实例化连接器源帖子 类型类 io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:162) 的 3.1.2 [2018-01-29 16:49:51,419] 信息 Jdbc源连接器配置值: 批处理.max.行 = 100 连接网址 = incrementing.column.name = id 模式 = 时间戳 + 递增 poll.interval.ms = 5000 查询 = schema.pattern = null 表黑名单 = [] table.poll.interval.ms = 60000 表类型= [表] 表白名单 = [] timestamp.column.name = updated_at timestamp.delay.interval.ms = 0 主题前缀 = postgres_ validate.non.null = true (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:180) [2018-01-29 16:49:52,129]信息 已完成连接器的创建 source-postgres (org.apache.kafka.connect.runtime.worker:173) [2018-01-29 16:49:52,130]INFO SourceConnectorConfig 值: connector.class = io.confluent.connect.jdbc.JdbcSourceConnector key.converter = null 名称 = 源邮局 任务.max = 16 value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:180) [2018-01-29 16:49:52,209]错误 连接器错误后停止 (org.apache.kafka.connect.cli.ConnectStandalone:102) java.lang.IllegalArgumentException:组数必须为正数。 at org.apache.kafka.connect.util.ConnectorUtils.groupPartitions(ConnectorUtils.java:45) at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:123) at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:193) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.recomputeTaskConfigs(StandaloneHerder.java:251) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:281) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:163) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96) [2018-01-29 16:49:52,210]信息 卡夫卡连接停止 (org.apache.kafka.connect.runtime.Connect:68)[2018-01-29 16:49:52,210] 信息 停止 REST 服务器 (org.apache.kafka.connect.runtime.rest.RestServer:154)[2018-01-29 16:49:52,213] 信息已停止 ServerConnector@54dab9ac{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)[2018-01-29 16:49:52,218] 信息停止 o.e.j.s.ServletContextHandler@5aabbb29{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)[2018-01-29 16:49:52,224] 信息休息服务器停止 (org.apache.kafka.connect.runtime.rest.RestServer:165)[2018-01-29 16:49:52,224] 信息 牧民停下脚步 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:76) [2018-01-29 16:49:52,224]信息 正在停止连接器源邮局 (org.apache.kafka.connect.runtime.worker:218)[2018-01-29 16:49:52,225] 信息 停止表监视线程 (io.confluent.connect.jdbc.JdbcSourceConnector:137)[2018-01-29 16:49:52,225] 信息 停止的连接器源邮递 (org.apache.kafka.connect.runtime.worker:229)[2018-01-29 16:49:52,225] 信息 工人停止 (org.apache.kafka.connect.runtime.worker:122)[2018-01-29 16:49:52,225] 信息停止文件偏移支持存储 (org.apache.kafka.connect.storage.FileOffsetBackingStore:68) [2018-01-29 16:49:52,225]信息工作者已停止 (org.apache.kafka.connect.runtime.worker:142)[2018-01-29 16:49:57,334] 信息 反射扫描 6952 毫秒 263 个网址, 生成 12036 个密钥和 80097 个值 (组织反思:229)[2018-01-29 16:49:57,346]信息 牧民停了下来 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:86) [2018-01-29 16:49:57,346]信息卡夫卡连接停止 (org.apache.kafka.connect.runtime.Connect:73)
我们在RDS Postgres(9.6)到Redshift之间使用DMS。 它一直在失败,而且很悲惨,而且几乎在这一点上几乎不奇怪地昂贵,因此我们正在将其作为可能的解决方案。我在这里有点碰壁,真的很想在这方面得到一些帮助。
我正在处理与此非常相似的问题,我发现如果连接器没有配置来告诉它要拉什么,它只会出错。尝试将以下内容添加到连接器配置中:
表白名单=
然后指定要抓取的表列表。
我在使用 JDBC 源连接器作业时遇到了此错误。问题是table.whitelist
设置区分大小写,即使底层数据库不是(RDBMS 是 MS Sql Server)。
所以我的桌子tableName
,我有"table.whitelist": "tablename",
。这失败了,我得到了上面的错误。将其更改为"table.whitelist": "tableName",
修复了错误。
尽管事实上SELECT * FROM tablename
和SELECT * FROM tableName
都在MS Sql Manager中工作。