python重试装饰器(Python function retry decorator)
短信预约 -IT技能 免费直播动态提醒
python重试装饰器(Python function retry decorator)
在用requests请求接口或者html的时候,很容易出现超时,限制等各种原因。
在对源代码不进行修改的情况下,可以用装饰器来进行重试
任何函数: 成功,返回-结果,失败,返回--False 都可以用这个装饰器进行重试
1.不需要传参的装饰器写法:
max_retry 为默认重试的次数
import requests
def retry(func):
def inner(*args, **kwargs):
ret = func(*args, **kwargs)
max_retry = 3
number = 0
if not ret:
while number < max_retry:
number += 1
print("尝试第:{}次".format(number))
result = func(*args, **kwargs)
if result:
break
return inner
@retry
def get_reponse():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
}
url = 'https://www.baidu.com'
try:
r = requests.get(url=url, headers=headers, timeout=0.01)
if r.status_code == 200:
print(r.headers)
return r.headers
except requests.exceptions.ReadTimeout:
return False
except requests.exceptions.ConnectTimeout:
return False
except Exception:
return False
get_reponse()
执行结果:
尝试第:1次
尝试第:2次
尝试第:3次
2.需要传参的装饰器写法:
def retry(*args, **kwargs):
def warpp(func):
def inner():
ret = func()
max_retry = kwargs.get('max_retry')
# 不传默认重试3次
if not max_retry:
max_retry = 3
number = 0
if not ret:
while number < max_retry:
number += 1
print("尝试第:{}次".format(number))
result = func()
if result:
break
return inner
return warpp
@retry(max_retry=5) # 不传默认为3次 @retry() 括号需要带
def get_reponse():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
}
url = 'https://www.baidu.com'
try:
r = requests.get(url=url, headers=headers, timeout=0.01)
if r.status_code == 200:
print(r.headers)
return r.headers
except requests.exceptions.ReadTimeout:
return False
except requests.exceptions.ConnectTimeout:
return False
except Exception:
return False
get_reponse()
执行结果:
尝试第:1次
尝试第:2次
尝试第:3次
尝试第:4次
尝试第:5次
3. 装饰器在类的用法
import requests
class Test:
def retry(*args, **kwargs):
def warpp(func):
def inner(self, *args, **kwargs):
ret = func(self, *args, **kwargs)
max_retry = kwargs.get('max_retry')
# 不传默认重试3次
if not max_retry:
max_retry = 3
number = 0
if not ret:
while number < max_retry:
number += 1
print("尝试第:{}次".format(number))
result = func(self, *args, **kwargs)
if result:
break
return inner
return warpp
@retry()
def get_reponse(self, url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
}
try:
r = requests.get(url=url, headers=headers, timeout=0.01)
if r.status_code == 200:
print(r.headers)
return r.headers
except requests.exceptions.ReadTimeout:
return False
except requests.exceptions.ConnectTimeout:
return False
except Exception:
return False
if __name__ == '__main__':
st = Test()
url = 'https://www.baidu.com'
st.get_reponse(url)
运行结果:
尝试第:1次
尝试第:2次
尝试第:3次
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341