我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用Redisson订阅数问题怎么解决

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用Redisson订阅数问题怎么解决

本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

    一、前提

    最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。

    二、源码分析

    下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

    1、RedissonLock#lock() 方法

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {        long threadId = Thread.currentThread().getId();        // 尝试获取,如果ttl == null,则表示获取锁成功        Long ttl = tryAcquire(leaseTime, unit, threadId);        // lock acquired        if (ttl == null) {            return;        }        // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题        RFuture<RedissonLockEntry> future = subscribe(threadId);        if (interruptibly) {            commandExecutor.syncSubscriptionInterrupted(future);        } else {            commandExecutor.syncSubscription(future);        }        // 后面代码忽略        try {            // 无限循环获取锁,直到获取锁成功            // ...        } finally {            // 取消订阅锁释放事件            unsubscribe(future, threadId);        }}

    总结下主要逻辑:

    • 获取当前线程的线程id;

    • tryAquire尝试获取锁,并返回ttl

    • 如果ttl为空,则结束流程;否则进入后续逻辑;

    • this.subscribe(threadId)订阅当前线程,返回一个RFuture;

    • 如果在指定时间没有监听到,则会产生如上异常。

    • 订阅成功后, 通过while(true)循环,一直尝试获取锁

    • fially代码块,会解除订阅

    所以上述这情况问题应该出现在subscribe()方法中

    2、详细看下subscribe()方法

    protected RFuture<RedissonLockEntry> subscribe(long threadId) {    // entryName 格式:“id:name”;    // channelName 格式:“redisson_lock__channel:name”;    return pubSub.subscribe(getEntryName(), getChannelName());}

    RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {    // ....    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}

    而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的

    public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {    this(config, id);    this.config = cfg;    // 初始化    initTimer(cfg);    initSingleEntry();}protected void initTimer(MasterSlaveServersConfig config) {    int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};    Arrays.sort(timeouts);    int minTimeout = timeouts[0];    if (minTimeout % 100 != 0) {        minTimeout = (minTimeout % 100) / 2;    } else if (minTimeout == 100) {        minTimeout = 50;    } else {        minTimeout = 100;    }    timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);    connectionWatcher = new IdleConnectionWatcher(this, config);    // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:    subscribeService = new PublishSubscribeService(this, config);}

    PublishSubscribeService构造函数

    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {    super();    this.connectionManager = connectionManager;    this.config = config;    for (int i = 0; i < locks.length; i++) {        // 这里初始化了一组信号量,每个信号量的初始值为1        locks[i] = new AsyncSemaphore(1);    }}

    3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面

    private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();public RFuture<E> subscribe(String entryName, String channelName) {      // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量     // public AsyncSemaphore getSemaphore(ChannelName channelName) {    //    return locks[Math.abs(channelName.hashCode() % locks.length)];    // }    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));    AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();        RPromise<E> newPromise = new RedissonPromise<E>() {        @Override        public boolean cancel(boolean mayInterruptIfRunning) {            return semaphore.remove(listenerHolder.get());        }    };    Runnable listener = new Runnable() {        @Override        public void run() {            //  如果存在RedissonLockEntry, 则直接利用已有的监听            E entry = entries.get(entryName);            if (entry != null) {                entry.acquire();                semaphore.release();                entry.getPromise().onComplete(new TransferListener<E>(newPromise));                return;            }            E value = createEntry(newPromise);            value.acquire();            E oldValue = entries.putIfAbsent(entryName, value);            if (oldValue != null) {                oldValue.acquire();                semaphore.release();                oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));                return;            }            // 创建监听,            RedisPubSubListener<Object> listener = createListener(channelName, value);            // 订阅监听            service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);        }    };    // 最终会执行listener.run方法    semaphore.acquire(listener);    listenerHolder.set(listener);    return newPromise;}

    AsyncSemaphore#acquire()方法

    public void acquire(Runnable listener) {    acquire(listener, 1);}public void acquire(Runnable listener, int permits) {    boolean run = false;    synchronized (this) {        // counter初始化值为1        if (counter < permits) {            // 如果不是第一次执行,则将listener加入到listeners集合中            listeners.add(new Entry(listener, permits));            return;        } else {            counter -= permits;            run = true;        }    }    // 第一次执行acquire, 才会执行listener.run()方法    if (run) {        listener.run();    }}

    梳理上述逻辑:

    从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
    2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
    3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
    4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。

    从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法

    4、PublishSubscribeService#subscribe逻辑如下:

    private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {    RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();    // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。    subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);    return promise;}private void subscribe(Codec codec, ChannelName channelName,  RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {    PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);    if (connEntry != null) {        // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中        addListeners(channelName, promise, type, lock, connEntry, listeners);        return;    }    // 没有时,才是最重要的逻辑    freePubSubLock.acquire(new Runnable() {        @Override        public void run() {            if (promise.isDone()) {                lock.release();                freePubSubLock.release();                return;            }            // 从队列中取头部元素            PubSubConnectionEntry freeEntry = freePubSubConnections.peek();            if (freeEntry == null) {                // 第一次肯定是没有的需要建立                connect(codec, channelName, promise, type, lock, listeners);                return;            }            // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。            int remainFreeAmount = freeEntry.tryAcquire();            if (remainFreeAmount == -1) {                throw new IllegalStateException();            }            PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);            if (oldEntry != null) {                freeEntry.release();                freePubSubLock.release();                addListeners(channelName, promise, type, lock, oldEntry, listeners);                return;            }            // 如果remainFreeAmount=0, 则从队列中移除            if (remainFreeAmount == 0) {                freePubSubConnections.poll();            }            freePubSubLock.release();            // 增加监听            RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);            ChannelFuture future;            if (PubSubType.PSUBSCRIBE == type) {                future = freeEntry.psubscribe(codec, channelName);            } else {                future = freeEntry.subscribe(codec, channelName);            }            future.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    if (!future.isSuccess()) {                        if (!promise.isDone()) {                            subscribeFuture.cancel(false);                        }                        return;                    }                    connectionManager.newTimeout(new TimerTask() {                        @Override                        public void run(Timeout timeout) throws Exception {                            subscribeFuture.cancel(false);                        }                    }, config.getTimeout(), TimeUnit.MILLISECONDS);                }            });        }    });}private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {    // 根据channelName计算出slot获取PubSubConnection    int slot = connectionManager.calcSlot(channelName.getName());    RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);    promise.onComplete((res, e) -> {        if (e != null) {            ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e);        }    });    connFuture.onComplete((conn, e) -> {        if (e != null) {            freePubSubLock.release();            lock.release();            promise.tryFailure(e);            return;        }        // 这里会从配置中读取subscriptionsPerConnection        PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());        // 每获取一次,subscriptionsPerConnection就会减直到为0        int remainFreeAmount = entry.tryAcquire();        // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中        PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);        if (oldEntry != null) {            releaseSubscribeConnection(slot, entry);            freePubSubLock.release();            addListeners(channelName, promise, type, lock, oldEntry, listeners);            return;        }        if (remainFreeAmount > 0) {            // 加入到队列中            freePubSubConnections.add(entry);        }        freePubSubLock.release();        RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);        // 这里真正的进行订阅(底层与redis交互)        ChannelFuture future;        if (PubSubType.PSUBSCRIBE == type) {            future = entry.psubscribe(codec, channelName);        } else {            future = entry.subscribe(codec, channelName);        }        future.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    if (!promise.isDone()) {                        subscribeFuture.cancel(false);                    }                    return;                }                connectionManager.newTimeout(new TimerTask() {                    @Override                    public void run(Timeout timeout) throws Exception {                        subscribeFuture.cancel(false);                    }                }, config.getTimeout(), TimeUnit.MILLISECONDS);            }        });    });}

    PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

     public int tryAcquire() {    while (true) {        int value = subscribedChannelsAmount.get();        if (value == 0) {            return -1;        }        if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {            return value - 1;        }    }}

    梳理上述逻辑:

    还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
    2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法

    1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
    2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
    2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
    2.4 后面就是进行底层的subscribe和addListener

    如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
    4、如果remainFreeAmount < 0 会抛出IllegalStateException异常;如果remainFreeAmount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
    5、最后也是进行底层的subscribe和addListener;

    读到这里,这篇“使用Redisson订阅数问题怎么解决”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    使用Redisson订阅数问题怎么解决

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    使用Redisson订阅数问题怎么解决

    本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、前提最近在使用
    2023-06-26

    Spring Cache怎么使用Redisson分布式锁解决缓存击穿问题

    本篇内容主要讲解“Spring Cache怎么使用Redisson分布式锁解决缓存击穿问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spring Cache怎么使用Redisson分布式锁解
    2023-06-30

    java怎么解决订单状态扭转问题

    这篇文章主要讲解了“java怎么解决订单状态扭转问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java怎么解决订单状态扭转问题”吧!状态机机制状态机机制是一种常用的解决状态扭转问题的方法
    2023-07-05

    SAP采购订单报错问题怎么解决

    今天小编给大家分享一下SAP采购订单报错问题怎么解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。问题:SAP MM 明明已
    2023-06-05

    怎么使用github解决问题

    在当今软件开发行业中,Github已经成为了解决问题的一个重要工具。Github是一个面向开源及私有软件项目的托管平台,因为其丰富的特性得到了全球开发者的广泛喜爱。在利用Github解决问题时,需要注意以下几个方面。一、Github是什么G
    2023-10-22

    怎么使用Puppeteer解决SEO问题

    这篇文章主要讲解了“怎么使用Puppeteer解决SEO问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么使用Puppeteer解决SEO问题”吧!引言在前端开发中,我们经常会遇到SEO
    2023-07-05

    Vue使用swiper问题怎么解决

    本文小编为大家详细介绍“Vue使用swiper问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“Vue使用swiper问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、下载指定版本swipe
    2023-07-06

    使用Java怎么解决跨域问题

    今天就跟大家聊聊有关使用Java怎么解决跨域问题,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。什么是跨域(CORS)跨域(CORS)是指不同域名之间相互访问。跨域,指的是浏览器不能执
    2023-06-06

    hibernate的orphanRemoval使用问题怎么解决

    在使用 Hibernate 的 orphanRemoval 属性时,可能会遇到一些问题。下面是一些常见问题的解决方法:1. 单向关联关系:如果你在单向关联关系中设置了 orphanRemoval=true,那么当父实体从关联关系中删除一个子
    2023-09-12

    React中useEffect使用问题怎么解决

    本篇内容介绍了“React中useEffect使用问题怎么解决”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!前言最近看了一下 ant-des
    2023-07-02

    使用jBuilder8出现问题怎么解决

    如果您在使用jBuilder8时遇到问题,可以尝试以下解决方案:确保您的操作系统和Java版本与jBuilder8的要求兼容。查看jBuilder8的官方文档或网站,了解最低要求和兼容性信息。检查您的jBuilder8安装是否正确。如果您是
    使用jBuilder8出现问题怎么解决
    2023-10-28

    Elasticsearch使用常见问题怎么解决

    这篇文章主要介绍“Elasticsearch使用常见问题怎么解决”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Elasticsearch使用常见问题怎么解决”文章能帮助大家解决问题。一、和redis
    2023-06-05

    php使用composer常见问题怎么解决

    这篇文章主要介绍php使用composer常见问题怎么解决,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!php有什么特点1、执行速度快。2、具有很好的开放性和可扩展性。3、PHP支持多种主流与非主流的数据库。4、面向
    2023-06-14

    Mysql使用on update current_timestamp问题怎么解决

    本文小编为大家详细介绍“Mysql使用on update current_timestamp问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“Mysql使用on update current_timestamp问题怎么解决”文章能
    2023-07-05

    c#中使用Environment.Exit的问题怎么解决

    在C#中,使用Environment.Exit方法可以立即终止应用程序的执行。如果您想解决使用Environment.Exit方法的问题,可以尝试以下几种方法:使用return语句:在需要终止程序的地方,可以使用return语句直接返回,这
    c#中使用Environment.Exit的问题怎么解决
    2024-02-29

    win10 gpu使用常见问题怎么解决

    这篇文章主要介绍了win10 gpu使用常见问题怎么解决的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇win10 gpu使用常见问题怎么解决文章都会有所收获,下面我们一起来看看吧。win10gpu常见问题解决方
    2023-07-01

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录