Python插入数据到elasticse
短信预约 -IT技能 免费直播动态提醒
将一个文件中的内容逐条写入elasticsearch中,效率没有写hadoop高,跟kafka更没得比
import time
from elasticsearch import Elasticsearch
from collections import OrderedDict
start_time = time.time()
es = Elasticsearch(['localhost:9200'])
temp_list = []
with open('log.out','r',encoding='utf-8')as f:
data_list = f.readlines()
for data in data_list:
temp = OrderedDict()
temp['ServerIp'] = data.split('|')[0]
temp['SpiderType'] = data.split('|')[1]
temp['Level'] = data.split('|')[2]
temp['Date'] = data.split('|')[3]
temp['Type'] = data.split('|')[4]
temp['OffSet'] = data.split('|')[5]
temp['DockerId'] = data.split('|')[6]
temp['WebSiteId'] = data.split('|')[7]
temp['Url'] = data.split('|')[8]
temp['DateStamp'] = data.split('|')[9]
temp['NaviGationId'] = data.split('|')[10]
temp['ParentWebSiteId'] = data.split('|')[11]
temp['TargetUrlNum'] = data.split('|')[12]
temp['Timeconsume'] = data.split('|')[13]
temp['Success'] = data.split('|')[14]
temp['Msg'] = data.split('|')[15]
temp['Extend1'] = data.split('|')[16]
temp['Extend2'] = data.split('|')[17]
temp['Extend3'] = data.split('|')[18]
# temp_list.append(temp)
body = {'ServerIp': temp['ServerIp'],
'SpiderType': temp['SpiderType'],
'Level': temp['Level'],
'Date': temp['Date'],
'Type': temp['Type'],
'OffSet': temp['OffSet'],
'DockerId': temp['DockerId'],
'WebSiteId': temp['WebSiteId'],
'Url': temp['Url'],
'DateStamp': temp['DateStamp'],
'NaviGationId': temp['NaviGationId'],
'ParentWebSiteId': temp['ParentWebSiteId'],
'TargetUrlNum': temp['TargetUrlNum'],
'Timeconsume': temp['Timeconsume'],
'Success': temp['Success'],
'Msg': temp['Msg'],
'Extend1': temp['Extend1'],
'Extend2': temp['Extend2'],
'Extend3': temp['Extend3'],
}
es.index(index='shangjispider', doc_type='spider', body=body, id=None)
end_time = time.time()
t = end_time - start_time
print(t)
不得不说,这样搞,效率真的不高,插入287条用了30s,根本没法投入生产,在想别的办法
-----------------------------------------------------------------------------------------------------------------
又搞了半天,发现了一个新的方法,效率陡增啊,老铁,有木有!!!!
=》
==》
===》
====》
=====》
======》
-----------------------------------------------擦亮你的双眼---------------------------------------------------------
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
start_time = time.time()
es = Elasticsearch()
actions = []
f = open('log.out', 'r', encoding='utf-8')
data_list = f.readlines()
i = 0
for data in data_list:
line = data.split('|')
action = {
"_index": "haizhen",
"_type": "imagetable",
"_id": i,
"_source": {
'ServerIp': line[0],
'SpiderType': line[1],
'Level': line[2],
'Date': line[3],
'Type': line[4],
'OffSet': line[5],
'DockerId': line[6],
'WebSiteId': line[7],
'Url': line[8],
'DateStamp': line[9],
'NaviGationId': line[10],
'ParentWebSiteId': line[11],
'TargetUrlNum': line[12],
'Timeconsume': line[13],
'Success': line[14],
'Msg': line[15],
'Extend1': line[16],
'Extend2': line[17],
'Extend3': line[18],
}
}
i += 1
actions.append(action)
if len(action) == 1000:
helpers.bulk(es, actions)
del actions[0:len(action)]
if i > 0:
helpers.bulk(es, actions)
end_time = time.time()
t = end_time - start_time
print('本次共写入{}条数据,用时{}s'.format(i, t))
见证奇迹的时刻
这效率杠杠滴呀,反正是够我用了,先这样吧,需要提升了再想办法吧
老铁,记得给我点赞喏!!!!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341