我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java高并发编程基础三大利器之Semaphore

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java高并发编程基础三大利器之Semaphore

引言

最近可以进行个税申报了,还没有申报的同学可以赶紧去试试哦。不过我反正是从上午到下午(3月1日)一直都没有成功的进行申报,一进行申报 就返回“当前访问人数过多,请稍后再试”。为什么有些人就能够申报成功,有些人就直接返回失败。这很明显申报处理资源是有限的, 只能等别人处理完了在来处理你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。我反正已经是放弃了,等到夜深人静的时候再来试试。作为一个程序员我们肯定知道这是个税申请app的限流操作,如果还有不懂什么 是限流操作的可以参考下这个文章《高并发系统三大利器之限流》。比如个税申报系统每台机器只最多分别最多只能处理1000个请求,再多的请求就会把机器打挂。如果是多余的请求就把这些请求拒绝掉。直接给你返回一句温馨提示:“当前访问人数过多,请稍后再试”,如果要实现这个功能大家想想可以通过哪些方法算法来实现。

共享锁、独占锁

学习semaphore之前我们必须要先了解下什么是共享锁。

共享锁:它是允许多个线程同时获取锁,并发的访问共享资源

独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有, 当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功。

什么是Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。”

  • Semaphore机制是提供给线程抢占式获取许可,所以他可以实现公平或者非公平,类似于ReentrantLock。说了这么多我们来个实际的例子看一看,比如我们去停车场停车,停车场总共只有5个车位,但是现在有8辆汽车来停车,剩下的3辆汽车要么等其他汽车开走后进行停车,或者去找别的停车位?
  1.  
  2. public class SemaphoreTest { 
  3.     public static void main(String[] args) throws InterruptedException { 
  4.          // 初始化五个车位 
  5.         Semaphore semaphore = new Semaphore(5); 
  6.         // 等所有车子 
  7.         final CountDownLatch latch = new CountDownLatch(8); 
  8.         for (int i = 0; i < 8; i++) { 
  9.             int finalI = i; 
  10.             if (i == 5) { 
  11.                 Thread.sleep(1000); 
  12.                 new Thread(() -> { 
  13.                     stopCarNotWait(semaphore, finalI); 
  14.                     latch.countDown(); 
  15.                 }).start(); 
  16.                 continue
  17.             } 
  18.             new Thread(() -> { 
  19.                 stopCarWait(semaphore, finalI); 
  20.                 latch.countDown(); 
  21.             }).start(); 
  22.         } 
  23.         latch.await(); 
  24.         log("总共还剩:" + semaphore.availablePermits() + "个车位"); 
  25.     } 
  26.  
  27.     private static void stopCarWait(Semaphore semaphore, int finalI) { 
  28.         String format = String.format("车牌号%d", finalI); 
  29.         try { 
  30.             semaphore.acquire(1); 
  31.             log(format + "找到车位了,去停车了"); 
  32.             Thread.sleep(10000); 
  33.         } catch (Exception e) { 
  34.             e.printStackTrace(); 
  35.         } finally { 
  36.             semaphore.release(1); 
  37.             log(format + "开走了"); 
  38.         } 
  39.     } 
  40.  
  41.     private static void stopCarNotWait(Semaphore semaphore, int finalI) { 
  42.          String format = String.format("车牌号%d", finalI); 
  43.         try { 
  44.             if (semaphore.tryAcquire()) { 
  45.                 log(format + "找到车位了,去停车了"); 
  46.                 Thread.sleep(10000); 
  47.                 log(format + "开走了"); 
  48.                 semaphore.release(); 
  49.             } else { 
  50.                 log(format + "没有停车位了,不在这里等了去其他地方停车去了"); 
  51.             } 
  52.         } catch (Exception e) { 
  53.             e.printStackTrace(); 
  54.         } 
  55.  
  56.     } 
  57.  
  58.     public static void log(String content) { 
  59.         // 格式化 
  60.         DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); 
  61.         // 当前时间 
  62.         LocalDateTime now = LocalDateTime.now(); 
  63.         System.out.println(now.format(fmTime) + "  "+content); 
  64.     } 
  1. 2021-03-01 18:54:57  车牌号0找到车位了,去停车了 
  2. 2021-03-01 18:54:57  车牌号3找到车位了,去停车了 
  3. 2021-03-01 18:54:57  车牌号2找到车位了,去停车了 
  4. 2021-03-01 18:54:57  车牌号1找到车位了,去停车了 
  5. 2021-03-01 18:54:57  车牌号4找到车位了,去停车了 
  6. 2021-03-01 18:54:58  车牌号5没有停车位了,不在这里等了去其他地方停车去了 
  7. 2021-03-01 18:55:07  车牌号7找到车位了,去停车了 
  8. 2021-03-01 18:55:07  车牌号6找到车位了,去停车了 
  9. 2021-03-01 18:55:07  车牌号2开走了 
  10. 2021-03-01 18:55:07  车牌号0开走了 
  11. 2021-03-01 18:55:07  车牌号3开走了 
  12. 2021-03-01 18:55:07  车牌号4开走了 
  13. 2021-03-01 18:55:07  车牌号1开走了 
  14. 2021-03-01 18:55:17  车牌号7开走了 
  15. 2021-03-01 18:55:17  车牌号6开走了 
  16. 2021-03-01 18:55:17  总共还剩:5个车位 

从输出结果我们可以看到车牌号5这辆车看见没有车位了,就不在这个地方傻傻的等了,而是去其他地方了,但是车牌号6和车牌号7分别需要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphore 的acquire 方法如果没有获取到凭证它就会阻塞,而tryAcquire方法如果没有获取到凭证不会阻塞的。

semaphore在dubbo中的应用

在Dubbo中可以给Provider配置线程池大小来控制系统提供服务的最大并行度,默认是200。

  1. "200"/> 

比如我现在这个订单系统有三个接口,分别为创单、取消订单、修改订单。这三个接口加起来的并发是200但是创单接口是核心接口,我想让它多分点线程来执行 让它可以有最大150个线程,取消订单和修改订单分别最大25个线程执行就可以了。dubbo提供了executes这一属性来实现这个功能

  1. "cn.javajr.service.CreateOrderService" executes="150"/> 
  2. "cn.javajr.service.CancelOrderService" executes="25"/> 
  3. "cn.javajr.service.EditOrderService" executes="25"/> 

我们可以看看dubbo内部是如何来executes的,具体实现是在ExecuteLimitFilter这个类我们可以

  1.  public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { 
  2.         URL url = invoker.getUrl(); 
  3.         String methodName = invocation.getMethodName(); 
  4.         Semaphore executesLimit = null
  5.         boolean acquireResult = false
  6.         int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); 
  7.         if (max > 0) { 
  8.             RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); 
  9.             // 如果当前使用的线程数量已经大于等于设置的阈值,那么直接抛出异常 
  10. //            if (count.getActive() >= max) { 
  11. // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than  + max + "\" /> limited."); 
  12.              
  13.               
  14.             executesLimit = count.getSemaphore(max); 
  15.             if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { 
  16.                 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than  + max + "\" /> limited."); 
  17.             } 
  18.         } 
  19.         long begin = System.currentTimeMillis(); 
  20.         boolean isSuccess = true
  21.         // 计数器+1 
  22.         RpcStatus.beginCount(url, methodName); 
  23.         try { 
  24.             Result result = invoker.invoke(invocation); 
  25.             return result; 
  26.         } catch (Throwable t) { 
  27.             isSuccess = false
  28.             if (t instanceof RuntimeException) { 
  29.                 throw (RuntimeException) t; 
  30.             } else { 
  31.                 throw new RpcException("unexpected exception when ExecuteLimitFilter", t); 
  32.             } 
  33.         } finally { 
  34.            // 计数器-1 
  35.             RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); 
  36.             if(acquireResult) { 
  37.                 executesLimit.release(); 
  38.             } 
  39.         } 
  40.     } 

从上述代码我们也可以看出早期这个是没有采用Semaphore来实现的,而是直接采用被注释的 if (count.getActive() >= max) 这个来来实现的,由于这个count.getActive() >= max 和这个计数加1不是原子性的,所以会有问题,具体bug号可以看https://github.com/apache/dubbo/pull/582后面才采用上述代码用Semaphore来修复非原子性问题。具体更详细的分析可以参见代码的链接。不过现在最新版本(2.7.9)我看是采用采用自旋加上和CAS来实现的。

Semaphore

上面就是对Semaphore一个简单的使用以及dubbo中用到的例子,说句实话Semaphore在工作中用的还是比较少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。我们前面《Java高并发编程基础之AQS》通过ReentrantLock 一起学习了下AQS,其实Semaphore同样也是通过AQS来是实现的,我们可以一起来对照下独占锁的方法,基本上都是有方法一一相对应的。图片这里有两点稍微需要注意的地方:

  • 在独占锁模式中,我们只有在获取了独占锁的节点释放锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被释放,就没有必要去唤醒它的后继节点。
  • 在共享锁模式下,当一个节点获取到了共享锁,我们在获取成功后就可以唤醒后继节点了,而不需要等到该节点释放锁的时候,这是因为共享锁可以被多个线程同时持有,一个锁获取到了,则后继的节点都可以直接来获取。因此,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。

获取凭证

我们同样还是通过非公平锁的模式来获取凭证 我们可以看下acquire的核心方法

  1. public final void acquireSharedInterruptibly(int arg) 
  2.           throws InterruptedException { 
  3.       if (Thread.interrupted()) 
  4.           throw new InterruptedException(); 
  5.       if (tryAcquireShared(arg) < 0) 
  6.           doAcquireSharedInterruptibly(arg); 
  7.   } 
  8.    protected int tryAcquireShared(int acquires) { 
  9.            return nonfairTryAcquireShared(acquires); 
  10.   } 
  11.  
  12. // 主要看下这个方法,这个方法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireShared 
  13.    final int nonfairTryAcquireShared(int acquires) { 
  14.          //自旋 
  15.    for (;;) { 
  16.         //Semaphore用AQS的state变量的值代表可用许可数 
  17.         int available = getState(); 
  18.         //可用许可数减去本次需要获取的许可数即为剩余许可数 
  19.         int remaining = available - acquires; 
  20.         //如果剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数 
  21.         if (remaining < 0 || 
  22.             compareAndSetState(available, remaining)) 
  23.             return remaining; 
  24.     } 
  • 当tryAcquireShared 获取返回许可书小于0时说明获取许可失败需要进入doAcquireSharedInterruptibly这个方法去休眠。
  • 当tryAcquireShared 获取返回许可书小于0时说明获取许可成功直接结束。

doAcquireSharedInterruptibly

  1. private void doAcquireSharedInterruptibly(int arg) 
  2.        throws InterruptedException { 
  3.        // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE), 
  4.        // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式 
  5.        final Node node = addWaiter(Node.SHARED); 
  6.        boolean failed = true
  7.        try { 
  8.            for (;;) { 
  9.                final Node p = node.predecessor(); 
  10.                if (p == head) { 
  11.                    int r = tryAcquireShared(arg); 
  12.                    if (r >= 0) { 
  13.                        setHeadAndPropagate(node, r); 
  14.                        p.next = null; // help GC 
  15.                        failed = false
  16.                        return
  17.                    } 
  18.                } 
  19.                if (shouldParkAfterFailedAcquire(p, node) && 
  20.                    parkAndCheckInterrupt()) 
  21.                    throw new InterruptedException(); 
  22.            } 
  23.        } finally { 
  24.            if (failed) 
  25.                cancelAcquire(node); 
  26.        } 
  27.    } 

这个方法是不是跟我们上篇文章讲的AQS的独占锁的acquireQueued很像,不过独占锁它是直接调用了用了setHead(node)方法,而共享锁调用的是setHeadAndPropagate(node, r)这个方法除了调用setHead 里面还调用了doReleaseShared(唤醒后继节点)

  1. private void setHeadAndPropagate(Node node, int propagate) { 
  2.       Node h = head; // Record old head for check below 
  3.       setHead(node); 
  4.       if (propagate > 0 || h == null || h.waitStatus < 0 || 
  5.           (h = head) == null || h.waitStatus < 0) { 
  6.           Node s = node.next
  7.           if (s == null || s.isShared()) 
  8.               doReleaseShared(); 
  9.       } 
  10.   } 

其他的方法基本上是和ReentrantLock来实现的独占锁差不多,我相信大家对源码分析感兴趣的应该也不多,其他更多细节问题还是需要自己亲自动手去看源码的。

总结

当信号量Semaphore初始化设置许可证为1 时,它也可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。

Semaphore是JUC包中的一个很简单的工具类,用来实现多线程下对于资源的同一时刻的访问线程数限制

Semaphore中存在一个【许可】的概念,即访问资源之前,先要获得许可,如果当前许可数量为0,那么线程阻塞,直到获得许可

Semaphore内部使用AQS实现,由抽象内部类Sync继承了AQS。因为Semaphore天生就是共享的场景,所以其内部实际上类似于共享锁的实现

共享锁的调用框架和独占锁很相似,它们最大的不同在于获取锁的逻辑——共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。

由于共享锁同一时刻可以被多个线程持有,因此当头节点获取到共享锁时,可以立即唤醒后继节点来争锁,而不必等到释放锁的时候。因此,共享锁触发唤醒后继节点的行为可能有两处,一处在当前节点成功获得共享锁后,一处在当前节点释放共享锁后。

采用semaphore来进行限流的话会产生突刺现象。

★指在一定时间内的一小段时间内就用完了所有资源,后大部分时间中无资源可用。比如在限流方法中的计算器算法,设置1s内的最大请求数为100,在前100ms已经有了100个请求,则后面900ms将无法处理请求,这就是突刺现象。

本文转载自微信公众号「java金融」,可以通过以下二维码关注。转载本文请联系java金融公众号。

 

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java高并发编程基础三大利器之Semaphore

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java高并发编程基础三大利器之Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Java高并发编程基础三大利器之CyclicBarrier

前面一篇文章我们《Java高并发编程基础三大利器之CountDownLatch》它有一个缺点,就是它的计数器只能够使用一次,也就是说当计数器(state)减到为 0的时候,如果 再有线程调用去 await() 方法,该线程会直接通过,不会再

Java高并发编程基础三大利器之CountDownLatch

CountDownLatch是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就减1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上(调用await方法的线程)等待的线程就可以恢复工作了。

Java高并发编程基础之AQS

大多数人应该都可以说出 CountDownLatch、CyclicBarrier、Sempahore多线程并发三大利器。这三大利器都是通过AbstractQueuedSynchronizer抽象类(下面简写AQS)来实现的,所以学习三大利器

如何使用Java高并发编程之Semaphore

本篇内容主要讲解“如何使用Java高并发编程之Semaphore”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用Java高并发编程之Semaphore”吧!共享锁、独占锁学习semapho
2023-06-15

Java高并发编程基础之如何使用AQS

本篇内容主要讲解“Java高并发编程基础之如何使用AQS”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java高并发编程基础之如何使用AQS”吧! 引言曾经有一道比较比较经典的面试题“你能够说说
2023-06-15

Java并发编程之介绍线程安全基础的示例

这篇文章主要介绍了Java并发编程之介绍线程安全基础的示例,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。线程安全基础1.线程安全问题2.账户取款案例3.同步代码块synchr
2023-06-06

Java 并发集合:揭秘高效并行编程的利器

Java 并发集合为多线程编程提供了强大而丰富的工具集合,通过合理使用这些集合,可以极大地提升代码的并发性能和可扩展性。
Java 并发集合:揭秘高效并行编程的利器
2024-02-07

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录