在 Flink SQL CLI 客户端中将 Kafka 主题作为 Flink 表查看



我正在尝试将 kafka 主题作为 Flink SQL CLI 客户端中的表。

以下是我对 sql-client.sh 的称呼:

./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib  -d ./conf/sql-client-config-1.yaml

我收到以下异常:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

以下是完整的堆栈跟踪:

USHOLBPI1-ML:flink Behzad.Pirvali$ ./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib  -d ./conf/sql-client-config-1.yaml
Reading default environment from: file:/Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/./conf/sql-client-config-1.yaml
No session environment specified.
Validating current environment...
Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
    at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
    at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
    at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
    ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.properties.2.key=group.id
connector.properties.2.value=group-scanned-tickets
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=scanned-tickets
connector.type=kafka
connector.version=2.2.0
format.property-version=1
format.schema=ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)
format.type=json
schema.0.name=venueId
schema.0.type=LONG
schema.1.name=eventName
schema.1.type=STRING
schema.2.name=ticketId
schema.2.type=LONG
schema.3.name=eventStartTime
schema.3.type=TIMESTAMP
schema.4.name=scannedTime
schema.4.rowtime.timestamps.from=eventTime
schema.4.rowtime.timestamps.type=from-field
schema.4.rowtime.watermarks.delay=60000
schema.4.rowtime.watermarks.type=periodic-bounded
schema.4.type=TIMESTAMP
update-mode=append
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
    at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:218)
    at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:134)
    at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
    ... 4 more

下面是sql-config.yaml:

tables:
  - name: scanned_tickets
    type: source
    update-mode: append
    schema:
    - name: venueId
      type: LONG
    - name: eventName
      type: STRING
    - name: ticketId
      type: LONG
    - name: eventStartTime
      type: TIMESTAMP
    - name: scannedTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    connector:
      property-version: 1
      type: kafka
      version: 2.2.0
      topic: scanned-tickets
      startup-mode: earliest-offset
      properties:
      - key: zookeeper.connect
        value: localhost:2181
      - key: bootstrap.servers
        value: localhost:9092
      - key: group.id
        value: group-scanned-tickets
    format:
      property-version: 1
      type: json
      schema: "ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)"

#==============================================================================
# Execution properties
#==============================================================================
# Execution properties allow for changing the behavior of a table program.
execution:
  type: streaming              # 'batch' or 'streaming' execution
  result-mode: table           # 'changelog' or 'table' presentation of results
  parallelism: 1               # parallelism of the program
  max-parallelism: 128         # maximum parallelism
  min-idle-state-retention: 0  # minimum idle state retention in ms
  max-idle-state-retention: 0  # maximum idle state retention in ms
#==============================================================================
# Deployment properties
#==============================================================================
# Deployment properties allow for describing the cluster to which table
# programs are submitted to.
deployment:
  type: standalone             # only the 'standalone' deployment is supported
  response-timeout: 5000       # general cluster communication timeout in ms
  gateway-address: ""          # (optional) address from cluster to gateway
  gateway-port: 0              # (optional) port from cluster to gateway

Flink 版本是 1.7.2,下面是 lib 目录内容:

-rw-r--r--   1 Behzad.Pirvali  staff     28524 May 30 10:25 flink-json-1.7.2.jar
drwxr-xr-x@  8 Behzad.Pirvali  staff       256 May 30 10:25 .
-rw-r--r--   1 Behzad.Pirvali  staff   1739194 May 30 10:25 flink-connector-kafka-0.11_2.12-1.7.2-sql-jar.jar
drwxr-xr-x@ 12 Behzad.Pirvali  staff       384 Feb 11 06:50 ..
-rw-r--r--@  1 Behzad.Pirvali  staff  84497196 Feb 11 06:50 flink-dist_2.12-1.7.2.jar
-rw-r--r--@  1 Behzad.Pirvali  staff    141942 Feb 11 06:49 flink-python_2.12-1.7.2.jar
-rw-r--r--@  1 Behzad.Pirvali  staff    489884 Feb 11 06:32 log4j-1.2.17.jar
-rw-r--r--@  1 Behzad.Pirvali  staff      9931 Feb 11 06:32 slf4j-log4j12-1.7.15.jar

所以,看起来我缺少一个 jar 文件,但我无法完全弄清楚是哪一个?

我能够找出问题的根本原因。yaml 文件中的配置不匹配:

connector:
      property-version: 1
      type: kafka
      version: 2.2.0

我正在设置代理版本,但版本必须是 0.11 才能匹配 fink-kafka-connector:flink-connector-kafka-0.11_2.12-1.7.2-sql-jar

因此,将配置更改为此配置有效:

connector:
      property-version: 1
      type: kafka
      version: 0.11

相关内容

  • 没有找到相关文章

最新更新