研究机器学习之MLlib实践经验

jopen 10年前

本文主要讨论是用MLlib进行Classification工作。典型的应用场景就是AD CTR Prediction,也就是大部分互联网公司的利润来源。据业余了解,广告CTR预估使用最多的基础算法还是L1正则化的Logistic Regression。

机器学习任务主要分为两种:Supervised Machine Learning 和 Unsupervised Machine Learning其中Supervised Machine Learning主要包括Classification和Regression,Unsupervised Machine Learning主要包括Clustering。除了这些核心的算法以外,还有一些辅助处理的模块,例如Preprocessing, Dimensionality Reduction, Model Selection等。

目前最新的Spark 1.1.0版本中MLlib主要还是对核心算法的支持,辅助处理模块还很不完善。源代码包和其功能的对应关系如下:

Classification/Clustering/Regression/Tree

分类算法、回归算法、决策树、聚类算法

Optimization

核心算法的优化方法实现

STAT

基础统计

Feature

预处理

Evaluation

算法效果衡量

Linalg

基础线性代数运算支持

Recommendation

推荐算法

本文主要讨论是用MLlib进行 Classification工作。分类是机器学习最基础的工作,典型的应用场景就是AD CTR Prediction,也就是大部分互联网公司的利润来源。据业余了解,广告CTR预估使用最多的基础算法还是L1正则化的Logistic Regression。

下面一步一步来看看使用MLlib进行Classification机器学习。

1、分类算法原理与MLlib的实现

首先需要了解机器学习和MLlib的基础知识和原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-Linear-Methods.html 。本文主要从工程实践的角度讨论如何使用和调优。

分类问题主要包括Binary Classification 和 Multiclass Classification。目前的MLlib只支持Linear Classification问题,这里讨论的也都是线性分类问题,不涉及到Kernel Method等。目前MLlib里面的Classification算法最常用的就是LR,SVM和Tree/RF。其中LR和SVM目前只支持 Binary Classification,Tree/RF支持Multiclass Classification。本文主要讨论使用LR和SVM进行线性Binary Classification问题的实践中遇到的一些问题。

抽象来看LR和SVM算法都是通过指定Loss Function和Gradient/SUB-Gradient,然后通过Optimization算法(SGD或LBFGS)求使得Loss Function最小的凸优化问题,最后得出的解是一个Weights向量。从代码中也可以看出,LR和SVM算法仅仅是指定的Loss Function和Gradient是不同的,其求解最小值的过程是通用的,所以求解最小值的过程抽象出了Optimization模块,目前主要有 SGD和LBFGS两种实现。

为了防止过拟合,需要在Loss Function后面加入一个正则化项一起求最小值。正则化项相当于对Weights向量的惩罚,期望求出一个更简单的模型。 MLlib目前支持两种正则化方法L1和L2。 L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。

2、两种Optimization方法SGDLBFGS

有了上面的数学基础,现在就是求取一个函数的最小值问题了。MLlib里面目前提供两种方法SGD和LBFGS。关于这两种算法的原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-Optimization.html 。这两种优化方法的核心都是RDD的aggregate操作,这个从Spark Job运行时的UI中可以看出,SGD/LBFGS每迭代一次,aggregate执行一次,Spark UI中出现一个stage。下面分别看看两种优化算法具体怎么实现的。

SGD:

核心实现在GradientDescent.runMiniBatchSGD函数中

    for (i <- 1 to numIterations) {              val bcWeights = data.context.broadcast(weights)              // Sample a subset (fraction miniBatchFraction) of the total data              // compute and sum up the subgradients on this subset (this is one map-reduce)              val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)                .treeAggregate((BDV.zeros[Double](n), 0.0))(                  seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>                    val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))                    (grad, loss + l)                  },                  combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>                    (grad1 += grad2, loss1 + loss2)                  })                       /**              * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration              * and regVal is the regularization value computed in the previous iteration as well.              */              stochasticLossHistory.append(lossSum / miniBatchSize + regVal)              val update = updater.compute(                weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)              weights = update._1              regVal = update._2            }  
</div> </div>

每次根据MiniBatchFraction指定的比例随机采样相应数量的样本,然后调用Gradient.Compute逐个计算Gradient和 Loss并累加在一起,得到这一轮迭代总的Gradient和Loss的变化。然后调用Updater.Compute更新Weights矩阵和 RegVal(正则化项)。也就是说Gradient.Compute只是利用当前Weights向量计算Gradient和Loss的变化,而 Updater.Compute则根据这个变化以及正则化项计算更新之后的Weights和RegVal。

SGD算法支持使用L1和L2正则化方法。

注意到这里Weights向量在下一轮计算的过程中每个参与计算的Executor都需要,所以使用了Broadcast变量把它分发到每个节点,提高了计算效率。

LBFGS:

LBFGS 优化方法的核心实现在LBFGS.runLBFGS函数里面。LBFGS的实现比SGD更加依赖Breeze库,它的迭代框架都是使用的Breeze的 LBFGS的实现,只是实现了自己的名为CostFun的DiffFunction。大家可以去LBGFS.CostFun函数中看看Loss Function和Gradient的计算方法与SGD算法如出一辙,也是利用了RDD的Aggregate操作。

和SGD不同,目前 LBFGS只支持L2正则化,不支持L1正则化。 其实在Breeze库里面有LBFGS + L1正则化的实现OWLQN(OWLQN算法默认自带L1正则化,所以在传入的参数DiffFunction中不需要显示定义正则化项,只需要定义 Loss Function即可)是可以把它引入MLlib里面完成LBFGS+L1的功能。 这个在社区也有讨论,DB Tsai等人正在做这方面的工作。等不及的同学也可以尝试下我们自己修改的版本,引入了LBFGS+L1的功能的代码。

SGD和LBFGS两种算法的比较:

网上的资料都告诉我们说LBFGS比SGD更容易收敛,效果更好,大家可以亲自尝试下。例如选择Logistic Regression算法,选取同一个数据集,在做Training和Test集合的分配的时候也要一致。然后把生成的Training和Test的 RDD分别丢到LogisticRegressionWithSGD和LogisticRegressionWithLBFGS两种具体实现算法里。其他参数要一致(例如都选择L2正则化,regParam=1.0)然后比较效果。

怎么比较效果的好坏呢?分类问题就是那些指标 Precision/Recall/F1-score/Area Under ROC等。这里面有个需要注意的问题,MLlib的实现里面,SGD优化的终止条件是通过指定NumIterations也就是迭代次数终止的;而 LBFGS优化的终止条件是通过指定ConvergenceTol(两次迭代Loss Function变化的容忍度)和MaxnumIterations(最大迭代次数)来终止的。

为了达到比较这两种优化方法的目的,需要定义一个统一的指标。由于我们优化的目标是让Loss Function+正则化项 最小,所以这个就是统一的指标。MLlib的日志里面会打印出迭代的最后10次的Loss大小:

//GradientDescent

    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(                 stochasticLossHistory.takeRight(10).mkString(", ")))  
</div> </div> //LBFGS
    logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(                 lossHistory.takeRight(10).mkString(", ")))  
</div> </div>

我在一个数据集上测试,其他参数都一致(L2正则化,RegParam=1.0,MiniBatchFraction=1.0)的情况下,LBFGS需要 14次就可以达到收敛;而对于SGD(实际上MiniBatchFraction=1.0的已经不是纯种SGD了)算法大约需要10000次循环才能达到相似的Loss大小。而且最后两种算法得到的Weights(权重矩阵)是一样的。

3,如何解读和分析训练出来的模型

训练出来的模型实际上就是一个Weights向量,现在MLlib的GeneralizedLinearModel的成员变量Weights和 Intercept都是Public的了。 在训练好GeneralizedLinearModel之后,可以直接把Weights和Intercept打印出来,或者进行一些计算找出权重最大的几个维度。

这里有个问题,如果我的Loss Function和Gradient是确定的,那么使用不同的优化方法求出的Weights是不是应该是一样的?例如对同一个数据集,使用 LogisticRegressionWithSGD和LogisticRegressionWithLBFGS分别训练出的模型是否应该是一致的?怎么衡量效果好坏?

对于这个问题,我想首先要看几点:

1)  确认两种方法使用的正则化和其他参数是一致的。

2)  通过日志查看两种优化方法的Loss Function+正则化项最后是否收敛?是否收敛到接近的值?

3)  如果有某种优化算法的Loss不收敛,说明那种方法的迭代次数不够(SGD)或者ConvergenceTol设置的太大(LBFGS),调整参数重试。如果收敛到相差较大的值,十有八九你的算法有问题。

4)  如果两种优化方法的Loss Function+正则化项收敛且到接近的值,那么就要看看传统指标Precision/Recall/Area Under ROC。如果Area Under ROC接近1.0,说明你的数据完全线性可分或者过拟合了,这个时候打印出两种方法得到的Weights应该是类似倍数关系的;如果Area Under ROC不是接近1.0,那么说明你的数据是真实的数据,这个时候两种方法得到的Weights应该是一致的。

5)  接下来就可以使用上述的思路去调节参数优化Precision/Recall等指标了。

4,预处理

其实上面的工作还只是停留在学习算法的阶段,拿过来一些公开的Dataset或者Benchmark来跑跑,看看效果。这些数据集往往都是经过了别人的一些加工和处理,在实际工作中这一部分也是需要我们来做的,这就是数据预处理。数据预处理特别繁琐,但是对机器学习模型效果的好坏却非常重要。

预处理主要包括Normalization,Scale,Outlier-Detection,正负样本均衡等。例如遇到一个数据正负样本比例9:1,这样的数据直接丢到模型里面显然会让模型更偏向正例做预测。解决这个问题的方法挺多的,最简单的例子就是正例采样、负例冗余,使两者达到接近平衡。

MLlib 现阶段主要精力还是在核心算法上,对预处理这部分做的不是很好,这也提高了使用门槛。在1.1.0版本开始增加了一个新的Package叫 Feature,里面大多是Preprocssing函数,包括Normalizer和StandardScaler等。例如 StandardScaler能够把Feature按照列转换成Mean=0,Standard deviation=1的正态分布。

数据输入支持普通文本文件和LIBSVM格式。在1.1.0版本之前,输入的Label可以是+1/-1会被自动映射成1.0/0.0,但是从1.1.0开始貌似只支持输入1.0/0.0的Label了,这个和我们以前常见的LIBSVM格式的数据集不一样,需要注意。这个改动应该主要考虑的是对以后多分类的支持吧。

5,MLlib中的Vector和线性代数运算

不知道大家有没有注意到一个问题,就是MLlib底层的矩阵运算使用了Breeze库,Breeze库提供了Vector/Matrix的实现以及相应计算的接口(Linalg)。但是在MLlib里面同时也提供了Vector和Linalg等的实现(目前只是对Breeze做了一层包装)。在所有的 MLlib的函数里面的参数传递都是使用Mllib自己的Vector,而且在函数内的矩阵计算又通过ToBreeze.ToDenseVector变成 Breeze的形式进行运算。这样做的目的一是保持自己函数接口的稳定性,不会因为Breeze的变化而变化;另外一个就是可以把Distributed Matrix作为一种Matrix的实现而被使用。

6,开发环境

Spark集群(Standalone、Yarn-Client、Yarn-Cluster、单机调试环境)。

我主要使用Scala开发,IDE为Intellij IDEA,安装Scala插件。

开发一个Project可以使用Maven或者SBT编译,都可以通过IDEA创建相应的工程。 Maven编译的话和Java的Maven工程没啥区别,主要是修改Pom.xml文件;使用SBT编译的话,主要是修改Build.sbt文件。

Build.sbt的格式网上有很多资料了,简单说下需要注意的问题:

1) 必须每隔一行写新的内容;

2)LibraryDependencies 后面%%和%的区别:ArtifactId后面带/不带版本号;

3)LibraryDependencies 后面可以使用”Provided”使其在Assembly打包的时候不被打入包中。

开发一个基于Spark和MLlib的机器学习Job,主要依赖的两个LibraryDependencies就是Spark-Core和Spark-Mllib。

其实使用Scala开发Spark程序最重要的一点就是要知道你写的代码中哪些是RDD的操作,哪些是在RDD内部的操作,哪些是 Transform,哪些是Actions,哪个地方会形成一个Stage。这些搞清楚之后就明白了哪些Code是在Driver上执行的,哪些是在 Executor上并行执行的。另外就是哪些资源相关的参数,像Executor-Memory和Num-Executors等。关于这方面的内容在后面的介绍Tree和Random Forest的博客中讨论。

原文链接:mllib实践经验    (责编/刘亚琼)