如何在Flink sql客户端中连接到MinIO文件系统



我正在尝试构建以Flink和MinIO为存储构建的数据管道,目前我可以将数据下沉到MinIO bucket成功,但当我尝试创建一个表WITH(MinIO文件(时,它总是遇到Connection Refused错误:

Flink SQL> CREATE TABLE WordCountTable (
>   word STRING,
>   `count` INT
> )  WITH (
>   'connector' = 'filesystem',         
>   'path' = 's3://test/wordcount2', 
>   'format' = 'csv',     
>   'csv.field-delimiter'=' '
> );
[INFO] Execute statement succeed.
Flink SQL> select * from WordCountTable;
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: Connection refused

我试着用谷歌搜索它,唯一有用的帖子是——https://github.com/fhueske/flink-sql-demo,在爪牙部分,但它已经过时了。

这是docker组成文件:

version: '3'
services:
minio:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_storage:/data
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
command: server --console-address ":9001" /data
jobmanager:
image: flink:1.15.0-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager        
state.backend: filesystem
state.checkpoints.dir: s3://state/checkpoint
s3.endpoint: http://minio:9000
s3.path.style.access: true
s3.access-key: minio
s3.secret-key: minio123
taskmanager:
image: flink:1.15.0-scala_2.12
links:
- jobmanager
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2        
state.backend: filesystem
state.checkpoints.dir: s3://state/checkpoint
s3.endpoint: http://minio:9000
s3.path.style.access: true
s3.access-key: minio
s3.secret-key: minio123
sql-client:
image: flink:1.15.0-scala_2.12
command: bin/sql-client.sh
links:
- jobmanager
depends_on:
- jobmanager
environment:
FLINK_JOBMANAGER_HOST: jobmanager
volumes:
minio_storage: { }

提前谢谢。

更新今天我尝试验证与pingnc的网络连接:似乎一切都很好:

root@0e452dd7385e:/usr/bin# ping jobmanager
PING jobmanager (192.168.128.3) 56(84) bytes of data.
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=1 ttl=64 time=3.39 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=2 ttl=64 time=0.193 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=3 ttl=64 time=0.339 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=4 ttl=64 time=0.186 ms
root@0e452dd7385e:/usr/bin# nc -zv jobmanager 6123
Connection to jobmanager (192.168.128.3) 6123 port [tcp/*] succeeded!

但我在sql客户端日志中发现,有一个Connection refused: /0.0.0.0:8081错误:

2022-07-28 06:44:16,870 WARN  org.apache.flink.client.program.rest.RestClusterClient       [] - Attempt to submit job 'collect' (80b7f32d13c2e3f1deeee4db3df6b923) to 'http://0.0.0.0:8081' has failed.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) [flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) [flink-dist-1.15.0.jar:1.15.0]

令人困惑的是,为什么Flink sql客户端尝试连接到0.0.0.0:8081为什么不jobmanager:8081

记得添加插件(Hadoop/Presto S3文件系统插件(

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

最新更新