Demo entry 6671672

clustering_v4

   

Submitted by anonymous on Dec 01, 2017 at 02:03
Language: Python. Code size: 2.8 kB.

from pyspark.mllib.clustering import KMeans, KMeansModel
import sys
import time
from pyspark import SparkContext
import csv

# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
    bestIndex = 0
    closest = float("+inf")
	# for each point in the array, calculate the distance to the test point, then return
    # the index of the array point with the smallest distance
    for i in range(len(points)):
        dist = distanceSquared(p,points[i])
        if dist < closest:
            closest = dist
            bestIndex = i
    return bestIndex

# The squared distances between two points
def distanceSquared(p1,p2):  
    return (p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2
# The sum of two points
def addPoints(p1,p2):
    return [p1[0] + p2[0], p1[1] + p2[1]]

def main():
	sc = SparkContext()
	name=sys.argv[1]
	f = open('elapsed_time_new_'+str(name), 'wt')
	try:
		writer = csv.writer(f)
		t_tot=time.time()
		raw = sc.textFile(u"/user/user1/data/"+str(name))
		line = raw.map(lambda x: x.split(','))
		points = line.map(lambda x: (float(x[1]), float(x[2])))
		t_train=time.time()
		# K is the number of means (center points of clusters) to find
		K = 8
		# ConvergeDist -- the threshold "distance" between iterations at which we decide we are done
		convergeDist = .0005
		i=0
		maxIterations=10
		# start with K randomly selected points from the dataset
		kPoints = points.takeSample(False, K, 34)
		# loop until the total distance between one iteration's points and the next is less than the convergence distance specified
		tempDist = float("+inf")
		while (tempDist > convergeDist) and (i<maxIterations):
			# for each point, find the index of the closest kpoint.  map to (index, (point,1))
			closest = points.map(lambda p : (closestPoint(p, kPoints), (p, 1)))
			# For each key (k-point index), reduce by adding the coordinates and number of points
			pointStats = closest.reduceByKey(lambda (point1,n1),(point2,n2):  (addPoints(point1,point2),n1+n2) )
			# For each key (k-point index), find a new point by calculating the average of each closest point
			newPoints = pointStats.map(lambda (i,(point,n)): (i,[point[0]/n,point[1]/n])).collect()
			# calculate the total of the distance between the current points and new points
			tempDist=0
			for  (i,point) in newPoints: tempDist += distanceSquared(kPoints[i],point)
			# Copy the new points to the kPoints array for the next iteration
			for (i, point) in newPoints: kPoints[i] = point
			i+=1
		train_time=round(((time.time() - t_train)*1000),4)
		total_time=round(((time.time()-t_tot)*1000),4)
		writer.writerow((train_time,total_time))
	finally:
		f.close()
	
if __name__ == "__main__":
    main()

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).