我的编程空间,编程开发者的网络收藏夹
学习永远不晚

dubbo怎么实现consumer从多个group中调用指定group的provider

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

dubbo怎么实现consumer从多个group中调用指定group的provider

本篇内容主要讲解“dubbo怎么实现consumer从多个group中调用指定group的provider”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“dubbo怎么实现consumer从多个group中调用指定group的provider”吧!

    背景

    在工作中,遇到这样的场景:

    有个es索引构建服务,需要从各个业务服务获取索引的信息,从而构建索引,业务服务都实现同一个接口IndexInfoProvider,通过设置不同的group来达到区分的效果(group就是es索引名)。

    索引构建服务在内存维护了一个Map<String, IndexInfoProvider> providerMap,key是索引名&mdash;&mdash;也就是provider的group,value是IndexInfoProvider服务的consumer。

    为了图方便,索引构建服务还一次性初始化了所有consumer,假设总共有200个分组,那么Map就会缓存200个consumer,初始化的时候巨慢。如果不一次性初始化,而是按需的话,代码会变得复杂一些,可能需要像双重检验这样的措施。而且因为是缓存,必然也会遇到缓存一致性的问题,例如新增一个索引。

    基于这样的问题,就想着,dubbo的一个consumer,能不能按需调用指定分组的provider呢~

    过程

    为什么必须缓存那么多consumer

    为什么有200个分组,就要缓存200个consumer,而不是像我们平时那样,1个就可以呢?

    dubbo怎么实现consumer从多个group中调用指定group的provider

    业务服务在实现IndexInfoProvider接口的时候,都指定是分组,而且分组名,就是对应的索引名。

    相应的,消费者就必须指定消费的分组,200个索引,自然就需要200个consumer。

    consumer只能调用一个group的provider么

    在我们日常的使用中,consumer基本都是只能调用一个group的provider。但实际上,dubbo是支持调用多个group的provider的。

    dubbo怎么实现consumer从多个group中调用指定group的provider

    这样写是指定a分组和b分组,多个分组之间用英文逗号分隔

    dubbo怎么实现consumer从多个group中调用指定group的provider

    这样写,是所有的分组

    group="*"能代替Map<String, IndexInfoProvider>么

    如果将es索引构建服务的IndexInfoProvider接口的consumer分组设置为group="*",然后在调用接口的时候,根据需要,从一堆provider中筛选出指定group的provider,只调用这些provider,是不是就可以代替Map缓存了?

    初步方案:

    • 利用ThreadLocal来指定要调用的分组,在调用方法前设置group到ThreadLocal中。

    • 实现dubbo的负载均衡拓展,只选取指定分组的节点来调用。

    • 调用结束,清理ThreadLocal中的分组信息。

    似乎是可行的?

    理想很美好,现实很骨感

    通过源码和代码debug发现,consumer持有的Invoker,有点像一个责任链。

    如果是单分组&mdash;&mdash;!group.contains(",") && !group.equals("*"),那么会是这样:

    MockClusterInvoker->FailoverClusterInvoker

    如果是多分组,则会变成:

    MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker

    负载均衡的机制,是得到了FailoverClusterInvoker才会生效

    MockClusterInvoker只是一种降级机制,不是导致问题的原因

    问题是出在MergeableClusterInvoker,下面是导致问题的代码!!!

    dubbo怎么实现consumer从多个group中调用指定group的provider

    代码大概的意思是:消费者没有指定合并策略,那么就会调用第一个有效的服务提供者,如果都是无效,就直接调用第一个。

    我是没有指定合并策略的,所以会变成调用第一个有效的服务提供者。根本走不到负载均衡那里。

    不指定合并策略不行,那指定呢?指定合并策略会怎样呢?

    dubbo怎么实现consumer从多个group中调用指定group的provider

    dubbo怎么实现consumer从多个group中调用指定group的provider

    指定了合并策略,会调用所有invoker,然后用Merger合并结果。

    这种很适合这样种场景:

    一部分数据在服务A,一部分数据在服务B,需要分别调用A和B,然后把两者的结果集合并

    设想着这样的方案:

    • 指定了合并策略,那么所有分组的invoker都会被调用到,那么请求就到FailoverClusterInvoker了,负载均衡spi就生效了

    • 修改一下负载均衡spi,如果目前调用的invoker不是指定分组的,那么就直接返回null

    • 实现自己的合并spi,返回第一个不会null的结果

    虽然不是什么优雅的方式,但是,似乎是能做到的???

    实际上,行不通,达不到想要的效果。原因是MergeableClusterInvoker内部是用线程池并发调用的,ThreadLocal里的分组信息会丢失

    dubbo怎么实现consumer从多个group中调用指定group的provider

    还有机会么

    MergeableClusterInvoker类是没有留拓展的余地啦,还有其他机会么?MergeableClusterInvoker的Invokers列表从哪里来的?

    dubbo怎么实现consumer从多个group中调用指定group的provider

    Invokers是从directory来的,这里有没有拓展的余地呢?

    有的,在AbstractDirectory的list方法

    dubbo怎么实现consumer从多个group中调用指定group的provider

    看到这,发现用路由spi,还是有机会的,如果实现动态路由,每次只给MergeableClusterInvoker返回指定分组的Invoker,是不是就可以呢?怎么让routers包含我们自定义的路由spi呢?

    dubbo怎么实现consumer从多个group中调用指定group的provider

    需要url上携带了router参数,但是这里的url的参数是由谁决定的?@Reference 注解是没有没有的指定router参数的&hellip;

    最后debuig,兜兜转转,发现只能靠重写ReferenceConfig类的loadRegistries方法,往url上加上动态路由的参数。

    dubbo怎么实现consumer从多个group中调用指定group的provider

    成果

    AssignGroupRouterFactory

    public class AssignGroupRouterFactory implements RouterFactory {    public static final String NAME = "assignGroup";    @Override    public Router getRouter(URL url) {        return new AssignGroupRouter(url);    }}

    AssignGroupRouter

    public class AssignGroupRouter implements Router {    private final URL url;    public AssignGroupRouter(URL url) {        this.url = url;    }    @Override    public URL getUrl() {        return url;    }    @Override    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {        String assignGroup = IndexInfoProviderInvoker.getAssignGroup();        if (Objects.isNull(assignGroup)) {            return invokers;        }        return invokers.stream()            .filter(invoker -> {                URL invokerUrl = invoker.getUrl();                return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));            }).collect(Collectors.toList());    }    @Override    public int compareTo(Router o) {        return 1;    }}

    AssignGroupReferenceConfig

    public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {    @Override    protected List<URL> loadRegistries(boolean provider) {        List<URL> urls = super.loadRegistries(provider);        if (CollectionUtils.isEmpty(urls)) {            return urls;        }        // 指定动态路由,路由方式为assignGroup        return urls.stream()            .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,                "true"))            .collect(Collectors.toList());    }}

    IndexInfoProviderInvokerProxy

    @Componentpublic class IndexInfoProviderInvokerProxy {    public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();    private final IndexInfoProvider indexInfoProvider;    public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {        this.indexInfoProvider = indexInfoProvider;    }        public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {        return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));    }    private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {        INDEX_TYPE_THREAD_LOCAL.set(indexType);        T result = supplier.get();        INDEX_TYPE_THREAD_LOCAL.remove();        return result;    }}

    到此,相信大家对“dubbo怎么实现consumer从多个group中调用指定group的provider”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    dubbo怎么实现consumer从多个group中调用指定group的provider

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    dubbo怎么实现consumer从多个group中调用指定group的provider

    本篇内容主要讲解“dubbo怎么实现consumer从多个group中调用指定group的provider”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“dubbo怎么实现consumer从多个g
    2023-07-05

    dubbo如何实现consumer从多个group中调用指定group的provider

    这篇文章主要介绍了dubbo如何实现consumer从多个group中调用指定group的provider问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-21

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录