我使用pyflink来运行flink流,如果我使用StandAlone模式运行flink,它可以工作,但使用yarn-per-job模式运行flick,它失败了,report"pyflink.util.exceptions.TableException:无法执行sql">
yarn per job命令为:flink run-t yarn per job-Djobmanager.memory.prrocess.size=1024mb-Dtaskmanager.memory.prprocess.size=2048mb-ynm flink cluster-Dtaskmanager.numberOfTaskSlots=2-pyfs cluster.py…
独立命令是:flink-run-pyfs-cluster.py…
附在cluster.py.中的python环境档案
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
curr_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
jars = f"""
file://{curr_path}/jars/flink-sql-connector-kafka_2.11-1.13.1.jar;
file://{curr_path}/jars/force-shading-1.13.1.jar"""
t_env.get_config().get_configuration().set_string("pipeline.jars", jars)
t_env.add_python_archive("%s/requirements/flink.zip" % curr_path)
t_env.get_config().set_python_executable("flink.zip/flink/bin/python")
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(2)
env.get_config().set_auto_watermark_interval(10000)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
parse_log = udaf(LogParser(parsing_params),
input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.TIMESTAMP(3)],
result_type=DataTypes.STRING(), func_type="pandas")
process_ad = udf(ADProcessor(ad_params), result_type=DataTypes.STRING())
t_env.create_temporary_function('log_parsing_process', parse_log)
t_env.create_temporary_function('ad_process', process_ad)
tumble_window = Tumble.over("5.minutes").on("time_ltz").alias("w")
t_env.execute_sql(f"""
CREATE TABLE source_table(
ip VARCHAR, -- ip address
raws VARCHAR, -- message
host VARCHAR, -- host
log_type VARCHAR, -- type
system_name VARCHAR, -- system
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{source_topic}',
'properties.bootstrap.servers' = '{source_servers}',
'properties.group.id' = '{group_id}',
'scan.startup.mode' = '{auto_offset_reset}',
'format' = 'json'
)
""")
sink_sql = f"""
CREATE TABLE sink (
alert VARCHAR, -- alert
start_time timestamp(3), -- window start timestamp
end_time timestamp(3) -- window end timestamp
) with (
'connector' = 'kafka',
'topic' = '{sink_topic}',
'properties.bootstrap.servers' = '{sink_servers}',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
)"""
t_env.execute_sql(sink_sql)
t_env.get_config().set_null_check(False)
source_table = t_env.from_path('source_table')
sink_table = source_table.window(tumble_window)
.group_by("w, log_type")
.select("log_parsing_process(ip, raws, host, log_type, system_name, time_ltz) AS pattern, "
"w.start AS start_time, "
"w.end AS end_time")
.select("ad_process(pattern, start_time, end_time) AS alert, start_time, end_time")
sink_table.execute_insert("sink")
错误为:
File "/tmp/pyflink/xxxx/xxxx/workerbee/log_exception_detection_run_on_diff_mode.py ,line 148, in run_flink sink_table_execute_insert("test_sink")
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/pyflink.zip/pyflink/table/table.py, line 1056 in execute_insert
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 163, in deco
pyflink.util.exceptions.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:777)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMetondAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMetondAccessorImpl.invoke(NativeMethodAccessorImpl.hava:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.hava:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
节点管理器日志:
INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: launchContainer: [bash, /opt/hadoop_data/tmp/nm-local-dir/usercache/root/appcache/applicatino_I1644370510310_0002/container_I1644370510310_0002_03_000001/default_container_executor.sh]
WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_I1644370510310_0002_03_000001 is : 1
WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_I1644370510310_0002_03_000001 and exit exit code: 1
ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java: 1008)
at org.apache.hadoop.util.Shell.run(Shell.java: 901)
at org.apache.hadoop.util.Shell$ShellCommandExceutor.execute(Shell.java:1213
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:309)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.launchContainer(ContainerLaunch.java:585)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.Call(ContainerLaunch.java:373)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.Call(ContainerLaunch.java:103)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPollExecutor.runWorker(ThreadPollExecutor.java:1149)
at java.util.concurrent.ThreadPollExecutor$Worker.run(ThreadPollExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from container-launch.
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: container id: container_I1644370510310_0002_03_000001
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exit code: 1
WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Container launch failed : Container exited with a non-zero exit code 1
INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: Container container_I1644370510310_0002_03_000001 transitioned from RUNNING to EXITED_WITH_FAILURE
看起来像是一个与类加载器相关的问题。classloader.check-leaked-classloader配置可以参考https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/
此外,您可以尝试使用add_jarAPI,而不是直接设置pipeline.jars-config
def add_jars(self, *jars_path: str):
"""
Adds a list of jar files that will be uploaded to the cluster and referenced by the job.
:param jars_path: Path of jars.
"""
add_jars_to_context_class_loader(jars_path)
jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil
.getEnvironmentConfig(self._j_stream_execution_environment)
old_jar_paths = env_config.getString(jars_key, None)
joined_jars_path = ';'.join(jars_path)
if old_jar_paths and old_jar_paths.strip():
joined_jars_path = ';'.join([old_jar_paths, joined_jars_path])
env_config.setString(jars_key, joined_jars_path)
经过调试和检查,我终于发现问题是我错过了一些flink-hadoop-jar包:
commons-cli-1.4.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
hadoop-yarn-api-3.3.1.jar