我的编程空间,编程开发者的网络收藏夹
学习永远不晚

elasticsearch源码分析index action实现方式

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

elasticsearch源码分析index action实现方式

action的作用

上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。

再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中会有一个实例(IndexAction INSTANCE = new IndexAction())这个实例用于client绑定对应的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),绑定过程发送在ActionModuel中。

另外在Action类中还会定义一个action的名字(String NAME = "indices:data/write/index")这个名字用于TransportService绑定对于的handle,用于处理NettyTransport接收到的信息。TransportAction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过NettyTransport转发该请求到对应的node进行处理。所有的Transport的结构都是这种类型。

TransportAction的类图

首先看一下TransportAction的类图,所的Transport*action都继承自于它。

它主要由两个方法execute和doExecute,execute方法有两种实现,第一种实现需要自行添加actionListener。最终的逻辑都在doExecute方法中,这个方法在各个功能模块中实现。以下是TransportIndexAction的继承关系:

实现上由于功能划分的原因,TransportIndexAction直接继承自TranspShardReplicationOperationAction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三个子类。

OperationTransportHandler的代码

如下所示:

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
//继承自BaseTransportRequestHanlder
………………
        @Override
        public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
            // no need to have a threaded listener since we just send back a response
            request.listenerThreaded(false);
            // if we have a local operation, execute it on a thread since we don't spawn
            request.operationThreaded(true);
      //调用Transport的execute方法,通过channel返回结果
            execute(request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse(result);
                    } catch (Throwable e) {
                        onFailure(e);
                    }
                }
                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Throwable e1) {
                        logger.warn("Failed to send response for " + actionName, e1);
                    }
                }
            });
        }

看过NettyTransport请求发送和处理的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过NettyTransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,可以看到它调用execute方法来处理。它的注册时在TransportShardReplicationOperationAction构造函数中完成的。

知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个“[r]”,它的作用是处理需要在副本上进行的操作,代码如下所示:

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
……………………
        @Override
        public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
            try {
                shardOperationOnReplica(request);
            } catch (Throwable t) {
                failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
                throw t;
            }
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

可以看到代码结构非常像,只是调用了副本操作的方法shardOperationOnReplica,这个方法在这TransportShardReplicationOperationAction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。

分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个Handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在AsyncShardOperationAction类中。首先看一下它的内部结构:

因为TransportShardReplicationOperationAction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryShard上操作然后是Replicashard上操作。代码如下所示:

protected void doStart() throws ElasticsearchException {
            try {
          //检查是否有阻塞
                ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
          //检测是否是创建索引
                if (resolveIndex()) {
                    internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
                } else {
                    internalRequest.concreteIndex(internalRequest.request().index());
                }
                // check if we need to execute, and if not, return
                if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
                    return;
                }
          //再次检测是否有阻塞
                blockException = checkRequestBlock(observer.observedState(), internalRequest);
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
                shardIt = shards(observer.observedState(), internalRequest);
            } catch (Throwable e) {
                listener.onFailure(e);
                return;
            }
        //查找primaryShard
            boolean foundPrimary = false;
            ShardRouting shardX;
            while ((shardX = shardIt.nextOrNull()) != null) {
                final ShardRouting shard = shardX;
                // we only deal with primary shardIt here...
                if (!shard.primary()) {
                    continue;
                }
                if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
                    logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                    return;
                }
                if (!primaryOperationStarted.compareAndSet(false, true)) {
                    return;
                }
                foundPrimary = true;
          //primaryShard就在本地,直接进行相关操作
                if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                    try {
                        if (internalRequest.request().operationThreaded()) {
                            internalRequest.request().beforeLocalFork();
                            threadPool.executor(executor).execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        performOnPrimary(shard.id(), shard);
                                    } catch (Throwable t) {
                                        listener.onFailure(t);
                                    }
                                }
                            });
                        } else {
                            performOnPrimary(shard.id(), shard);
                        }
                    } catch (Throwable t) {
                        listener.onFailure(t);
                    }
                } else {//primaryShard在其它节点上,将请求通过truansport发送到对应的节点。
                    DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
                        @Override
                        public Response newInstance() {
                            return newResponseInstance();
                        }
                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                        @Override
                        public void handleResponse(Response response) {
                            listener.onResponse(response);
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                            if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                                    retryPrimaryException(exp)) {
                                primaryOperationStarted.set(false);
                                internalRequest.request().setCanHaveDuplicates();
                                // we already marked it as started when we executed it (removed the listener) so pass false
                                // to re-add to the cluster listener
                                logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
                                retry(exp);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                    });
                }
                break;
            }
            ………………
        }

这就是对应请求的处理过程。

primary操作的方法

void performOnPrimary(int primaryShardId, final ShardRouting shard) {
           ……
                PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
                performReplicas(response);
            …………
        }

以上就是performOnPrimary方法的部分代码,首先调用外部类的shardOperationOnPrimary方法,该方法实现在各个子类中,在TransportIndexAction中的实现如下所示:

@Override
    protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
        final IndexRequest request = shardRequest.request;
        // 查看是否需要routing
        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) {
            if (request.routing() == null) {
                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
            }
        }
      //调用indexserice执行对应的index操作
        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        long version;
        boolean created;
        try {
            Engine.IndexingOperation op;
            if (request.opType() == IndexRequest.OpType.INDEX) {
                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
                if (index.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
                }
                indexShard.index(index);
                version = index.version();
                op = index;
                created = index.created();
            } else {
                Engine.Create create = indexShard.prepareCreate(sourceToParse,
                        request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
                if (create.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
                }
                indexShard.create(create);
                version = create.version();
                op = create;
                created = true;
            }
            if (request.refresh()) {
                try {
                    indexShard.refresh("refresh_flag_index");
                } catch (Throwable e) {
                    // ignore
                }
            }
            // update the version on the request, so it will be used for the replicas
            request.version(version);
            request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
            assert request.versionType().validateVersionForWrites(request.version());
            IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
            return new PrimaryResponse<>(shardRequest.request, response, op);
        } catch (WriteFailureException e) {
            if (e.getMappingTypeToUpdate() != null) {
                DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
                if (docMapper != null) {
                    mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                }
            }
            throw e.getCause();
        }
    }

上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performReplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performReplica,在该方法中调用外部类的抽象方法shardOperationOnReplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。

总结

这里以TransportIndexAction为例分析了tansportaction的结构层次。它在TransportAction直接还有一层那就是TransportShardReplicationOperationAction,这个类是actionsupport包中的一个,这个包把所有的子操作方法做了进一步的抽象,抽象出几个大类放到了这里,所有其它子功能很多都继承自这。这个包会在后面有详细分析。 

以上就是elasticsearch源码分析index action实现方式的详细内容,更多关于elasticsearch源码分析index action的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

elasticsearch源码分析index action实现方式

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

elasticsearch源码index action实现方式是什么

本篇内容主要讲解“elasticsearch源码index action实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“elasticsearch源码index action实现方式
2023-06-30

源码 | 解析 Redo Log 实现方式

柯煜昌 顾问软件工程师目前从事 RadonDB 容器化研发,华中科技大学研究生毕业,有多年的数据库内核开发经验。| 前言提及 Redo Log(重做日志)与 LSN(log sequece number)时,经常被问及以下问题:MySQL 的 InnoDB 为
源码 | 解析 Redo Log 实现方式
2017-03-28

Vue3源码分析reactivity实现方法示例

这篇文章主要为大家介绍了Vue3源码分析reactivity实现方法原理示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-01-28

await错误捕获实现方式源码解析

这篇文章主要为大家介绍了await错误捕获实现方式源码示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-25

vue3源码分析reactivity实现原理

这篇文章主要为大家介绍了vue3源码分析reactivity实现原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-01-28

Elasticsearch分布式搜索引擎在日志分析中的应用(如何利用Elasticsearch实现分布式日志分析?)

Elasticsearch分布式搜索引擎在日志分析中具有广泛应用,可高效处理海量日志数据。其分布式架构支持横向扩展,并采用反向索引结构,实现快速搜索。Logstash和Filebeat数据收集器将日志从不同来源收集到Elasticsearch集群中。Elasticsearch提供了强大的查询语言和强大的可视化功能,可以通过Kibana创建仪表盘和图表,以便分析和可视化日志数据。Elasticsearch还支持机器学习算法,用于检测异常和识别模式。在安全威胁检测、性能监控、故障排除和合规审计等方面有着广泛的
Elasticsearch分布式搜索引擎在日志分析中的应用(如何利用Elasticsearch实现分布式日志分析?)
2024-04-02

Elasticsearch分布式搜索的近实时搜索特性分析(Elasticsearch如何实现近实时搜索?)

Elasticsearch作为分布式搜索引擎,能实现近实时搜索,让用户立即搜索新索引数据。其机制包括:实时索引、刷新机制、搜索刷新、版本控制、快照。新数据写入后,Elasticsearch会将数据刷新到磁盘,并通过搜索刷新使最新更改可被搜索,从而保证快速响应和数据一致性。但近实时搜索也存在资源消耗、数据延迟和一致性权衡等局限性,需要通过调整刷新间隔、使用搜索刷新和监控集群等最佳实践来优化性能。
Elasticsearch分布式搜索的近实时搜索特性分析(Elasticsearch如何实现近实时搜索?)
2024-04-02

OpenMP task construct实现原理源码分析

本篇内容主要讲解“OpenMP task construct实现原理源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“OpenMP task construct实现原理源码分析”吧!从编译器
2023-07-05

深入分析GolangServer源码实现过程

这篇文章深入介绍了GolangServer源码实现过程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
2023-02-02

源码分析C++是如何实现string的

我们平时使用C++开发过程中或多或少都会使用std::string,但您了解string具体是如何实现的吗,本文小编就带大家从源码角度分析一下
2023-05-14

Netty分布式flush方法刷新buffer队列源码分析

本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入
2023-06-29

React实现合成事件的源码分析

React 中的事件,是对原生事件的封装,叫做合成事件。抽象出一层合成事件,是为了做兼容,抹平不同浏览器之间的差异。本文将从事件绑定和事件触发角度,带大家解读下源码,感兴趣的可以了解一下
2022-12-08

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录