我正在尝试将 kafka 主题作为 Flink SQL CLI 客户端中的表。
以下是我对 sql-client.sh 的称呼:
./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib -d ./conf/sql-client-config-1.yaml
我收到以下异常:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
以下是完整的堆栈跟踪:
USHOLBPI1-ML:flink Behzad.Pirvali$ ./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib -d ./conf/sql-client-config-1.yaml
Reading default environment from: file:/Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/./conf/sql-client-config-1.yaml
No session environment specified.
Validating current environment...
Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.properties.2.key=group.id
connector.properties.2.value=group-scanned-tickets
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=scanned-tickets
connector.type=kafka
connector.version=2.2.0
format.property-version=1
format.schema=ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)
format.type=json
schema.0.name=venueId
schema.0.type=LONG
schema.1.name=eventName
schema.1.type=STRING
schema.2.name=ticketId
schema.2.type=LONG
schema.3.name=eventStartTime
schema.3.type=TIMESTAMP
schema.4.name=scannedTime
schema.4.rowtime.timestamps.from=eventTime
schema.4.rowtime.timestamps.type=from-field
schema.4.rowtime.watermarks.delay=60000
schema.4.rowtime.watermarks.type=periodic-bounded
schema.4.type=TIMESTAMP
update-mode=append
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:218)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:134)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
... 4 more
下面是sql-config.yaml:
tables:
- name: scanned_tickets
type: source
update-mode: append
schema:
- name: venueId
type: LONG
- name: eventName
type: STRING
- name: ticketId
type: LONG
- name: eventStartTime
type: TIMESTAMP
- name: scannedTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "eventTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
connector:
property-version: 1
type: kafka
version: 2.2.0
topic: scanned-tickets
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: group-scanned-tickets
format:
property-version: 1
type: json
schema: "ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)"
#==============================================================================
# Execution properties
#==============================================================================
# Execution properties allow for changing the behavior of a table program.
execution:
type: streaming # 'batch' or 'streaming' execution
result-mode: table # 'changelog' or 'table' presentation of results
parallelism: 1 # parallelism of the program
max-parallelism: 128 # maximum parallelism
min-idle-state-retention: 0 # minimum idle state retention in ms
max-idle-state-retention: 0 # maximum idle state retention in ms
#==============================================================================
# Deployment properties
#==============================================================================
# Deployment properties allow for describing the cluster to which table
# programs are submitted to.
deployment:
type: standalone # only the 'standalone' deployment is supported
response-timeout: 5000 # general cluster communication timeout in ms
gateway-address: "" # (optional) address from cluster to gateway
gateway-port: 0 # (optional) port from cluster to gateway
Flink 版本是 1.7.2,下面是 lib 目录内容:
-rw-r--r-- 1 Behzad.Pirvali staff 28524 May 30 10:25 flink-json-1.7.2.jar
drwxr-xr-x@ 8 Behzad.Pirvali staff 256 May 30 10:25 .
-rw-r--r-- 1 Behzad.Pirvali staff 1739194 May 30 10:25 flink-connector-kafka-0.11_2.12-1.7.2-sql-jar.jar
drwxr-xr-x@ 12 Behzad.Pirvali staff 384 Feb 11 06:50 ..
-rw-r--r--@ 1 Behzad.Pirvali staff 84497196 Feb 11 06:50 flink-dist_2.12-1.7.2.jar
-rw-r--r--@ 1 Behzad.Pirvali staff 141942 Feb 11 06:49 flink-python_2.12-1.7.2.jar
-rw-r--r--@ 1 Behzad.Pirvali staff 489884 Feb 11 06:32 log4j-1.2.17.jar
-rw-r--r--@ 1 Behzad.Pirvali staff 9931 Feb 11 06:32 slf4j-log4j12-1.7.15.jar
所以,看起来我缺少一个 jar 文件,但我无法完全弄清楚是哪一个?
我能够找出问题的根本原因。yaml 文件中的配置不匹配:
connector:
property-version: 1
type: kafka
version: 2.2.0
我正在设置代理版本,但版本必须是 0.11 才能匹配 fink-kafka-connector:flink-connector-kafka-0.11_2.12-1.7.2-sql-jar
因此,将配置更改为此配置有效:
connector:
property-version: 1
type: kafka
version: 0.11