kneeliverse.postprocessing

The following module provides a set of methods used for post-processing knees. These filter were designed to improve the quality of the knee candidates.

  1# coding: utf-8
  2
  3'''
  4The following module provides a set of methods
  5used for post-processing knees. These filter were
  6designed to improve the quality of the knee candidates.
  7'''
  8
  9__author__ = 'Mário Antunes'
 10__version__ = '1.0'
 11__email__ = 'mario.antunes@ua.pt'
 12__status__ = 'Development'
 13__license__ = 'MIT'
 14__copyright__ = '''
 15Copyright (c) 2021-2023 Stony Brook University
 16Copyright (c) 2021-2023 The Research Foundation of SUNY
 17'''
 18
 19import math
 20import logging
 21import numpy as np
 22import kneeliverse.rdp as rdp
 23import kneeliverse.linear_fit as lf
 24import kneeliverse.convex_hull as ch
 25import kneeliverse.knee_ranking as kr
 26
 27
 28logger = logging.getLogger(__name__)
 29
 30
 31def filter_corner_knees(points: np.ndarray, knees: np.ndarray, t:float = .33) -> np.ndarray:
 32    """
 33    Filter the left upper corner knees points.
 34
 35    A left upper knee corner does not provide a significant improvement to be considered.
 36    The detection method relies on a three point rectangle fitting and overlap.
 37
 38    Args:
 39        points (np.ndarray): numpy array with the points (x, y)
 40        knees (np.ndarray): knees indexes
 41        t (float): overlap treshold (default 0.33)
 42
 43    Returns:
 44        np.ndarray: the filtered knees
 45    """
 46
 47    filtered_knees = []
 48
 49    for i in range(len(knees)):
 50        idx = knees[i]
 51        if idx-1 >=0 and idx+1 < len(points):
 52            p0, p1 ,p2 = points[idx-1:idx+2]
 53            corner0 = np.array([p0[0], p2[1]])
 54            amin, amax = kr.rect(corner0, p1)
 55            bmin, bmax = kr.rect(p0, p2)
 56            p = kr.rect_overlap(amin, amax, bmin, bmax)
 57            
 58            if p < t:
 59                filtered_knees.append(idx)
 60        else:
 61            filtered_knees.append(idx)
 62
 63    return np.array(filtered_knees)
 64
 65
 66def select_corner_knees(points: np.ndarray, knees: np.ndarray, t:float = .33) -> np.ndarray:
 67    """
 68    Detect and keep the left upper corner knees points.
 69
 70    The detection method relies on a three point rectangle fitting and overlap.
 71
 72    Args:
 73        points (np.ndarray): numpy array with the points (x, y)
 74        knees (np.ndarray): knees indexes
 75        t (float): overlap treshold (default 0.33)
 76
 77    Returns:
 78        np.ndarray: the filtered knees
 79    """
 80
 81    filtered_knees = []
 82
 83    for i in range(len(knees)):
 84        idx = knees[i]
 85        if idx-1 >=0 and idx+1 < len(points):
 86            p0, p1 ,p2 = points[idx-1:idx+2]
 87            corner0 = np.array([p0[0], p2[1]])
 88            amin, amax = kr.rect(corner0, p1)
 89            bmin, bmax = kr.rect(p0, p2)
 90            p = kr.rect_overlap(amin, amax, bmin, bmax)
 91            
 92            if p >= t:
 93                filtered_knees.append(idx)
 94
 95    return np.array(filtered_knees)
 96
 97
 98def filter_worst_knees(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
 99    """
100    Filter the worst knees points.
101
102    A worst knee is a knee that is higher (at y axis) than a previous knee.
103
104    Args:
105        points (np.ndarray): numpy array with the points (x, y)
106        knees (np.ndarray): knees indexes
107
108    Returns:
109        np.ndarray: the filtered knees
110    """
111    
112    if len(knees) <= 1:
113        return knees
114    else:
115        filtered_knees = []
116
117        filtered_knees.append(knees[0])
118        h_min = points[knees[0]][1]
119
120        for i in range(1, len(knees)):
121            h = points[knees[i]][1]
122
123            if h <= h_min:
124                filtered_knees.append(knees[i])
125                h_min = h
126
127        return np.array(filtered_knees)
128
129
130def filter_clusters(points: np.ndarray, knees: np.ndarray,
131clustering: callable, t: float = 0.01,
132method: kr.ClusterRanking = kr.ClusterRanking.linear) -> np.ndarray:
133    """
134    Filter the knee points based on clustering.
135
136    For each cluster a single point is selected based on the ranking.
137    The ranking is computed based on the slope and the improvement (on the y axis).
138
139    Args:
140        points (np.ndarray): numpy array with the points (x, y)
141        knees (np.ndarray): knees indexes
142        clustering (callable): the clustering function
143        t (float): the threshold for merging (in percentage, default 0.01)
144        method (ranking.ClusterRanking): represents the direction of the ranking within a cluster (default ranking.ClusterRanking.linear)
145
146    Returns:
147        np.ndarray: the filtered knees
148    """
149    if method is kr.ClusterRanking.hull:
150        hull = ch.graham_scan_lower(points)
151        logger.info(f'hull {len(hull)}')
152
153    x = points[:, 0]
154    y = points[:, 1]
155
156    if len(knees) <= 1:
157        return knees
158    else:
159        knee_points = points[knees]
160        clusters = clustering(knee_points, t)
161
162        max_cluster = clusters.max()
163        filtered_knees = []
164        for i in range(0, max_cluster+1):
165            current_cluster = knees[clusters == i]
166            #logger.info(f'Cluster {i} with {len(current_cluster)} elements')
167
168            if len(current_cluster) > 1:
169                if method is kr.ClusterRanking.hull:
170                    # select the hull points that exist within the cluster
171                    a, b = current_cluster[[0, -1]]
172                    #logger.info(f'Bounds [{a}, {b}]')
173                    idx = (hull>=a)*(hull<=b)
174                    hull_within_cluster = hull[idx]
175                    #logger.info(f'Hull (W\\C) {hull_within_cluster} ({len(hull_within_cluster)})')
176                    # only consider clusters with at least a single hull point
177                    rankings = np.zeros(len(current_cluster))
178                    
179                    if len(hull_within_cluster) > 1:
180                        length = x[b+1] - x[a-1]
181                        for cluster_idx in range(len(current_cluster)):
182                            j = current_cluster[cluster_idx]
183                            if j in hull_within_cluster:
184                                length_l = (x[j] - x[a-1])/length
185                                length_r = (x[b+1] - x[j])/length
186                                left = points[a-1:j+1]
187                                right = points[j:b+2]
188                                coef_l = lf.linear_fit_points(left)
189                                coef_r = lf.linear_fit_points(right)
190                                #r_l = lf.linear_residuals(x[a-1:j+1], y[a-1:j+1], coef_l)
191                                #r_r = lf.linear_residuals(x[j:b+2], y[j:b+2], coef_r)
192                                #r_l = lf.rmse_points(left, coef_l)
193                                #r_r = lf.rmse_points(right, coef_r)
194                                
195                                r_l = np.sum(lf.shortest_distance_points(left, left[0], left[-1]))
196                                r_r = np.sum(lf.shortest_distance_points(right, right[0], right[-1]))
197                                
198                                current_error = r_l * length_l  + r_r * length_r
199                                rankings[cluster_idx] = current_error
200                            else:
201                                rankings[cluster_idx] = -1.0
202                        # replace all -1 with maximum distance
203                        #logger.info(f'CHR {rankings}')
204                        rankings[rankings<0] = np.amax(rankings)
205                        rankings = kr.distance_to_similarity(rankings)
206                        #logger.info(f'CHRF {rankings}')
207                    elif len(hull_within_cluster) == 1:
208                        for cluster_idx in range(len(current_cluster)):
209                            j = current_cluster[cluster_idx]
210                            if j in hull_within_cluster:
211                                rankings[cluster_idx] = 1.0
212                    else:
213                        rankings = None
214                else:
215                    rankings = kr.smooth_ranking(points, current_cluster, method)
216
217                # Compute relative ranking
218                if rankings is None:
219                    best_knee = None
220                else:
221                    rankings = kr.rank(rankings)
222                    #logger.info(f'Rankings {rankings}')
223                    # Min Max normalization
224                    #rankings = (rankings - np.min(rankings))/np.ptp(rankings)
225                    idx = np.argmax(rankings)
226                    best_knee = knees[clusters == i][idx]
227            else:
228                if method is kr.ClusterRanking.hull:
229                    knee = knees[clusters == i][0]
230                    if knee in hull:
231                        best_knee = knee
232                    else:
233                        best_knee = None
234                else:
235                    best_knee = knees[clusters == i][0]
236            
237            if best_knee is not None:
238                filtered_knees.append(best_knee)
239                """# plot clusters within the points
240                plt.plot(x, y)
241                plt.plot(x[current_cluster], y[current_cluster], 'ro')
242                if method is kr.ClusterRanking.hull:
243                    plt.plot(x[hull], y[hull], 'g+')
244                plt.plot(x[best_knee], y[best_knee], 'yx')
245                plt.show()"""
246
247        return np.array(filtered_knees)
248
249
250def filter_clusters_corners(points: np.ndarray, knees: np.ndarray, clustering: callable, t: float = 0.01) -> np.ndarray:
251    """
252    This methods finds and removes corner points.
253
254    Args:
255        points (np.ndarray): numpy array with the points (x, y)
256        knees (np.ndarray): knees indexes
257        clustering (callable): the clustering function
258        t (float): the threshold for merging (in percentage, default 0.01)
259    
260    Returns:
261        np.ndarray: the filtered knees
262
263    """
264    knee_points = points[knees]
265    clusters = clustering(knee_points, t)
266
267    max_cluster = clusters.max()
268    filtered_knees = []
269    for i in range(0, max_cluster+1):
270        current_cluster = knees[clusters == i]
271        # Compute the rank for each corner point
272        ranks = rank_corners_triangle(points, current_cluster)
273        idx = np.argmax(ranks)
274        best_knee = knees[clusters == i][idx]
275        filtered_knees.append(best_knee)
276    return np.array(filtered_knees)
277
278
279
280def add_points_even(points: np.ndarray, reduced: np.ndarray, knees: np.ndarray, removed:np.ndarray, tx:float=0.05, ty:float=0.05, extremes:bool=False) -> np.ndarray:
281    """
282    Add evenly spaced points between knees points.
283
284    Whenever a smooth segment between two knew points are
285    further away than tx (on the X-axis) and ty (on the Y axis),
286    even spaced points are added to the result.
287    This function will map the knees (in RDP space) into 
288    the space of the complete set of points.
289
290    Args:
291        points (np.ndarray): numpy array with the points (x, y)
292        reduced (np.ndarray): numpy array with the index of the reduced points (x, y) (simplified by RDP)
293        knees (np.ndarray): knees indexes
294        removed (np.ndarray): the points that were removed
295        tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
296        ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
297        extremes (bool): if True adds the extreme points (firt and last) (default False)
298
299    Returns:
300        np.ndarray: the resulting knees (mapped into the complete set of points)
301    """
302    
303    points_reduced = points[reduced]
304
305    # compute the delta x and y for the complete trace
306    max_x, max_y = points.max(axis=0)
307    min_x, min_y = points.min(axis=0)
308    dx = math.fabs(max_x - min_x)
309    dy = math.fabs(max_y - min_y)
310    
311    # compute the candidates
312    candidates = []
313
314    # check between knees
315    for i in range(1, len(points_reduced)):
316        left = i-1
317        right = i
318        pdx = math.fabs(points_reduced[right][0] - points_reduced[left][0])/dx
319        pdy = math.fabs(points_reduced[right][1] - points_reduced[left][1])/dy
320        if pdx > (2.0*tx) and pdy > ty:
321            candidates.append(left)
322            candidates.append(right)
323    #print(f'candidates: {candidates}')
324    
325    # Map candidates into the complete set of points
326    candidates = np.array(candidates)
327    candidates = rdp.mapping(candidates, reduced, removed)
328
329    # new knees
330    new_knees = []
331
332    # Process candidates as pairs
333    for i in range(0, len(candidates), 2):
334        left = candidates[i]
335        right = candidates[i+1]
336        pdx = math.fabs(points[right][0] - points[left][0])/dx
337        number_points = int(math.ceil(pdx/(2.0*tx)))
338        inc = int((right-left)/number_points)
339        idx = left
340        for _ in range(number_points):
341            idx = idx + inc
342            new_knees.append(idx)
343    
344    # filter worst knees that may be added due in this function
345    # but keep the detected knees
346    #new_knees = filter_worst_knees(points, new_knees)
347
348    # Map knees into the complete set of points
349    knees = rdp.mapping(knees, reduced, removed)
350
351    # Add extremes points to the output
352    if extremes:
353        extremes_idx = [0, len(points)-1]
354        knees_idx = np.concatenate((knees, new_knees, extremes_idx))
355    else:
356        knees_idx = np.concatenate((knees, new_knees))
357    
358    # np.concatenate generates float array when one is empty (see https://github.com/numpy/numpy/issues/8878)
359    knees_idx = knees_idx.astype(int)
360    knees_idx = np.unique(knees_idx)
361    knees_idx.sort()
362    #return knees_idx
363    return filter_worst_knees(points, knees_idx)
364
365def add_points_even_knees(points: np.ndarray, knees: np.ndarray, tx:float=0.05, ty:float=0.05, extremes:bool=False) -> np.ndarray:
366    """
367    Add evenly spaced points between knees points (using knee as markers).
368
369    Whenever the distance between two consequetive knees is greater than 
370    tx (on the X-axis) and ty (on the Y axis), even spaced points are 
371    added to the result.
372    The knees have to be mapped in the complete set of points.
373
374    Args:
375        points (np.ndarray): numpy array with the points (x, y)
376        knees (np.ndarray): knees indexes
377        tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
378        ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
379        extremes (bool): if True adds the extreme points (firt and last) (default False)
380
381    Returns:
382        np.ndarray: the resulting knees
383    """
384    # new knees
385    new_knees = []
386
387    # compute the delta x and y for the complete trace
388    max_x, max_y = points.max(axis=0)
389    min_x, min_y = points.min(axis=0)
390    dx = math.fabs(max_x - min_x)
391    dy = math.fabs(max_y - min_y)
392    
393    # compute the candidates
394    candidates = []
395
396    # check top part
397    left = 0
398    right = knees[0]
399    pdx = math.fabs(points[right][0] - points[left][0])/dx
400    pdy = math.fabs(points[right][1] - points[left][1])/dy
401
402    if pdx > (2.0*tx) and pdy > ty:
403        candidates.append((left, right))
404    
405    # check between knees
406    for i in range(1, len(knees)):
407        left = knees[i-1]
408        right = knees[i]
409        pdx = math.fabs(points[right][0] - points[left][0])/dx
410        pdy = math.fabs(points[right][1] - points[left][1])/dy
411        if pdx > (2.0*tx) and pdy > ty:
412            candidates.append((left, right))
413    
414    # check last part
415    left = knees[-1]
416    right = len(points)-1
417    pdx = math.fabs(points[right][0] - points[left][0])/dx
418    pdy = math.fabs(points[right][1] - points[left][1])/dy
419
420    if pdx > (2.0*tx) and pdy > ty:
421        candidates.append((left, right))
422
423    # Process candidates as pairs
424    for left, right in candidates:
425        pdx = math.fabs(points[right][0] - points[left][0])/dx
426        number_points = int(math.ceil(pdx/(2.0*tx)))
427        inc = int((right-left)/number_points)
428        idx = left
429        for _ in range(number_points):
430            idx = idx + inc
431            new_knees.append(idx)
432
433    # Add extremes points to the output
434    if extremes:
435        extremes_idx = [0, len(points)]
436        knees_idx = np.concatenate((knees, new_knees, extremes_idx))
437    else:
438        knees_idx = np.concatenate((knees, new_knees))
439    
440    # np.concatenate generates float array when one is empty (see https://github.com/numpy/numpy/issues/8878)
441    knees_idx = knees_idx.astype(int)
442    knees_idx = np.unique(knees_idx)
443    knees_idx.sort()
444    # filter worst knees that may be added due in this function
445    return filter_worst_knees(points, knees_idx)
446
447
448def triangle_area(p: np.ndarray) -> float:
449    """
450    Given 3 points computes the triangle area.
451
452    Args:
453        p (np.ndarray): numpy array with 3 2D points (x, y)
454    
455    Returns:
456        float: triange area
457    """
458    area = 0.5 * (p[0][0]*(p[1][1]-p[2][1]) + p[1][0]*(p[2][1]-p[0][1]) + p[2][0]*(p[0][1]-p[1][1]))
459    return area
460
461
462def rank_corners_triangle(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
463    """
464    Ranks knees based on the triangle fast heuristic.
465
466    Args:
467        points (np.ndarray): numpy array with the points (x, y)
468        knees (np.ndarray): knees indexes
469    
470    Returns:
471        np.ndarray: the ranks
472    """
473    ranks = []
474
475    for i in range(0, len(knees)):
476        idx = [knees[i]-1, knees[i], knees[i]+1]
477        pt = points[idx]
478        #TODO: use the above function
479        area = 0.5*((pt[1][0]-pt[0][0])*(pt[1][1]-pt[2][1]))
480        ranks.append(area)
481
482    return np.array(ranks)
483
484
485def rank_corners(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
486    """
487    Ranks knees based on their corners.
488    
489    Args:
490        points (np.ndarray): numpy array with the points (x, y)
491        knees (np.ndarray): knees indexes
492    
493    Returns:
494        np.ndarray: the ranks
495    """
496    ranks = []
497
498    # compute the first rank
499    d = points[knees[0]][0] - points[0][0]
500    ranks.append(d)
501
502    for i in range(1, len(knees)):
503        d = points[knees[i]][0] - points[knees[i-1]][0]
504        ranks.append(d)
505
506    # return the ranks
507    return np.array(ranks)
logger = <Logger kneeliverse.postprocessing (WARNING)>
def filter_corner_knees( points: numpy.ndarray, knees: numpy.ndarray, t: float = 0.33) -> numpy.ndarray:
32def filter_corner_knees(points: np.ndarray, knees: np.ndarray, t:float = .33) -> np.ndarray:
33    """
34    Filter the left upper corner knees points.
35
36    A left upper knee corner does not provide a significant improvement to be considered.
37    The detection method relies on a three point rectangle fitting and overlap.
38
39    Args:
40        points (np.ndarray): numpy array with the points (x, y)
41        knees (np.ndarray): knees indexes
42        t (float): overlap treshold (default 0.33)
43
44    Returns:
45        np.ndarray: the filtered knees
46    """
47
48    filtered_knees = []
49
50    for i in range(len(knees)):
51        idx = knees[i]
52        if idx-1 >=0 and idx+1 < len(points):
53            p0, p1 ,p2 = points[idx-1:idx+2]
54            corner0 = np.array([p0[0], p2[1]])
55            amin, amax = kr.rect(corner0, p1)
56            bmin, bmax = kr.rect(p0, p2)
57            p = kr.rect_overlap(amin, amax, bmin, bmax)
58            
59            if p < t:
60                filtered_knees.append(idx)
61        else:
62            filtered_knees.append(idx)
63
64    return np.array(filtered_knees)

Filter the left upper corner knees points.

A left upper knee corner does not provide a significant improvement to be considered. The detection method relies on a three point rectangle fitting and overlap.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • t (float): overlap treshold (default 0.33)
Returns:

np.ndarray: the filtered knees

def select_corner_knees( points: numpy.ndarray, knees: numpy.ndarray, t: float = 0.33) -> numpy.ndarray:
67def select_corner_knees(points: np.ndarray, knees: np.ndarray, t:float = .33) -> np.ndarray:
68    """
69    Detect and keep the left upper corner knees points.
70
71    The detection method relies on a three point rectangle fitting and overlap.
72
73    Args:
74        points (np.ndarray): numpy array with the points (x, y)
75        knees (np.ndarray): knees indexes
76        t (float): overlap treshold (default 0.33)
77
78    Returns:
79        np.ndarray: the filtered knees
80    """
81
82    filtered_knees = []
83
84    for i in range(len(knees)):
85        idx = knees[i]
86        if idx-1 >=0 and idx+1 < len(points):
87            p0, p1 ,p2 = points[idx-1:idx+2]
88            corner0 = np.array([p0[0], p2[1]])
89            amin, amax = kr.rect(corner0, p1)
90            bmin, bmax = kr.rect(p0, p2)
91            p = kr.rect_overlap(amin, amax, bmin, bmax)
92            
93            if p >= t:
94                filtered_knees.append(idx)
95
96    return np.array(filtered_knees)

Detect and keep the left upper corner knees points.

The detection method relies on a three point rectangle fitting and overlap.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • t (float): overlap treshold (default 0.33)
Returns:

np.ndarray: the filtered knees

def filter_worst_knees(points: numpy.ndarray, knees: numpy.ndarray) -> numpy.ndarray:
 99def filter_worst_knees(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
100    """
101    Filter the worst knees points.
102
103    A worst knee is a knee that is higher (at y axis) than a previous knee.
104
105    Args:
106        points (np.ndarray): numpy array with the points (x, y)
107        knees (np.ndarray): knees indexes
108
109    Returns:
110        np.ndarray: the filtered knees
111    """
112    
113    if len(knees) <= 1:
114        return knees
115    else:
116        filtered_knees = []
117
118        filtered_knees.append(knees[0])
119        h_min = points[knees[0]][1]
120
121        for i in range(1, len(knees)):
122            h = points[knees[i]][1]
123
124            if h <= h_min:
125                filtered_knees.append(knees[i])
126                h_min = h
127
128        return np.array(filtered_knees)

Filter the worst knees points.

A worst knee is a knee that is higher (at y axis) than a previous knee.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
Returns:

np.ndarray: the filtered knees

def filter_clusters( points: numpy.ndarray, knees: numpy.ndarray, clustering: <built-in function callable>, t: float = 0.01, method: kneeliverse.knee_ranking.ClusterRanking = <ClusterRanking.linear: 'linear'>) -> numpy.ndarray:
131def filter_clusters(points: np.ndarray, knees: np.ndarray,
132clustering: callable, t: float = 0.01,
133method: kr.ClusterRanking = kr.ClusterRanking.linear) -> np.ndarray:
134    """
135    Filter the knee points based on clustering.
136
137    For each cluster a single point is selected based on the ranking.
138    The ranking is computed based on the slope and the improvement (on the y axis).
139
140    Args:
141        points (np.ndarray): numpy array with the points (x, y)
142        knees (np.ndarray): knees indexes
143        clustering (callable): the clustering function
144        t (float): the threshold for merging (in percentage, default 0.01)
145        method (ranking.ClusterRanking): represents the direction of the ranking within a cluster (default ranking.ClusterRanking.linear)
146
147    Returns:
148        np.ndarray: the filtered knees
149    """
150    if method is kr.ClusterRanking.hull:
151        hull = ch.graham_scan_lower(points)
152        logger.info(f'hull {len(hull)}')
153
154    x = points[:, 0]
155    y = points[:, 1]
156
157    if len(knees) <= 1:
158        return knees
159    else:
160        knee_points = points[knees]
161        clusters = clustering(knee_points, t)
162
163        max_cluster = clusters.max()
164        filtered_knees = []
165        for i in range(0, max_cluster+1):
166            current_cluster = knees[clusters == i]
167            #logger.info(f'Cluster {i} with {len(current_cluster)} elements')
168
169            if len(current_cluster) > 1:
170                if method is kr.ClusterRanking.hull:
171                    # select the hull points that exist within the cluster
172                    a, b = current_cluster[[0, -1]]
173                    #logger.info(f'Bounds [{a}, {b}]')
174                    idx = (hull>=a)*(hull<=b)
175                    hull_within_cluster = hull[idx]
176                    #logger.info(f'Hull (W\\C) {hull_within_cluster} ({len(hull_within_cluster)})')
177                    # only consider clusters with at least a single hull point
178                    rankings = np.zeros(len(current_cluster))
179                    
180                    if len(hull_within_cluster) > 1:
181                        length = x[b+1] - x[a-1]
182                        for cluster_idx in range(len(current_cluster)):
183                            j = current_cluster[cluster_idx]
184                            if j in hull_within_cluster:
185                                length_l = (x[j] - x[a-1])/length
186                                length_r = (x[b+1] - x[j])/length
187                                left = points[a-1:j+1]
188                                right = points[j:b+2]
189                                coef_l = lf.linear_fit_points(left)
190                                coef_r = lf.linear_fit_points(right)
191                                #r_l = lf.linear_residuals(x[a-1:j+1], y[a-1:j+1], coef_l)
192                                #r_r = lf.linear_residuals(x[j:b+2], y[j:b+2], coef_r)
193                                #r_l = lf.rmse_points(left, coef_l)
194                                #r_r = lf.rmse_points(right, coef_r)
195                                
196                                r_l = np.sum(lf.shortest_distance_points(left, left[0], left[-1]))
197                                r_r = np.sum(lf.shortest_distance_points(right, right[0], right[-1]))
198                                
199                                current_error = r_l * length_l  + r_r * length_r
200                                rankings[cluster_idx] = current_error
201                            else:
202                                rankings[cluster_idx] = -1.0
203                        # replace all -1 with maximum distance
204                        #logger.info(f'CHR {rankings}')
205                        rankings[rankings<0] = np.amax(rankings)
206                        rankings = kr.distance_to_similarity(rankings)
207                        #logger.info(f'CHRF {rankings}')
208                    elif len(hull_within_cluster) == 1:
209                        for cluster_idx in range(len(current_cluster)):
210                            j = current_cluster[cluster_idx]
211                            if j in hull_within_cluster:
212                                rankings[cluster_idx] = 1.0
213                    else:
214                        rankings = None
215                else:
216                    rankings = kr.smooth_ranking(points, current_cluster, method)
217
218                # Compute relative ranking
219                if rankings is None:
220                    best_knee = None
221                else:
222                    rankings = kr.rank(rankings)
223                    #logger.info(f'Rankings {rankings}')
224                    # Min Max normalization
225                    #rankings = (rankings - np.min(rankings))/np.ptp(rankings)
226                    idx = np.argmax(rankings)
227                    best_knee = knees[clusters == i][idx]
228            else:
229                if method is kr.ClusterRanking.hull:
230                    knee = knees[clusters == i][0]
231                    if knee in hull:
232                        best_knee = knee
233                    else:
234                        best_knee = None
235                else:
236                    best_knee = knees[clusters == i][0]
237            
238            if best_knee is not None:
239                filtered_knees.append(best_knee)
240                """# plot clusters within the points
241                plt.plot(x, y)
242                plt.plot(x[current_cluster], y[current_cluster], 'ro')
243                if method is kr.ClusterRanking.hull:
244                    plt.plot(x[hull], y[hull], 'g+')
245                plt.plot(x[best_knee], y[best_knee], 'yx')
246                plt.show()"""
247
248        return np.array(filtered_knees)

Filter the knee points based on clustering.

For each cluster a single point is selected based on the ranking. The ranking is computed based on the slope and the improvement (on the y axis).

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • clustering (callable): the clustering function
  • t (float): the threshold for merging (in percentage, default 0.01)
  • method (ranking.ClusterRanking): represents the direction of the ranking within a cluster (default ranking.ClusterRanking.linear)
Returns:

np.ndarray: the filtered knees

def filter_clusters_corners( points: numpy.ndarray, knees: numpy.ndarray, clustering: <built-in function callable>, t: float = 0.01) -> numpy.ndarray:
251def filter_clusters_corners(points: np.ndarray, knees: np.ndarray, clustering: callable, t: float = 0.01) -> np.ndarray:
252    """
253    This methods finds and removes corner points.
254
255    Args:
256        points (np.ndarray): numpy array with the points (x, y)
257        knees (np.ndarray): knees indexes
258        clustering (callable): the clustering function
259        t (float): the threshold for merging (in percentage, default 0.01)
260    
261    Returns:
262        np.ndarray: the filtered knees
263
264    """
265    knee_points = points[knees]
266    clusters = clustering(knee_points, t)
267
268    max_cluster = clusters.max()
269    filtered_knees = []
270    for i in range(0, max_cluster+1):
271        current_cluster = knees[clusters == i]
272        # Compute the rank for each corner point
273        ranks = rank_corners_triangle(points, current_cluster)
274        idx = np.argmax(ranks)
275        best_knee = knees[clusters == i][idx]
276        filtered_knees.append(best_knee)
277    return np.array(filtered_knees)

This methods finds and removes corner points.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • clustering (callable): the clustering function
  • t (float): the threshold for merging (in percentage, default 0.01)
Returns:

np.ndarray: the filtered knees

def add_points_even( points: numpy.ndarray, reduced: numpy.ndarray, knees: numpy.ndarray, removed: numpy.ndarray, tx: float = 0.05, ty: float = 0.05, extremes: bool = False) -> numpy.ndarray:
281def add_points_even(points: np.ndarray, reduced: np.ndarray, knees: np.ndarray, removed:np.ndarray, tx:float=0.05, ty:float=0.05, extremes:bool=False) -> np.ndarray:
282    """
283    Add evenly spaced points between knees points.
284
285    Whenever a smooth segment between two knew points are
286    further away than tx (on the X-axis) and ty (on the Y axis),
287    even spaced points are added to the result.
288    This function will map the knees (in RDP space) into 
289    the space of the complete set of points.
290
291    Args:
292        points (np.ndarray): numpy array with the points (x, y)
293        reduced (np.ndarray): numpy array with the index of the reduced points (x, y) (simplified by RDP)
294        knees (np.ndarray): knees indexes
295        removed (np.ndarray): the points that were removed
296        tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
297        ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
298        extremes (bool): if True adds the extreme points (firt and last) (default False)
299
300    Returns:
301        np.ndarray: the resulting knees (mapped into the complete set of points)
302    """
303    
304    points_reduced = points[reduced]
305
306    # compute the delta x and y for the complete trace
307    max_x, max_y = points.max(axis=0)
308    min_x, min_y = points.min(axis=0)
309    dx = math.fabs(max_x - min_x)
310    dy = math.fabs(max_y - min_y)
311    
312    # compute the candidates
313    candidates = []
314
315    # check between knees
316    for i in range(1, len(points_reduced)):
317        left = i-1
318        right = i
319        pdx = math.fabs(points_reduced[right][0] - points_reduced[left][0])/dx
320        pdy = math.fabs(points_reduced[right][1] - points_reduced[left][1])/dy
321        if pdx > (2.0*tx) and pdy > ty:
322            candidates.append(left)
323            candidates.append(right)
324    #print(f'candidates: {candidates}')
325    
326    # Map candidates into the complete set of points
327    candidates = np.array(candidates)
328    candidates = rdp.mapping(candidates, reduced, removed)
329
330    # new knees
331    new_knees = []
332
333    # Process candidates as pairs
334    for i in range(0, len(candidates), 2):
335        left = candidates[i]
336        right = candidates[i+1]
337        pdx = math.fabs(points[right][0] - points[left][0])/dx
338        number_points = int(math.ceil(pdx/(2.0*tx)))
339        inc = int((right-left)/number_points)
340        idx = left
341        for _ in range(number_points):
342            idx = idx + inc
343            new_knees.append(idx)
344    
345    # filter worst knees that may be added due in this function
346    # but keep the detected knees
347    #new_knees = filter_worst_knees(points, new_knees)
348
349    # Map knees into the complete set of points
350    knees = rdp.mapping(knees, reduced, removed)
351
352    # Add extremes points to the output
353    if extremes:
354        extremes_idx = [0, len(points)-1]
355        knees_idx = np.concatenate((knees, new_knees, extremes_idx))
356    else:
357        knees_idx = np.concatenate((knees, new_knees))
358    
359    # np.concatenate generates float array when one is empty (see https://github.com/numpy/numpy/issues/8878)
360    knees_idx = knees_idx.astype(int)
361    knees_idx = np.unique(knees_idx)
362    knees_idx.sort()
363    #return knees_idx
364    return filter_worst_knees(points, knees_idx)

Add evenly spaced points between knees points.

Whenever a smooth segment between two knew points are further away than tx (on the X-axis) and ty (on the Y axis), even spaced points are added to the result. This function will map the knees (in RDP space) into the space of the complete set of points.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • reduced (np.ndarray): numpy array with the index of the reduced points (x, y) (simplified by RDP)
  • knees (np.ndarray): knees indexes
  • removed (np.ndarray): the points that were removed
  • tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
  • ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
  • extremes (bool): if True adds the extreme points (firt and last) (default False)
Returns:

np.ndarray: the resulting knees (mapped into the complete set of points)

def add_points_even_knees( points: numpy.ndarray, knees: numpy.ndarray, tx: float = 0.05, ty: float = 0.05, extremes: bool = False) -> numpy.ndarray:
366def add_points_even_knees(points: np.ndarray, knees: np.ndarray, tx:float=0.05, ty:float=0.05, extremes:bool=False) -> np.ndarray:
367    """
368    Add evenly spaced points between knees points (using knee as markers).
369
370    Whenever the distance between two consequetive knees is greater than 
371    tx (on the X-axis) and ty (on the Y axis), even spaced points are 
372    added to the result.
373    The knees have to be mapped in the complete set of points.
374
375    Args:
376        points (np.ndarray): numpy array with the points (x, y)
377        knees (np.ndarray): knees indexes
378        tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
379        ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
380        extremes (bool): if True adds the extreme points (firt and last) (default False)
381
382    Returns:
383        np.ndarray: the resulting knees
384    """
385    # new knees
386    new_knees = []
387
388    # compute the delta x and y for the complete trace
389    max_x, max_y = points.max(axis=0)
390    min_x, min_y = points.min(axis=0)
391    dx = math.fabs(max_x - min_x)
392    dy = math.fabs(max_y - min_y)
393    
394    # compute the candidates
395    candidates = []
396
397    # check top part
398    left = 0
399    right = knees[0]
400    pdx = math.fabs(points[right][0] - points[left][0])/dx
401    pdy = math.fabs(points[right][1] - points[left][1])/dy
402
403    if pdx > (2.0*tx) and pdy > ty:
404        candidates.append((left, right))
405    
406    # check between knees
407    for i in range(1, len(knees)):
408        left = knees[i-1]
409        right = knees[i]
410        pdx = math.fabs(points[right][0] - points[left][0])/dx
411        pdy = math.fabs(points[right][1] - points[left][1])/dy
412        if pdx > (2.0*tx) and pdy > ty:
413            candidates.append((left, right))
414    
415    # check last part
416    left = knees[-1]
417    right = len(points)-1
418    pdx = math.fabs(points[right][0] - points[left][0])/dx
419    pdy = math.fabs(points[right][1] - points[left][1])/dy
420
421    if pdx > (2.0*tx) and pdy > ty:
422        candidates.append((left, right))
423
424    # Process candidates as pairs
425    for left, right in candidates:
426        pdx = math.fabs(points[right][0] - points[left][0])/dx
427        number_points = int(math.ceil(pdx/(2.0*tx)))
428        inc = int((right-left)/number_points)
429        idx = left
430        for _ in range(number_points):
431            idx = idx + inc
432            new_knees.append(idx)
433
434    # Add extremes points to the output
435    if extremes:
436        extremes_idx = [0, len(points)]
437        knees_idx = np.concatenate((knees, new_knees, extremes_idx))
438    else:
439        knees_idx = np.concatenate((knees, new_knees))
440    
441    # np.concatenate generates float array when one is empty (see https://github.com/numpy/numpy/issues/8878)
442    knees_idx = knees_idx.astype(int)
443    knees_idx = np.unique(knees_idx)
444    knees_idx.sort()
445    # filter worst knees that may be added due in this function
446    return filter_worst_knees(points, knees_idx)

Add evenly spaced points between knees points (using knee as markers).

Whenever the distance between two consequetive knees is greater than tx (on the X-axis) and ty (on the Y axis), even spaced points are added to the result. The knees have to be mapped in the complete set of points.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • tx (float): the threshold (X-axis) for adding points (in percentage, default 0.05)
  • ty (float): the threshold (Y-axis) for adding points (in percentage, default 0.05)
  • extremes (bool): if True adds the extreme points (firt and last) (default False)
Returns:

np.ndarray: the resulting knees

def triangle_area(p: numpy.ndarray) -> float:
449def triangle_area(p: np.ndarray) -> float:
450    """
451    Given 3 points computes the triangle area.
452
453    Args:
454        p (np.ndarray): numpy array with 3 2D points (x, y)
455    
456    Returns:
457        float: triange area
458    """
459    area = 0.5 * (p[0][0]*(p[1][1]-p[2][1]) + p[1][0]*(p[2][1]-p[0][1]) + p[2][0]*(p[0][1]-p[1][1]))
460    return area

Given 3 points computes the triangle area.

Arguments:
  • p (np.ndarray): numpy array with 3 2D points (x, y)
Returns:

float: triange area

def rank_corners_triangle(points: numpy.ndarray, knees: numpy.ndarray) -> numpy.ndarray:
463def rank_corners_triangle(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
464    """
465    Ranks knees based on the triangle fast heuristic.
466
467    Args:
468        points (np.ndarray): numpy array with the points (x, y)
469        knees (np.ndarray): knees indexes
470    
471    Returns:
472        np.ndarray: the ranks
473    """
474    ranks = []
475
476    for i in range(0, len(knees)):
477        idx = [knees[i]-1, knees[i], knees[i]+1]
478        pt = points[idx]
479        #TODO: use the above function
480        area = 0.5*((pt[1][0]-pt[0][0])*(pt[1][1]-pt[2][1]))
481        ranks.append(area)
482
483    return np.array(ranks)

Ranks knees based on the triangle fast heuristic.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
Returns:

np.ndarray: the ranks

def rank_corners(points: numpy.ndarray, knees: numpy.ndarray) -> numpy.ndarray:
486def rank_corners(points: np.ndarray, knees: np.ndarray) -> np.ndarray:
487    """
488    Ranks knees based on their corners.
489    
490    Args:
491        points (np.ndarray): numpy array with the points (x, y)
492        knees (np.ndarray): knees indexes
493    
494    Returns:
495        np.ndarray: the ranks
496    """
497    ranks = []
498
499    # compute the first rank
500    d = points[knees[0]][0] - points[0][0]
501    ranks.append(d)
502
503    for i in range(1, len(knees)):
504        d = points[knees[i]][0] - points[knees[i-1]][0]
505        ranks.append(d)
506
507    # return the ranks
508    return np.array(ranks)

Ranks knees based on their corners.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
Returns:

np.ndarray: the ranks