Python大批量写入数据(百万级别)的方法
短信预约 -IT技能 免费直播动态提醒
背景
现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案
方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码
1,先通过pandas读取所有csv数据存入列表。
2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
3,方案二 线程内以 executemany 方法批量插入所有数据。
4,方案一 线程内使用异步事件循环遍历所有数据异步插入。
5,方案一纯属没事找事型。
方案二
import threading
import pandas as pd
import asyncio
import time
import aiomysql
import pymysql
data=[]
error_data=[]
def run(start,end):
global data
global error_data
print("start"+threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
mysdb = getDb("*", *, "*", "*", "*")
cursor = mysdb.cursor()
sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
cursor.executemany(sql,data[start:end])
mysdb.commit()
mysdb.close()
print("end" + threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def csv_file_read_use_pd(csvFile):
csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
csv_result = csv_result.fillna(value="None")
result = csv_result.values.tolist()
return result
class MyDataBase:
def __init__(self,host=None,port=None,username=None,password=None,database=None):
self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
def close(self):
self.db.close()
def getDb(host,port,username,password,database):
MyDb = MyDataBase(host, port, username, password,database)
return MyDb.db
def main(csvFile):
global data #获取全局对象 csv全量数据
#读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行
thread_lens=20
csv_result=csv_file_read_use_pd(csvFile)
day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
for item in csv_result:
item.insert(0,day)
data=csv_result
thread_exe_count_list=[] #线程需要执行的区间
csv_lens=len(csv_result)
avg = csv_lens // thread_lens
remainder=csv_lens % thread_lens
# 0,27517 27517,55,034
nowIndex=0
for i in range(thread_lens):
temp=[nowIndex,nowIndex+avg]
nowIndex=nowIndex+avg
thread_exe_count_list.append(temp)
thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程
# print(thread_exe_count_list)
#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
for i in range(thread_lens):
sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
sub_thread.start()
sub_thread.join()
time.sleep(3)
if __name__=="__main__":
#csv_file_read_use_pd("分公司箱型箱量.csv")
main("分公司箱型箱量.csv")
方案一
import threading
import pandas as pd
import asyncio
import time
import aiomysql
data=[]
error_data=[]
async def async_basic(loop,start,end):
global data
global error_data
print("start"+threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
conn = await aiomysql.connect(
host="*",
port=*,
user="*",
password="*",
db="*",
loop=loop
)
day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
async with conn.cursor() as cursor:
for item in data[start:end]:
params=[day]
params.extend(item)
try:
x=await cursor.execute(sql,params)
if x==0:
error_data.append(item)
print(threading.current_thread().name+" result "+str(x))
except Exception as e:
print(e)
error_data.append(item)
time.sleep(10)
pass
await conn.close()
#await conn.commit()
#关闭连接池
# pool.close()
# await pool.wait_closed()
print("end" + threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def csv_file_read_use_pd(csvFile):
csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
csv_result = csv_result.fillna(value="None")
result = csv_result.values.tolist()
return result
def th(start,end):
loop = asyncio.new_event_loop()
loop.run_until_complete(async_basic(loop,start,end))
def main(csvFile):
global data #获取全局对象 csv全量数据
#读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行
thread_lens=20
csv_result=csv_file_read_use_pd(csvFile)
data=csv_result
thread_exe_count_list=[] #线程需要执行的区间
csv_lens=len(csv_result)
avg = csv_lens // thread_lens
remainder=csv_lens % thread_lens
# 0,27517 27517,55,034
nowIndex=0
for i in range(thread_lens):
temp=[nowIndex,nowIndex+avg]
nowIndex=nowIndex+avg
thread_exe_count_list.append(temp)
thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程
print(thread_exe_count_list)
#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
for i in range(thread_lens):
sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
sub_thread.start()
time.sleep(3)
if __name__=="__main__":
#csv_file_read_use_pd("分公司箱型箱量.csv")
main("分公司箱型箱量.csv")
总结
到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341