我们只需4/30即可调整Futureestack注册。条款和条件适用。 现在注册

在这里,在新的遗物中,我们每分钟收集137亿数据点。我们收集,分析和显示客户的大量数据存储为时间序列。在努力建立应用程序和其他实体之间的关系,如服务器和容器,为新的,智能产品,如亚博最新版直播新的遗物雷达,我们不断探索更快,更有效地进行分组时间序列数据。鉴于我们收集的数据量,更快的聚类时间是至关重要的。

加快K-Means集群

一种流行的数据分组方法是K.- 梅尔集群。基本原则K.- 模糊涉及确定每个数据点之间的距离,并将它们分组为有意义的群集。通常使用平面上的二维数据进行该过程。它肯定可以在两个以上的维度中簇聚类,但我们可以快速可视化数据的能力更复杂。例如,以下图示显示了融合K.-表示在几个迭代中以两个任意维度聚类:

K-means_convergence

GIF通过呼唤 - 自己的工作,GFDL

不幸的是,这种方法不能很好地映射到时间序列,因为它们通常是一维的。但是,我们仍然可以使用一些不同的函数来计算两个时间序列之间的距离因子。在这些例子中,我们使用均方误差探索不同K.- 模拟。在测试这些实施方面,我们注意到许多人遭受了严重的性能问题,但我们仍然能够展示它如何加速K.-表示聚类,在某些情况下以数量级排列。

对于这个练习,我们将使用Python的NumPy包。如果您自己按照此步骤操作,则可以将代码复制并粘贴到jupyter笔记本。让我们首先导入我们沿途使用的软件包:

导入时间导入numpy作为np导入matplotlib.pyplot作为plt%matplotlib内联

在接下来的测试中,我们首先生成10000个随机时间序列,500个样本长。然后我们在随机长度的正弦波中加入噪声。虽然这类数据不是很理想K.- 群集聚类,它应该足以应对未优化的实现。

n = 10000 ts_len = 500阶段= np.array(np.random.randint(0,50,[n,2])pure = np.sin([np.linspace(-np.pi * x [0],-np.pi * x [1],ts_len)用于阶段的x阶段])噪声= np.array([np.random.normal(0,1,ts_len)范围(n)])信号= pure *噪音#归一化0到1个信号+ = np.abs(np.min(信号))信号/ = np.max(信号)plt.plot(信号[0])

第一个实施

让我们从遇到的最基本、最简单的实现开始。euclid_dist.实现了一个简单的MSE估计距离函数,和k_means.实现基本的k均值算法。我们选择了num_clust从我们的初始数据集中随机时间序列作为质心(表示每个群集的中间)。为了num_iter.迭代,我们不断地移动质心,同时最大限度地减少它们之间的距离和其他时间序列。

Def euclid_dist(t1, t2): return np.sqrt(((t1-t2)**2).sum()) Def k_means(data, num_clust, num_iter): centroids = signals[np.random];randint(0,信号。形状[0],num_clust) n的范围(num_iter):作业={},印第安纳州,我列举(数据):min_dist =浮动(“正”)closest_clust =没有c_ind, j在列举(重心):dist = euclid_dist (i, j)如果dist < min_dist: min_dist = dist closest_clust = c_ind如果closest_clust作业:作业[closest_clust] .append(印第安纳州)其他:[key] =[m / len(Assignments [key]) for m in clust_sum return centroids . [key] =[m / len(Assignments [key]) for m in clust_sum return centroids . [key] =[m / len(Assignments [key]) for m in clust_sum return centroids .
t1 = time.time()质心= k_means(信号,100,100)t2 = time.time()打印(“拍摄{}秒”.format(t2  -  t1))
花了1138.8745470046997秒

收集数据花了将近20分钟。这并不可怕,但也不太好。为了加快下一个实现的速度,我们决定尽可能减少for循环。

矢量化实现

使用numpy的令人敬畏的好处是矢量化的操作。(如果您不熟悉矢量操作,请退房这个良好的评论。)

K-Means算法调用每个质心和数据点之间的成对比较。这意味着,在我们之前的迭代中,我们将100个质心与10,000次迭代进行了比较了每次迭代一百万比较。请记住,每个比较都涉及两组500个样本。由于我们迭代100次,这给了我们共有1亿比较 - 一个CPU的一点工作。虽然Python是一种合理有效的语言,但它很难击败在C中写的操作。由于这个原因,大多数Numpy的核心操作都是用C编写的,并使矢量变化以最小化循环产生的开销。

让我们研究一下如何对代码进行向量化,以便尽可能多地消除这些代码为了循环尽可能。

首先,我们将代码划分为功能块。这让我们对每个部分负责什么有了更好的了解。接下来,我们改变calc_centroids步骤只迭代质心(而不是每次序列)。这样,我们通过所有的时间序列和一个质心传递给euclid_dist.。我们也分配了经销而不是像对待字典一样对待它并随着时间的推移而扩展它。NumPy的argmin在一次拍摄中比较每个矢量对。

move_centroids.,我们使用Vector Operation将另一个用于循环崩溃,我们仅在独特的质心集中迭代。如果我们丢失了质心,我们通过从我们的时间序列数据集随机选择时添加了适当的数字(这很少发生在野外)。

最后,我们增加了早期停止检查k_means.如果没有质心性更新,请停止我们的迭代。

这是一下代码:

def euclid_dist(t1,t2):返回np.sqrt(((t1-t2)** 2).sum(轴= 1))def calc_centroid(数据,质心):dist = np.zeros([data.shape [0.], centroids.shape[0]]) for idx, centroid in enumerate(centroids): dist[:, idx] = euclid_dist(centroid, data) return np.array(dist) def closest_centroids(data, centroids): dist = calc_centroids(data, centroids) return np.argmin(dist, axis = 1) def move_centroids(data, closest, centroids): k = centroids.shape[0] new_centroids = np.array([data[closest == c].mean(axis = 0) for c in np.unique(closest)]) if k - new_centroids.shape[0] > 0: print("adding {} centroid(s)".format(k - new_centroids.shape[0])) additional_centroids = data[np.random.randint(0, data.shape[0], k - new_centroids.shape[0])] new_centroids = np.append(new_centroids, additional_centroids, axis = 0) return new_centroids def k_means(data, num_clust, num_iter): centroids = signals[np.random.randint(0, signals.shape[0], num_clust)] last_centroids = centroids for n in range(num_iter): closest = closest_centroids(data, centroids) centroids = move_centroids(data, closest, centroids) if not np.any(last_centroids != centroids): print("early finish!") break last_centroids = centroids return centroids
t1 = time.time()质心= k_means(信号,100,100)t2 = time.time()打印(“拍摄{}秒”.format(t2  -  t1))
添加1个质心早期结束!拿到206.72993397712708秒

3。5分钟多一点。不坏!但我们想更快地完成。

K-Means ++实施

对于下一次迭代,我们应用了K.——+ +算法。该算法旨在选择更好的初始质心。让我们看看这个优化是否有帮助......

def init_centroids(数据,num_clust):centroids = np.zeros([num_clust,data.shape [1]])质心[0 ,:] = data [np.random.randint(0,data.shape [0],1)]对于I范围(1,num_clust):d2 = np.min([np.linalg.norm(data-c,轴= 1)** 2 for c for centroids [0:i,:]],轴= 0)probs = d2 / d2.sum()cumprobs = probs.cumsum()ind = np.where(cumprobs> = np.random.random.random.randoom())[0] [0]质心[i,:] = np。expand_dims(data[ind], axis = 0) return centroids def k_means(data, num_clust, num_iter): centroids = init_centroids(data, num_clust) last_centroids = centroids for n in range(num_iter): closest = closest_centroids(data, centroids) centroids = move_centroids(data, closest, centroids) if not np.any(last_centroids != centroids): print("Early finish!") break last_centroids = centroids return centroids
t1 = time.time()质心= k_means(信号,100,100)t2 = time.time()打印(“拍摄{}秒”.format(t2  -  t1))
早期完成!占180.91435194015503秒

加入K.与我们之前的迭代相比,equss ++算法略有更好的性能。但是,一旦我们并联化,这种优化就会开始偿还。

并行实施

直到这一点,我们的所有实施都是单线程的,所以我们决定探索并行化部分K.-means ++算法。由于我们正在使用Jupyter笔记本,我们选择使用并行管理IpyPallel用于并行计算。通过IpyParelial,我们不必担心刻字整个服务器,但我们需要一些特质我们需要解决。例如,我们必须通知我们的工作节点加载numpy。

将IPPAlliallAlliant导入IPP C = IPP.Client()v = c [:] v.use_cloudpickle()使用v.sync_imports():导入numpy作为np

(有关启动工人的详细信息,请参阅PYPALLEALLEST开始指南)。

在这种迭代中,我们专注于两个平行化区域。第一的,calc_centroids有一个循环遍历每个质心并将其与我们的时间序列数据进行比较。我们使用map_sync.将每个迭代发送到我们的工作者。

接下来,我们并行化类似的循环K.——+ +质心搜索。注意对v.push:因为我们的lambda被引用数据,我们需要确保它在工人节点上提供。我们通过调用IpyPallel的完成这一点方法将该变量复制到工作者的全局作用域。

看一看:

def calc_centroids(数据,质心):返回np.array(v.map_sync(lambda x:np.sqrt(((x  -  data)** 2).sum(轴= 1)),质心)def closest_centroids(点,质心):dist = calc_centroids(点,质心)返回np.argmin(dist,axis = 0)def init_centroid(数据,num_clust):v.push(dict(data = data))centroids = np.zeros([num_clust,data.shape [1]])质心[0 ,:] = data [np.random.randint(0,data.shape [0],1)]在范围内(1,num_clust):d2 = np。min(v.map_sync(lambda c:np.linalg.norm(data  -  c,轴= 1)** 2,质心[0:i,]),轴= 0)probs = d2 / d2.sum()cumprobs = probs.cumsum()ind = np.where(cumprobs> = np.random.random.random.random.random.random.randroids [i,:] = np.expand_dims(数据[IND],轴= 0)返回质心
t1 = time.time()质心= k_means(信号,100,100)t2 = time.time()打印(“拍摄{}秒”.format(t2  -  t1))
添加2个质心早期完成!占143.49819207191467秒

几于两分钟以上,这种迭代却取得了最快的时间!

下一步:去更快!

在这些测试中,我们依赖于中央处理单元(CPU)内核。cpu提供了方便的并行化,但是我们怀疑,如果再付出一点努力,我们就可以使用图形处理单元(gpu)更快地实现集群。我们也许可以用纹orflow.,用于数值计算和机器学习的开源软件。事实上,tensorflow已经包括一个K.——实现,但我们几乎肯定需要调整它来支持时间序列聚类。无论如何,我们永远不会停止寻找更高效、更快的聚类算法来帮助管理用户数据。