# Demo entry 6671672

clustering_v4

Submitted by anonymous on Dec 01, 2017 at 02:03
Language: Python. Code size: 2.8 kB.

```from pyspark.mllib.clustering import KMeans, KMeansModel
import sys
import time
from pyspark import SparkContext
import csv

# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
bestIndex = 0
closest = float("+inf")
# for each point in the array, calculate the distance to the test point, then return
# the index of the array point with the smallest distance
for i in range(len(points)):
dist = distanceSquared(p,points[i])
if dist < closest:
closest = dist
bestIndex = i
return bestIndex

# The squared distances between two points
def distanceSquared(p1,p2):
return (p1 - p2) ** 2 + (p1 - p2) ** 2
# The sum of two points
return [p1 + p2, p1 + p2]

def main():
sc = SparkContext()
name=sys.argv
f = open('elapsed_time_new_'+str(name), 'wt')
try:
writer = csv.writer(f)
t_tot=time.time()
raw = sc.textFile(u"/user/user1/data/"+str(name))
line = raw.map(lambda x: x.split(','))
points = line.map(lambda x: (float(x), float(x)))
t_train=time.time()
# K is the number of means (center points of clusters) to find
K = 8
# ConvergeDist -- the threshold "distance" between iterations at which we decide we are done
convergeDist = .0005
i=0
maxIterations=10
kPoints = points.takeSample(False, K, 34)
# loop until the total distance between one iteration's points and the next is less than the convergence distance specified
tempDist = float("+inf")
while (tempDist > convergeDist) and (i<maxIterations):
# for each point, find the index of the closest kpoint.  map to (index, (point,1))
closest = points.map(lambda p : (closestPoint(p, kPoints), (p, 1)))
# For each key (k-point index), reduce by adding the coordinates and number of points
pointStats = closest.reduceByKey(lambda (point1,n1),(point2,n2):  (addPoints(point1,point2),n1+n2) )
# For each key (k-point index), find a new point by calculating the average of each closest point
newPoints = pointStats.map(lambda (i,(point,n)): (i,[point/n,point/n])).collect()
# calculate the total of the distance between the current points and new points
tempDist=0
for  (i,point) in newPoints: tempDist += distanceSquared(kPoints[i],point)
# Copy the new points to the kPoints array for the next iteration
for (i, point) in newPoints: kPoints[i] = point
i+=1
train_time=round(((time.time() - t_train)*1000),4)
total_time=round(((time.time()-t_tot)*1000),4)
writer.writerow((train_time,total_time))
finally:
f.close()

if __name__ == "__main__":
main()
```

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.