这几天来看了一些hadoop相关的书,发现这个叫做mahout的库非常的有用,在这里mark一下
并找了一些相关的文章,这里是其中一篇,写的挺不错的。
转自:http://www.cnblogs.com/vivounicorn/archive/2011/10/08/2201986.html
——————–正文开始——————–
K-Means这个词第一次使用是在1967,但是它的思想可以追溯到1957年,它是一种非常简单地基于距离的聚类算法,认为每个Cluster由相似的点组成而这种相似性由距离来衡量,不同Cluster间的点应该尽量不相似,每个Cluster都会有一个“重心”;另外它也是一种排他的算法,即任意点必然属于某一Cluster且只属于该Cluster。当然它的缺点也比较明显,例如:对于孤立点敏感、产生最终聚类之间规模的差距不大。
一、基本思想
1、数学描述
给定d维实数向量(),后面就将这个实数向量称作点吧,短!K-Means算法会根据事先制定的参数k,将这些点划分出k个Cluster(k ≤ n),而划分的标准是最小化点与Cluster重心(均值)的距离平方和,假设这些Cluster为:,则数学描述如下:
![arg_Cmin sum limit_{i=1}^{k} sum limit_{x_j in C_i}{ | x_j-mu_i | }^2 ](http://chart.apis.google.com/chart?cht=tx&chl=arg_Cmin+%5csum+%5climit_%7bi%3d1%7d%5e%7bk%7d+%5csum+%5climit_%7bx_j+%5cin+C_i%7d%7b%7c%7cx_j-%5cmu_i%7c%7c%7d%5e2%0a),其中为第i个Cluster的“重心”(Cluster中所有点的平均值)。 |
聚类的效果类似下图:
具体可见:http://en.wikipedia.org/wiki/K-means_clustering
2、算法步骤
典型的算法如下,它是一种迭代的算法:
(1)、根据事先给定的k值建立初始划分,得到k个Cluster,比如,可以随机选择k个点作为k个Cluster的重心,又或者用Canopy Clustering得到的Cluster作为初始重心(当然这个时候k的值由Canopy Clustering得结果决定);
(2)、计算每个点到各个Cluster重心的距离,将它加入到最近的那个Cluster;
(3)、重新计算每个Cluster的重心;
(4)、重复过程2~3,直到各个Cluster重心在某个精度范围内不变化或者达到最大迭代次数。
别看算法简单,很多复杂算法的实际效果或许都不如它,而且它的局部性较好,容易并行化,对大规模数据集很有意义;算法时间复杂度是:O(nkt),其中:n 是聚类点个数,k 是Cluster个数,t 是迭代次数。
三、并行化策略
K-Means较好地局部性使它能很好的被并行化。第一阶段,生成Cluster的过程可以并行化,各个Slaves读取存在本地的数据集,用上述算法生成Cluster集合,最后用若干Cluster集合生成第一次迭代的全局Cluster集合,然后重复这个过程直到满足结束条件,第二阶段,用之前得到的Cluster进行聚类操作。
用map-reduce描述是:datanode在map阶段读出位于本地的数据集,输出每个点及其对应的Cluster;combiner操作对位于本地包含在相同Cluster中的点进行reduce操作并输出,reduce操作得到全局Cluster集合并写入HDFS。
四、Mahout K-Means Clustering说明
mahout实现了标准K-Means Clustering,思想与前面相同,一共使用了2个map操作、1个combine操作和1个reduce操作,每次迭代都用1个map、1个combine和一个reduce操作得到并保存全局Cluster集合,迭代结束后,用一个map进行聚类操作。可以在mahout-core下的src/main/java中的package:org.apache.mahout.clustering.kmeans中找到相关代码:
1、数据模型
可以参见上一篇相同标题。
2、目录结构
从目录结构上说,需要两个输入目录:一个用于保存数据点集——input,一个用来保存点的初始划分——clusters;在形成Cluster的阶段,每次迭代会生成一个目录,上一次迭代的输出目录会作为下一次迭代的输入目录,这种目录的命名为:Clusters-“迭代次数”;最终聚类点的结果会放在clusteredPoints文件夹中而Cluster信息放在Clusters-“最后一次迭代次数”文件夹中。
3、如何抽象Cluster
可以从Cluster.java及其父类,对于Cluster,mahout实现了一个抽象类AbstractCluster封装Cluster,具体说明可以参考上一篇文章,这里做个简单说明:
(1)、private int id; #每个K-Means算法产生的Cluster的id
(2)、private long numPoints; #Cluster中包含点的个数,这里的点都是Vector
(3)、private Vector center; #Cluster的重心,这里就是平均值,由s0和s1计算而来。
(4)、private Vector Radius; #Cluster的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,由s0、s1和s2计算而来。
(5)、private double s0; #**表示Cluster**包含点的权重之和,
(6)、private Vector s1; #**表示Cluster**包含点的加权和,
(7)、private Vector s2; #**表示Cluster**包含点平方的加权和,
(8)、public void computeParameters(); #根据s0、s1、s2计算numPoints、center和Radius:
这几个操作很重要,最后三步很必要,在后面会做说明。
(9)、public void observe(VectorWritable x, double weight); #每当有一个新的点加入当前Cluster时都需要更新s0、s1、s2的值
(10)、public ClusterObservation getObservations(); #这个操作在combine操作时会经常被用到,它会返回由s0、s1、s2初始化的ClusterObservation对象,表示当前Cluster中包含的所有被观察过的点
五、K-Means Clustering的Map-Reduce实现
K-Means Clustering的实现同样包含单机版和MR两个版本,单机版就不说了,MR版用了两个map操作、一个combine操作和一个reduce操作,是通过两个不同的job触发,用Dirver来组织的,map和reduce阶段执行顺序是:
图1
1、初始划分的形成
K-Means算法需要一个对数据点的初始划分,mahout里用了两种方法(以Iris dataset前3个feature为例):
(1)、使用RandomSeedGenerator类
在指定clusters目录生成k个初始划分并以Sequence File形式存储,其选择方法希望能尽可能不让孤立点作为Cluster重心,大概流程如下:
图2
(2)、使用Canopy Clustering
Canopy Clustering常常用来对初始数据做一个粗略的划分,它的结果可以为之后代价较高聚类提供帮助,个人觉得Canopy Clustering用在数据预处理上要比单纯拿来聚类更有用,比如对K-Means来说提供k值,另外还能很好的处理孤立点,当然,需要人工指定的参数由k变成了T1、T2,T1和T2所起的作用是缺一不可的,T1决定了每个Cluster包含点的数目,这直接影响了Cluster的“重心”和“半径”,而T2则决定了Cluster的数目,T2太大会导致只有一个Cluster,而太小则会出现过多的Cluster。通过实验,T1和T2取值会严重影响到算法的效果,如何确定T1和T2,似乎可以用AIC、BIC或者交叉验证去做,但是我没有找到具体做法,希望各位高人给指条明路:)。
以Iris为例,canopy方法选择T1=3、T2=1.5、cd=0.5、x=10,两种方法结果的前三个维度比较如下:
从图中不同Cluster的交界情况可以看出这两种方法的不同效果。
2、配置Cluster信息
K-Means算法的MR实现,第一次迭代需要将随机方法或者Canopy Clustering方法结果目录作为kmeans第一次迭代的输入目录,接下来的每次迭代都需要将上次迭代的输出目录作为本次迭代的输入目录,这就需要能在每次kmeans map和kmeans reduce操作前从该目录得到Cluster的信息,这个功能由KMeansUtil.configureWithClusterInfo实现,它依据指定的HDFS目录将Canopy Cluster或者上次迭代Cluster的信息存储到一个Collection中,这个方法在之后的每个map和reduce操作中都需要。
3、KMeansMapper
对照上一篇第三节关于MR的那幅图,map操作之前的InputFormat、Split、RR与上一篇完全相同。
(1)、KMeansMapper接收的是(WritableComparable<?>, VectorWritable) Pair,setup方法利用KMeansUtil.configureWithClusterInfo得到上一次迭代的Clustering结果,map操作需要依据这个结果聚类。
(2)、每个slave机器会分布式的处理存在硬盘上的数据,依据之前得到得Cluster信息,用emitPointToNearestCluster方法将每个点加入到与其距离最近的Cluster,输出结果为(与当前点距离最近Cluster的ID, 由当前点包装而成的ClusterObservations) Pair。
4、KMeansCombiner
combiner操作是一个本地的reduce操作,发生在map之后,reduce之前:
5、KMeansReducer
很直白的的操作,只是在setup的时候稍复杂。
(1)、setup操作的目的是读取初始划分或者上次迭代的结果,构建Cluster信息,同时做了Map<Cluster的ID,Cluster>映射,方便从ID找Cluster。
(2)、reduce操作非常直白,将从combiner传来的<Cluster ID,ClusterObservations>进行汇总;
computeConvergence用来判断当前Cluster是否收敛,即新的“重心”与老的“重心”距离是否满足之前传入的精度要求;
Last but not least,注意到有个cluster.computeParameters()操作,这个操作非常重要,它保证了本次迭代的结果不会影响到下次迭代,也就是保证了能够“重新计算每个Cluster的重心”这一步骤。
前三个操作得到新的Cluster信息;
后三个步骤清空S0、S1、S2信息,保证下次迭代所需的Cluster信息是“干净”的。
之后,reduce将(Cluster ID, Cluster) Pair写入到HDFS中以”clusters-迭代次数“命名的文件夹中。
6、KMeansClusterMapper
之前的MR操作用于构建Cluster信息,KMeansClusterMapper则用构造好的Cluster信息来聚类。
(1)、setup依然是从指定目录读取并构建Cluster信息;
(2)、map操作通过计算每个点到各Cluster“重心”的距离完成聚类操作,可以看到map操作结束,所有点就都被放在唯一一个与之距离最近的Cluster中了,因此之后并不需要reduce操作。
7、KMeansDriver
如果把前面的KMeansMapper、KMeansCombiner、KMeansReducer、KMeansClusterMapper看做是砖的话,KMeansDriver就是盖房子的人,它用来组织整个kmeans算法流程(包括单机版和MR版)。示意图如下:
8、Example
在源码的/mahout-examples目录下的package org.apache.mahout.clustering.syntheticcontrol.kmeans中有个Job.java文件提供了对K-Means Clustering的一个版本。核心内容是两个run函数:第一个是用随机选择的方法确定初始划分;第二个是用Canopy 方法确定初始划分,需要注意的是,我们不需要Canopy 方法来做聚类操作,所以CanopyDriver.run的倒数第二个参数(runClustering)要设定为false。
example的使用方法如下:
● 启动master和slave机器;
● 终端输入:hadoop namenode –format
● 终端输入:start-all.sh
● 查看hadoop相关进程是否启动,终端输入:jps
● 在HDFS创建testdata目录,终端输入:hadoop dfs -mkdir testdata
● 将http://archive.ics.uci.edu/ml/中Iris数据集下载,更改一下数据格式,将数据集中“,”替换为“ ”(空格),并上传到HDFS的testdata文件夹中,进入Iris目录,终端输入:hadoop dfs -put iris.txt testdata/
● 用Canopy 方法确定初始划分,参数取值为:t1=3,t2=1.5,convergenceDelta=0.5,maxIter=10,进入mahout目录,确认终端执行ls后可以看到mahout-examples-0.5-job.jar这个文件,终端执行:hadoop jar mahout-examples-0.5-job.jar org.apache.mahout.clustering.syntheticcontrol.kmeans.Job -i testdata -o output -t1 3 -t2 1.5 -cd 0.5 -x 10
● 在http://localhost:50030/jobtracker.jsp 查看作业执行情况:
从HDFS上可以看到以下几个目录:
图6
job_201110082236_0001是由InputDriver对原始数据集的一个预处理,输入目录是:testdata,输出目录是:output/data
job_201110082236_0002是由CanopyDriver发起的对data的初始划分,输入目录是:output/data,输出目录是:output/clusters-0
job_201110082236_0003是由KmeansDriver发起的构建Cluster的第一次迭代,输入目录是:output/clusters-0,输出目录是:output/clusters-1
job_201110082236_0004是由KmeansDriver发起的构建Cluster的第二次迭代,输入目录是:output/clusters-1,输出目录是:output/clusters-2
job_201110082236_0005是由KmeansDriver发起的Clustering,输入目录是:output/data,输出目录是:output/clusteredPoints
可见,要查看最终结果需要两个信息:Cluster信息和Clustering data后点的信息,它们分别存储在HDFS的最后一次迭代目录output/clusters-2和聚类点目录output/clusteredPoints。
● 查看结果,将结果获取到本地,终端输入:bin/mahout clusterdump –seqFileDir /user/leozhang/output/clusters-2 –pointsDir /user/leozhang/output/clusteredPoints –output result.txt,终端输入:ls可以看到result.txt这个文件,文件内容类似:
● 为了能比较直观的查看数据,可以利用R(可以使用强大的RStudio,需要安装rgl),我将数据整理到以下链接:http://files.cnblogs.com/vivounicorn/data%26result.rar ,分组显示数据前三维,不同Cluster用不同颜色表示。效果类似于图3,代码类似于:
执行pca <- read.table(ttt,header=TRUE)可能会报错:Error in read.table(ttt, header = TRUE) : more columns than column names,没关系,再重新执行一下这句就行了。
六、总结
K-Means Clustering是基于距离的排他的划分方法,初始划分对它很重要,mahout里实现了两种方法:随机方法和Canopy方法,前者比较简单,但孤立点会对其Clustering效果造成较大影响,后者对孤立点的处理能力很强但是需要确定的参数多了一个且如何合理取值需要探讨。
七、参考资料
1、http://mahout.apache.org/ 2、https://cwiki.apache.org/MAHOUT/k-means-clustering.html 3、http://developer.yahoo.com/hadoop/tutorial/ 4、http://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy3/ 5、http://grepcode.com/file/repo1.maven.org/maven2/org.apache.mahout/mahout-utils/0.5/org/apache/mahout/clustering/conversion/InputDriver.java#InputDriver
6、http://faculty.chass.ncsu.edu/garson/PA765/cluster.htm