Python读取Redis数据导出到Elasticsearch
jopen
9年前
#! usr/bin/python # -*- coding:utf-8 -*- import redis import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import sys, getopt def usage(): print 'usage: python cmd.py -e <elasticsearch host> -r <redis host> -p <redis list key prefix>' if __name__ == '__main__': opts, args = getopt.getopt(sys.argv[1:], "e:r:p:") elasticsearch_host = '' redis_host = '' redis_list_prefix= '' for op, value in opts: if op == "-e": elasticsearch_host = value elif op == "-r": redis_host = value elif op == "-p": redis_list_prefix = value if elasticsearch_host == '' or redis_host == '' or redis_list_prefix == '': usage() sys.exit(1) client = Elasticsearch([{'host' : elasticsearch_host}]) r = redis.Redis(host = redis_host, port=6379) doctype = "watchvideocount" #get all keys ends with day description except today videocount_keys = r.keys(redis_list_prefix + '_*') #start query data for k_index in videocount_keys: #get all <k, v> data kvalues = r.hgetall(k_index) #get day p = k_index.index('_') day = k_index[p+1:] #make index name index_name = k_index.lower() #create index try: client.indices.create(index=index_name) except Exception: print 'index ', index_name, ' exist' namapping = { doctype: { "properties": { "day":{"type": "date"}, "EID":{"type": "string"}, "CID":{"type": "string"}, "VID":{"type": "string"}, "VALUE":{"type": "integer"} } } } client.indices.put_mapping(index=index_name, doc_type=doctype, body=namapping) #data cache data_cache = [] for key in kvalues: fields = key.split('_') eid = fields[0] cid = fields[1] vid = fields[2] value = kvalues[key] #make primary key ID = day + '_' + eid + '_' + cid + '_' + vid #make JSON data json_data = {'_index':index_name, '_type':doctype, "_id":ID, "DAY":day, "EID":eid, "CID":cid, "VID":vid, "VALUE":value} #append data cache data_cache.append(json_data) #index into elasticsearch bulk(client, actions=data_cache, stats_only=True) print k_index, 'done'