Logstash+ElasticSearch处理mysql慢查询日志
yeyou
9年前
<p>遇到一个需求, 需要查询某些业务的慢查询日志. 结果DBA平台那边提供的慢查询日志不能解决实际的业务场景(上报的字段补全), 无奈, 自己挽起袖子上</p> <p>参考了 <a href="/misc/goto?guid=4959673684841117832" rel="nofollow,noindex">这篇文章</a> , 不过自己根据需求做了较多的变更</p> <p>开始吧</p> <h2>1. 找到日志的位置</h2> <p>先确认是否开启了, 然后找到日志文件的位置</p> <pre> > show variables like '%slow%'; +---------------------+-------------------------------------+ | Variable_name | Value | +---------------------+-------------------------------------+ | log_slow_queries | ON | | slow_launch_time | 2 | | slow_query_log | ON | | slow_query_log_file | /data/mysqllog/20000/slow-query.log | +---------------------+-------------------------------------+</pre> <h2>2. 慢查询日志</h2> <p>格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改</p> <pre> # Time: 160524 5:12:29 # User@Host: user_a[xxxx] @ [10.166.140.109] # Query_time: 1.711086 Lock_time: 0.000040 Rows_sent: 385489 Rows_examined: 385489 use dbname; SET timestamp=1464037949; SELECT 1 from dbname;</pre> <h2>3. 使用 logstash 采集</h2> <p>采集, 无非是用 multiline 进行多行解析</p> <p>但是, 需要处理的</p> <p>第一个是, 去除掉没用的信息</p> <p>第二个, 慢查询sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是 降噪 (将参数去掉, 涉及带引号的内容+数字), 将参数类信息过滤掉, 留下核心的sql, 然后计算出一个hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 <a href="/misc/goto?guid=4959673684923410287" rel="nofollow,noindex">mutate</a> 以及 <a href="/misc/goto?guid=4959673685011459713" rel="nofollow,noindex">checksum</a></p> <pre> # calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] }</pre> <p>最后算出来的md5, 放入了 logstash_checksum</p> <p>完整的logstash配置文件(具体使用可能需要根据自身日志格式做些小调整) 注意, 里面的pattern ALLWORD [\s\S]*</p> <pre> input { file { path => ["/data/mysqllog/20000/slow-query.log"] sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb" type => "mysql-slow-log" add_field => ["env", "PRODUCT"] codec => multiline { pattern => "^# User@Host:" negate => true what => previous } } } filter { grok { # User@Host: logstash[logstash] @ localhost [127.0.0.1] # User@Host: logstash[logstash] @ [127.0.0.1] match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ] } grok { # Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970 match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"] } // remove useless data mutate { gsub => [ "sql", "\nSET timestamp=\d+?;\n", "", "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "", "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "", "sql", "\n/usr/local/mysql/bin/mysqld.+$", "", "sql", "\nTcp port:.+$", "", "sql", "\nTime .+$", "" ] } # Capture the time the query happened grok { match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ] } date { match => [ "timestamp", "UNIX" ] } # calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] } # Drop the captured timestamp field since it has been moved to the time of the event mutate { # TODO: remove the message field remove_field => ["timestamp", "message", "sql_for_hash"] } } output { #stdout{ # codec => rubydebug #} #if ("_grokparsefailure" not in [tags]) { # stdout{ # codec => rubydebug # } #} if ("_grokparsefailure" not in [tags]) { elasticsearch { hosts => ["192.168.1.1:9200"] index => "logstash-slowlog" } } }</pre> <p>采集进去的内容</p> <pre> { "@timestamp" => "2016-05-23T21:12:59.000Z", "@version" => "1", "tags" => [ [0] "multiline" ], "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log", "host" => "Luna-mac-2.local", "type" => "mysql-slow", "env" => "PRODUCT", "user" => "dba_bak_all_sel", "ip" => "10.166.140.109", "duration" => 28.812601, "lock_wait" => 0.000132, "results" => 749414, "scanned" => 749414, "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;", "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371" }</pre> <h2>4. 写查询</h2> <p>查询, 我们需要按 logstash_checksum 进行聚合, 然后按照次数由多到少降序展示, 同时, 每个 logstash_checksum 需要有一条具体的sql进行展示</p> <p>通过 es 的 <a href="/misc/goto?guid=4959673685091387237" rel="nofollow,noindex">Top hits Aggregation</a> 可以完美地解决这个查询需求</p> <p>查询的query</p> <pre> body = { "from": 0, "size": 0, "query": { "filtered": { "query": { "match": { "user": "test" } }, "filter": { "range": { "@timestamp": { "gte": "now-1d", "lte": "now" } } } } }, "aggs": { "top_errors": { "terms": { "field": "logstash_checksum", "size": 20 }, "aggs": { "top_error_hits": { "top_hits": { "sort": [ { "@timestamp":{ "order": "desc" } } ], "_source": { "include": [ "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned" ] }, "size" : 1 } } } } } }</pre> <p>跟这个写法相关的几个参考链接: <a href="/misc/goto?guid=4959673685172226558" rel="nofollow,noindex">Terms Aggregation</a> / <a href="/misc/goto?guid=4959673685249542871" rel="nofollow,noindex">Elasticsearch filter document group by field</a></p> <h2>5. 渲染页面</h2> <p>python的后台, 使用 sqlparse 包, 将sql进行格式化(换行/缩进/大小写), 再往前端传. <a href="/misc/goto?guid=4959673685334648149" rel="nofollow,noindex">sqlparse</a></p> <pre> >>> sql = 'select * from foo where id in (select id from bar);' >>> print sqlparse.format(sql, reindent=True, keyword_case='upper') SELECT * FROM foo WHERE id IN (SELECT id FROM bar);</pre> <p>然后在页面上, 使用js进行语法高亮 <a href="/misc/goto?guid=4959673685416650046" rel="nofollow,noindex">code-prettify</a></p> <p> </p> <p>来自: <a href="/misc/goto?guid=4959673685494300937" rel="nofollow">http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html</a></p> <p> </p>