Python+Redis从零打造分布式锁实战示例
引言
在分布式系统中,多个节点可能需要访问同一共享资源,这就需要一种协调机制来保证在同一时刻只有一个节点进行操作,这就是分布式锁的作用。
1. 简单实现(基于SETNX命令)
使用setnx命令(set if not exists)尝试设置一个key-value对,如果key不存在,则设置成功并返回1,表示获取到了锁。
import Redis
import time
def acquire_lock(redis_client, lock_key):
identifier = str(time.time()) + str(id(threading.current_thread()))
acquired = redis_client.setnx(lock_key, identifier)
return acquired
def release_lock(redis_client, lock_key, identifier):
pipe = redis_client.pipeline(True)
while True:
try:
pipe.watch(lock_key)
if pipe.get(lock_key) == identifier:
pipe.multi()
pipe.delete(lock_key)
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
# 使用示例
r = redis.Redis(host='localhost', port=6379, db=0)
lock_key = 'mylock'
if acquire_lock(r, lock_key):
print("Lock acquired")
# 执行临界区代码...
release_lock(r, lock_key, identifier)
2. 改进:设置超时时间(expire命令)
为防止进程崩溃导致锁无法释放,可以给锁设置一个过期时间。
def acquire_lock_with_timeout(redis_client, lock_key, expire_time):
identifier = str(time.time()) + str(id(threading.current_thread()))
while True:
result = redis_client.set(lock_key, identifier, nx=True, ex=expire_time)
if result:
return identifier
time.sleep(0.1)
# 使用示例不变,但在acquire_lock_with_timeout函数中增加了expire_time参数
3. 进一步改进:使用Lua脚本保证原子性
在释放锁的时候,需要确保删除的是自己设置的锁,且这个操作是原子的。可以使用Lua脚本来实现。
RELEASE_LOCK_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
def release_lock LUA(redis_client, lock_key, identifier):
release_script = redis_client.register_script(RELEASE_LOCK_SCRIPT)
released = release_script(keys=[lock_key], args=[identifier])
return bool(released)
# 使用示例不变
4. 更完善的实现,Redlock算法
Redlock算法的实现相对复杂,以下是一个简化的python示例,展示了如何在多个Redis实例上尝试获取和释放锁。请注意,在实际应用中,需要确保Redis实例之间的时间同步,并且考虑网络延迟等因素。
import redis
import time
class RedLock:
def __init__(self, masters, ttl):
self.masters = [redis.Redis(host=host, port=port, db=db) for host, port, db in masters]
self.quorum = len(self.masters) // 2 + 1
self.ttl = ttl
def lock(self, resource_id):
expiration = time.time() + self.ttl
while time.time() < expiration:
acquired_count = 0
locked_instances = []
# 尝试在每个Redis实例上获取锁
for master in self.masters:
if master.set(resource_id, 'locked', nx=True, px=self.ttl):
acquired_count += 1
locked_instances.append(master)
# 如果获得超过半数以上的锁,则认为成功获取分布式锁
if acquired_count > self.quorum:
return locked_instances
# 清除已获取但未达到法定数量的锁
for master in locked_instances:
master.delete(resource_id)
# 等待一段时间后重试
time.sleep(0.1)
raise Exception("Could not acquire lock")
def unlock(self, resource_id, masters_with_lock):
for master in masters_with_lock:
# 使用Lua脚本保证原子性地删除锁(与前面简单实现中的释放锁方法类似)
RELEASE_LOCK_SCRIPT = """
if redis.call("get", KEYS[1]) == "locked" then
return redis.call("del", KEYS[1])
else
return 0
end
"""
release_script = master.register_script(RELEASE_LOCK_SCRIPT)
release_script(keys=[resource_id])
# 示例用法:
masters = [('localhost', 6379, 0), ('localhost', 6380, 0)] # 假设有两个Redis实例
redlock = RedLock(masters, 10000) # 设置锁的有效期为10秒
resource_id = 'my_resource'
try:
masters_with_lock = redlock.lock(resource_id)
print(f"Acquired Redlock on resource {resource_id}")
# 执行临界区代码...
finally:
redlock.unlock(resource_id, masters_with_lock)
此示例仅为了说明Redlock算法的核心思想,并未涵盖所有边界条件和异常处理,如Redis实例宕机、网络延迟导致时间不一致等情况,请在生产环境中使用时结合实际情况进行优化和完善。
总结
通过以上介绍和示例代码,我们了解了如何基于Python与Redis实现不同层次复杂度的分布式锁。从简单的SETNX命令获取锁,到设置锁的超时时间以防止进程崩溃导致死锁,再到利用Lua脚本保证释放锁操作的原子性,以及针对大规模分布式环境考虑采用Redlock算法提高系统的容错性和安全性。这些方法可以根据实际业务场景选择合适的方式来实现分布式锁,从而有效解决多节点环境下对共享资源的竞争问题。随着技术的发展和需求的变化,分布式锁的实现方式也将持续演进和完善,为构建更稳定、高效的分布式系统提供有力支撑。
以上就是Python+Redis从零打造分布式锁实战示例的详细内容,更多关于Python Redis分布式锁的资料请关注编程网(www.lsjlt.com)其它相关文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341