使用python实现可重入的公平读写锁
在本项目中,读写锁主要应用于多线程服务器场景下的日志文件的读写,以及缓存的获取和更新。 多线程编程的准标准库posix pthread库拥有rwlock, 而python2.7自带的threading库没有读写锁,只有可重入锁RLock, 因此有必要自己实现一个读写锁以提升程序的并发性。
需要了解的概念
- 可重入锁。 可重入锁是指同一个锁可以多次被同一线程加锁而不会死锁。 实现可重入锁的目的是防止递归函数内的加锁行为,或者某些场景内无法获取锁A是否已经被加锁,这时如果不使用可重入锁就会对同一锁多次重复加锁,导致立即死锁。
- 读写锁。 读写锁与一般锁最大的区别是对同一共享资源多个线程的读取行为是并行的,同时保持该资源同一时刻只能由一个写进程独占,且写请求相对读请求有更高的优先级以防止writer starvation。( 一般锁同一时刻只能由一个线程独占,不论是读进程还是写进程, 即读写都是串行的,而读写锁读是并行的,写是串行的。)
读写锁的特点是:
2.1 当且仅当 锁没有被写进程占用且没有写请求时,可以获得读权限锁
2.2 当且仅当 锁没有被占用且没有读写请求时,可以获得写权限锁
读写锁的状态自动机可以参考下图
所有数据库都拥有读写锁,当必要时,会自动将读锁提升为写锁,称为lock promotion。
使用读写锁的注意事项
- 慎用promote ! 读写锁一般都有提权函数promote()用于将一个已经获取读锁的线程进一步提权获得写锁,这样做很容易导致程序死锁。例如,两个均已经获取读锁的线程A和B同时调用promote函数尝试获得写权限,线程A发现存在读线程B,需要等待B完成以获取写锁,线程B发现存在读线程A,需要等待线程A完成以获取写锁,循环等待发生,程序死锁。因此,当且仅当你能确定当前仅有一个读线程占有锁时才能调用promote函数。一个已经获取读锁的线程提权最好的办法是先释放读锁,然后重新申请写锁。
- 使用多个锁时保证加解锁顺序相反。 考虑以下错误代码:
A.lock();
B.lock();
Foo();
A.unlock();
Bar();
B.unlock();
如果在Bar函数中尝试重新获取锁A,那么获取B锁之前先要获取A锁的语义就被破坏了,因为你尝试在拥有锁B的情况下获取锁A,而不是意图实现的相反情况,并且Bar函数在A锁的关键区之外,该实现有可能导致死锁或其它未定义的情况。
正确的实现应该是按照C++中的RAII原则加解锁, 在python中使用with语法
lockA=threading.lock()
lockB=threading.lock()
with lockA:
with lockB:
Foo();
Bar()
读写锁目前的非官方实现
下列为目前发现的python rwlock的非官方实现
1. https://majid.info/blog/a-reader-writer-lock-for-python/
2. https://hdknr.github.io/docs/django/modules/django/utils/synch.html#RWLock
3. https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
4. https://github.com/azraelxyz/rwlock/blob/master/rwlock/rwlock.py
存在的问题
由于4个实现全部贴出代码内容较长,因此这里略去。推荐阅读[1]和[4]的实现。
1. [1]. 使用条件变量实现, [2]. 使用信号量实现,实际效果没有区别(信号量类有内部计数器,既可以当锁又可以当条件变量),但在当前需求下使用条件变量的版本更通俗易懂且[2]. 没有测试代码。 [3]. 中测试代码最全且使用了unittest,但自己实现的信号量_LightSwitch的auquire和release语义和python threading库正好相反,不推荐。 [4]. 的实现最规范也最复杂,已经提交给了issue8800, 与其它3个实现的主要区别是自己实现了可重入锁, 但是没有promote和demote接口也没有测试代码。
2. 除了[2]和[4],其它两个个版本的锁都是不可重入的。
通过分析4个版本的源码可以看出,4个版本[1]的实现最均衡,唯一实现了promote和demote函数,代码也最清晰易懂,但是4个版本均存在无法完全解决writer starvation的问题(没有队列保证公平性,随机唤醒写线程,如果写线程较多可能会出现某一阻塞等待的写线程永远无法被唤醒的情况 )。
改进版的读写锁实现
针对[1]的改进主要包括两点:
1. 增加了写请求队列(python中threading.Queue是线程安全的), 唤醒写线程时按照FIFO实现公平调度,避免大量写进程等待时可能发生的writer starvation
2. 将threading.lock改为可重入的threading.Rlock
3. 如果对同时并发读取的线程数有限制,则可以在RWLock的构造函数__init__中定义一个最大同时读取数max_reader_num,同时将acquire_read中的条件判断替换为:
while self.rwlock < 0 or self.rwlock == max_reader_num or self.writers_waiting:
即可实现限制并发读取的最大线程数。
改进的实现代码
import threading
from Queue import Queue
class RWLock:
"""
A simple reader-writer lock Several readers can hold the lock
simultaneously, XOR one writer. Write locks have priority over reads to
prevent write starvation. wake up writer accords to FIFO
"""
def __init__(self):
self.wait_writers_q=Queue()
self.rwlock = 0
self.writers_waiting = 0
self.monitor = threading.RLock()
self.readers_ok = threading.Condition(self.monitor)
def acquire_read(self):
"""Acquire a read lock. Several threads can hold this typeof lock.
It is exclusive with write locks."""
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
"""Acquire a write lock. Only one thread can hold this lock, and
only when no read locks are also held."""
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
writers_ok= threading.Condition(self.monitor)
self.wait_writers_q.put(writers_ok)
writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def promote(self):
"""Promote an already-acquired read lock to a write lock
WARNING: it is very easy to deadlock with this method"""
self.monitor.acquire()
self.rwlock -= 1
while self.rwlock != 0:
self.writers_waiting += 1
writers_ok= threading.Condition(self.monitor)
self.wait_writers_q.put(writers_ok)
writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def demote(self):
"""Demote an already-acquired write lock to a read lock"""
self.monitor.acquire()
self.rwlock = 1
self.readers_ok.notifyAll()
self.monitor.release()
def release(self):
"""Release a lock, whether read or write."""
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
# print "wake write..."
writers_ok=self.wait_writers_q.get_nowait()
writers_ok.acquire()
writers_ok.notify()
writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notifyAll()
self.readers_ok.release()
测试代码
if __name__ == '__main__':
import time
rwl = RWLock()
class Reader(threading.Thread):
def run(self):
print self, 'start'
rwl.acquire_read()
print self, 'acquired'
time.sleep(5)
print self, 'stop'
rwl.release()
class Writer(threading.Thread):
def run(self):
print self, 'start'
rwl.acquire_write()
print self, 'acquired'
time.sleep(10)
print self, 'stop'
rwl.release()
rwl.release()
class ReaderWriter(threading.Thread):
def run(self):
print self, 'start'
rwl.acquire_read()
print self, 'acquired'
time.sleep(5)
rwl.promote()
print self, 'promoted'
time.sleep(5)
print self, 'stop'
rwl.release()
class WriterReader(threading.Thread):
def run(self):
print self, 'start'
rwl.acquire_write()
print self, 'acquired'
time.sleep(10)
print self, 'demoted'
rwl.demote()
time.sleep(10)
print self, 'stop'
rwl.release()
Reader().start()
time.sleep(1)
Reader().start()
time.sleep(1)
ReaderWriter().start()
time.sleep(1)
WriterReader().start()
time.sleep(1)
Reader().start()
for i in range(5):
time.sleep(1)
Writer().start()
测试输出:
[rstudio2@Rserver1 MgtvData]$ python rwlock_test.py
<Reader(Thread-1, started 140256753055488)> start
<Reader(Thread-1, started 140256753055488)> acquired
<Reader(Thread-2, started 140256742565632)> start
<Reader(Thread-2, started 140256742565632)> acquired
<ReaderWriter(Thread-3, started 140256732075776)> start
<ReaderWriter(Thread-3, started 140256732075776)> acquired
<WriterReader(Thread-4, started 140256519124736)> start
<Reader(Thread-5, started 140256508634880)> start
<Reader(Thread-1, started 140256753055488)> stop
<Writer(Thread-6, started 140256753055488)> start
<Reader(Thread-2, started 140256742565632)> stop
<Writer(Thread-7, started 140256742565632)> start
<ReaderWriter(Thread-3, started 140256732075776)> promoted
<Writer(Thread-8, started 140256498145024)> start
<Writer(Thread-9, started 140256487655168)> start
<Writer(Thread-10, started 140256477165312)> start
<ReaderWriter(Thread-3, started 140256732075776)> stop
<WriterReader(Thread-4, started 140256519124736)> acquired
<WriterReader(Thread-4, started 140256519124736)> demoted
<WriterReader(Thread-4, started 140256519124736)> stop
<Writer(Thread-6, started 140256753055488)> acquired
<Writer(Thread-6, started 140256753055488)> stop
<Writer(Thread-7, started 140256742565632)> acquired
<Writer(Thread-7, started 140256742565632)> stop
<Writer(Thread-8, started 140256498145024)> acquired
<Writer(Thread-8, started 140256498145024)> stop
<Writer(Thread-9, started 140256487655168)> acquired
<Writer(Thread-9, started 140256487655168)> stop
<Writer(Thread-10, started 140256477165312)> acquired
<Writer(Thread-10, started 140256477165312)> stop
<Reader(Thread-5, started 140256508634880)> acquired
<Reader(Thread-5, started 140256508634880)> stop
可以看到, 从thread6开始的写进程被依次按照请求的顺序唤醒。
扩展阅读
- spin-lock使用while循环的目的是解决spurious wakeup
- 使用信号量的目的是解决missed signal
- http://tutorials.jenkov.com/java-concurrency/read-write-locks.html
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341