Parallel OPTICS - 密度聚类算法Spark

2015年01月25日 作者: Yong FU

应用背景

我们瀚思安信在分析安全大数据时,常见的分析第一步就是把用户或者行为聚类,找出其中孤立的或者小类,然后作为潜在有问题的用户或者行为进行进一步分析。

聚类算法是最常见的机器学习算法种类,比如大家都熟悉的K-Means,原理或者实现都很简单。但是在安全大数据分析时,我们期望聚类算法满足额外的一些特性:

  • 支持自定义的距离函数:比如衡量用户行为相识度。很多聚类算法都不满足这点,像标准K-Means就只能用欧式距离(很多初学者都没注意到这点)。

  • 能并行化运行在Spark平台上。瀚思安全大数据分析即使是一个小系统,每天也输入接近TB级别的数据,能并发执行是基础的必需需求。而目前Spark Mllib里面只有K-Means一种并行聚类实现。

  • 无参数,或者至少对参数不敏感。这点是因为我们数据分析大部分是无监督学习,不需要用户掌握复杂的调参数知识。前者可以用无参数贝叶斯法,后者就是下面要说的密度聚类算法。

密度聚类算法

聚类算法大致可以分为划分法(Partitioning Methods)、 层次法(Hierarchical Methods)、基于密度的方法(density-based methods)、 基于网格的方法(grid-based methods)、基于模型的方法(Model-Based Methods)。 而 OPTICS 就是一种基于密度的聚类算法,这种方法的思想就是当区域内点的密度大于某个阀值, 就把这些点归于一类,因此这种基于密度的聚类算法天生就有很强的寻找离群噪音点的能力。 一般来讲聚类算法最终得出的都是固定参数下的具体分类结果,而 OPTICS 则不然, OPTICS 最终得出的是一个在一定参数区间——最小领域半径(ε−neighborhood) 下包含所有分类可能的点的序列,这个序列里的每个点都记录了它在此特定参数区间下的2个属性——核心距离 (core distance)以及可达距离(reachability distance)。通过这个序列, 我们可以很方便的得出在参数 ε' 下(当 ε' ≤ ε−neighborhood 时)数据点的分类结果。 综上所述,OPTICS具有2个很重要的特点:

1.抗离群噪音干扰的能力(寻找离群噪音点的能力) 2.对初始参数不敏感

OPTICS

OPTICS 需要的参数有2个,第一个就是前面提到的最小邻域半径(ε−neighborhood), 这个参数的意义是对于每个点 P 而言,在半径为 ε 的范围内如果有其他点,那么这些点都算P的邻点, 也就是说和P是在最小领域半径为 ε 的情况下是属于同一类的。

如上图所示,点1、2、3为点P在 ε 范围内的邻点,而点4则不是。所以点 P 有3个邻点, 这样我们就会自然的想到 OPTICS 应该具有的第二个参数——最小邻点数(minimum number of points : minpts)——对于点 P 而言,至少需要有多少个邻点才能证明自己是一个核心点(core point), 也就是说自己是一个类别中最普通的一员,不是边界点,更不是一个离群的孤立点。 minpts 在这里也代表了密度聚类算法中的密度阀值,当 ε 范围内点的密度大于阀值 minpts, 则说明 ε 范围内的点属于同一类别。

前面提到过,OPTICS 输出的序列中,每个点都有核心距离和可达距离两个属性。在最小领域半径为 ε , 最小邻点数为3的情况下(后面的例子中也默认为这样),如上图所示,点 P 具有4个邻点, 而到点1的距离是点 P 刚好满足至少有3个邻点的半径,所以取点 P 到点1的距离为点 P 的核心距离。 对于点4和点1而言,它们的可达距离就是其到点 P 的距离。而对于点2和点3而言, 由于它们到点 P 的距离小于点 P 的核心距离,因此,设置它们的可达距离等于点 P 的核心距离。

也就是说对于每个点 P 而言,假设让其刚好满足最小邻点数的点为 P' ,则P的核心距离等于其到点 P' 的距离,其他情况为正无穷。

对于核心点 P 的邻点 P1、P2、…Pn 而言,如果其到点 P 的距离大于点 P 的核心距离, 则其可达距离为该点到点 P 的距离,如果小于等于点 P 的核心距离,则其可达距离等于点 P 的核心距离。

OPTICS的具体步骤是:

1: procedure runOPTICS(X, ε, minpts)
2: while point x ∈ X unprocessed and unchecked do
3:    dis ← distance(x, X)
4:    if x is corePoint then
5:        mark x as processed
6:        N ← getNeighbors(x, ε, dis)
7:        setCoreDistance(x, N, ε, minpts)
8:        O ← x; Q ← N
9:        update(x, N, Q)
10:       while Q ≠ empty do
11:           y ← extractMinRD(Q)
12:           mark y asprocessed
13:           O ← y
14:           dis ← distance(y, X)
15:           if y is corePoint then
16:               N' ← getNeighbors(y, ε, dis)
17:               setCoreDistance(y, N', ε, minpts)
18:               Q ← N'
19:               update(y, N', Q)

其中 X 是原始的点的数据集合,O 作为最后输出结果的点集的有序序列,Q 是一个无重复元素的队列, 保存了当前已找到但是还没处理过的属于同一类别的点的集合。算法首先遍历输入的点集 X ,找到一个核心点,如果找不到,则直接结束算法,如果找到一个点 x 为核心点, 则设置 x 的核心距离以及可达距离并将其加入输出序列 O,将其所有邻点加入队列 Q。 之后从队列 Q 中找出可达距离最小的点 y,将其加入输出序列,如果 y 也是核心点, 则将 y 的所有邻点加入到并且更新队列 Q。重复以上步骤直到处理完所有的点。 其中的 update 函数的功能是更新队列 Q 中点的可达距离,具体步骤为:

1: procedure update(x, N, Q)
2: foreach unprocessed point x' ∈ N do
3:    newD ← max(CD[x], distance(x, x'))
4:    if RD[x'] = NULL then
5:        RD[x'] ← newD
6:    else if newD < RD[x'] then
7:        RD[x’] ← newD

在得到最后的有序序列O后,可以通过选取参数 ε' ≤ ε 来获得邻域半径为 ε' 时的分类结果,其步骤如下:

1: procedure orderToClusters(O, ε')
2: id ← 0
3: for each point x ∈ O do
4:    if RD[x] > ε' then
5:        if CD[x] ≤ ε' then
6:            id ← id +1
7:            CID[x] ← id
8:        else
9:            CID[x] ← NOISE
10:   else
11:       CID[x] ← id

Parallel OPTICS

在 OPTICS 中,可以进行并化化操作的步骤有2个, 第一个就是 runOPTICS 函数中第3行和第14行中计算点与数据集中其他点之间距离的 distance 函数。 我们知道要计算一个数据集各个元素之间的两两距离的时间复杂度是 O(n2),对该操作采取并行化处理, 可以将时间压缩到 O(n)。

第二个可以并行化处理的就是第9行以及第19行的 update 函数,对于 Q 中需要更新的点而言, 其操作是相互独立的,因此可以将需要 O(n) 的过程并行到 O(1) 中完成。

因此 Parallel OPTICS 可以在 OPTICS 的基础上通过修改 runOPTICS 以及 update 函数得到。 其具体步骤如下:

1: procedure runPOPTICS(X, ε, minpts)
2: O ← X.map{new Point(coreDis, reachDis, id)}
3: while point x ∈ X unprocessed and unchecked do
4:    dis ← X.map{x' => distance(x, x')}
5:    if x is corePoint then
6:        mark x as processed; O' ← x.id
7:        N ← getNeighbors(x, ε, dis)
8:        Q ← N
9:        O ← O.map{p => update(x, dis, ε, minpts, p)}
10:       while Q ≠ empty do
11:           y ← extractMinRD(Q)
12:           mark y as processed; O' ← y.id
13:           dis ← X.map{x' => distance(y, x')}
14:           if y is corePoint then
15:               N' ← getNeighbors(y, ε, dis)
16:               Q ← N'
17:               O ← O.map{p => update(y, dis, ε, minpts, p)}
18: sort O by O'

在第2行中 Parallel OPTICS 预先通过并行运算同步生成了一个和 X 长度一致并且由 Point 类的实例组成的输出序列 O,并且 O 中的每个实例的成员变量 coreDis 以及 reachDis 的初始值都设定为 ∞。在第6行(12行)标记 x(y)为已处理的同时,将其输出的顺序记录在 O' 中, 并且最终通过 O' 对最后输出的序列 O 进行排序。在第4行(13行),通过 map 函数将 X 的元素映射为相互独立的 x',然后同步计算点 x(y)与各个 x' 的距离。在第9行(17行), 同样通过 map 函数同步更新输出序列 O 中的各个元素,其中的 updata 函数如下。

1: procedure update(x, dis, ε, minpts, p)
2: if x is corePoint then
3:    if p is x then
4:        p.coredis ← getCoreDistane(dis, ε, minpts)
5:    if p is neighbor of x then
6:        if distance(p, x) < x.coreDis then
7:            p.reachDis ← min(p.reachDis, x.coreDis)
8:        else
9:            p.reachDis ← min(p.reachDis, distance(p, x))

算法已经开源在GitHub上,欢迎大家Fork。