草庐IT

Hudi源码|Insert源码分析总结(一)(整体流程)

董可伦 2023-05-14 原文

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

Apache Hudi insert源码分析总结,以Java Client为例,不了解Hudi Java Client的可以参考:Hudi Java Client总结|读取Hive写Hudi代码示例

以Java Client为例的原因:1、自己生产上用的Java Client,相比于Spark客户端更熟悉一点。
2、Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
3、Java Client更贴近源码,可以直接分析核心逻辑。不用剖析Spark、Flink源码。对Sprk、Flink源码不熟悉的更容易上手。
4、等分析完Java Client源码后,有时间的话我会再总结一下Spark客户端的源码,这样大家会更容易理解。

版本

Hudi 0.9.0

备注:其实每个版本核心代码都差不多,之所以使用0.9.0,一个是因为对于Java Client,我用0.9.0用的比较多,相比于使用最新版可以节省不少时间,另一个原因是,之前总结的Java Client的源码也是基于0.9.0。比如Hudi Clean Policy 清理策略实现分析Hudi Clean 清理文件实现分析

initTable

首先是通过initTable初始化Hudi表,可以看出来主要就是根据我们配置的一些参数,创建.hoodie元数据目录,然后将这些参数持久化到hoodier.properties文件中,具体的细节可以自己研究。

    public HoodieTableMetaClient initTable(Configuration configuration, String basePath)
        throws IOException {
      return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
    }

  /**
   * Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
   *
   * @return Instance of HoodieTableMetaClient
   */
  public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hadoopConf, String basePath,
      Properties props) throws IOException {
    LOG.info("Initializing " + basePath + " as hoodie table " + basePath);
    Path basePathDir = new Path(basePath);
    final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
    if (!fs.exists(basePathDir)) {
      fs.mkdirs(basePathDir);
    }
    Path metaPathDir = new Path(basePath, METAFOLDER_NAME);
    if (!fs.exists(metaPathDir)) {
      fs.mkdirs(metaPathDir);
    }

    // if anything other than default archive log folder is specified, create that too
    String archiveLogPropVal = new HoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);
    if (!StringUtils.isNullOrEmpty(archiveLogPropVal)) {
      Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
      if (!fs.exists(archiveLogDir)) {
        fs.mkdirs(archiveLogDir);
      }
    }

    // Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
    final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
    if (!fs.exists(temporaryFolder)) {
      fs.mkdirs(temporaryFolder);
    }

    // Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
    final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
    if (!fs.exists(auxiliaryFolder)) {
      fs.mkdirs(auxiliaryFolder);
    }

    initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
    HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
    // We should not use fs.getConf as this might be different from the original configuration
    // used to create the fs in unit tests
    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
    LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
    return metaClient;
  }    

HoodieWriteConfig

这里的配置是写数据时使用的配置,上面initTable的配置是持久化文件的配置,当然这俩配置要保持一致(实际上Spark客户端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些参数。熟悉这些参数后就可以进行调优了。

            Properties indexProperties = new Properties();
            indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(), 150000); // 1000万总体时间提升1分钟
            HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                    .withSchema(writeSchema.toString())
                    .withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism)
                    .forTable(tableName)
                    .withWritePayLoad(payloadClassName)
                    .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
                    .withIndexConfig(HoodieIndexConfig.newBuilder()
                            .withIndexType(HoodieIndex.IndexType.BLOOM)
//                            .bloomIndexPruneByRanges(false) // 1000万总体时间提升1分钟
                            .bloomFilterFPP(0.000001)   // 1000万总体时间提升3分钟
                            .fromProperties(indexProperties)
                            .build())
                    .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150, 200)
                            .compactionSmallFileSize(Long.parseLong(smallFileLimit))
                            .approxRecordSize(Integer.parseInt(recordSizeEstimate))
                            .retainCommits(100).build())
                    .withStorageConfig(HoodieStorageConfig.newBuilder()
                            .parquetMaxFileSize(Long.parseLong(maxFileSize)).build())
                    .build();

HoodieJavaWriteClient

创建writeClient

HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

startCommit

String newCommitTime = writeClient.startCommit();

具体的实现在其父类AbstractHoodieWriteClient中。首先调用rollbackFailedWrites执行rollback操作,关于rollback分析本文先不讲。然后通过HoodieActiveTimeline.createNewInstantTime()创建一个新的instantTime。最后创建metaClient,通过metaClient.getActiveTimeline().createNewInstant生成.commit.request文件

  public String startCommit() {
    // 首先调用rollbackFailedWrites执行rollback操作
    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
    // 生成新的instantTime
    String instantTime = HoodieActiveTimeline.createNewInstantTime();
    // 创建metaClient
    HoodieTableMetaClient metaClient = createMetaClient(true);
    startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
    return instantTime;
  }
  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
    LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
    // if there are pending compactions, their instantTime must not be greater than that of this instant time
    metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
        ValidationUtils.checkArgument(
            HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
        "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
            + latestPending + ",  Ingesting at " + instantTime));
    if (config.getFailedWritesCleanPolicy().isLazy()) {
      this.heartbeatClient.start(instantTime);
    }
    // 创建.commit.request
    metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
        instantTime));
  }  

generateRecord

主要是构造writeClient写数据所需的数据结构writeRecords:List<HoodieRecord>,具体的可以参考我之前分享的文章。

client.insert(writeRecords, newCommitTime)

首先获取table,这里返回HoodieJavaCopyOnWriteTable,接着验证一下schema和历史数据的兼容性。然后通过preWrite执行写之前的一些步骤,比如设置操作类型,接着调用table.insert执行完整的写数据操作,返回result。最后调用postWrite执行archive、clean等操作返回WriteStatuses。

  public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
    // 首先获取table,这里的table为HoodieJavaCopyOnWriteTable
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
        getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
    // 验证schema
    table.validateUpsertSchema();
    // 写之前的一些步骤,比如设置操作类型
    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
    // 调用table.insert执行写数据操作,返回result
    HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
    if (result.getIndexLookupDuration().isPresent()) {
      metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
    }
    // 调用postWrite返回WriteStatuses
    return postWrite(result, instantTime, table);
  }

postWrite

我们先看一下postWrite的逻辑,首先判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean,也就是archive、clean等操作是在写操作完成、生成.commit文件之后进行的。

  /**
   * 判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean
   */
  protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
                                        String instantTime,
                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
    if (result.getIndexLookupDuration().isPresent()) {
      metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
    }
    // commit是否已经提交,这里主要考虑是否设置了自动提交,hoodie.auto.commit默认true
    // 如果不是自动提交的话,那么我们需要手动执行clean等操作,然后手动commit
    // 所以这里默认为true
    // isCommitted代表着已经生成了.commit文件,也就是写操作成功了,也就是通过table.insert已经完成了整个的写操作
    if (result.isCommitted()) {
      // Perform post commit operations.
      if (result.getFinalizeDuration().isPresent()) {
        metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
            result.getWriteStats().get().size());
      }
      // postCommit主要是执行archive、clean等操作。也就是archive、clean等操作是在写操作完成,生成.commit文件之后进行的。
      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

      emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
    }
    return result.getWriteStatuses();
  }

table.insert

先调用JavaInsertCommitActionExecutor.execute接着调用JavaWriteHelper.newInstance().write

  public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
                                                       String instantTime,
                                                       List<HoodieRecord<T>> records) {
    return new JavaInsertCommitActionExecutor<>(context, config,
        this, instantTime, records).execute();
  }

  public HoodieWriteMetadata<List<WriteStatus>> execute() {
    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
  }

JavaWriteHelper.write

它的write方法是在其父类AbstractWriteHelper中实现的,首先首先判断是否需要去重(通过配置项hoodie.combine.before.insert配置是否需要去重),insert默认不需要去重(upsert/delete默认需要)。如果需要去重的话调用方法combineOnCondition先进行去重。
然后判断是否需要tag, tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据,对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件。由于这里为insert所以不需要tag,这也是insert和upsert一个比较大的区别。
我们后面分析upsert源码时,会专门分析tag怎么实现的,本文先略过。然后通过调用executor.execute执行写操作,返回result,这里的executor为JavaInsertCommitActionExecutor。

  public HoodieWriteMetadata<O> write(String instantTime,
                                      I inputRecords,
                                      HoodieEngineContext context,
                                      HoodieTable<T, I, K, O> table,
                                      boolean shouldCombine,
                                      int shuffleParallelism,
                                      BaseCommitActionExecutor<T, I, K, O, R> executor,
                                      boolean performTagging) {
    try {
      // De-dupe/merge if needed
      // 如果开启了去重,则先去重,insert默认不去重
      // 配置项hoodie.combine.before.insert
      I dedupedRecords =
          combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

      Instant lookupBegin = Instant.now();
      I taggedRecords = dedupedRecords;
      if (performTagging) { // 是否需要tag,insert为false
        // perform index loop up to get existing location of records
        // tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据
        // 对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件
        taggedRecords = tag(dedupedRecords, context, table);
      }
      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

      // 通过调用executor.execute执行写操作,返回result。这里的executor为JavaInsertCommitActionExecutor
      HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
      result.setIndexLookupDuration(indexLookupDuration);
      return result;
    } catch (Throwable e) {
      if (e instanceof HoodieUpsertException) {
        throw (HoodieUpsertException) e;
      }
      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
    }
  }

  public boolean shouldCombineBeforeInsert() {
    return getBoolean(COMBINE_BEFORE_INSERT);
  }

  public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
      .key("hoodie.combine.before.insert")
      .defaultValue("false")
      .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
          + " writing to storage.");

JavaInsertCommitActionExecutor.execute

JavaInsertCommitActionExecutor.execute实际上调用的父类BaseJavaCommitActionExecutorexecute
首先通过buildProfile构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用。WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息。数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理,位置信息是为了获取要更新的文件,也就是对应的fileId。对于upsert数据,我们复用原来的fileId。对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写。然后将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight。这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息。然后通过getPartitioner根据WorkloadProfile获取partitioner,接着调用partition方法返回partitionedRecords(<桶号,对应的HoodieRecord>),一个桶对应一个文件 fileId。最后再遍历partitionedRecords,也就是每个桶执行一次写操作handleInsertPartition/handleUpsertPartition,最后调用BoundedInMemoryExecutor.execute,利用生产者消费者模式写数据,关于如何通过生产者消费者模式写数据,我已经在Hudi源码|bootstrap源码分析总结(写Hudi)分析过bootstrap的源码了,原理一样,不同的是实现类不一样,感兴趣的可以看看。

关于tag(索引相关)、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文讲个大概,可能有不准确的地方,大家可以先结合17张图带你彻底理解Hudi Upsert原理进行学习,至于具体的源码分析,限于篇幅及个人精力,本文先不涉及,会放在后面的文章单独讲解,对于本文可能不准确的地方,也会在后面的文章中更新。

  public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
    HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

    WorkloadProfile profile = null;
    if (isWorkloadProfileNeeded()) { // 始终为true
      // 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
      // WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
      // 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
      // 位置信息是为了获取要更新的文件
      // 对于upsert数据,我们复用原来的fileId
      // 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
      profile = new WorkloadProfile(buildProfile(inputRecords));
      LOG.info("Workload profile :" + profile);
      try {
        // 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
        // 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
        saveWorkloadProfileMetadataToInflight(profile, instantTime);
      } catch (Exception e) {
        HoodieTableMetaClient metaClient = table.getMetaClient();
        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
        try {
          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
          }
        } catch (IOException ex) {
          LOG.error("Check file exists failed");
          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
        }
      }
    }


    // 根据WorkloadProfile获取partitioner
    final Partitioner partitioner = getPartitioner(profile);
    // <桶号,对应的HoodieRecord>,一个桶对应一个文件 fileId
    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);

    List<WriteStatus> writeStatuses = new LinkedList<>();
    // forEach,每个桶执行一次写操作handleInsertPartition/handleUpsertPartition
    // 最终通过BoundedInMemoryExecutor.execute 生产者消费者模式写数据
    partitionedRecords.forEach((partition, records) -> {
      // 是否更新、删除
      if (WriteOperationType.isChangingRecords(operationType)) {
        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
      } else {
        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
      }
    });
    updateIndex(writeStatuses, result);
    // commit生成.commit文件,.commit文件的生成标记着写数据的完成
    updateIndexAndCommitIfNeeded(writeStatuses, result);
    return result;
  }

commit

上面通过handleInsertPartition/handleUpsertPartition实际上已经完成了数据的写入。但是最后还要生成.commit元数据文件,代表一次commit的完成,否则如果没有生成commit的话,比如只有.commit.request或者commit.inflight,这样在查询时候不会查到本地写数据生成的文件,而且下次写数据时会触发rollback来处理。

这里索引相关的先不看,commit调用链updateIndexAndCommitIfNeeded->commitOnAutoCommit->autoCommit->commit->commit,最终在BaseJavaCommitActionExecutor的commit方法中通过activeTimeline.saveAsComplete生成.commit文件。
前面讲过了,在生成.commit文件后,会调用postWrite方法触发archive、clean等操作。实际上archive、clean等操作的失败,不影响本次写数据的成功。比如clean失败了,可以下次再clean就可以了。所以当commit完成后,如果clean失败了,这样对于有失败机制的集成工具,比如我们使用的Apache NIFI,是不能将本批次数据放进失败队列的。PS:当本次commit不成功时,我们需要放进失败队列,目的是防止数据丢失。其实我们可以自己写代码继承JavaClient类,将postWrite方法和table.insert分开。这样便于判断是写数据失败还是clean失败,以后我会分享相关代码实现。

  public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
    Instant indexStartTime = Instant.now();
    // Update the index back
    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
    result.setWriteStatuses(statuses);
    result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
    commitOnAutoCommit(result);
  }

  protected void commitOnAutoCommit(HoodieWriteMetadata result) {
    // validate commit action before committing result
    runPrecommitValidators(result);
    // validate commit action before committing result
    if (config.shouldAutoCommit()) {
      LOG.info("Auto commit enabled: Committing " + instantTime);
      autoCommit(extraMetadata, result);
    } else {
      LOG.info("Auto commit disabled for " + instantTime);
    }
  }
  
  protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
    this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
        lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
    try {
      TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
          result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
      commit(extraMetadata, result);
    } finally {
      this.txnManager.endTransaction();
    }
  }

  // BaseJavaCommitActionExecutor
  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
    commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
  }

  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
    String actionType = getCommitActionType();
    LOG.info("Committing " + instantTime + ", action Type " + actionType);
    result.setCommitted(true);
    result.setWriteStats(writeStats);
    // Finalize write
    finalizeWrite(instantTime, writeStats, result);
    try {
      LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
      HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
          extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

      // 通过activeTimeline.saveAsComplete生成.commit文件
      activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
      LOG.info("Committed " + instantTime);
      result.setCommitMetadata(Option.of(metadata));
    } catch (IOException e) {
      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
          e);
    }
  } 

handleInsertPartition

handleInsertPartition到生产者消费者模式调用链,handleInsertPartition->handleUpsertPartition->handleInsert->JavaLazyInsertIterable.computeNext->BoundedInMemoryExecutor.execute

  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                              Partitioner partitioner) {
    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
  }

  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                              Partitioner partitioner) {
    JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
    BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
    BucketType btype = binfo.bucketType;
    try {
      if (btype.equals(BucketType.INSERT)) {
        return handleInsert(binfo.fileIdPrefix, recordItr);
      } else if (btype.equals(BucketType.UPDATE)) {
        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
      } else {
        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
      }
    } catch (Throwable t) {
      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
      LOG.error(msg, t);
      throw new HoodieUpsertException(msg, t);
    }
  }

  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
      LOG.info("Empty partition");
      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }
    return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
        taskContextSupplier, new CreateHandleFactory<>());
  }

  // JavaLazyInsertIterable
  protected List<WriteStatus> computeNext() {
    // Executor service used for launching writer thread.
    BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
        null;
    try {
      final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
      bufferedIteratorExecutor =
          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
      // bufferedIteratorExecutor.execute通过生产者消费者模型实现写数据    
      final List<WriteStatus> result = bufferedIteratorExecutor.execute();
      assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
      return result;
    } catch (Exception e) {
      throw new HoodieException(e);
    } finally {
      if (null != bufferedIteratorExecutor) {
        bufferedIteratorExecutor.shutdownNow();
      }
    }
  }  

总结

本文以Java Client为例,对Apache Hudi insert源码进行了整体逻辑的分析总结,希望能对大家有所帮助。由于精力有限,对于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我会在后面的文章再进行总结。并且等insert相关源码分析完后,会再进行upsert的源码分析。可能有些地方不够准确,还请大家多多指正,让我们共同进步。

注释代码

github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments

相关阅读

有关Hudi源码|Insert源码分析总结(一)(整体流程)的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. SPI接收数据异常问题总结 - 2

    SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手

  3. 建模分析 | 平面2R机器人(二连杆)运动学与动力学建模(附Matlab仿真) - 2

    目录0专栏介绍1平面2R机器人概述2运动学建模2.1正运动学模型2.2逆运动学模型2.3机器人运动学仿真3动力学建模3.1计算动能3.2势能计算与动力学方程3.3动力学仿真0专栏介绍?附C++/Python/Matlab全套代码?课程设计、毕业设计、创新竞赛必备!详细介绍全局规划(图搜索、采样法、智能算法等);局部规划(DWA、APF等);曲线优化(贝塞尔曲线、B样条曲线等)。?详情:图解自动驾驶中的运动规划(MotionPlanning),附几十种规划算法1平面2R机器人概述如图1所示为本文的研究本体——平面2R机器人。对参数进行如下定义:机器人广义坐标

  4. 网站日志分析软件--让网站日志分析工作变得更简单 - 2

    网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.

  5. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  6. (附源码)vue3.0+.NET6实现聊天室(实时聊天SignalR) - 2

    参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍  介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。  内容有:    ①:Hub模型的方法介绍    ②:服务器端代码介绍    ③:前端vue3安装并调用后端方法    ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke()  去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on

  7. ABB-IRB-1200运动学分析MATLAB RVC工具分析+Simulink-Adams联合仿真 - 2

    一、机器人介绍        此处是基于MATLABRVC工具箱,对ABB-IRB-1200型号的微型机械臂进行正逆向运动学分析,并利Simulink工具实现对机械臂进行具有动力学参数的末端轨迹规划仿真,最后根据机械模型设计Simulink-Adams联合仿真。 图1.ABBIRB 1200尺寸参数示意图ABBIRB 1200提供的两种型号广泛适用于各作业,且两者间零部件通用,两种型号的工作范围分别为700 mm 和 900 mm,大有效负载分别为 7 kg 和5 kg。 IRB 1200 能够在狭小空间内能发挥其工作范围与性能优势,具有全新的设计、小型化的体积、高效的性能、易于集成、便捷的接

  8. Simulink方法总结和避坑指南(一)——Simulink入门与基本调试方法 - 2

    文章目录一、项目场景二、基本模块原理与调试方法分析——信源部分:三、信号处理部分和显示部分:四、基本的通信链路搭建:四、特殊模块:interpretedMATLABfunction:五、总结和坑点提醒一、项目场景  最近一个任务是使用simulink搭建一个MIMO串扰消除的链路,并用实际收到的数据进行测试,在搭建的过程中也遇到了不少的问题(当然这比vivado里面的debug好不知道多少倍)。准备趁着这个机会,先以一个很基本的通信链路对simulink基础和相关的debug方法进行总结。  在本篇中,主要记录simulink的基本原理和基本的SISO通信传输链路(QPSK方式),计划在下篇记

  9. 关于Qt程序打包后运行库依赖的常见问题分析及解决方法 - 2

    目录一.大致如下常见问题:(1)找不到程序所依赖的Qt库version`Qt_5'notfound(requiredby(2)CouldnotLoadtheQtplatformplugin"xcb"in""eventhoughitwasfound(3)打包到在不同的linux系统下,或者打包到高版本的相同系统下,运行程序时,直接提示段错误即segmentationfault,或者Illegalinstruction(coredumped)非法指令(4)ldd应用程序或者库,查看运行所依赖的库时,直接报段错误二.问题逐个分析,得出解决方法:(1)找不到程序所依赖的Qt库version`Qt_5'

  10. ruby-on-rails - 如何使用 ruby​​-prof 和 JMeter 分析 Rails - 2

    我想使用ruby​​-prof和JMeter分析Rails应用程序。我对分析特定Controller/操作/或模型方法的建议方法不感兴趣,我想分析完整堆栈,从上到下。所以我运行这样的东西:RAILS_ENV=productionruby-prof-fprof.outscript/server>/dev/null然后我在上面运行我的JMeter测试计划。然而,问题是使用CTRL+C或SIGKILL中断它也会在ruby​​-prof可以写入任何输出之前杀死它。如何在不中断ruby​​-prof的情况下停止mongrel服务器? 最佳答案

随机推荐