草庐IT

Flink 源码之AsyncFunction

AlienPaul 2023-10-09 原文

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

简介

Flink的特点是高吞吐低延迟。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能,这时候怎么办?

为了解决这个问题,Flink引入了AsyncFunction系列接口。使用这些异步接口调用外部服务的时候,不用再同步等待结果返回,只需要将数据存入队列,外部服务接口返回时会更新队列数据状态。在调用外部服务后直接返回处理下一个异步调用,不需要同步等待结果。下游拉取数据的时候直接从队列获取即可。

使用方法

在讲解AsyncFunction使用方法之前,我们先“伪造”一个耗时的外部系统调用。调用pullData会立即返回一个CompletableFuture。耗时5秒后生成的数据通过CompletableFuture返回。

public class AsyncIODemo implements Serializable {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public CompletableFuture<String> pullData(final String source) {

        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        executorService.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            completableFuture.complete("Output value: " + source);
        });

        return completableFuture;
    }
}

接下来编写Flink作业:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")

val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String] {
    override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
        // 调用前面的外部系统调用,拉取数据
        val future = new AsyncIODemo().pullData(input)
        // 这个方法是非阻塞的,一旦数据获取成功,会立即调用resultFuture.complete方法
        future.whenCompleteAsync(new BiConsumer[String, Throwable] {
            override def accept(t: String, u: Throwable): Unit = {
                resultFuture.complete(Array(t))
            }
        })
    }
}, 10, TimeUnit.SECONDS)
// 上面设置最长异步调用超时时间为10秒

asyncStream.print()
env.execute()

执行Flink作业。我们发现虽然外部系统调用了4次,然而并没有等待20秒后才输出全部4个结果,实际上只等待了5秒左右。AsyncFunction的功能得到了验证。

注意:尽管AsyncFunction字面上为异步调用,实际上asynInvoke方法仍然是同步的。绝不能在该方法中阻塞等待调用结果,这样失去了它原本的作用。应该在此处编写异步回调方法,通过异步方式通知Flink数据已获取完毕。

AsyncFunction

从这里开始进入源码分析环节。AsyncFunction接口源码如下:

@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     *     trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
     * exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
    }
}

AsyncFunction接口有两个方法:

  • asyncInvoke:异步操作每一个数据流输入元素。方法的第一个参数input为数据流中的元素,第二个参数resultFuture用于收集异步处理的结果或者是错误信息。不要在此方法内同步等待数据处理逻辑,这样会阻塞线程,降低作业吞吐量。
  • timeout:定义数据超时处理逻辑。方法的参数和asyncInvoke相同。AsyncFunction已经提供了默认实现。如果需要自定义超时逻辑,可以覆盖这个方法。

ResultFuture

ResultFuture在异步操作的时候用于收集结果或错误。

@PublicEvolving
public interface ResultFuture<OUT> {
    /**
     * Completes the result future with a collection of result objects.
     *
     * <p>Note that it should be called for exactly one time in the user code. Calling this function
     * for multiple times will cause data lose.
     *
     * <p>Put all results in a {@link Collection} and then emit output.
     *
     * @param result A list of results.
     */
    void complete(Collection<OUT> result);

    /**
     * Completes the result future exceptionally with an exception.
     *
     * @param error A Throwable object.
     */
    void completeExceptionally(Throwable error);
}

它包含两个方法:

  • complete:如果异步逻辑顺利返回,调用complete方法转入结果数据的集合对象,将数据传递给下游。
  • completeExceptionally:如果异步逻辑需要错误,需要调用这个方法将错误传入。

AsyncDataStream

该类是创建异步算子的工具类。它有2种方法:

  • unorderedWait:不保证输出元素的顺序和读入元素顺序相同。
  • orderedWait:保证输出元素的顺序和读入元素顺序相同。

这两种方法每个还对应两个重载方法,但是参数含义是相同的。参数为:

  • DataStream<IN> in:需要添加异步处理逻辑的数据流。AsyncDataStream实际上是个工具类,并不是一种流的类型。
  • AsyncFunction<IN, OUT> func:用户定义的异步执行逻辑。
  • long timeout:异步任务超时时间。
  • TimeUnit timeUnit:超时时间单位。
  • int capacity:异步任务初始队列长度。只有部分重载方法有这个参数。默认值为100。

下面是orderedWait其中一个重载方法的代码。

public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit,
    int capacity) {
    return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}

它调用了addOperator方法,为DataStream添加一个OneInputTransformation,其中包含了AsyncWaitOperator

其他几个unorderedWaitorderedWait重载方法调用的都是addOperator,不再赘述。

接下来轮到了addOperator方法:

private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    int bufSize,
    OutputMode mode) {

    TypeInformation<OUT> outTypeInfo =
        TypeExtractor.getUnaryOperatorReturnType(
        func,
        AsyncFunction.class,
        0,
        1,
        new int[] {1, 0},
        in.getType(),
        Utils.getCallLocationName(),
        true);

    // create transform
    AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
        new AsyncWaitOperatorFactory<>(
        in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);

    return in.transform("async wait operator", outTypeInfo, operatorFactory);
}

这个方法创建了一个AsyncWaitOperatorFactory,将其包装入transformation。factory在生成ExecutionGraph的时候将创建出AsyncWaitOperator。下一节我们一起分析下异步操作的核心AsyncWaitOperator

AsyncWaitOperator

我们从AsyncWaitOperator的构造方法开始。构造方法参数中最重要的是outputMode,它决定了异步处理任务队列的类型,从而决定用户数据异步处理后是否严格按照输入顺序输出。

public AsyncWaitOperator(
    @Nonnull AsyncFunction<IN, OUT> asyncFunction,
    long timeout,
    int capacity,
    @Nonnull AsyncDataStream.OutputMode outputMode,
    @Nonnull ProcessingTimeService processingTimeService,
    @Nonnull MailboxExecutor mailboxExecutor) {
    super(asyncFunction);

    // 设置可以和下游算子组成OperatorChain
    setChainingStrategy(ChainingStrategy.ALWAYS);

    Preconditions.checkArgument(
        capacity > 0, "The number of concurrent async operation should be greater than 0.");
    // 默认队列长度
    this.capacity = capacity;

    // 枚举值,决定用户数据异步处理后是否严格按照输入顺序输出
    this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

    // 异步处理超时时间
    this.timeout = timeout;

    // 时间服务,用于设置定时器,检测超时等
    this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

    // 用户作业执行线程池
    this.mailboxExecutor = mailboxExecutor;
}

在operator创建出来后紧接着会执行setup方法,进行初始化操作。

@Override
public void setup(
    StreamTask<?, ?> containingTask,
    StreamConfig config,
    Output<StreamRecord<OUT>> output) {
    // 调用父类初始化逻辑
    super.setup(containingTask, config, output);

    // 创建元素序列化器
    this.inStreamElementSerializer =
        new StreamElementSerializer<>(
        getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

    switch (outputMode) {
        case ORDERED:
            // 如果需要保持输出数据有序
            // 创建的队列为OrderedStreamElementQueue
            queue = new OrderedStreamElementQueue<>(capacity);
            break;
        case UNORDERED:
            // 如果不需要保持输出有序
            // 创建的队列为UnorderedStreamElementQueue
            queue = new UnorderedStreamElementQueue<>(capacity);
            break;
        default:
            throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
    }

    this.timestampedCollector = new TimestampedCollector<>(output);
}

setup方法根据outputMode是否保证输出元素顺序,来决定创建的StreamElementQueue

接下来是处理元素的processElement方法。上游每个元素到来的时候,都会调用这个方法。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    // add element first to the queue
    // 将元素放入队列中
    // 返回队列的entry
    // 队列中的entry类型实现了ResultFuture接口,后面介绍
    final ResultFuture<OUT> entry = addToWorkQueue(element);

    // 创建ResultHandler,包装了超时定时器,输入数据和resultFuture
    // 用来操作resultFuture和超时定时器
    final ResultHandler resultHandler = new ResultHandler(element, entry);

    // register a timeout for the entry if timeout is configured
    // 如果配置了超时时间
    if (timeout > 0L) {
        // 计算超时时刻
        final long timeoutTimestamp =
            timeout + getProcessingTimeService().getCurrentProcessingTime();

        // 注册一个定时器,在超时的时刻调用AsyncFunction的timeout方法
        final ScheduledFuture<?> timeoutTimer =
            getProcessingTimeService()
            .registerTimer(
            timeoutTimestamp,
            timestamp ->
            userFunction.timeout(
                element.getValue(), resultHandler));

        // 设置定时器给resultHandler
        resultHandler.setTimeoutTimer(timeoutTimer);
    }

    // 调用AsyncFunction的asyncInvoke方法
    userFunction.asyncInvoke(element.getValue(), resultHandler);
}

继续查看addToWorkQueue方法,将元素放入任务队列中。

private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
    throws InterruptedException {

    Optional<ResultFuture<OUT>> queueEntry;
    
    // 如果元素添加队列失败,说明队列已满
    // 需要当前线程让出执行机会给mailboxExecutor,即执行用户自定义处理逻辑
    while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
        mailboxExecutor.yield();
    }

    // 添加队列成功,返回ResultFuture
    return queueEntry.get();
}

workQueue我们在后面讨论。接下来分析ResultHandler

ResultHandler

ResultHandlerResultFuture的实现类,为AsyncFunction中两个方法的参数,让用户使用。分别处理异步处理完成(complete)和异步处理异常(completeExceptionally)两种情况。

ResultHandler持有4个成员变量:

  • timeoutTimer:定时器,在数据计算完毕(调用了complete方法的时候),需要将timer清除,所以需要持有定时器。
  • inputRecord:数据流中的原始数据。
  • resultFuture:实际为元素队列中的entry。这个后面介绍。
  • completed:用来表示异步计算是否完成。

用户的自定义异步处理逻辑在AsyncFunction中,异步处理完成的时候需要调用ResultHandlercomplete方法。这个方法将completed变量标记为true。然后调用processInMainbox方法。

@Override
public void complete(Collection<OUT> results) {
    Preconditions.checkNotNull(
        results, "Results must not be null, use empty collection to emit nothing");

    // already completed (exceptionally or with previous complete call from ill-written
    // AsyncFunction), so
    // ignore additional result
    if (!completed.compareAndSet(false, true)) {
        return;
    }

    processInMailbox(results);
}

processInMainbox方法在MailboxExecutor线程池执行resultFuturecomplete方法,通知持有这些元素的队列,该元素已经处理完毕。然后清除掉超时时间timer。最后调用outputCompletedElement,输出已完成的元素到下游。对应的代码如下所示:

private void processInMailbox(Collection<OUT> results) {
    // move further processing into the mailbox thread
    mailboxExecutor.execute(
        () -> processResults(results),
        "Result in AsyncWaitOperator of input %s",
        results);
}

private void processResults(Collection<OUT> results) {
    // Cancel the timer once we've completed the stream record buffer entry. This will
    // remove the registered
    // timer task
    if (timeoutTimer != null) {
        // canceling in mailbox thread avoids
        // https://issues.apache.org/jira/browse/FLINK-13635
        timeoutTimer.cancel(true);
    }

    // update the queue entry with the result
    resultFuture.complete(results);
    // now output all elements from the queue that have been completed (in the correct
    // order)
    outputCompletedElement();
}

private void outputCompletedElement() {
    if (queue.hasCompletedElements()) {
        // emit only one element to not block the mailbox thread unnecessarily
        queue.emitCompletedElement(timestampedCollector);
        // if there are more completed elements, emit them with subsequent mails
        if (queue.hasCompletedElements()) {
            mailboxExecutor.execute(
                this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
        }
    }
}

StreamElementQueue

这一节我们分析异步处理的核心:StreamElementQueue。所有需要异步处理的数据都会在此队列中排队。

此队列需要支持是否保持输出元素顺序这两种情形,因此它具有两个实现类:

  • OrderedStreamElementQueue:元素输出的顺序严格和输入的顺序一致。
  • UnorderedStreamElementQueue:不保证元素输出的顺序和输入的一致。

该接口有如下方法:

@Internal
public interface StreamElementQueue<OUT> {

    // 尝试将元素放入队列,如果队列已满,返回Optional.EMPTY
    // 返回一个ResultFuture对象
    Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);

    // 弹出队列头部一个已经完成异步处理的元素给outputCollector
    void emitCompletedElement(TimestampedCollector<OUT> output);

    // 检查队列头部元素是否已完成异步处理
    boolean hasCompletedElements();

    // 其余方法省略
    // ...
}

下面分别介绍这两种子类Queue。

OrderedStreamElementQueue

这个队列保证了输出元素顺序和输入元素顺序严格一致。它使用一个Queue<StreamElementQueueEntry<OUT>>类型队列保存输入数据。Queue使用的是ArrayDeque类型。

添加元素的tryPut方法如下。如果添加成功(未超出队列容量限制),返回ResultFuture<OUT>,否则返回Optional.EMPTY

@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
    if (queue.size() < capacity) {
        // 只有队列有剩余空间的情况下才加入队列
        // 根据element的类型(数据还是watermark),构造对应的队列entry
        StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

        // 将entry加入队列
        queue.add(queueEntry);

        LOG.debug(
            "Put element into ordered stream element queue. New filling degree "
            + "({}/{}).",
            queue.size(),
            capacity);

        return Optional.of(queueEntry);
    } else {
        LOG.debug(
            "Failed to put element into ordered stream element queue because it "
            + "was full ({}/{}).",
            queue.size(),
            capacity);

        // 如果超出队列容量,返回EMPTY
        return Optional.empty();
    }
}

createEntry方法根据element的类型,创建不同的队列entry(StreamElementQueueEntry)。如果元素是数据类型,创建StreamRecordQueueEntry,如果元素是watermark,则创建WatermarkQueueEntry

private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
    if (streamElement.isRecord()) {
        return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
    }
    if (streamElement.isWatermark()) {
        return new WatermarkQueueEntry<>((Watermark) streamElement);
    }
    throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}

从队列中取出元素的方法为emitCompletedElementOrderedStreamElementQueue从队列的头部获取一个元素,发送给outputCollectorhasCompletedElements方法也是检测队列头部的元素是否已经完成异步处理。所以说OrderedStreamElementQueue能够保证输出数据和输入数据的顺序严格一致。但是带来的问题是处理延迟会受到异步处理时间的影响。

@Override
public boolean hasCompletedElements() {
    return !queue.isEmpty() && queue.peek().isDone();
}

@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
    if (hasCompletedElements()) {
        final StreamElementQueueEntry<OUT> head = queue.poll();
        head.emitResult(output);
    }
}

UnorderedStreamElementQueue

OrderedStreamElementQueue不同的是,UnorderedStreamElementQueue使用Deque<Segment<OUT>>类型双向队列来保存输入数据。队列的元素类型为Segment。需要注意的是,队列中元素的个数并不等于元素的个数,因为一个Segment可以包含多个元素。

Segment内部包含了两个集合incompleteElementscompletedElements,分别保存未完成处理的元素和已完成处理的元素。

/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;

Segment(int initialCapacity) {
    incompleteElements = new HashSet<>(initialCapacity);
    completedElements = new ArrayDeque<>(initialCapacity);
}

添加元素的时候,需要判断队列entry是否已经异步处理完毕,将其加入相应的集合中。

void add(StreamElementQueueEntry<OUT> queueEntry) {
    if (queueEntry.isDone()) {
        completedElements.add(queueEntry);
    } else {
        incompleteElements.add(queueEntry);
    }
}

当entry中数据计算完毕的时候,需要调用complete方法,将这个entry移动到已完成计算的元素集合中。

void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
    // adding only to completed queue if not completed before
    // there may be a real result coming after a timeout result, which is updated in the
    // queue entry but
    // the entry is not re-added to the complete queue
    if (incompleteElements.remove(elementQueueEntry)) {
        completedElements.add(elementQueueEntry);
    }
}

在触发计算的时候,需要获取到已经完成计算的元素。获取方法为从completedElementspoll一个交给outputCollector

int emitCompleted(TimestampedCollector<OUT> output) {
    final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
    if (completedEntry == null) {
        return 0;
    }
    completedEntry.emitResult(output);
    return 1;
}

分析到这里不难发现,Segment放弃了元素顺序保证,将已经完成计算的元素挑出来放置到completedElements集合中,因此下游在拉取数据的时候,不会因为队列中间有一个长时间未complete的元素而阻塞,从而降低了延迟,并且减少了延迟抖动。

那么问题来了,看似一个Segment皆可以解决问题,为何需要一个队列来存放SegmentSegment是什么时候创建的?如何决定元素加入哪个Segment?接下来我们讨论这些问题。

首先分析tryPut方法。

@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
    // 检查是否超出队列长度
    if (size() < capacity) {
        StreamElementQueueEntry<OUT> queueEntry;
        // 根据不同的数据类型来生成不同的队列entry
        if (streamElement.isRecord()) {
            queueEntry = addRecord((StreamRecord<?>) streamElement);
        } else if (streamElement.isWatermark()) {
            queueEntry = addWatermark((Watermark) streamElement);
        } else {
            throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
        }

        numberOfEntries++;

        LOG.debug(
            "Put element into unordered stream element queue. New filling degree "
            + "({}/{}).",
            size(),
            capacity);

        return Optional.of(queueEntry);
    } else {
        LOG.debug(
            "Failed to put element into unordered stream element queue because it "
            + "was full ({}/{}).",
            size(),
            capacity);

        return Optional.empty();
    }
}

对比分析下addRecordaddWatermark方法,不难发现端倪。加入record的时候,如果队列中没有Segment则创建一个新的Segment,如果有,就在这个Segment中插入这个record。然而加入watermark这个方法则不同。它还会判断队列中最后一个Segment是否为空。如果为空,则创建一个新的Segment再把watermark放入。到这里我们就搞清楚了Segment是怎么创建和数据如何加入Segment这两个问题。数据流中每当遇到一个watermark,就会使用新的Segment

private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
    // ensure that there is at least one segment
    Segment<OUT> lastSegment;
    if (segments.isEmpty()) {
        lastSegment = addSegment(capacity);
    } else {
        lastSegment = segments.getLast();
    }

    // entry is bound to segment to notify it easily upon completion
    StreamElementQueueEntry<OUT> queueEntry =
        new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
    lastSegment.add(queueEntry);
    return queueEntry;
}

private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
    Segment<OUT> watermarkSegment;
    if (!segments.isEmpty() && segments.getLast().isEmpty()) {
        // reuse already existing segment if possible (completely drained) or the new segment
        // added at the end of
        // this method for two succeeding watermarks
        watermarkSegment = segments.getLast();
    } else {
        watermarkSegment = addSegment(1);
    }

    StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
    watermarkSegment.add(watermarkEntry);

    // add a new segment for actual elements
    addSegment(capacity);
    return watermarkEntry;
}

接下来我们看下发送已完成数据这个方法。和加入数据相反,这里获取队列中第一个Segment,从其中拿出一个已完成计算的元素。最后判断下这个Segment中是否保存的还有元素,如果没有的话,将这个Segment从队列中弹出被垃圾回收。但是至少要确保队列中有一个Segment

@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
    if (segments.isEmpty()) {
        return;
    }
    final Segment currentSegment = segments.getFirst();
    numberOfEntries -= currentSegment.emitCompleted(output);

    // remove any segment if there are further segments, if not leave it as an optimization even
    // if empty
    if (segments.size() > 1 && currentSegment.isEmpty()) {
        segments.pop();
    }
}

通过这种设计UnorderedStreamElementQueue能够将一连串数据,通过watermark分隔,放入不同的Segment中。从emitCompletedElement方法可以看出,只有队列头部的Segment中的数据全部弹出或超时之后,才有可能去读取下一个Segment中的数据。这种设计允许一定程度的输出结果乱序,但是乱序程度不可能跨越watermark。从而保证了watermark语义的正确,不会由于乱序的容忍而导致部分数据被意外认为“来迟”。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

有关Flink 源码之AsyncFunction的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  3. (附源码)vue3.0+.NET6实现聊天室(实时聊天SignalR) - 2

    参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍  介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。  内容有:    ①:Hub模型的方法介绍    ②:服务器端代码介绍    ③:前端vue3安装并调用后端方法    ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke()  去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on

  4. Cesium源码解析一(terrain文件的加载、解析与渲染全过程梳理) - 2

    快速导航(持续更新中…)Cesium源码解析一(terrain文件的加载、解析与渲染全过程梳理)Cesium源码解析二(metadataAvailability的含义)Cesium源码解析三(metadata元数据拓展中行列号的分块规则解析)Cesium源码解析四(Quantized-Mesh(.terrain)格式文件在CesiumJS和UE中加载情况的对比)目录1.前言2.本篇的由来3.terrain文件的加载3.1更新环境3.2更新和执行渲染命令3.3数据优化3.4结束当前帧4.总结1.前言  目前市场上三维比较火的实现方案主要有两种,b/s的方案主要是Cesium,c/s的方案主要是u

  5. Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题 - 2

    集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar

  6. 停车系统源码-基于springboot+uniapp开源项目 - 2

    Iparking停车收费管理系统-可商用介绍Iparking是一款基于springBoot的停车收费管理系统,支持封闭车场和路边车场,支持微信支付宝多种支付渠道,支持多种硬件,涵盖了停车场管理系统的所有基础功能。技术栈Springboot,MybatisPlus,Beetl,Mysql,Redis,RabbitMQ,UniApp功能云端功能序号模块功能描述1系统管理菜单管理配置系统菜单2系统管理组织管理管理组织机构3系统管理角色管理配置系统角色,包含数据权限和功能权限配置4系统管理用户管理管理后台用户5系统管理租户管理多租户管理6系统管理公众号配置租户公众号配置7系统管理操作日志审计日志8系统

  7. 打通源码,高效定位代码问题|云效工程师指北 - 2

    大家好,我叫胡飞虎,花名虎仔,目前负责云效旗下产品Codeup代码托管的设计与开发。代码作为企业最核心的数据资产,除了被构建、部署之外还有更大的价值。为了帮助企业和团队挖掘更多源代码价值以赋能日常代码研发、运维等工作,云效代码团队在大数据和智能化方向进行了一系列的探索和实践(例如代码搜索与推荐),本文主要介绍我们如何通过直接打通源代码来提高研发与运维效率。随着微服务架构的流行,一个业务流程需要多个微服务共同完成。一旦出现问题,运维人员在面对数量多、调用链路复杂的情况下,很难快速锁定导致问题发生的罪魁祸首:代码。为了提高排查效率,目前常见的解决方案是:链路跟踪+日志分析工具相结合。即通过链路跟踪

  8. Android Studio开发之使用内容组件Content获取通讯信息讲解及实战(附源码 包括添加手机联系人和发短信) - 2

    运行有问题或需要源码请点赞关注收藏后评论区留言一、利用ContentResolver读写联系人在实际开发中,普通App很少会开放数据接口给其他应用访问。内容组件能够派上用场的情况往往是App想要访问系统应用的通讯数据,比如查看联系人,短信,通话记录等等,以及对这些通讯数据及逆行增删改查。首先要给AndroidMaifest.xml中添加响应的权限配置 下面是往手机通讯录添加联系人信息的例子效果如下分成三个步骤先查出联系人的基本信息,然后查询联系人号码,再查询联系人邮箱代码 ContactAddActivity类packagecom.example.chapter07;importandroid

  9. java 版本企业电子招投标采购系统源码之登录页面 - 2

    ​ 信息数智化招采系统服务框架:SpringCloud、SpringBoot2、Mybatis、OAuth2、Security前端架构:VUE、Uniapp、Layui、Bootstrap、H5、CSS3涉及技术:Eureka、Config、Zuul、OAuth2、Security、OSS、Turbine、Zipkin、Feign、Monitor、Stream、ElasticSearch等企业电子化采购系统企业电子化采购系统是明理公司在多家大、中、小型企业采购需求的分析与实际应用的基础上,结合企业采购流程优化再造理念开发的一体化电子招标采购平台,对于招标项目提供交易过程的全流程电子化、规范化管

  10. flink on yarn - 2

    文章目录使用flinksqlclientonyarnsession模式Per-JobCluster模式flinkrunflinkrunapplication-tyarn-application配置任务退出时保留Checkpoint从外部checkpoint恢复应用资料使用安装完hadoop3.3.4之后,启动hadoop、yarn将flink1.14.6上传到各个服务器节点,解压flinksqlclientonyarnhttps://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/Appli

随机推荐