我正在处理https://github.com/uncleguanghui/pyflink_learn.
我的环境是flink 1.12.0和ubuntu,flink在后台运行。
wordcount示例相当简单。
import os
import shutil
from pyflink.table import BatchTableEnvironment, EnvironmentSettings
from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)
dir_word = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'word.csv')
t_env.execute_sql(f"""
CREATE TABLE source (
id BIGINT, -- ID
word STRING -- word
) WITH (
'connector' = 'filesystem',
'path' = 'file://{dir_word}',
'format' = 'csv'
)
""")
dir_result = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'result.csv')
if os.path.exists(dir_result):
if os.path.isfile(dir_result):
os.remove(dir_result)
else:
shutil.rmtree(dir_result, True)
t_env.execute_sql(f"""
CREATE TABLE sink (
word STRING, -- word
cnt BIGINT -- cnt
) WITH (
'connector' = 'filesystem',
'path' = 'file://{dir_result}',
'format' = 'csv'
)
""")
t_env.execute_sql("""
INSERT INTO sink
SELECT word
, count(1) AS cnt
FROM source
GROUP BY word
""")
使用命令运行代码
flink run -m localhost:8081 -py batch.py
然而,出现了一个问题
# flink run -m localhost:8081 -py batch.py
File "batch1.py", line 24
""")
^
SyntaxError: invalid syntax
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:113)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
错误指向代码的这一部分
t_env.execute_sql(f"""
CREATE TABLE source (
id BIGINT, -- ID
word STRING -- word
) WITH (
'connector' = 'filesystem',
'path' = 'file://{dir_word}',
'format' = 'csv'
)
""") // issue to happen
有人能发现其中的解决方案是什么?
尝试使用他引用的API表版本。如果有效,则表示您的python版本较低。还要确保使用pip-installapache-flink安装pyflink。最后你可以直接问他问题,他回答得很快。我也在研究他的例子,学习pyflink的东西,api经常更新,确保也使用相同的版本:(