kneeliverse.rdp

The following module provides a set of methods used for curve simplification. It offers several versions of the RDP algorhtm.

  1# coding: utf-8
  2
  3'''
  4The following module provides a set of methods
  5used for curve simplification. It offers several
  6versions of the RDP algorhtm.
  7'''
  8
  9__author__ = 'Mário Antunes'
 10__version__ = '1.0'
 11__email__ = 'mario.antunes@ua.pt'
 12__status__ = 'Development'
 13__license__ = 'MIT'
 14__copyright__ = '''
 15Copyright (c) 2021-2023 Stony Brook University
 16Copyright (c) 2021-2023 The Research Foundation of SUNY
 17'''
 18
 19#import math
 20import enum
 21import logging
 22import numpy as np
 23import kneeliverse.linear_fit as lf
 24import kneeliverse.metrics as metrics
 25import kneeliverse.evaluation as evaluation
 26
 27
 28#import matplotlib.pyplot as plt
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33class Distance(enum.Enum):
 34    """
 35    Enum that defines the distance method for the RDP 
 36    """
 37    perpendicular = 'perpendicular'
 38    shortest = 'shortest'
 39
 40    def __str__(self):
 41        return self.value
 42
 43
 44class Order(enum.Enum):
 45    """
 46    Enum that defines the distance method for the RDP 
 47    """
 48    triangle = 'triangle'
 49    area = 'area'
 50    segment = 'segment'
 51
 52    def __str__(self):
 53        return self.value
 54
 55
 56def mapping(indexes: np.ndarray, reduced: np.ndarray, removed: np.ndarray, sorted: bool = True) -> np.ndarray:
 57    """
 58    Computes the reverse of the RDP method.
 59
 60    It maps the indexes on a simplified curve (using the rdp algorithm) into
 61    the indexes of the original points.
 62    This method assumes the indexes are sorted in ascending order.
 63
 64    Args:
 65        indexes (np.ndarray): the indexes in the reduced space
 66        reduced (np.ndarray): the points that form the reduced space
 67        removed (np.ndarray): the points that were removed
 68        sorted (bool): True if the removed array is already sorted
 69
 70    Returns:
 71        np.ndarray: the indexes in the original space
 72    """
 73
 74    if not sorted:
 75        sorted_removed = removed[np.argsort(removed[:, 0])]
 76    else:
 77        sorted_removed = removed
 78
 79    rv = []
 80    j = 0
 81    count = 0
 82
 83    for i in indexes:
 84        value = reduced[i]
 85        while j < len(sorted_removed) and sorted_removed[j][0] < value:
 86            count += sorted_removed[j][1]
 87            j += 1
 88        idx = i + count
 89        rv.append(int(idx))
 90
 91    return np.array(rv)
 92
 93
 94def compute_cost_coef(pt: np.ndarray, coef, cost: metrics.Metrics=metrics.Metrics.smape) -> float:
 95    """
 96    Computes the cost of fitting a linear function (with a given coefficient)
 97    in the points array.
 98
 99    Args:
100        points (np.ndarray): numpy array with the points (x, y)
101        coef (tuple): the coefficients from the linear fit
102        cost (lf.Linear_Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
103
104    Returns:
105        float: the cost of fitting the linear function
106    """
107    methods = {metrics.Metrics.r2: lf.linear_r2_points,
108    metrics.Metrics.rmspe: lf.rmspe_points,
109    metrics.Metrics.rmsle: lf.rmsle_points,
110    metrics.Metrics.smape: lf.smape_points,
111    metrics.Metrics.rpd: lf.rpd_points}
112
113    return methods[cost](pt, coef)
114
115
116def rdp(points: np.ndarray, t: float = 0.01, distance: Distance = Distance.shortest, cost: metrics.Metrics = metrics.Metrics.smape) -> tuple:
117    """
118    Ramer–Douglas–Peucker (RDP) algorithm.
119
120    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
121    This version uses different cost functions to decided whenever to keep or remove a line segment.
122
123    Args:
124        points (np.ndarray): numpy array with the points (x, y)
125        t (float): the coefficient of determination threshold (default 0.01)
126        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
127        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
128
129    Returns:
130        tuple: the index of the reduced space, the points that were removed
131    """
132    stack = [(0, len(points))]
133
134    reduced = []
135    removed = []
136
137    # select the distance metric to be used
138    distance_points = None
139    if distance is Distance.shortest:
140        distance_points = lf.shortest_distance_points
141    elif distance is Distance.perpendicular:
142        distance_points = lf.perpendicular_distance_points
143    else:
144        distance_points = lf.shortest_distance_points
145
146    while stack:
147        left, right = stack.pop()
148        pt = points[left:right]
149
150        if len(pt) <= 2:
151            if cost is metrics.Metrics.r2:
152                r = 1.0
153            else:
154                r = 0.0
155        else:
156            coef = lf.linear_fit_points(pt)
157            r = compute_cost_coef(pt, coef, cost)
158
159        curved = r < t if cost is metrics.Metrics.r2 else r >= t
160
161        if curved:
162            d = distance_points(pt, pt[0], pt[-1])
163            index = np.argmax(d)
164            stack.append((left+index, left+len(pt)))
165            stack.append((left, left+index+1))
166        else:
167            reduced.append(left)
168            removed.append([left, len(pt) - 2.0])
169
170    reduced.append(len(points)-1)
171    return np.array(reduced), np.array(removed)
172
173
174def compute_removed_points(points: np.ndarray, reduced: np.ndarray) -> np.ndarray:
175    """
176    Given an array of points and the reduced set it computes how many
177    points were removed per segment.
178
179    Args:
180        points (np.ndarray): numpy array with the points (x, y)
181        reduced (np.ndarray): numpy array with the reduced set of points
182    
183    Returns:
184        np.ndarray: the points that were removed
185    """
186    removed = []
187
188    left = reduced[0]
189    for i in range(1, len(reduced)):
190        right = reduced[i]
191        pt = points[left:right+1]
192        removed.append([left,len(pt)-2])
193        left = right
194
195    return np.array(removed)
196
197
198def order_triangle(pt: np.ndarray, index:int, distance_points: callable) -> tuple:
199    """
200    Computes the triangle area of the left and right segments.
201    The triangle area is a fast heuristic to estimate the how curve
202    the left and right segment are.
203
204    Args:
205        pt (np.ndarray): numpy array with the points (x, y)
206        index (int): the index that separates the left from the right segment
207        distance_points (callable): methods that computes the distance between points
208
209    Returns:
210        tuple: the left and right triangle area
211    """
212    # compute the area of the triangles made from the farthest point
213    base_left = np.linalg.norm(pt[0]-pt[index])
214    pt_left = pt[0:index+1]
215    height_left = distance_points(pt_left, pt_left[0], pt_left[-1]).max()
216    left_tri_area = 0.5*base_left*height_left
217
218    base_right = np.linalg.norm(pt[index]-pt[-1])
219    pt_right = pt[index:]
220    height_right = distance_points(pt_right, pt_right[0], pt_right[-1]).max()
221    right_tri_area = 0.5*base_right*height_right
222
223    return left_tri_area, right_tri_area
224
225
226def order_area(pt: np.ndarray, index:int, distance_points:callable) -> tuple:
227    """
228    Computes the area of the left and right segments.
229    The area is an heuristic to estimate the how curve
230    the left and right segment are.
231
232    Args:
233        pt (np.ndarray): numpy array with the points (x, y)
234        index (int): the index that separates the left from the right segment
235        distance_points (callable): methods that computes the distance between points
236
237    Returns:
238        tuple: the left and right area
239    """
240    # compute the area using the distance function
241    #pt_left = points[left:left+index+1]
242    pt_left = pt[0:index+1]
243    left_distance = distance_points(pt_left, pt_left[0], pt_left[-1])
244    
245    #pt_right = points[left+index:left+len(pt)]
246    pt_right = pt[index:]
247    right_distance = distance_points(pt_right, pt_right[0], pt_right[-1])
248
249    left_area = np.sum(left_distance)
250    right_area = np.sum(right_distance)
251    
252    return  left_area, right_area
253
254
255def order_segment(pt: np.ndarray, index:int) -> tuple:
256    """
257    Computes the fitting error for the left and right segments.
258
259    Args:
260        pt (np.ndarray): numpy array with the points (x, y)
261        index (int): the index that separates the left from the right segment
262        
263    Returns:
264        tuple: the left and right fitting error
265    """
266    pt_left = pt[0:index+1]
267    left_cost = lf.linear_fit_residuals_points(pt_left)
268    
269    pt_right = pt[index:]
270    right_cost = lf.linear_fit_residuals_points(pt_right)
271    
272    return left_cost, right_cost
273
274
275def _rdp_fixed(points: np.ndarray, length:int, distance_points:callable, 
276order:Order, stack:list, reduced:list) -> list:
277    """
278    Main loop of the RDP fixed version.
279
280    Not intended to be used as a single method.
281    It is used internally on rdp_fixed and mp_grdp.
282
283    Args:
284        points (np.ndarray): numpy array with the points (x, y)
285        length (int): the fixed length of reduced points 
286        distance_points (callable): the distance function
287        order (Order): the metric used to sort the segments
288        stack (list): stack used to explore the points space
289        reduced (list): set of reduced points
290
291    Returns:
292        list: the index of the reduced space
293    """
294
295    while length > 0 and stack:
296        _, left, right = stack.pop()
297
298        pt = points[left:right]
299        d = distance_points(pt, pt[0], pt[-1])
300
301        # fix issue when all distances are 0 (perfect fit)
302        # changed to EPS for improve robustness
303        #if d.sum() < np.finfo(float).eps:
304        if np.all((d < np.finfo(float).eps)):
305            index = int((len(d))/2)
306        else:
307            index = np.argmax(d)
308
309        # add the relevant point to the reduced set
310        reduced.append(left+index)
311
312        if order is Order.triangle:
313            left_cost, right_cost = order_triangle(pt, index, distance_points)
314        elif order is Order.area:
315            left_cost, right_cost = order_area(pt, index, distance_points)
316        else:
317            left_cost, right_cost = order_segment(pt, index)
318
319        # Prevent the insertion of single point segments
320        if (left+index+1) - (left) > 2:
321            stack.append((left_cost, left, left+index+1))
322        
323        if (left+len(pt)) - (left+index) > 2:
324            stack.append((right_cost, left+index, left+len(pt)))
325        
326        # Sort the stack based on the cost
327        stack.sort(key=lambda t: t[0], reverse=False)
328        length -= 1
329    # sort the reduced set
330    reduced.sort()
331    return reduced
332
333
334def rdp_fixed(points: np.ndarray, length:int=10, distance: Distance = Distance.shortest, order:Order=Order.segment) -> tuple:
335    """
336    Ramer–Douglas–Peucker (RDP) algorithm.
337
338    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
339    This version reduces the number of points to a fixed length.
340
341    Args:
342        points (np.ndarray): numpy array with the points (x, y)
343        length (int): the fixed length of reduced points (default: 10)
344        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
345        order (Order): the metric used to sort the segments (default: Order.segment)
346
347    Returns:
348        tuple: the index of the reduced space, the points that were removed
349    """
350    stack = [(0, 0, len(points))]
351    reduced = [0, len(points)-1]
352
353    length -= 2
354
355    # select the distance metric to be used
356    distance_points = None
357    if distance is Distance.shortest:
358        distance_points = lf.shortest_distance_points
359    elif distance is Distance.perpendicular:
360        distance_points = lf.perpendicular_distance_points
361    else:
362        distance_points = lf.shortest_distance_points
363
364    reduced = _rdp_fixed(points, length, distance_points, order, stack, reduced)
365    reduced = np.array(reduced)
366    return reduced, compute_removed_points(points, reduced)
367
368
369def plot_frame(points, reduced, t):
370    fig = plt.figure(figsize=(6, 6))
371    x = points[:, 0]
372    y = points[:, 1]
373    plt.plot(x, y)
374    points_reduced = points[reduced]
375    x = points_reduced[:, 0]
376    y = points_reduced[:, 1]
377    plt.plot(x, y, marker='o', markersize=3)
378    plt.savefig(f'./img/img_{t}.png', transparent = False,  facecolor = 'white')
379    print(f'{t}')
380    plt.close()
381
382
383def _grdp(points:np.ndarray, t:float, cost:metrics.Metrics, order:Order, 
384distance_points:callable, stack:list, reduced:list) -> tuple:
385    """
386    Main loop of the gRDP version.
387
388    Not intended to be used as a single method.
389    It is used internally on grdp and mp_grdp.
390
391    Args:
392        points (np.ndarray): numpy array with the points (x, y)
393        t (float): the coefficient of determination threshold
394        cost (metrics.Metrics): the cost method used to evaluate a point set 
395        order (Order): the metric used to sort the segments
396        distance_points (callable): the distance function
397        stack (list): stack used to explore the points space
398        reduced (list): set of reduced points
399
400    Returns:
401        tuple: the index of the reduced space, stack
402    """
403    # Setup cache that is used to speedup the global cost computation
404    cache = {}
405
406    global_cost = evaluation.compute_global_cost(points, reduced, cost, cache)
407    curved = global_cost < t if cost is metrics.Metrics.r2 else global_cost >= t
408
409    ti = 0
410
411    while curved and stack:
412        _, left, right = stack.pop()
413        pt = points[left:right]
414        d = distance_points(pt, pt[0], pt[-1])
415
416        # fix issue when all distances are 0 (perfect fit)
417        # changed to EPS for improve robustness
418        #if d.sum() < np.finfo(float).eps:
419        if np.all((d < np.finfo(float).eps)):
420            index = int((len(d))/2)
421        else:
422            index = np.argmax(d)
423        
424        # add the relevant point to the reduced set and sort
425        reduced.append(left+index)
426        reduced.sort()
427
428        # compute the cost of the current solution
429        global_cost = evaluation.compute_global_cost(points, reduced, cost, cache)
430        curved = global_cost < t if cost is metrics.Metrics.r2 else global_cost >= t
431        
432        if order is Order.triangle:
433            left_cost, right_cost = order_triangle(pt, index, distance_points)
434        elif order is Order.area:
435            left_cost, right_cost = order_area(pt, index, distance_points)
436        else:
437            left_cost, right_cost = order_segment(pt, index)
438        
439        # Prevent the insertion of single point segments
440        if (left+index+1) - (left) > 2:
441            stack.append((left_cost, left, left+index+1))
442        
443        if (left+len(pt)) - (left+index) > 2:
444            stack.append((right_cost, left+index, left+len(pt)))
445        
446        # Sort the stack based on the cost
447        stack.sort(key=lambda t: t[0], reverse=False)
448
449        ## TO DELETE ##
450        #plot_frame(points, reduced, ti)
451        #ti = ti + 1 
452        
453
454    return reduced, stack
455
456
457def grdp(points: np.ndarray, t: float = 0.01, distance: Distance = Distance.shortest, 
458cost: metrics.Metrics = metrics.Metrics.smape, order:Order=Order.segment) -> tuple:
459    """
460    Global Ramer–Douglas–Peucker (RDP) algorithm.
461
462    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
463    This version computes the global cost of reconstruction (instead of the cost of the current segment).
464    It uses a cache to keep the cost of the segments that are not being explored right now.
465    The exploration is based on the segment that has an overall higher reconstruction error.
466
467    Args:
468        points (np.ndarray): numpy array with the points (x, y)
469        t (float): the coefficient of determination threshold (default 0.01)
470        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
471        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
472        order (Order): the metric used to sort the segments (default: Order.segment)
473       
474    Returns:
475        tuple: the index of the reduced space, the points that were removed
476    """
477
478    stack = [(0, 0, len(points))]
479    reduced = [0, len(points)-1]
480    
481    # select the distance metric to be used
482    distance_points = None
483    if distance is Distance.shortest:
484        distance_points = lf.shortest_distance_points
485    elif distance is Distance.perpendicular:
486        distance_points = lf.perpendicular_distance_points
487    else:
488        distance_points = lf.shortest_distance_points
489
490    reduced, _ = _grdp(points, t, cost, order, distance_points, stack, reduced)
491    reduced = np.array(reduced)
492    return reduced, compute_removed_points(points, reduced)
493
494
495def mp_grdp(points: np.ndarray, t: float = 0.01, min_points:int = 10, distance: Distance = Distance.shortest,
496cost: metrics.Metrics = metrics.Metrics.smape, order:Order=Order.segment) -> tuple:
497    """
498    MP gRDP (Min Points Global RDP)
499
500    This version computes the gRDP with the given threshold.
501    At the end, if the minimum number of points constraint is not satisfied
502    it executes the rdp_fixed version.
503
504    This method stores the reduced set and stack of the gRDP, meaning that
505    the rdp_fixed method continues from the previous point onwards.
506
507    Args:
508        points (np.ndarray): numpy array with the points (x, y)
509        t (float): the coefficient of determination threshold (default 0.01)
510        min_points (int): the minimal amount of points (default 10)
511        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
512        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
513        order (Order): the metric used to sort the segments (default: Order.segment)
514    
515    Returns:
516        tuple: the index of the reduced space, the points that were removed
517    """
518
519    stack = [(0, 0, len(points))]
520    reduced = [0, len(points)-1]
521    
522    # select the distance metric to be used
523    distance_points = None
524    if distance is Distance.shortest:
525        distance_points = lf.shortest_distance_points
526    elif distance is Distance.perpendicular:
527        distance_points = lf.perpendicular_distance_points
528    else:
529        distance_points = lf.shortest_distance_points
530
531    reduced, stack = _grdp(points, t, cost, order, distance_points, stack, reduced)
532
533    if len(reduced) >= min_points:
534        reduced = np.array(reduced)
535        return reduced, compute_removed_points(points, reduced)
536    else:
537        length = min_points - len(reduced)
538        reduced = _rdp_fixed(points, length, distance_points, order, stack, reduced)
539        reduced = np.array(reduced)
540        return reduced, compute_removed_points(points, reduced)
541    
542
543def min_point_rdp(points: np.ndarray, t:list=[0.01, 0.001, 0.0001], min_points:int=10) -> tuple:
544    """
545    Minimal points RDP.
546
547    Given a minimal amount of points, this version runs Global RDP with different threshold 
548    values. The thresholds are sorted in decreasing order.
549    The method returns as soon as a threshold value returns a minimal amount of points.
550    If necessary the method will execute the fixed version of RDP to get the exact amount of points.
551
552    Args:
553        points (np.ndarray): numpy array with the points (x, y)
554        t (float): a list of coefficient of determination thresholds (default [0.01, 0.001, 0.0001])
555        min_points (int): the minimal amount of points (default 10)
556
557    Returns:
558        tuple: the index of the reduced space, the points that were removed
559    """    
560    
561    # sort the threshold in 
562    t.sort(reverse=True)
563
564    for current_t in t:
565        reduced, removed = grdp(points, t=current_t)
566        if len(reduced) >= min_points:
567            return reduced, removed
568    
569    return rdp_fixed(points, min_points)
logger = <Logger kneeliverse.rdp (WARNING)>
class Distance(enum.Enum):
34class Distance(enum.Enum):
35    """
36    Enum that defines the distance method for the RDP 
37    """
38    perpendicular = 'perpendicular'
39    shortest = 'shortest'
40
41    def __str__(self):
42        return self.value

Enum that defines the distance method for the RDP

perpendicular = <Distance.perpendicular: 'perpendicular'>
shortest = <Distance.shortest: 'shortest'>
Inherited Members
enum.Enum
name
value
class Order(enum.Enum):
45class Order(enum.Enum):
46    """
47    Enum that defines the distance method for the RDP 
48    """
49    triangle = 'triangle'
50    area = 'area'
51    segment = 'segment'
52
53    def __str__(self):
54        return self.value

Enum that defines the distance method for the RDP

triangle = <Order.triangle: 'triangle'>
area = <Order.area: 'area'>
segment = <Order.segment: 'segment'>
Inherited Members
enum.Enum
name
value
def mapping( indexes: numpy.ndarray, reduced: numpy.ndarray, removed: numpy.ndarray, sorted: bool = True) -> numpy.ndarray:
57def mapping(indexes: np.ndarray, reduced: np.ndarray, removed: np.ndarray, sorted: bool = True) -> np.ndarray:
58    """
59    Computes the reverse of the RDP method.
60
61    It maps the indexes on a simplified curve (using the rdp algorithm) into
62    the indexes of the original points.
63    This method assumes the indexes are sorted in ascending order.
64
65    Args:
66        indexes (np.ndarray): the indexes in the reduced space
67        reduced (np.ndarray): the points that form the reduced space
68        removed (np.ndarray): the points that were removed
69        sorted (bool): True if the removed array is already sorted
70
71    Returns:
72        np.ndarray: the indexes in the original space
73    """
74
75    if not sorted:
76        sorted_removed = removed[np.argsort(removed[:, 0])]
77    else:
78        sorted_removed = removed
79
80    rv = []
81    j = 0
82    count = 0
83
84    for i in indexes:
85        value = reduced[i]
86        while j < len(sorted_removed) and sorted_removed[j][0] < value:
87            count += sorted_removed[j][1]
88            j += 1
89        idx = i + count
90        rv.append(int(idx))
91
92    return np.array(rv)

Computes the reverse of the RDP method.

It maps the indexes on a simplified curve (using the rdp algorithm) into the indexes of the original points. This method assumes the indexes are sorted in ascending order.

Arguments:
  • indexes (np.ndarray): the indexes in the reduced space
  • reduced (np.ndarray): the points that form the reduced space
  • removed (np.ndarray): the points that were removed
  • sorted (bool): True if the removed array is already sorted
Returns:

np.ndarray: the indexes in the original space

def compute_cost_coef( pt: numpy.ndarray, coef, cost: kneeliverse.metrics.Metrics = <Metrics.smape: 'smape'>) -> float:
 95def compute_cost_coef(pt: np.ndarray, coef, cost: metrics.Metrics=metrics.Metrics.smape) -> float:
 96    """
 97    Computes the cost of fitting a linear function (with a given coefficient)
 98    in the points array.
 99
100    Args:
101        points (np.ndarray): numpy array with the points (x, y)
102        coef (tuple): the coefficients from the linear fit
103        cost (lf.Linear_Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
104
105    Returns:
106        float: the cost of fitting the linear function
107    """
108    methods = {metrics.Metrics.r2: lf.linear_r2_points,
109    metrics.Metrics.rmspe: lf.rmspe_points,
110    metrics.Metrics.rmsle: lf.rmsle_points,
111    metrics.Metrics.smape: lf.smape_points,
112    metrics.Metrics.rpd: lf.rpd_points}
113
114    return methods[cost](pt, coef)

Computes the cost of fitting a linear function (with a given coefficient) in the points array.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • coef (tuple): the coefficients from the linear fit
  • cost (lf.Linear_Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
Returns:

float: the cost of fitting the linear function

def rdp( points: numpy.ndarray, t: float = 0.01, distance: Distance = <Distance.shortest: 'shortest'>, cost: kneeliverse.metrics.Metrics = <Metrics.smape: 'smape'>) -> tuple:
117def rdp(points: np.ndarray, t: float = 0.01, distance: Distance = Distance.shortest, cost: metrics.Metrics = metrics.Metrics.smape) -> tuple:
118    """
119    Ramer–Douglas–Peucker (RDP) algorithm.
120
121    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
122    This version uses different cost functions to decided whenever to keep or remove a line segment.
123
124    Args:
125        points (np.ndarray): numpy array with the points (x, y)
126        t (float): the coefficient of determination threshold (default 0.01)
127        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
128        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
129
130    Returns:
131        tuple: the index of the reduced space, the points that were removed
132    """
133    stack = [(0, len(points))]
134
135    reduced = []
136    removed = []
137
138    # select the distance metric to be used
139    distance_points = None
140    if distance is Distance.shortest:
141        distance_points = lf.shortest_distance_points
142    elif distance is Distance.perpendicular:
143        distance_points = lf.perpendicular_distance_points
144    else:
145        distance_points = lf.shortest_distance_points
146
147    while stack:
148        left, right = stack.pop()
149        pt = points[left:right]
150
151        if len(pt) <= 2:
152            if cost is metrics.Metrics.r2:
153                r = 1.0
154            else:
155                r = 0.0
156        else:
157            coef = lf.linear_fit_points(pt)
158            r = compute_cost_coef(pt, coef, cost)
159
160        curved = r < t if cost is metrics.Metrics.r2 else r >= t
161
162        if curved:
163            d = distance_points(pt, pt[0], pt[-1])
164            index = np.argmax(d)
165            stack.append((left+index, left+len(pt)))
166            stack.append((left, left+index+1))
167        else:
168            reduced.append(left)
169            removed.append([left, len(pt) - 2.0])
170
171    reduced.append(len(points)-1)
172    return np.array(reduced), np.array(removed)

Ramer–Douglas–Peucker (RDP) algorithm.

Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points. This version uses different cost functions to decided whenever to keep or remove a line segment.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • t (float): the coefficient of determination threshold (default 0.01)
  • distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
  • cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
Returns:

tuple: the index of the reduced space, the points that were removed

def compute_removed_points(points: numpy.ndarray, reduced: numpy.ndarray) -> numpy.ndarray:
175def compute_removed_points(points: np.ndarray, reduced: np.ndarray) -> np.ndarray:
176    """
177    Given an array of points and the reduced set it computes how many
178    points were removed per segment.
179
180    Args:
181        points (np.ndarray): numpy array with the points (x, y)
182        reduced (np.ndarray): numpy array with the reduced set of points
183    
184    Returns:
185        np.ndarray: the points that were removed
186    """
187    removed = []
188
189    left = reduced[0]
190    for i in range(1, len(reduced)):
191        right = reduced[i]
192        pt = points[left:right+1]
193        removed.append([left,len(pt)-2])
194        left = right
195
196    return np.array(removed)

Given an array of points and the reduced set it computes how many points were removed per segment.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • reduced (np.ndarray): numpy array with the reduced set of points
Returns:

np.ndarray: the points that were removed

def order_triangle( pt: numpy.ndarray, index: int, distance_points: <built-in function callable>) -> tuple:
199def order_triangle(pt: np.ndarray, index:int, distance_points: callable) -> tuple:
200    """
201    Computes the triangle area of the left and right segments.
202    The triangle area is a fast heuristic to estimate the how curve
203    the left and right segment are.
204
205    Args:
206        pt (np.ndarray): numpy array with the points (x, y)
207        index (int): the index that separates the left from the right segment
208        distance_points (callable): methods that computes the distance between points
209
210    Returns:
211        tuple: the left and right triangle area
212    """
213    # compute the area of the triangles made from the farthest point
214    base_left = np.linalg.norm(pt[0]-pt[index])
215    pt_left = pt[0:index+1]
216    height_left = distance_points(pt_left, pt_left[0], pt_left[-1]).max()
217    left_tri_area = 0.5*base_left*height_left
218
219    base_right = np.linalg.norm(pt[index]-pt[-1])
220    pt_right = pt[index:]
221    height_right = distance_points(pt_right, pt_right[0], pt_right[-1]).max()
222    right_tri_area = 0.5*base_right*height_right
223
224    return left_tri_area, right_tri_area

Computes the triangle area of the left and right segments. The triangle area is a fast heuristic to estimate the how curve the left and right segment are.

Arguments:
  • pt (np.ndarray): numpy array with the points (x, y)
  • index (int): the index that separates the left from the right segment
  • distance_points (callable): methods that computes the distance between points
Returns:

tuple: the left and right triangle area

def order_area( pt: numpy.ndarray, index: int, distance_points: <built-in function callable>) -> tuple:
227def order_area(pt: np.ndarray, index:int, distance_points:callable) -> tuple:
228    """
229    Computes the area of the left and right segments.
230    The area is an heuristic to estimate the how curve
231    the left and right segment are.
232
233    Args:
234        pt (np.ndarray): numpy array with the points (x, y)
235        index (int): the index that separates the left from the right segment
236        distance_points (callable): methods that computes the distance between points
237
238    Returns:
239        tuple: the left and right area
240    """
241    # compute the area using the distance function
242    #pt_left = points[left:left+index+1]
243    pt_left = pt[0:index+1]
244    left_distance = distance_points(pt_left, pt_left[0], pt_left[-1])
245    
246    #pt_right = points[left+index:left+len(pt)]
247    pt_right = pt[index:]
248    right_distance = distance_points(pt_right, pt_right[0], pt_right[-1])
249
250    left_area = np.sum(left_distance)
251    right_area = np.sum(right_distance)
252    
253    return  left_area, right_area

Computes the area of the left and right segments. The area is an heuristic to estimate the how curve the left and right segment are.

Arguments:
  • pt (np.ndarray): numpy array with the points (x, y)
  • index (int): the index that separates the left from the right segment
  • distance_points (callable): methods that computes the distance between points
Returns:

tuple: the left and right area

def order_segment(pt: numpy.ndarray, index: int) -> tuple:
256def order_segment(pt: np.ndarray, index:int) -> tuple:
257    """
258    Computes the fitting error for the left and right segments.
259
260    Args:
261        pt (np.ndarray): numpy array with the points (x, y)
262        index (int): the index that separates the left from the right segment
263        
264    Returns:
265        tuple: the left and right fitting error
266    """
267    pt_left = pt[0:index+1]
268    left_cost = lf.linear_fit_residuals_points(pt_left)
269    
270    pt_right = pt[index:]
271    right_cost = lf.linear_fit_residuals_points(pt_right)
272    
273    return left_cost, right_cost

Computes the fitting error for the left and right segments.

Arguments:
  • pt (np.ndarray): numpy array with the points (x, y)
  • index (int): the index that separates the left from the right segment
Returns:

tuple: the left and right fitting error

def rdp_fixed( points: numpy.ndarray, length: int = 10, distance: Distance = <Distance.shortest: 'shortest'>, order: Order = <Order.segment: 'segment'>) -> tuple:
335def rdp_fixed(points: np.ndarray, length:int=10, distance: Distance = Distance.shortest, order:Order=Order.segment) -> tuple:
336    """
337    Ramer–Douglas–Peucker (RDP) algorithm.
338
339    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
340    This version reduces the number of points to a fixed length.
341
342    Args:
343        points (np.ndarray): numpy array with the points (x, y)
344        length (int): the fixed length of reduced points (default: 10)
345        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
346        order (Order): the metric used to sort the segments (default: Order.segment)
347
348    Returns:
349        tuple: the index of the reduced space, the points that were removed
350    """
351    stack = [(0, 0, len(points))]
352    reduced = [0, len(points)-1]
353
354    length -= 2
355
356    # select the distance metric to be used
357    distance_points = None
358    if distance is Distance.shortest:
359        distance_points = lf.shortest_distance_points
360    elif distance is Distance.perpendicular:
361        distance_points = lf.perpendicular_distance_points
362    else:
363        distance_points = lf.shortest_distance_points
364
365    reduced = _rdp_fixed(points, length, distance_points, order, stack, reduced)
366    reduced = np.array(reduced)
367    return reduced, compute_removed_points(points, reduced)

Ramer–Douglas–Peucker (RDP) algorithm.

Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points. This version reduces the number of points to a fixed length.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • length (int): the fixed length of reduced points (default: 10)
  • distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
  • order (Order): the metric used to sort the segments (default: Order.segment)
Returns:

tuple: the index of the reduced space, the points that were removed

def plot_frame(points, reduced, t):
370def plot_frame(points, reduced, t):
371    fig = plt.figure(figsize=(6, 6))
372    x = points[:, 0]
373    y = points[:, 1]
374    plt.plot(x, y)
375    points_reduced = points[reduced]
376    x = points_reduced[:, 0]
377    y = points_reduced[:, 1]
378    plt.plot(x, y, marker='o', markersize=3)
379    plt.savefig(f'./img/img_{t}.png', transparent = False,  facecolor = 'white')
380    print(f'{t}')
381    plt.close()
def grdp( points: numpy.ndarray, t: float = 0.01, distance: Distance = <Distance.shortest: 'shortest'>, cost: kneeliverse.metrics.Metrics = <Metrics.smape: 'smape'>, order: Order = <Order.segment: 'segment'>) -> tuple:
458def grdp(points: np.ndarray, t: float = 0.01, distance: Distance = Distance.shortest, 
459cost: metrics.Metrics = metrics.Metrics.smape, order:Order=Order.segment) -> tuple:
460    """
461    Global Ramer–Douglas–Peucker (RDP) algorithm.
462
463    Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points.
464    This version computes the global cost of reconstruction (instead of the cost of the current segment).
465    It uses a cache to keep the cost of the segments that are not being explored right now.
466    The exploration is based on the segment that has an overall higher reconstruction error.
467
468    Args:
469        points (np.ndarray): numpy array with the points (x, y)
470        t (float): the coefficient of determination threshold (default 0.01)
471        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
472        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
473        order (Order): the metric used to sort the segments (default: Order.segment)
474       
475    Returns:
476        tuple: the index of the reduced space, the points that were removed
477    """
478
479    stack = [(0, 0, len(points))]
480    reduced = [0, len(points)-1]
481    
482    # select the distance metric to be used
483    distance_points = None
484    if distance is Distance.shortest:
485        distance_points = lf.shortest_distance_points
486    elif distance is Distance.perpendicular:
487        distance_points = lf.perpendicular_distance_points
488    else:
489        distance_points = lf.shortest_distance_points
490
491    reduced, _ = _grdp(points, t, cost, order, distance_points, stack, reduced)
492    reduced = np.array(reduced)
493    return reduced, compute_removed_points(points, reduced)

Global Ramer–Douglas–Peucker (RDP) algorithm.

Is an algorithm that decimates a curve composed of line segments to a similar curve with fewer points. This version computes the global cost of reconstruction (instead of the cost of the current segment). It uses a cache to keep the cost of the segments that are not being explored right now. The exploration is based on the segment that has an overall higher reconstruction error.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • t (float): the coefficient of determination threshold (default 0.01)
  • distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
  • cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
  • order (Order): the metric used to sort the segments (default: Order.segment)
Returns:

tuple: the index of the reduced space, the points that were removed

def mp_grdp( points: numpy.ndarray, t: float = 0.01, min_points: int = 10, distance: Distance = <Distance.shortest: 'shortest'>, cost: kneeliverse.metrics.Metrics = <Metrics.smape: 'smape'>, order: Order = <Order.segment: 'segment'>) -> tuple:
496def mp_grdp(points: np.ndarray, t: float = 0.01, min_points:int = 10, distance: Distance = Distance.shortest,
497cost: metrics.Metrics = metrics.Metrics.smape, order:Order=Order.segment) -> tuple:
498    """
499    MP gRDP (Min Points Global RDP)
500
501    This version computes the gRDP with the given threshold.
502    At the end, if the minimum number of points constraint is not satisfied
503    it executes the rdp_fixed version.
504
505    This method stores the reduced set and stack of the gRDP, meaning that
506    the rdp_fixed method continues from the previous point onwards.
507
508    Args:
509        points (np.ndarray): numpy array with the points (x, y)
510        t (float): the coefficient of determination threshold (default 0.01)
511        min_points (int): the minimal amount of points (default 10)
512        distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
513        cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
514        order (Order): the metric used to sort the segments (default: Order.segment)
515    
516    Returns:
517        tuple: the index of the reduced space, the points that were removed
518    """
519
520    stack = [(0, 0, len(points))]
521    reduced = [0, len(points)-1]
522    
523    # select the distance metric to be used
524    distance_points = None
525    if distance is Distance.shortest:
526        distance_points = lf.shortest_distance_points
527    elif distance is Distance.perpendicular:
528        distance_points = lf.perpendicular_distance_points
529    else:
530        distance_points = lf.shortest_distance_points
531
532    reduced, stack = _grdp(points, t, cost, order, distance_points, stack, reduced)
533
534    if len(reduced) >= min_points:
535        reduced = np.array(reduced)
536        return reduced, compute_removed_points(points, reduced)
537    else:
538        length = min_points - len(reduced)
539        reduced = _rdp_fixed(points, length, distance_points, order, stack, reduced)
540        reduced = np.array(reduced)
541        return reduced, compute_removed_points(points, reduced)

MP gRDP (Min Points Global RDP)

This version computes the gRDP with the given threshold. At the end, if the minimum number of points constraint is not satisfied it executes the rdp_fixed version.

This method stores the reduced set and stack of the gRDP, meaning that the rdp_fixed method continues from the previous point onwards.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • t (float): the coefficient of determination threshold (default 0.01)
  • min_points (int): the minimal amount of points (default 10)
  • distance (Distance): the distance metric used to decide the split point (default: Distance.shortest)
  • cost (metrics.Metrics): the cost method used to evaluate a point set (default: metrics.Metrics.smape)
  • order (Order): the metric used to sort the segments (default: Order.segment)
Returns:

tuple: the index of the reduced space, the points that were removed

def min_point_rdp( points: numpy.ndarray, t: list = [0.01, 0.001, 0.0001], min_points: int = 10) -> tuple:
544def min_point_rdp(points: np.ndarray, t:list=[0.01, 0.001, 0.0001], min_points:int=10) -> tuple:
545    """
546    Minimal points RDP.
547
548    Given a minimal amount of points, this version runs Global RDP with different threshold 
549    values. The thresholds are sorted in decreasing order.
550    The method returns as soon as a threshold value returns a minimal amount of points.
551    If necessary the method will execute the fixed version of RDP to get the exact amount of points.
552
553    Args:
554        points (np.ndarray): numpy array with the points (x, y)
555        t (float): a list of coefficient of determination thresholds (default [0.01, 0.001, 0.0001])
556        min_points (int): the minimal amount of points (default 10)
557
558    Returns:
559        tuple: the index of the reduced space, the points that were removed
560    """    
561    
562    # sort the threshold in 
563    t.sort(reverse=True)
564
565    for current_t in t:
566        reduced, removed = grdp(points, t=current_t)
567        if len(reduced) >= min_points:
568            return reduced, removed
569    
570    return rdp_fixed(points, min_points)

Minimal points RDP.

Given a minimal amount of points, this version runs Global RDP with different threshold values. The thresholds are sorted in decreasing order. The method returns as soon as a threshold value returns a minimal amount of points. If necessary the method will execute the fixed version of RDP to get the exact amount of points.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • t (float): a list of coefficient of determination thresholds (default [0.01, 0.001, 0.0001])
  • min_points (int): the minimal amount of points (default 10)
Returns:

tuple: the index of the reduced space, the points that were removed