我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用spark分析mysql慢日志

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用spark分析mysql慢日志

熟悉oracle的童鞋都知道,在oracle中,有很多视图记录着sql执行的各项指标,我们可以根据自己的需求编写相应脚本,从oracle中获取sql的性能开销。作为开源数据库,mysql不比oracle,分析慢sql只能通过slow.log。slow.log看起来不够直观,而且同一条慢sql执行多次的话就会在slow.log中被记录多次,可阅读性较差。
最近,部门开发的数据库审计平台上线mysql审计模块,需要为客户提供一键化提取slow.log中慢sql的功能。由于本人之前研究过spark,在分析慢日志的文本结构后,使用scala语言,利用spark core相关技术,编写了能够去重slow.log中重复sql,并将按执行时间排序的top sql输入到hive表中的小程序。
话不多说,上菜!

开发环境:
1、CentOS 6.5
2、JDK 1.7
3、Hadoop 2.4.1
4、Hive 0.13
5、Spark 1.5.1
6、scala 2.11.4
hadoop及spark集群环境的搭建方法就不多说了哈,网上资料很多,对大数据感兴趣的童鞋可以尝试搭建。

step 1 使用scala ide for eclipse编写应用程序
analyzeSlowLog.scala:

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.util.matching.Regex
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext

object SlowLogAnalyze {
  def main(args: Array[String]): Unit = {
    //创建SparkConf,SparkContext和HiveContext
    val conf=new SparkConf()
      .setAppName("SlowLogAnalyze");
    val sc=new SparkContext(conf)
    val hiveContext=new HiveContext(sc)

    //读取hdfs文件,获取logRDD
    val logRDD=sc.textFile("hdfs://spark1:9000/files/slow.log", 5)

    //创建正则表达式,用来过滤slow.log中的无效信息
    val pattern1="# Time:".r
    val pattern2="# User@Host:".r
    val pattern3="SET timestamp=".r 

    //对logRDD进行filter,过滤无效信息
    val filteredLogRDD=logRDD.filter { str => 
          //正则返回的是option类型,只有Some和None两种类型
          if(pattern1.findFirstIn(str)!=None){
            false
          }else if(pattern2.findFirstIn(str)!=None){
            false
          }else if(pattern3.findFirstIn(str)!=None){
            false
          }else{
            true
          }
         }
    

    //将filteredLogRDD转换为数组
    val logArray=filteredLogRDD.toArray()

    //定义正则表达式pattern,用于识别Query_time
    val pattern="# Query_time:".r 

    //定义数组KV_Array,用于存放循环映射后的tuple,tuple为(query_time所在行,sql_text)
    val KV_Array=ArrayBuffer[(String,String)]()
          for (i<-0 until logArray.length){
             if(pattern.findFirstIn(logArray(i))!=None){
               val key=logArray(i)
               var flag=true 
               var value=""
               if(i<logArray.length-1){
                 for(k<-i+1 until logArray.length if flag ){
                   if(pattern.findFirstIn(logArray(k))!=None){
                     flag=false
                   }else{
                     value=value+logArray(k)
                   }
                 } 
               }
               KV_Array+=((key,value))
             }
           }

     //并行化集合获取KV_RDD
     val KV_RDD=sc.parallelize(KV_Array, 1)

     //执行map,将KV_RDD映射为(execute_time,sql_text)的tuple类RDD time_sql_RDD
     val sql_time_RDD=KV_RDD
         .map{tuple=>
             val timeSplit=tuple._1.split(" ")
             //注意这里是toDouble,不是toInt!!!!因为日志中的时间是Double类型!!!!
             (tuple._2,timeSplit(2).toDouble)
         }

     

     val groupBySqlRDD=sql_time_RDD.groupByKey()
         .map{tuple=>
             val timeArray=tuple._2.toArray
             var totalTime=0.0
             for(i<-0 until timeArray.length){
               totalTime=totalTime + timeArray(i)
             }
             val avgTime=totalTime/timeArray.length
             (tuple._1,avgTime)
         }

     val sortedRowRDD=groupBySqlRDD
         .map{tuple=>(tuple._2,tuple._1)}
         .sortByKey(false, 1)
         .map{tuple=>Row(tuple._2,tuple._1)}
     val top10Array=sortedRowRDD.take(10)
     val top10RDD=sc.parallelize(top10Array, 1)
     //将sortedRDD转换为dataframe 
     val structType=new StructType(Array(
           StructField("sql_text",StringType,true),
           StructField("executed_time",DoubleType,true)
           )
         )
     val top10DF=hiveContext.createDataFrame(top10RDD, structType) 
     hiveContext.sql("drop table if exists sql_top10")
     top10DF.saveAsTable("sql_top10")
  }
}

将代码打成jar包并上传至linux。
step 2 编写执行脚本
analyzeSlowLog.sh:

/var/software/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \
--class cn.spark.study.sql.SlowLogAnalyze \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /var/software/hive/conf/hive-site.xml \
--driver-class-path /var/software/hive/lib/mysql-connector-java-5.1.17.jar \
/var/software/spark_study/scala/SlowLogAnalyze.jar

step 3 执行analyzeSlowLog.sh,并进入hive查看分析结果:
hive> show tables;
OK
daily_top3_keywords_uvs
good_students
sql_top10 -- 这张表就是scala程序中定义的表名,程序运行时会在hive中创建
student_infos
student_scores
Time taken: 0.042 seconds, Fetched: 5 row(s)

查看sql_top10中的内容:
这里由于长度限制,截断了sql文本,所以看起来部分sql是一样的,实际是两条不同的sql(where 条件不同)。
hive> select substr(sql_text,1,50),executed_time from sql_top10;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
Execution completed successfully
MapredLocal task succeeded
OK
select 'true' as QUERYID, ID_GARAG 0.0252804
select count() from pms_garage_vitri_info 0.0048902
select count(
) from information_schema.PROCESSLIS 3.626E-4
select 'true' as QUERYID, e_survey 2.39E-4
select 'true' as QUERYID, e_survey 2.34E-4
SELECT account_code AS um 2.2360000000000001E-4
select 'true' as QUERYID, e_survey 2.19E-4
select 'true' as QUERYID, e_survey 2.18E-4
select 'true' as QUERYID, e_survey 2.15E-4
SELECT account_code AS um 2.1419999999999998E-4
Time taken: 8.501 seconds, Fetched: 10 row(s)

至此,对mysql slow.log的提取完毕!

关于在mysql中创建相关视图的思考:
hadoop和spark一般用于处理大数据,这里用来处理mysql的慢日志实在是大材小用。不过,要想在mysql中提供查看数据库top sql的v$Topsql视图,对slow.log的实时分析是必须的,此时,spark streaming便可派上用场。
思路如下:
1.编写crontab定时任务以定时拷贝slow.log至hdfs
2.编写crontab定时任务以调用spark streaming程序分析hdfs上的最新slow.log ->通过jdbc将将top sql输出到对应mysql数据库中的v$Topsql视图中,并覆盖之前的数据。
ps:在分析slow.log时,可在程序中executor,timestamp等字段(本文中并未提取这两个字段),以提供更详细的信息。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用spark分析mysql慢日志

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

MySQL慢查询日志分析

MySQL慢查询日志是MySQL服务器用来记录慢查询操作的一种日志打开慢查询日志功能:在MySQL配置文件(例如:my.cnf或my.ini)中,添加以下配置选项:slow_query_log = "1"slow_query_log_
MySQL慢查询日志分析
2024-10-20

Linux下MySQL的慢查询日志分析

MySQL的慢查询日志是记录MySQL执行时间超过指定阈值的查询语句的日志,在Linux下可以通过以下步骤来进行慢查询日志的分析:打开MySQL配置文件my.cnf,找到slow_query_log参数,将其设置为ON,并设置slow_qu
Linux下MySQL的慢查询日志分析
2024-08-16

MySQL SQL性能分析之慢查询日志、explain使用详解

目录SQL执行频率慢查询日志profileexplainSQL执行频率mysql 客户端连接成功后,通过 show [session|global] status 命令可以提供服务器状态信息。通过如下指令,可以查看当前数据库的insert
2023-04-14

MySQL SQL性能分析之慢查询日志、explain使用详解

这篇文章主要介绍了MySQL SQL性能分析 慢查询日志、explain使用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-05-16

MySQL慢日志查询分析方法与工具

MySQL中的日志包括:错误日志、二进制日志、通用查询日志、慢查询日志等等。这里主要介绍下比较常用的两个功能:通用查询日志和慢查询日志。 1)通用查询日志:记录建立的客户端连接和执行的语句。 2)慢查询日志:记录所有执行时间超过long_query_time秒
MySQL慢日志查询分析方法与工具
2017-05-29

MySQL优化之慢查询日志实例分析

本篇内容主要讲解“MySQL优化之慢查询日志实例分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MySQL优化之慢查询日志实例分析”吧!一、慢查询日志概念对于SQL和索引的优化问题,我们会使用
2023-07-02

编程热搜

目录