草庐IT

elasticsearch源码关于TransportSearchAction【阶段三】

weixin_41871193 2025-07-18 原文

1.回顾.TransportService

public class TransportService extends AbstractLifecycleComponent
TransportService:方法:1

  public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {

        asyncSender.sendRequest(connection, action, request, options, handler);
    }

TransportService有参构造触发
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);

TransportService:方法:2

 private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                TransportResponseHandler<T> handler) {
        // 发现节点                                                   
        DiscoveryNode node = connection.getNode();
        final long requestId = transport.newRequestId();
        final TimeoutHandler timeoutHandler;
        try {
            Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
            TransportResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
            clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler));
            // 发送内部请求
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
        } catch (final Exception e) {
          
        }
    }

TransportService:方法:3

  @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws IOException, TransportException {
            //内部的RPC请求通过TransportService#sendRequest发送
            //sendRequest会检查目的节点是否是本节点,如果是本节点,
            //则在sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函数
            //TransportService#sendRequest
            sendLocalRequest(requestId, action, request, options);
        }

TransportService:方法:4

  //ShardSearchTransportRequest,请求本地分片索引
    private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {

        final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool);
        try {
            // sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函
            final RequestHandlerRegistry reg = adapter.getRequestHandler(action);// 获取action对应的注册信息
            final String executor = reg.getExecutor();// 获取执行器
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked,#这个方法往下走
                reg.processMessageReceived(request, channel);// 处理task任务
            } 
        } catch (Exception e) {
        
        }
    }

2.RequestHandlerRegistry

public class RequestHandlerRegistry

 public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
        final Task task = taskManager.register(channel.getChannelType(), action, request);// 创建任务
        if (task == null) {
            handler.messageReceived(request, channel);// 启动会走这里
        } else {
            boolean success = false;
            try {
                handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
                success = true;
            } finally {
                if (success == false) {
                    taskManager.unregister(task);
                }
            }
        }
    }

3.SearchTransportService

public class SearchTransportService extends AbstractComponent
new TaskAwareTransportRequestHandler

   transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
            new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
                @Override
                public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
                    searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
                        @Override
                        public void onResponse(SearchPhaseResult searchPhaseResult) {
                            try {
                                channel.sendResponse(searchPhaseResult);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                }
            });

4.SearchService

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener

public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener<SearchPhaseResult> listener) {
    rewriteShardRequest(request, new ActionListener<ShardSearchRequest>() {
        @Override
        public void onResponse(ShardSearchRequest request) {
            try {
                // 监听search之后的结果SearchService#SearchService");
                listener.onResponse(executeQueryPhase(request, task));
            } catch (Exception e) {
                onFailure(e);
            }
    });
}
  private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
        // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
        // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
        // adding a lot of overhead
        Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
            ActionListener.wrap(r ->
                threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        listener.onResponse(request);
                    }
                }), listener::onFailure));
    }

5.Rewriteable

public interface Rewriteable

/**
     * Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
     */
    static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T> rewriteResponse) {
        rewriteAndFetch(original, context, rewriteResponse, 0);
    }
/**
 * Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
 */
static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>
    rewriteResponse, int iteration) {
    T builder = original;
    try {
        for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
             rewrittenBuilder = builder.rewrite(context)) {
            builder = rewrittenBuilder;
            if (context.hasAsyncActions()) {
                T finalBuilder = builder;
                final int currentIterationNumber = iteration;
                context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
                    currentIterationNumber), rewriteResponse::onFailure));
                return;
            }
        }
        rewriteResponse.onResponse(builder);
    } catch (IOException ex) {
        rewriteResponse.onFailure(ex);
    }
}

rewriteResponse.onResponse(builder);响应到,看第四阶段

有关elasticsearch源码关于TransportSearchAction【阶段三】的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. ruby-on-rails - 关于 Ruby 的一般问题 - 2

    我在我的rails应用程序中安装了来自github.com的acts_as_versioned插件,但有一段代码我不完全理解,我希望有人能帮我解决这个问题class_eval我知道block内的方法(或任何它是什么)被定义为类内的实例方法,但我在插件的任何地方都找不到定义为常量的CLASS_METHODS,而且我也不确定是什么here,并且有问题的代码从lib/acts_as_versioned.rb的第199行开始。如果有人愿意告诉我这里的内幕,我将不胜感激。谢谢-C 最佳答案 这是一个异端。http://en.wikipedia

  3. ruby - Rails Elasticsearch 聚合 - 2

    不知何故,我似乎无法获得包含我的聚合的响应...使用curl它按预期工作:HBZUMB01$curl-XPOST"http://localhost:9200/contents/_search"-d'{"size":0,"aggs":{"sport_count":{"value_count":{"field":"dwid"}}}}'我收到回复:{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":90,"max_score":0.0,"hits":[]},"a

  4. ruby - 我怎样才能更好地了解/了解更多关于 Ruby 的知识? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我最近开始学习Ruby,这是我的第一门编程语言。我对语法感到满意,并且我已经完成了许多只教授相同基础知识的教程。我已经写了一些小程序(包括我自己的数组排序方法,在有人告诉我谷歌“冒泡排序”之前我认为它非常聪明),但我觉得我需要尝试更大更难的东西来理解更多关于Ruby.关于如何执行此操作的任何想法?

  5. ruby - 关于 Ruby 中 Dir[] 和 File.join() 的混淆 - 2

    我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案

  6. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  7. (附源码)vue3.0+.NET6实现聊天室(实时聊天SignalR) - 2

    参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍  介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。  内容有:    ①:Hub模型的方法介绍    ②:服务器端代码介绍    ③:前端vue3安装并调用后端方法    ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke()  去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on

  8. 关于Qt程序打包后运行库依赖的常见问题分析及解决方法 - 2

    目录一.大致如下常见问题:(1)找不到程序所依赖的Qt库version`Qt_5'notfound(requiredby(2)CouldnotLoadtheQtplatformplugin"xcb"in""eventhoughitwasfound(3)打包到在不同的linux系统下,或者打包到高版本的相同系统下,运行程序时,直接提示段错误即segmentationfault,或者Illegalinstruction(coredumped)非法指令(4)ldd应用程序或者库,查看运行所依赖的库时,直接报段错误二.问题逐个分析,得出解决方法:(1)找不到程序所依赖的Qt库version`Qt_5'

  9. ruby-on-rails - 使用 Rails (Tire) 和 ElasticSearch 进行模糊字符串匹配 - 2

    我有一个Rails应用程序,现在设置了ElasticSearch和Tiregem以在模型上进行搜索,我想知道我应该如何设置我的应用程序以对模型中的某些索引进行模糊字符串匹配。我将我的模型设置为索引标题、描述等内容,但我想对其中一些进行模糊字符串匹配,但我不确定在何处进行此操作。如果您想发表评论,我将在下面包含我的代码!谢谢!在Controller中:defsearch@resource=Resource.search(params[:q],:page=>(params[:page]||1),:per_page=>15,load:true)end在模型中:classResource'Us

  10. ruby - 关于 Ruby/ChefSpec 编码风格的反馈 - 2

    我是Ruby的新手,但过去两周我一直在对Chef测试进行大量研究。该测试使用ChefSpec和Fauxhai,但它看起来不是很“像ruby”,我希望社区能给我一些编码风格的建议。有没有更好的方法来编写这样的嵌套循环?Recipe/foo/recipes/default.rbpackage"foo"doaction:installendRecipe/foo/spec/default_spec.rbrequire'chefspec'describe'foo::default'doplatforms={"debian"=>['6.0.5'],"ubuntu"=>['12.04','10.04

随机推荐