我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用python实现可重入的公平读写锁

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用python实现可重入的公平读写锁

在本项目中,读写锁主要应用于多线程服务器场景下的日志文件的读写,以及缓存的获取和更新。 多线程编程的准标准库posix pthread库拥有rwlock, 而python2.7自带的threading库没有读写锁,只有可重入锁RLock, 因此有必要自己实现一个读写锁以提升程序的并发性。

需要了解的概念

  1. 可重入锁。 可重入锁是指同一个锁可以多次被同一线程加锁而不会死锁。 实现可重入锁的目的是防止递归函数内的加锁行为,或者某些场景内无法获取锁A是否已经被加锁,这时如果不使用可重入锁就会对同一锁多次重复加锁,导致立即死锁。
  2. 读写锁。 读写锁与一般锁最大的区别是对同一共享资源多个线程的读取行为是并行的,同时保持该资源同一时刻只能由一个写进程独占,且写请求相对读请求有更高的优先级以防止writer starvation。( 一般锁同一时刻只能由一个线程独占,不论是读进程还是写进程, 即读写都是串行的,而读写锁读是并行的,写是串行的。)
    读写锁的特点是:
    2.1 当且仅当 锁没有被写进程占用且没有写请求时,可以获得读权限锁
    2.2 当且仅当 锁没有被占用且没有读写请求时,可以获得写权限锁
    读写锁的状态自动机可以参考下图
    rwlock_dfa
    所有数据库都拥有读写锁,当必要时,会自动将读锁提升为写锁,称为lock promotion。

使用读写锁的注意事项

  1. 慎用promote ! 读写锁一般都有提权函数promote()用于将一个已经获取读锁的线程进一步提权获得写锁,这样做很容易导致程序死锁。例如,两个均已经获取读锁的线程A和B同时调用promote函数尝试获得写权限,线程A发现存在读线程B,需要等待B完成以获取写锁,线程B发现存在读线程A,需要等待线程A完成以获取写锁,循环等待发生,程序死锁。因此,当且仅当你能确定当前仅有一个读线程占有锁时才能调用promote函数。一个已经获取读锁的线程提权最好的办法是先释放读锁,然后重新申请写锁。
  2. 使用多个锁时保证加解锁顺序相反。 考虑以下错误代码:
A.lock();
B.lock();
Foo();
A.unlock();
Bar();
B.unlock();

如果在Bar函数中尝试重新获取锁A,那么获取B锁之前先要获取A锁的语义就被破坏了,因为你尝试在拥有锁B的情况下获取锁A,而不是意图实现的相反情况,并且Bar函数在A锁的关键区之外,该实现有可能导致死锁或其它未定义的情况。
正确的实现应该是按照C++中的RAII原则加解锁, 在python中使用with语法

lockA=threading.lock()
lockB=threading.lock()
with lockA:
  with lockB:
    Foo();
  Bar()

读写锁目前的非官方实现

下列为目前发现的python rwlock的非官方实现
1. https://majid.info/blog/a-reader-writer-lock-for-python/
2. https://hdknr.github.io/docs/django/modules/django/utils/synch.html#RWLock
3. https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
4. https://github.com/azraelxyz/rwlock/blob/master/rwlock/rwlock.py

存在的问题

由于4个实现全部贴出代码内容较长,因此这里略去。推荐阅读[1]和[4]的实现。
1. [1]. 使用条件变量实现, [2]. 使用信号量实现,实际效果没有区别(信号量类有内部计数器,既可以当锁又可以当条件变量),但在当前需求下使用条件变量的版本更通俗易懂且[2]. 没有测试代码。 [3]. 中测试代码最全且使用了unittest,但自己实现的信号量_LightSwitch的auquire和release语义和python threading库正好相反,不推荐。 [4]. 的实现最规范也最复杂,已经提交给了issue8800, 与其它3个实现的主要区别是自己实现了可重入锁, 但是没有promote和demote接口也没有测试代码。
2. 除了[2]和[4],其它两个个版本的锁都是不可重入的。
通过分析4个版本的源码可以看出,4个版本[1]的实现最均衡,唯一实现了promote和demote函数,代码也最清晰易懂,但是4个版本均存在无法完全解决writer starvation的问题(没有队列保证公平性,随机唤醒写线程,如果写线程较多可能会出现某一阻塞等待的写线程永远无法被唤醒的情况 )。

改进版的读写锁实现

针对[1]的改进主要包括两点:
1. 增加了写请求队列(python中threading.Queue是线程安全的), 唤醒写线程时按照FIFO实现公平调度,避免大量写进程等待时可能发生的writer starvation
2. 将threading.lock改为可重入的threading.Rlock
3. 如果对同时并发读取的线程数有限制,则可以在RWLock的构造函数__init__中定义一个最大同时读取数max_reader_num,同时将acquire_read中的条件判断替换为:

while self.rwlock < 0 or self.rwlock == max_reader_num or self.writers_waiting:

即可实现限制并发读取的最大线程数。

改进的实现代码

import threading

from Queue import Queue

class RWLock:
        """
        A simple reader-writer lock Several readers can hold the lock
        simultaneously, XOR one writer. Write locks have priority over reads to
        prevent write starvation. wake up writer accords to FIFO
        """
          def __init__(self):
            self.wait_writers_q=Queue()
            self.rwlock = 0
            self.writers_waiting = 0
            self.monitor = threading.RLock()
            self.readers_ok = threading.Condition(self.monitor)

          def acquire_read(self):
            """Acquire a read lock. Several threads can hold this typeof lock.
        It is exclusive with write locks."""
            self.monitor.acquire()
            while self.rwlock < 0 or self.writers_waiting:
              self.readers_ok.wait()
            self.rwlock += 1
            self.monitor.release()

          def acquire_write(self):
            """Acquire a write lock. Only one thread can hold this lock, and
        only when no read locks are also held."""
            self.monitor.acquire()
            while self.rwlock != 0:
              self.writers_waiting += 1
              writers_ok= threading.Condition(self.monitor)
              self.wait_writers_q.put(writers_ok)
              writers_ok.wait()
              self.writers_waiting -= 1
            self.rwlock = -1
            self.monitor.release()

          def promote(self):
            """Promote an already-acquired read lock to a write lock
            WARNING: it is very easy to deadlock with this method"""
            self.monitor.acquire()
            self.rwlock -= 1
            while self.rwlock != 0:
              self.writers_waiting += 1
              writers_ok= threading.Condition(self.monitor)
              self.wait_writers_q.put(writers_ok)
              writers_ok.wait()
              self.writers_waiting -= 1
            self.rwlock = -1
            self.monitor.release()

          def demote(self):
            """Demote an already-acquired write lock to a read lock"""
            self.monitor.acquire()
            self.rwlock = 1
            self.readers_ok.notifyAll()
            self.monitor.release()

          def release(self):
            """Release a lock, whether read or write."""
            self.monitor.acquire()
            if self.rwlock < 0:
              self.rwlock = 0
            else:
              self.rwlock -= 1
            wake_writers = self.writers_waiting and self.rwlock == 0
            wake_readers = self.writers_waiting == 0
            self.monitor.release()
            if wake_writers:
              # print "wake write..."
              writers_ok=self.wait_writers_q.get_nowait()
              writers_ok.acquire()
              writers_ok.notify()
              writers_ok.release()
            elif wake_readers:
              self.readers_ok.acquire()
              self.readers_ok.notifyAll()
              self.readers_ok.release()

测试代码

if __name__ == '__main__':
        import time
        rwl = RWLock()
        class Reader(threading.Thread):
          def run(self):
            print self, 'start'
            rwl.acquire_read()
            print self, 'acquired'
            time.sleep(5)
            print self, 'stop'
            rwl.release()

        class Writer(threading.Thread):
          def run(self):
            print self, 'start'
            rwl.acquire_write()
            print self, 'acquired'
            time.sleep(10)
            print self, 'stop'
            rwl.release()
            rwl.release()

        class ReaderWriter(threading.Thread):
          def run(self):
            print self, 'start'
            rwl.acquire_read()
            print self, 'acquired'
            time.sleep(5)
            rwl.promote()
            print self, 'promoted'
            time.sleep(5)
            print self, 'stop'
            rwl.release()

        class WriterReader(threading.Thread):
          def run(self):
            print self, 'start'
            rwl.acquire_write()
            print self, 'acquired'
            time.sleep(10)
            print self, 'demoted'
            rwl.demote()
            time.sleep(10)
            print self, 'stop'
            rwl.release()

        Reader().start()
        time.sleep(1)
        Reader().start()
        time.sleep(1)
        ReaderWriter().start()
        time.sleep(1)
        WriterReader().start()
        time.sleep(1)
        Reader().start()
        for i in range(5):
                time.sleep(1)
                Writer().start()

测试输出:

[rstudio2@Rserver1 MgtvData]$ python rwlock_test.py
<Reader(Thread-1, started 140256753055488)> start
<Reader(Thread-1, started 140256753055488)> acquired
<Reader(Thread-2, started 140256742565632)> start
<Reader(Thread-2, started 140256742565632)> acquired
<ReaderWriter(Thread-3, started 140256732075776)> start
<ReaderWriter(Thread-3, started 140256732075776)> acquired
<WriterReader(Thread-4, started 140256519124736)> start
<Reader(Thread-5, started 140256508634880)> start
<Reader(Thread-1, started 140256753055488)> stop
<Writer(Thread-6, started 140256753055488)> start
<Reader(Thread-2, started 140256742565632)> stop
<Writer(Thread-7, started 140256742565632)> start
<ReaderWriter(Thread-3, started 140256732075776)> promoted
<Writer(Thread-8, started 140256498145024)> start
<Writer(Thread-9, started 140256487655168)> start
<Writer(Thread-10, started 140256477165312)> start
<ReaderWriter(Thread-3, started 140256732075776)> stop
<WriterReader(Thread-4, started 140256519124736)> acquired
<WriterReader(Thread-4, started 140256519124736)> demoted
<WriterReader(Thread-4, started 140256519124736)> stop
<Writer(Thread-6, started 140256753055488)> acquired
<Writer(Thread-6, started 140256753055488)> stop
<Writer(Thread-7, started 140256742565632)> acquired
<Writer(Thread-7, started 140256742565632)> stop
<Writer(Thread-8, started 140256498145024)> acquired
<Writer(Thread-8, started 140256498145024)> stop
<Writer(Thread-9, started 140256487655168)> acquired
<Writer(Thread-9, started 140256487655168)> stop
<Writer(Thread-10, started 140256477165312)> acquired
<Writer(Thread-10, started 140256477165312)> stop
<Reader(Thread-5, started 140256508634880)> acquired
<Reader(Thread-5, started 140256508634880)> stop

可以看到, 从thread6开始的写进程被依次按照请求的顺序唤醒。

扩展阅读

  1. spin-lock使用while循环的目的是解决spurious wakeup
  2. 使用信号量的目的是解决missed signal
  1. http://tutorials.jenkov.com/java-concurrency/read-write-locks.html

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用python实现可重入的公平读写锁

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

使用python实现可重入的公平读写锁

在本项目中,读写锁主要应用于多线程服务器场景下的日志文件的读写,以及缓存的获取和更新。 多线程编程的准标准库posix pthread库拥有rwlock, 而python2.7自带的threading库没有读写锁,只有可重入锁RLock,
2023-01-31

Curator实现分布式锁(可重入 不可重入 读写 联锁 信号量 栅栏 计数器)

文章目录 前言代码实践1. 配置2. 可重入锁InterProcessMutex3. 不可重入锁InterProcessSemaphoreMutex4. 可重入读写锁InterProcessReadWriteLock5. 联锁Int
2023-08-19

使用python svm实现直接可用的手写数字识别

目录python svm实现手写数字识别——直接可用1、训练1.1、训练数据集下载——已转化成csv文件1.2 、训练源码2、预测单张图片2.1、待预测图像2.2、预测源码2.3、预测结果python svm实现手写数字识别——直接可用 最
2022-06-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录