草庐IT

springboot-集成flink最佳实践和打包部署

ssehs 2023-11-08 原文

引入flink依赖

//stream api和table api
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.14.2</version>
    <!-- provided不会打包到jar -->
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.14.2</version>
    <scope>provided</scope>
</dependency>

编写入口

目录结构

  • com.example.demo
    • auto
      • ChildApplication
    • task
      • Task
      • AbstractTask
      • TaskManager
    • time
      • TimeSource
      • TimeTask
    • Demo2Application

子容器初始化类

@EnableAutoConfiguration
public class ChildApplication {
}

任务接口

public interface Task {
    void run(String... args) throws Exception;
}

抽象任务类

@Slf4j
public abstract class AbstractTask implements Task {

    @Override
    public void run(String... args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //解析spring参数
        DefaultApplicationArguments arguments = new DefaultApplicationArguments(args);
        //解析flink参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        //合并两种参数
        Configuration configuration = new Configuration();
        Map<String, String> map = parameterTool.toMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (Objects.equals(entry.getValue(), "__NO_VALUE_KEY")) {
                continue;
            }
            configuration.setString(entry.getKey(), entry.getValue());
        }

        Set<String> optionNames = arguments.getOptionNames();
        for (String optionName : optionNames) {
            List<String> optionValues = arguments.getOptionValues(optionName);
            if (CollectionUtils.isEmpty(optionValues)) {
                continue;
            }
            configuration.setString(optionName, String.join(",", optionValues));
        }
        //设置全局参数
        env.getConfig().setGlobalJobParameters(configuration);
        //配置任务
        configTask(env, parameterTool);
        //提交任务
        JobClient jobClient = env.executeAsync(getClass().getName());
        if (jobClient instanceof WebSubmissionJobClient) {
            return;
        }
        jobClient.getJobExecutionResult()
                .whenComplete(new BiConsumer<JobExecutionResult, Throwable>() {
                    @Override
                    public void accept(JobExecutionResult jobExecutionResult, Throwable throwable) {
                        log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS));
                    }
                });
    }

    public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool);

}

任务管理器

@Slf4j
@Service
public class TaskManager implements CommandLineRunner {

    @Resource
    List<Task> taskList;

    @Override
    public void run(String... args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        log.info("程序参数 {}", parameterTool);
        String runTaskName = parameterTool.get("task");
        if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) {
            return;
        }
        for (Task task : taskList) {
            if (Objects.equals(runTaskName, task.getClass().getName())) {
                task.run(args);
            }
        }
    }

}

一个计时任务数据源

@Slf4j
@Service
public class TimeSource extends RichSourceFunction<Date> {

    volatile boolean running = true;

    private JdbcTemplate jdbcTemplate;

    @Override
    public void open(Configuration parameters) throws Exception {
        //创建一个容器,并拿到需要的bean
        List<String> args = new LinkedList<>();
        args.add(String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", cls.getName() + UUID.randomUUID()));
        args.add(String.format("--spring.jmx.default-domain=%s", cls.getName() + UUID.randomUUID()));
        Configuration globalJobParameters = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();
        String activeKey = "spring.profiles.active";
        String active = globalJobParameters.getString(ConfigOptions.key(activeKey).stringType().noDefaultValue());
        if (StringUtils.isNotEmpty(active)) {
            args.add(String.format("--%s=%s", activeKey, active));
        }
        ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args.toArray(new String[0]));
        jdbcTemplate = applicationContext.getBean(JdbcTemplate.class);
    }

    @Override
    public void run(SourceContext<Date> ctx) throws Exception {
        while (running) {
            Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class));
            ctx.collect(date);
            TimeUnit.SECONDS.sleep(1);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

}

写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:

  • xxx is not serializable:flink的算子可能会在不同的机器上运行,所以类信息会序列化之后传输。所以算子不能有任何不能序列化的字段(字段为null除外)
  • 有些需要的字段没有实现Serializable,但是又确实要用到,比如JdbcTemplate,如果是mybatis的话,就是各种mapper;像这些字段,只能在open方法里面初始化。有两种方法做这个初始化:一是,通过全局参数把一些连接信息传到算子,然后在open方法中初始化JdbcTemplate;二是,在open方法中重新创建一个容器,然后从容器中拿到JdbcTemplate。第一种方法,比较容易实现,但是要手动装配JdbcTemplate;第二种方法,要重新创建一个容器,装配的任务全都交给容器;想法是很nice,但在一个容器中创建另一个容器,比想象中的要复杂一些。
  • 在一个容器中初始化另一个容器:
    • 需要一个容器初始化类:因为毕竟不需要注入所有对象,所以不能用主程序启动类Demo2Application;但是又要autoconfigure里面的很多对象,所以考虑加@EnableAutoConfiguration注解,同时放入单独的auto包,避免扫到不需要的bean定义;如果需要mybatis的mapper,考虑加@MapperScan注解
    • 定义好容器初始化类之后,启动报错:Error creating bean with name ‘springApplicationAdminRegistrar’ defined in class path resource [org/springframework/boot/autoconfigure/admin/SpringApplicationAdminJmxAutoConfiguration.class]: Invocation of init method failed; nested exception is javax.management.InstanceAlreadyExistsException: org.springframework.boot:type=Admin,name=SpringApplication。看错误信息是实例重复了,这个有两种解决办法:
      • 容器初始化类直接排除掉SpringApplicationAdminJmxAutoConfiguration.class:@EnableAutoConfiguration(exclude = {SpringApplicationAdminJmxAutoConfiguration.class})
      • 子容器启动时修改spring.application.admin.jmx-name:–spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s
    • 再启动,还是报错:Unable to register MBean [HikariDataSource (HikariPool-2)] with key ‘dataSource’; nested exception is javax.management.InstanceAlreadyExistsException: com.zaxxer.hikari:name=dataSource,type=HikariDataSource。又是个实例重复的问题,这个问题百度了下,需要给–spring.jmx.default-domain配置个新的值:–spring.jmx.default-domain=%s
    • 再启动,子容器正常创建,程序运行发现ok
    • 打包上传flink web,提交运行,正常!

一个计时任务

@Slf4j
@Service
public class TimeTask extends AbstractTask {

    @Resource
    private TimeSource timeSource;

    @Override
    public void configTask(StreamExecutionEnvironment env, ParameterTool tool) {
        env.getConfig().setAutoWatermarkInterval(0);
        env.addSource(timeSource)
                .setParallelism(1)
                .print()
                .setParallelism(1);
    }

}

主程序启动类

@SpringBootApplication
public class Demo2Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo2Application.class, args);
    }

}

打包程序

设置parent

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

直接使用spring-boot-maven-plugin?

<plugin>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。

考虑使用maven-shade-plugin

参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.3.0</version>
    <executions>
      <execution>
        <phase>package</phase>
        <goals>
          <goal>shade</goal>
        </goals>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <artifactSet>
            <excludes>
              <exclude>com.google.code.findbugs:jsr305</exclude>
              <exclude>org.slf4j:*</exclude>
              <exclude>log4j:*</exclude>
            </excludes>
          </artifactSet>
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>module-info.class</exclude>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
          <transformers>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
              <resource>META-INF/spring.handlers</resource>
              <resource>reference.conf</resource>
            </transformer>
            <transformer
              implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
              <resource>META-INF/spring.factories</resource>
            </transformer>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
              <resource>META-INF/spring.schemas</resource>
            </transformer>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>${start-class}</mainClass>
            </transformer>
          </transformers>
        </configuration>
      </execution>
    </executions>
</plugin>

结果报错:

Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer

纠结了半天,也没找到原因

再试试maven-assembly-plugin

  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.3.0</version>
    <configuration>
      <archive>
        <manifest>
          <mainClass>${start-class}</mainClass>
        </manifest>
      </archive>
      <!-- 打包依赖 -->
      <descriptorRefs>
        <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
    </configuration>
    <executions>
      <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>single</goal>
        </goals>
      </execution>
    </executions>
  </plugin>

可以正常打包,本地也能运行,但是上传到flink web报错

LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)

很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。

重回maven-shade-plugin

找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。

那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer的问题呢?

恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parentpluginManagement,发现里面定义很多插件,其中就包括maven-shade-plugin

按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
</plugin>
mvn clean package

打包成功了!

仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。

后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <configuration>
        <artifactSet>
            <excludes>
                <!-- 参考flink官网也把这个排除 -->
                <exclude>com.google.code.findbugs:jsr305</exclude>
                <exclude>ch.qos.logback:*</exclude>
            </excludes>
        </artifactSet>
    </configuration>
</plugin>

有关springboot-集成flink最佳实践和打包部署的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  4. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  5. ruby-on-rails - 我如何将 Hoptoad 与 DelayedJob 和 DaemonSpawn 集成? - 2

    我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W

  6. ruby-on-rails - Ruby on Rails 可以部署在 Azure 网站上吗? - 2

    我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/

  7. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  8. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

  9. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  10. ruby-on-rails - Rails 中同一个类的多个关联的最佳实践? - 2

    我认为我的问题最好用一个例子来描述。假设我有一个名为“Thing”的简单模型,它有一些简单数据类型的属性。像...Thing-foo:string-goo:string-bar:int这并不难。数据库表将包含具有这三个属性的三列,我可以使用@thing.foo或@thing.bar之类的东西访问它们。但我要解决的问题是当“foo”或“goo”不再包含在简单数据类型中时会发生什么?假设foo和goo代表相同类型的对象。也就是说,它们都是“Whazit”的实例,只是数据不同。所以现在事情可能看起来像这样......Thing-bar:int但是现在有一个新的模型叫做“Whazit”,看起来

随机推荐