尝试使用flume收集twitter数据并将其发送给kafka,但获得关于批处理大小的错误



我相信这与我的flume.conf文件有关。

Flume_project.conf

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'TwitterAgent'
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = kafkasink #filesink
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
#TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.keywords = AWS Marketplace, GCP Marketplace, Azure Marketplace
#filesink
# TwitterAgent.sinks.filesink.type = file_roll
# TwitterAgent.sinks.filesink.channel = MemChannel
# TwitterAgent.sinks.filesink.sink.directory = /docker_share/twitter_sink
# TwitterAgent.sinks.filesink.sink.pathManager.extension = out
# TwitterAgent.sinks.filesink.sink.pathManager.prefix = project
# TwitterAgent.sinks.filesink.sink.rollInterval = 3600
#kafkasink
TwitterAgent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
TwitterAgent.sinks.kafkasink.topic = fp_mc
TwitterAgent.sinks.kafkasink.brokerList = fp_mc-kafka-1:9092
TwitterAgent.sinks.kafkasink.channel = MemChannel
TwitterAgent.sinks.kafkasink.batchSize = 1
# add header and text to sink
# TwitterAgent.sinks.kafkasink.sink.serializer = header_and_text
# TwitterAgent.sinks.kafkasink.sink.serializer.appendNewline = true
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
TwitterAgent.sources.Twitter.consumerKey = xxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxx
TwitterAgent.sources.Twitter.accessToken = xxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxx

这里是错误

Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/openjdk-18/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/docker_share/conf:/docker_share/apache-flume-1.11.0-bin/lib/*:/docker_share/apache-flume-1.11.0-bin/lib:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /docker_share/conf/flume_project.conf -n TwitterAgent
23:53:22.599 [main] ERROR org.apache.flume.node.AbstractConfigurationProvider - Source Twitter has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible source and channel settings defined. source's batch size is greater than the channels transaction capacity. Source: Twitter, batch size = 1000, channel MemChannel, transaction capacity = 100
at org.apache.flume.node.AbstractConfigurationProvider.checkSourceChannelCompatibility(AbstractConfigurationProvider.java:389) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.AbstractConfigurationProvider.getSourceChannels(AbstractConfigurationProvider.java:370) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:332) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:108) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.Application.main(Application.java:491) ~[flume-ng-node-1.11.0.jar:1.11.0]
23:53:22.632 [main] ERROR org.apache.flume.node.AbstractConfigurationProvider - Sink kafkasink has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: kafkasink, batch size = 1000, channel MemChannel, transaction capacity = 100
at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:406) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:465) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:109) ~[flume-ng-node-1.11.0.jar:1.11.0]
at org.apache.flume.node.Application.main(Application.java:491) ~[flume-ng-node-1.11.0.jar:1.11.0]

我错过了什么吗?

我正在尝试使用Flume收集twitter数据,将其发送给Kafka,并使用弹性搜索分析该数据。

我已经尝试将TwitterAgent.channels.MemChannel.transactionCapacity从100更改为1000,但却得到以下错误,而不是

Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/openjdk-18/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/docker_share/conf:/docker_share/apache-flume-1.11.0-bin/lib/*:/docker_share/apache-flume-1.11.0-bin/lib:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /docker_share/conf/flume_project.conf -n TwitterAgent
00:08:00.136 [Twitter Stream consumer /  [1][Establishing connection]] ERROR org.apache.flume.source.twitter.TwitterSource - Exception while streaming tweets
twitter4j.TwitterException: 403:The request is understood, but it has been refused. An accompanying error message will explain why. This code is used when requests are being denied due to update limits (https://support.twitter.com/articles/15364-about-twitter-limits-update-api-dm-and-following).
message - You currently have Essential access which includes access to Twitter API v2 endpoints only. If you need access to this endpoint, you’ll need to apply for Elevated access via the Developer Portal. You can learn more here: https://developer.twitter.com/en/docs/twitter-api/getting-started/about-twitter-api#v2-access-leve
code - 453
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:170) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientBase.request(HttpClientBase.java:57) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientBase.get(HttpClientBase.java:75) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:201) ~[twitter4j-stream-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:170) ~[twitter4j-stream-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:570) ~[twitter4j-stream-4.0.7.jar:4.0.7]

我获得了twitter升级API的批准,错误消失了

最新更新