-
Notifications
You must be signed in to change notification settings - Fork 285
/
Copy pathDBSCAN.py
127 lines (100 loc) · 4.26 KB
/
DBSCAN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# ================================================================================================================
# ----------------------------------------------------------------------------------------------------------------
# DBSCAN
# ----------------------------------------------------------------------------------------------------------------
# ================================================================================================================
import numpy as np
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from itertools import cycle, islice
import matplotlib.pyplot as plt
import queue
import pandas as pd
class CustomDBSCAN():
def __init__(self):
self.core = -1
self.border = -2
# Find all neighbour points at epsilon distance
def neighbour_points(self, data, pointId, epsilon):
points = []
for i in range(len(data)):
# Euclidian distance
if np.linalg.norm([a_i - b_i for a_i, b_i in zip(data[i], data[pointId])]) <= epsilon:
points.append(i)
return points
# Fit the data into the DBSCAN model
def fit(self, data, Eps, MinPt):
# initialize all points as outliers
point_label = [0] * len(data)
point_count = []
# initilize list for core/border points
core = []
border = []
# Find the neighbours of each individual point
for i in range(len(data)):
point_count.append(self.neighbour_points(data, i, Eps))
# Find all the core points, border points and outliers
for i in range(len(point_count)):
if (len(point_count[i]) >= MinPt):
point_label[i] = self.core
core.append(i)
else:
border.append(i)
for i in border:
for j in point_count[i]:
if j in core:
point_label[i] = self.border
break
# Assign points to a cluster
cluster = 1
# Here we use a queue to find all the neighbourhood points of a core point and find the indirectly reachable points
# We are essentially performing Breadth First search of all points which are within Epsilon distance for each other
for i in range(len(point_label)):
q = queue.Queue()
if (point_label[i] == self.core):
point_label[i] = cluster
for x in point_count[i]:
if(point_label[x] == self.core):
q.put(x)
point_label[x] = cluster
elif(point_label[x] == self.border):
point_label[x] = cluster
while not q.empty():
neighbors = point_count[q.get()]
for y in neighbors:
if (point_label[y] == self.core):
point_label[y] = cluster
q.put(y)
if (point_label[y] == self.border):
point_label[y] = cluster
cluster += 1 # Move on to the next cluster
return point_label, cluster
# Visualize the clusters
def visualize(self, data, cluster, numberOfClusters):
N = len(data)
colors = np.array(list(islice(cycle(['#FE4A49', '#2AB7CA']), 3)))
for i in range(numberOfClusters):
if (i == 0):
# Plot all outliers point as black
color = '#000000'
else:
color = colors[i % len(colors)]
x, y = [], []
for j in range(N):
if cluster[j] == i:
x.append(data[j, 0])
y.append(data[j, 1])
plt.scatter(x, y, c=color, alpha=1, marker='.')
plt.show()
def main():
# Reading from the data file
df = pd.read_csv("./data/concentric_circles.csv")
dataset = df.astype(float).values.tolist()
# normalize dataset
X = StandardScaler().fit_transform(dataset)
custom_DBSCAN = CustomDBSCAN()
point_labels, clusters = custom_DBSCAN.fit(X, 0.25, 4)
print(point_labels, clusters)
custom_DBSCAN.visualize(X, point_labels, clusters)
if __name__ == "__main__":
main()