我写了一个小的Scala程序,它使用Apache Flink Streaming API来读取Twitter tweets。
object TwitterWordCount {
private val properties = "/home/twitter-login.properties"
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val twitterStream = env.addSource(new TwitterSource(properties))
val tweets = twitterStream
.flatMap(new JSONParseFlatMap[String, String] {
override def flatMap(in: String, out: Collector[String]): Unit = {
if (getString(in, "user.lang") == "en") {
out.collect(getString(in, "text"))
}
}
})
tweets.print
env.execute("tweets")
}
}
执行时遇到以下问题:
14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d shut down
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down
程序尝试重建连接。因此,这4行日志消息继续发出。
奇怪的是,当我运行Apache Flink项目中提供的示例时,一切都工作得很好(我从GitHub中提取了最新版本的master)。我甚至使用相同的属性文件。如果我把这个例子类复制到我自己的项目中,上面的问题状态也会发生。
我使用Flink原型来创建我自己的项目。我尝试了0.9.1和0.10-SNAPSHOT版本。flink-scala
、flink-streaming-scala
、flink-clients
、flink-connector-twitter
依赖项在对应版本中使用。
有没有人遇到过类似的问题,可以让我走上正确的轨道?
调试com.twitter.hbc.httpclient.ClientBase
给我带来了以下异常:org.apache.http.conn.ConnectTimeoutException: Connect to stream.twitter.com:80 timed out
根据Twitter开发者论坛上的一个帖子,这种情况的发生是因为Apaches HttpClient 4.2中的一个bug。事实上,在我的项目上解析依赖树显示,flink-runtime依赖于com.amazonaws:aws-java-sdk:1.81,而后者又依赖于org.apache.httpcomponents:httpclient:4.2。
添加HttpClient 4.2.6到我的项目的依赖项暂时解决了这个问题
谢谢@peedeeX21你的解决方案帮了我!在pom.xml中添加显式依赖可以解决从eclipse运行时的问题,但是当使用flink集群并使用flink run提交程序时,flink发行版中打包的版本仍然胜出。
我已经通过下载httpclient-4.2.6.jar jar到flink/lib并将其重命名为"a"(ahttpclient-4.2.6.jar)来解决它,因此它将首先添加到flink运行时的类路径(由bin/config.sh完成)希望对大家有所帮助。