我的编程空间,编程开发者的网络收藏夹
学习永远不晚

elasticsearch节点的transport请求发送怎么处理

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

elasticsearch节点的transport请求发送怎么处理

这篇文章主要介绍“elasticsearch节点的transport请求发送怎么处理”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点的transport请求发送怎么处理”文章能帮助大家解决问题。

transport请求的发送和处理过程

前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。

cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。

所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。

request的发送过程

代码在nettytransport中如下所示:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {        //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;     Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)        if (compress) {            options.withCompress(true);        }        byte status = 0;     //设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;    status = TransportStatus.setRequest(status);      ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流        boolean addedReleaseListener = false;        try {            bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置            StreamOutput stream = bStream;            // only compress if asked, and, the request is not bytes, since then only            // the header part is compressed, and the "body" can't be extracted as compressed            if (options.compress() && (!(request instanceof BytesTransportRequest))) {                status = TransportStatus.setCompress(status);                stream = CompressorFactory.defaultCompressor().streamOutput(stream);            }            stream = new HandlesStreamOutput(stream);            // we pick the smallest of the 2, to support both backward and forward compatibility            // note, this is the only place we need to do this, since from here on, we use the serialized version            // as the version to use also when the node receiving this request will send the response with            Version version = Version.smallest(this.version, node.version());            stream.setVersion(version);            stream.writeString(transportServiceAdapter.action(action, version));            ReleasableBytesReference bytes;            ChannelBuffer buffer;            // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output            // that create paged channel buffers, but its tricky to know when to do it (where this option is            // more explicit).            if (request instanceof BytesTransportRequest) {                BytesTransportRequest bRequest = (BytesTransportRequest) request;                assert node.version().equals(bRequest.version());                bRequest.writeThin(stream);                stream.close();                bytes = bStream.bytes();                ChannelBuffer headerBuffer = bytes.toChannelBuffer();                ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();                buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);            } else {                request.writeTo(stream);                stream.close();                bytes = bStream.bytes();                buffer = bytes.toChannelBuffer();            }            NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头            ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里            ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);            future.addListener(listener);//添加listener            addedReleaseListener = true;            transportServiceAdapter.onRequestSent(node, requestId, action, request, options);        } finally {            if (!addedReleaseListener) {                Releasables.close(bStream.bytes());            }        }    }

以上就是request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里transportRequest是一个抽象类,它继承了TransportMessage同时实现了streamable接口。cluster中对它的实现非常多,各个功能都有相应的request,这里就不一一列举,后面的代码分析中会时常涉及。

request的接受过程

request发送只是transport的一部分功能,有发送就要有接收,这样transport的功能才完整。接下来就是对接收过程的分析。上一篇中简单介绍过netty的使用,message的处理是通过MessageHandler处理,因此nettyTransport的信息处理逻辑都在MessageChannelHandler的messageReceived()方法中,代码如下所示:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        Transports.assertTransportThread();        Object m = e.getMessage();        if (!(m instanceof ChannelBuffer)) {//非buffer之间返回            ctx.sendUpstream(e);            return;        }     //解析message头        ChannelBuffer buffer = (ChannelBuffer) m;        int size = buffer.getInt(buffer.readerIndex() - 4);        transportServiceAdapter.received(size + 6);        // we have additional bytes to read, outside of the header        boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;        int markedReaderIndex = buffer.readerIndex();        int expectedIndexReader = markedReaderIndex + size;        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh        // buffer, or in the cumlation buffer, which is cleaned each time        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);      //读取信息头中的几个重要元数据        long requestId = buffer.readLong();        byte status = buffer.readByte();        Version version = Version.fromId(buffer.readInt());        StreamInput wrappedStream;      …………        if (TransportStatus.isRequest(status)) {//处理请求            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);            if (buffer.readerIndex() != expectedIndexReader) {                if (buffer.readerIndex() < expectedIndexReader) {                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);                } else {                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);                }                buffer.readerIndex(expectedIndexReader);            }        } else {//处理响应            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);            // ignore if its null, the adapter logs it            if (handler != null) {                if (TransportStatus.isError(status)) {                    handlerResponseError(wrappedStream, handler);                } else {                    handleResponse(ctx.getChannel(), wrappedStream, handler);                }            } else {                // if its null, skip those bytes                buffer.readerIndex(markedReaderIndex + size);            }          …………        wrappedStream.close();    }

以上就是信息处理逻辑,这个方法基础自netty的SimpleChannelUpstreamHandler类。作为MessageHandler会在client和server启动时加入到handler链中,在信息到达后netty会自动调用handler链依次处理。这是netty的内容,就不详细说明,请参考netty文档。

request和response是如何被处理

request的处理

代码如下所示:

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {        final String action = buffer.readString();//读出action的名字        transportServiceAdapter.onRequestReceived(requestId, action);        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);        try {            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//获取处理该信息的handler            if (handler == null) {                throw new ActionNotFoundTransportException(action);            }            final TransportRequest request = handler.newInstance();            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));            request.readFrom(buffer);            if (handler.executor() == ThreadPool.Names.SAME) {                //noinspection unchecked                handler.messageReceived(request, transportChannel);//使用该handler处理信息。            } else {                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));            }        } catch (Throwable e) {            try {                transportChannel.sendResponse(e);            } catch (IOException e1) {                logger.warn("Failed to send error message back to client for action [" + action + "]", e);                logger.warn("Actual Exception", e1);            }        }        return action;    }

几个关键部分在代码中进行了标注。这里仍旧不能看到请求是如何处理的。因为cluster中的请求各种各样,如ping,discovery,index等等,因此不可能使用同一种处理方式。因此request最终又被提交给handler处理。每个功能请求都实现了自己的handler,当请求被提交给handler时会做对应的处理。这里再说一下transportServiceAdapter,消息的处理都是通过它适配转发完成。request的完整处理流程是:messageReceived()方法收到信息判断是request会将其转发到transportServiceAdapter的handler方法,handler方法查找对应的requesthandler,使用将信息转发给该handler进行处理。这里就不举例说明,在后面的discover分析中我们会看到发现,ping等请求的处理过程。

response的处理过程

response通过handleResponse方法进行处理,代码如下:

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {        final TransportResponse response = handler.newInstance();        response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));        response.remoteAddress();        try {            response.readFrom(buffer);        } catch (Throwable e) {            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));            return;        }        try {            if (handler.executor() == ThreadPool.Names.SAME) {                //noinspection unchecked                handler.handleResponse(response);//转发给对应的handler            } else {                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));            }        } catch (Throwable e) {            handleException(handler, new ResponseHandlerFailureTransportException(e));        }    }

response的处理过程跟request很类似。每个request都会对应一个handler和一个response的处理handler,会在时候的时候注册到transportService中。请求到达时根据action名称获取到handler处理request,根据requestId获取对应的response handler进行响应。

关于“elasticsearch节点的transport请求发送怎么处理”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

elasticsearch节点的transport请求发送怎么处理

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

elasticsearch节点的transport请求发送怎么处理

这篇文章主要介绍“elasticsearch节点的transport请求发送怎么处理”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点的transport请求发送怎么处理”
2023-06-30

URLConnection发送HTTP请求的方法_动力节点Java学院整理

如何通过Java发送HTTP请求,通俗点讲,如何通过Java(模拟浏览器)发送HTTP请求。Java有原生的API可用于发送HTTP请求,即java.net.URL、java.net.URLConnection,这些API很好用、很常用,但
2023-05-31

怎么在Chrome development tool里查看C4C前台发送的请求细节

本篇内容介绍了“怎么在Chrome development tool里查看C4C前台发送的请求细节”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
2023-06-04

Apache Tomcat怎么高并发处理请求

这篇文章给大家分享的是有关Apache Tomcat怎么高并发处理请求的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。介绍作为常用的http协议服务器,tomcat应用非常广泛。tomcat也是遵循Servelt协
2023-06-29

RestTemplate怎么实现发送带headers的GET请求

这篇文章主要讲解了“RestTemplate怎么实现发送带headers的GET请求”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“RestTemplate怎么实现发送带headers的GET
2023-06-25

Couchbase中怎么处理并发读写请求

在Couchbase中处理并发读写请求通常涉及以下几个方面:使用CAS(Compare-And-Swap)操作:CAS操作是一种乐观锁机制,可以保证在写入数据时不会被其他客户端同时修改而导致数据不一致的问题。在Couchbase中,可以使用
Couchbase中怎么处理并发读写请求
2024-04-09

怎么用VBS实现发送带Cookie的HTTP请求

这篇文章主要介绍怎么用VBS实现发送带Cookie的HTTP请求,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!为了方便测试,先写一个回显Cookie的简单的PHP程序: 代码如下:
2023-06-08

linux shell中curl 发送post请求json格式问题的处理方法

今天在linux中使用curl发送一个post请求时,带有json的数据,在发送时发现json中的变量没有解析出来 如下curl -i -X POST -H 'Content-type':'
2022-06-04

JAVA中的HTTP请求怎么利用HttpClient实现发送

JAVA中的HTTP请求怎么利用HttpClient实现发送?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。HttpClient介绍HttpClient 不是一个
2023-05-31

java中的json怎么利用post请求进行发送

这篇文章给大家介绍java中的json怎么利用post请求进行发送,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。方法一:package main.utils;import java.io.*;import java.n
2023-05-31

Cassandra中怎么处理大量的并发读写请求

在Cassandra中处理大量的并发读写请求通常需要考虑以下几点:数据模型设计:在设计数据模型时,需要根据实际业务需求和访问模式来选择合适的分区键、主键和列族结构,以避免热点数据和数据分布不均匀导致的性能问题。分布式架构:Cassandra
Cassandra中怎么处理大量的并发读写请求
2024-03-11

jMeter中怎么发送两个逻辑相关的HTTP请求

这期内容当中小编将会给大家带来有关jMeter中怎么发送两个逻辑相关的HTTP请求,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。现在我有另一个需求场景:假设我开发了一个创建Service Request的
2023-06-03

java处理高并发请求的方法是什么

Java处理高并发请求的方法有很多种,以下是一些常用的方法:使用线程池:可以使用Java中的线程池技术来管理并发请求。通过创建固定大小的线程池,可以控制同时处理的请求数量,避免系统资源被过多的请求耗尽。使用消息队列:可以使用消息队列来缓冲请
2023-10-25

怎么设置Fiddler来拦截Java代码发送的HTTP请求

这篇文章主要介绍“怎么设置Fiddler来拦截Java代码发送的HTTP请求”,在日常操作中,相信很多人在怎么设置Fiddler来拦截Java代码发送的HTTP请求问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答
2023-06-04

axios发送post请求上传文件到后端的问题怎么解决

这篇“axios发送post请求上传文件到后端的问题怎么解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“axios发送po
2023-06-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录