# Demo entry 6671672

**clustering_v4**

Submitted by **anonymous**
on Dec 01, 2017 at 02:03

Language: Python. Code size: 2.8 kB.

from pyspark.mllib.clustering import KMeans, KMeansModel import sys import time from pyspark import SparkContext import csv # for a point p and an array of points, return the index in the array of the point closest to p def closestPoint(p, points): bestIndex = 0 closest = float("+inf") # for each point in the array, calculate the distance to the test point, then return # the index of the array point with the smallest distance for i in range(len(points)): dist = distanceSquared(p,points[i]) if dist < closest: closest = dist bestIndex = i return bestIndex # The squared distances between two points def distanceSquared(p1,p2): return (p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2 # The sum of two points def addPoints(p1,p2): return [p1[0] + p2[0], p1[1] + p2[1]] def main(): sc = SparkContext() name=sys.argv[1] f = open('elapsed_time_new_'+str(name), 'wt') try: writer = csv.writer(f) t_tot=time.time() raw = sc.textFile(u"/user/user1/data/"+str(name)) line = raw.map(lambda x: x.split(',')) points = line.map(lambda x: (float(x[1]), float(x[2]))) t_train=time.time() # K is the number of means (center points of clusters) to find K = 8 # ConvergeDist -- the threshold "distance" between iterations at which we decide we are done convergeDist = .0005 i=0 maxIterations=10 # start with K randomly selected points from the dataset kPoints = points.takeSample(False, K, 34) # loop until the total distance between one iteration's points and the next is less than the convergence distance specified tempDist = float("+inf") while (tempDist > convergeDist) and (i<maxIterations): # for each point, find the index of the closest kpoint. map to (index, (point,1)) closest = points.map(lambda p : (closestPoint(p, kPoints), (p, 1))) # For each key (k-point index), reduce by adding the coordinates and number of points pointStats = closest.reduceByKey(lambda (point1,n1),(point2,n2): (addPoints(point1,point2),n1+n2) ) # For each key (k-point index), find a new point by calculating the average of each closest point newPoints = pointStats.map(lambda (i,(point,n)): (i,[point[0]/n,point[1]/n])).collect() # calculate the total of the distance between the current points and new points tempDist=0 for (i,point) in newPoints: tempDist += distanceSquared(kPoints[i],point) # Copy the new points to the kPoints array for the next iteration for (i, point) in newPoints: kPoints[i] = point i+=1 train_time=round(((time.time() - t_train)*1000),4) total_time=round(((time.time()-t_tot)*1000),4) writer.writerow((train_time,total_time)) finally: f.close() if __name__ == "__main__": main()

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.