Python读取Redis数据导出到Elasticsearch

jopen 9年前

#! usr/bin/python  # -*- coding:utf-8 -*-  import redis  import datetime  from elasticsearch import Elasticsearch  from elasticsearch.helpers import bulk  import sys, getopt    def usage():   print 'usage: python cmd.py -e <elasticsearch host> -r <redis host> -p <redis list key prefix>'    if __name__ == '__main__':      opts, args = getopt.getopt(sys.argv[1:], "e:r:p:")     elasticsearch_host = ''   redis_host = ''   redis_list_prefix= ''   for op, value in opts:    if op == "-e":     elasticsearch_host = value    elif op == "-r":     redis_host = value    elif op == "-p":     redis_list_prefix = value       if elasticsearch_host == '' or redis_host == '' or redis_list_prefix == '':    usage()    sys.exit(1)       client = Elasticsearch([{'host' : elasticsearch_host}])     r = redis.Redis(host = redis_host, port=6379)     doctype = "watchvideocount"     #get all keys ends with day description except today   videocount_keys = r.keys(redis_list_prefix + '_*')     #start query data   for k_index in videocount_keys:    #get all <k, v> data    kvalues = r.hgetall(k_index)        #get day    p = k_index.index('_')    day = k_index[p+1:]          #make index name     index_name = k_index.lower()      #create index    try:     client.indices.create(index=index_name)    except Exception:     print 'index ', index_name, ' exist'    namapping = {      doctype: {       "properties": {        "day":{"type": "date"},        "EID":{"type": "string"},        "CID":{"type": "string"},        "VID":{"type": "string"},        "VALUE":{"type": "integer"}       }      }     }    client.indices.put_mapping(index=index_name, doc_type=doctype, body=namapping)        #data cache    data_cache = []     for key in kvalues:     fields = key.split('_')     eid = fields[0]     cid = fields[1]     vid = fields[2]     value = kvalues[key]     #make primary key     ID = day + '_' + eid + '_' + cid + '_' + vid       #make JSON data     json_data = {'_index':index_name, '_type':doctype, "_id":ID, "DAY":day, "EID":eid, "CID":cid, "VID":vid, "VALUE":value}      #append data cache     data_cache.append(json_data)        #index into elasticsearch     bulk(client, actions=data_cache, stats_only=True)    print k_index, 'done'


来自: http://my.oschina.net/u/2242064/blog/553022