我想确保flink在设置方面有效,然后尝试使用法复杂化。在最简单的例子中,我试着做这些事情。输入包含
列a,列b1,2
输出存在。为了使用docker为我的应用程序下载pyflink版本1.10,我使用了以下代码片段:
jobmanager:
image: pyflink/playgrounds:1.10.0
volumes:
- ./examples:/opt/examples
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8088:8088"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: pyflink/playgrounds:1.10.0
volumes:
- ./examples:/opt/examples
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os
# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
source_ddl = """
CREATE TABLE MyUserTable (
column_a INT PRIMARY KEY,
column_b INT,
) WITH (
'connector' = 'filesystem',
'path' = 'file:///Users//code/examples/input.csv ',
'format' = 'csv'
)"""
#connector for data output/sink
sink_ddl = """
CREATE TABLE table2 (
score INT)
WITH (
'connector' = 'filesystem',
'path' = 'file:///Users//code/examples/output.csv',
'format' = 'csv'
)"""
#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)
#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")
# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT * FROM MyUserTable")
table_result2.print()
生成的错误如下
File ".flink1.py", line 41, in <module>
source_table = table_env.execute_sql(source_ddl)
File "C:UserslandrAppDataLocalProgramsPythonPython38libsite-packagespyflinktabletable_environment.py", line 804, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "C:UserslandrAppDataLocalProgramsPythonPython38libsite-packagespy4jjava_gateway.py", line 1285, in __call__
return_value = get_return_value(
File "C:UserslandrAppDataLocalProgramsPythonPython38libsite-packagespyflinkutilexceptions.py",
line 147, in deco
return f(*a, **kw)
File "C:UserslandrAppDataLocalProgramsPythonPython38libsite-packagespy4jprotocol.py", line 326,
in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered ")" at line 6, column 25.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4835)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5209)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 15 more
删除"column_b INT,"
后面的逗号。
这应该行得通。