我想从Flink Job中将数据传输到AWS S3。以下是将数据流向S3的简单测试应用程序的链接。
https://github.com/dmiljkovic/test-flink-bucketingsink-s3
当jar被提交给我的计算机上的flink" cluster"时,代码也可以使用。问题是工作只能工作一次。如果作业是第二次提交,则产生堆栈跟踪。如果我重新启动集群作业,但仅第一次。
org.apache.commons.logging.LogConfigurationException: java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl (Caused by java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl)
at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:76)
at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:102)
at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:88)
at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:96)
at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:158)
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:119)
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:389)
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:371)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl
at org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
at org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
at org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
... 26 more
Source: Collection Source -> S3BucketingSink_write__bbb_UID_1 -> (Sink: S3BucketingSink_write__bbb_UID_2, Sink: S3BucketingSink_write__bbb_UID_3) (1/1)
这是由于" Commons-logging"与倒入classloading无法正常工作的事实。
两个立即的解决方法是:
- 切换到父级classloading
- 从应用程序JAR文件中删除所有Hadoop和Commons-Loging代码
对于Flink 1.4.2和Flink 1.5,我们确保" Commons-Logging"始终加载父级。特别处理这样的公地似乎在项目(JBOSS,TOMCAT等(中很常见。