对于第一次尝试,我想从文件中读取JSON数据并将其传递给Flink。我定义了一个源(逐行读取JSON字符串)和一个占位符过滤器。请参阅代码:
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys
class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))
class Dummy_Filter(FilterFunction):
def filter(self, value):
return True
#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()
当我构建工作并将其移至我的"开始flink-cluster"时,我会收到以下错误消息:
virtualbox:/media/sf_python $ ./flink-1.7.2/bin/pyflink-stream.sh ./json_parser_flink.py程序的启动执行无法运行 计划:null Trackback(最近的最新通话):文件",行 1,在文件中 "/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py", 第25行,主要在 org.apache.flink.client.program.rest.restclusterclient.submitjob(RestClusterClient.java:268) 在 org.apache.flink.client.program.clusterclient.run(clusterclient.java:487) 在 org.apache.flink.streaming.api.environment.StreamContextenVironment.execute(streamContextenVironment.java:66) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(streamExecutionEnvironment.java:1510) 在 org.apache.flink.streaming.python.api.environment.pythermenexecutionenvironment.execute(pythonstrymeamexecutionenvironment.java:245) 在sun.reflect.nativemethodaccessorimpl.invoke0(本机方法) sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) 在 sun.reflect.delegatingmethodaccessorimpl.invoke(授权methodaccessorimpl.java:43) 在java.lang.reflect.method.invoke(method.java:498) org.apache.flink.client.program.programinvocation exception: org.apache.flink.client.program.programinvocation exception:job 失败的。(Jobid:31615948194C951BE03D46576929AA23)
该程序不包含Flink作业。也许你忘了打电话 在执行环境上执行()。
我没有忘记致电execute()。
我发现了问题。快速期望在源函数中run()函数。