Flink1.14 python作业提交流程分析 您所在的位置:网站首页 flink命令行提交作业和wwb提交作业 Flink1.14 python作业提交流程分析

Flink1.14 python作业提交流程分析

2024-05-30 00:55| 来源: 网络整理| 查看: 265

作业提交流程官方介绍:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/overview

命令行提交作业:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/cli/#submitting-pyflink-jobs

以下面的基础命令为入口点:

./bin/flink run --python examples/python/table/word_count.py

查看flink-dist/src/main/bin/flink 文件可以看到这个shell脚本最后一行:

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

是通过shell去启动jvm进程,入口点就是CliFrontend这个类,同时把命令行传进来的参数也传给它的main方法。

 从代码可以看出CliFrontend的main方法主要做两件事,

初始化全局配置参数(不是传进来的args) 构造CliFrontend实例,调用parseAndRun()   try { final CliFrontend cli = new CliFrontend(configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); } finally { System.exit(retCode); }

 在随后的parseAndRun中

进行参数校验,对参数中的第一位进行截取,默认第一位参数为命令(action),通过switch去执行对应的命令操作。

// check for action if (args.length < 1) { CliFrontendParser.printHelp(customCommandLines); System.out.println("Please specify an action."); return 1; } // get action String action = args[0]; // remove action from parameters // ./flink run --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017 // 截除第一个参数 // params[] = --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017 final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version":

ACTION_RUN 这个常量就是run,所以执行的命令调用对应的run()方法。在run方法中主要关注create ProgramOptions的过程,这个过程把输入的内容转成对象ProgramOptions。

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

create过程会判断当前是Java作业还是python作业,判断的依据就是命令行传入的参数中是否有python作业相关的关键字,比如:py,python,pym,pyModule...等,或者启动类为org.apache.flink.client.python.PythonGatewayServer都会被判定为python作业。

如果是python作业则会通过createPythonProgramOptions方法创建ProgramOptions,而Java是直接new ProgramOptions。

public static ProgramOptions create(CommandLine line) throws CliArgsException { if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) { return createPythonProgramOptions(line); } else { return new ProgramOptions(line); } }

createPythonProgramOptions会先初始化一个用来加载引擎python相关扩展包的类加载器,然后通过反射创建PythonProgramOptions实例,PythonProgramOptions继承自ProgramOptions。

需要注意的是在反射创建PythonProgramOptions的时候,在构造方法中会初始化一些配置对应的变量,同时会把entryPointClass属性设置为: org.apache.flink.client.python.PythonDriver

public static ProgramOptions createPythonProgramOptions(CommandLine line) throws CliArgsException { try { ClassLoader classLoader = getPythonClassLoader(); Class pythonProgramOptionsClazz = Class.forName( "org.apache.flink.client.cli.PythonProgramOptions", false, classLoader); Constructor constructor = pythonProgramOptionsClazz.getConstructor(CommandLine.class); return (ProgramOptions) constructor.newInstance(line); } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { throw new CliArgsException( "Python command line option detected but the flink-python module seems to be missing " + "or not working as expected.", e); } }

回到run方法继续看create ProgramOptions之后的逻辑,后续getJobJarAndDependencies方法获取依赖的jar列表,其中也是通过entryPointClass的值判断是否是python作业,如果是python作业会获取FLINK_OPT_DIR下面flink-python开头的jar包作为dependencies包。随后配置合并和处理拿到effectiveConfiguration。

准备都完成以后会根据ProgramOptions(python作业为其子类PythonProgramOptions)和 effectiveConfiguration 创建 PackagedProgram,创建之后调用 executeProgram () 执行创建的PackagedProgram。

     final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); }

 executeProgram()又通过ClientUtils.executeProgram()执行

protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException { ClientUtils.executeProgram( new DefaultExecutorServiceLoader(), configuration, program, false, false); }

ClientUtils.executeProgram()中讲上下文环境的类加载器设置为之前创建的,同时保留当前上下文的类加载器,后面提交完成后需要切换回来,然后执行 program 的 invokeInteractiveModeForExecution()方法。

public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info( "Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } }

 invokeInteractiveModeForExecution中又执行 callMainMethod(mainClass, args);

public void invokeInteractiveModeForExecution() throws ProgramInvocationException { FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); try { callMainMethod(mainClass, args); } finally { FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread(); } }

callMainMethod()中就是反射调用入口类的main方法了,略去一下反射检查,关键就这一行。

mainMethod.invoke(null, (Object) args);

这里执行的这个main方法对于Java作业来说就是写的jar程序的main方法,对于python作业其实是之前默认赋值的 org.apache.flink.client.python.PythonDriver 的mian方法(后续还有 org.apache.flink.client.python.PythonGatewayServer )。

 

 

占位符



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有