Logstash+ElasticSearch处理mysql慢查询日志

yeyou 9年前
   <p>遇到一个需求, 需要查询某些业务的慢查询日志. 结果DBA平台那边提供的慢查询日志不能解决实际的业务场景(上报的字段补全), 无奈, 自己挽起袖子上</p>    <p>参考了 <a href="/misc/goto?guid=4959673684841117832" rel="nofollow,noindex">这篇文章</a> , 不过自己根据需求做了较多的变更</p>    <p>开始吧</p>    <h2>1. 找到日志的位置</h2>    <p>先确认是否开启了, 然后找到日志文件的位置</p>    <pre>  > show variables like '%slow%';  +---------------------+-------------------------------------+  | Variable_name       | Value                               |  +---------------------+-------------------------------------+  | log_slow_queries    | ON                                  |  | slow_launch_time    | 2                                   |  | slow_query_log      | ON                                  |  | slow_query_log_file | /data/mysqllog/20000/slow-query.log |  +---------------------+-------------------------------------+</pre>    <h2>2. 慢查询日志</h2>    <p>格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改</p>    <pre>  # Time: 160524  5:12:29  # User@Host: user_a[xxxx] @  [10.166.140.109]  # Query_time: 1.711086  Lock_time: 0.000040 Rows_sent: 385489  Rows_examined: 385489  use dbname;  SET timestamp=1464037949;  SELECT 1 from dbname;</pre>    <h2>3. 使用 logstash 采集</h2>    <p>采集, 无非是用 multiline 进行多行解析</p>    <p>但是, 需要处理的</p>    <p>第一个是, 去除掉没用的信息</p>    <p>第二个, 慢查询sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是 降噪 (将参数去掉, 涉及带引号的内容+数字), 将参数类信息过滤掉, 留下核心的sql, 然后计算出一个hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 <a href="/misc/goto?guid=4959673684923410287" rel="nofollow,noindex">mutate</a> 以及 <a href="/misc/goto?guid=4959673685011459713" rel="nofollow,noindex">checksum</a></p>    <pre>  # calculate unique hash    mutate {      add_field => {"sql_for_hash" => "%{sql}"}    }    mutate {      gsub => [          "sql_for_hash", "'.+?'", "",          "sql_for_hash", "-?\d*\.{0,1}\d+", ""      ]    }    checksum {      algorithm => "md5"      keys => ["sql_for_hash"]    }</pre>    <p>最后算出来的md5, 放入了 logstash_checksum</p>    <p>完整的logstash配置文件(具体使用可能需要根据自身日志格式做些小调整) 注意, 里面的pattern ALLWORD [\s\S]*</p>    <pre>  input {    file {      path => ["/data/mysqllog/20000/slow-query.log"]      sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"      type => "mysql-slow-log"      add_field => ["env", "PRODUCT"]      codec => multiline {        pattern => "^# User@Host:"        negate => true        what => previous      }    }  }  filter {    grok {      # User@Host: logstash[logstash] @ localhost [127.0.0.1]      # User@Host: logstash[logstash] @  [127.0.0.1]      match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]    }    grok {      # Query_time: 102.413328  Lock_time: 0.000167 Rows_sent: 0  Rows_examined: 1970      match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]    }      // remove useless data    mutate {      gsub => [          "sql", "\nSET timestamp=\d+?;\n", "",          "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",          "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",          "sql", "\n/usr/local/mysql/bin/mysqld.+$", "",          "sql", "\nTcp port:.+$", "",          "sql", "\nTime .+$", ""      ]    }      # Capture the time the query happened    grok {      match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]    }    date {      match => [ "timestamp", "UNIX" ]    }        # calculate unique hash    mutate {      add_field => {"sql_for_hash" => "%{sql}"}    }    mutate {      gsub => [          "sql_for_hash", "'.+?'", "",          "sql_for_hash", "-?\d*\.{0,1}\d+", ""      ]    }    checksum {      algorithm => "md5"      keys => ["sql_for_hash"]    }      # Drop the captured timestamp field since it has been moved to the time of the event    mutate {      # TODO: remove the message field      remove_field => ["timestamp", "message", "sql_for_hash"]    }  }  output {      #stdout{      #    codec => rubydebug      #}      #if ("_grokparsefailure" not in [tags]) {      #    stdout{      #        codec => rubydebug      #    }      #}      if ("_grokparsefailure" not in [tags]) {          elasticsearch {            hosts => ["192.168.1.1:9200"]            index => "logstash-slowlog"          }      }  }</pre>    <p>采集进去的内容</p>    <pre>  {             "@timestamp" => "2016-05-23T21:12:59.000Z",               "@version" => "1",                   "tags" => [          [0] "multiline"      ],                   "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",                   "host" => "Luna-mac-2.local",                   "type" => "mysql-slow",                    "env" => "PRODUCT",                   "user" => "dba_bak_all_sel",                     "ip" => "10.166.140.109",               "duration" => 28.812601,              "lock_wait" => 0.000132,                "results" => 749414,                "scanned" => 749414,                    "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",      "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"  }</pre>    <h2>4. 写查询</h2>    <p>查询, 我们需要按 logstash_checksum 进行聚合, 然后按照次数由多到少降序展示, 同时, 每个 logstash_checksum 需要有一条具体的sql进行展示</p>    <p>通过 es 的 <a href="/misc/goto?guid=4959673685091387237" rel="nofollow,noindex">Top hits Aggregation</a> 可以完美地解决这个查询需求</p>    <p>查询的query</p>    <pre>  body = {      "from": 0,      "size": 0,      "query": {          "filtered": {              "query": {                  "match": {                      "user": "test"                  }              },              "filter": {                  "range": {                      "@timestamp": {                          "gte": "now-1d",                          "lte": "now"                      }                  }              }          }      },      "aggs": {          "top_errors": {              "terms": {                  "field": "logstash_checksum",                  "size": 20              },              "aggs": {                  "top_error_hits": {                      "top_hits": {                          "sort": [                              {                                  "@timestamp":{                                      "order": "desc"                                  }                              }                          ],                          "_source": {                              "include": [                                 "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"                              ]                          },                          "size" : 1                      }                  }              }          }      }  }</pre>    <p>跟这个写法相关的几个参考链接: <a href="/misc/goto?guid=4959673685172226558" rel="nofollow,noindex">Terms Aggregation</a> / <a href="/misc/goto?guid=4959673685249542871" rel="nofollow,noindex">Elasticsearch filter document group by field</a></p>    <h2>5. 渲染页面</h2>    <p>python的后台, 使用 sqlparse 包, 将sql进行格式化(换行/缩进/大小写), 再往前端传. <a href="/misc/goto?guid=4959673685334648149" rel="nofollow,noindex">sqlparse</a></p>    <pre>  >>> sql = 'select * from foo where id in (select id from bar);'  >>> print sqlparse.format(sql, reindent=True, keyword_case='upper')  SELECT *  FROM foo  WHERE id IN    (SELECT id     FROM bar);</pre>    <p>然后在页面上, 使用js进行语法高亮 <a href="/misc/goto?guid=4959673685416650046" rel="nofollow,noindex">code-prettify</a></p>    <p> </p>    <p>来自: <a href="/misc/goto?guid=4959673685494300937" rel="nofollow">http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html</a></p>    <p> </p>