kneeliverse.evaluation

The following module provides a set of methods used for evaluation knee detection algorithms.

  1# coding: utf-8
  2
  3'''
  4The following module provides a set of methods
  5used for evaluation knee detection algorithms.
  6'''
  7
  8__author__ = 'Mário Antunes'
  9__version__ = '1.0'
 10__email__ = 'mario.antunes@ua.pt'
 11__status__ = 'Development'
 12__license__ = 'MIT'
 13__copyright__ = '''
 14Copyright (c) 2021-2023 Stony Brook University
 15Copyright (c) 2021-2023 The Research Foundation of SUNY
 16'''
 17
 18import enum
 19import math
 20import logging
 21import numpy as np
 22import kneeliverse.linear_fit as lf
 23import kneeliverse.metrics as metrics
 24
 25
 26logger = logging.getLogger(__name__)
 27
 28
 29class Strategy(enum.Enum):
 30    """
 31    Enum data type that represents the strategy of MAE, MSE and RMSE
 32    """
 33    knees = 'knees'
 34    expected = 'expected'
 35    best = 'best'
 36    worst = 'worst'
 37
 38    def __str__(self):
 39        return self.value
 40
 41
 42def get_neighbourhood_points(points: np.ndarray, a: int, b: int, t: float) -> tuple:
 43    """
 44    Get the neighbourhood (closest points) from a to b.
 45
 46    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
 47
 48    Args:
 49        points (np.ndarray): numpy array with the points (x, y)
 50        a (int): the initial point of the search
 51        b (int): the left limit of the search
 52        t (float): $R^2$ threshold
 53
 54    Returns:
 55        tuple: (neighbourhood index, r2, slope)
 56    """
 57
 58    x = points[:, 0]
 59    y = points[:, 1]
 60    return get_neighbourhood(x, y, a, b, t)
 61
 62
 63def get_neighbourhood_fast_points(points: np.ndarray, a: int, b: int, t: float) -> tuple:
 64    """
 65    Get the neighbourhood (closest points) from a to b.
 66
 67    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
 68    This version uses a inaccurate binary search to speedup the search.
 69
 70    Args:
 71        points (np.ndarray): numpy array with the points (x, y)
 72        a (int): the initial point of the search
 73        b (int): the left limit of the search
 74        t (float): $R^2$ threshold
 75
 76    Returns:
 77        tuple: (neighbourhood index, r2, slope)
 78    """
 79
 80    x = points[:, 0]
 81    y = points[:, 1]
 82    return get_neighbourhood_fast(x, y, a, b, t)
 83
 84
 85def get_neighbourhood_binary(x: np.ndarray, y: np.ndarray, a: int, b: int, t=0.9) -> int:
 86    """
 87    Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold.
 88
 89    This version uses a inaccurate binary search to speedup the search.
 90
 91    Args:
 92        x (np.ndarray): the value of the points in the x axis coordinates
 93        y (np.ndarray): the value of the points in the y axis coordinates
 94        a (int): the initial point of the search
 95        b (int): the left limit of the search
 96        t (float): $R^2$ threshold (default 0.9)
 97
 98    Returns:
 99        int: index of the point
100    """
101
102    i = b
103    right = a
104
105    while abs(i-right) > 1:
106        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
107        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
108
109        if r2 < t:
110            i = int((i+right)/2.0)
111        else:
112            right = i
113            i = int((b+right)/2.0)
114
115    return i
116
117
118def get_neighbourhood_fast(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
119    """
120    Get the neighbourhood (closest points) from a to b.
121
122    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
123    This version uses a inaccurate binary search to speedup the search.
124
125    Args:
126        x (np.ndarray): the value of the points in the x axis coordinates
127        y (np.ndarray): the value of the points in the y axis coordinates
128        a (int): the initial point of the search
129        b (int): the left limit of the search
130        t (float): $R^2$ threshold (default 0.9)
131
132    Returns:
133        tuple: (neighbourhood index, r2, slope)
134    """
135    # speedup when the search using an inaccurate binary search
136    i = get_neighbourhood_binary(x, y, a, b, t)
137    b, slope = lf.linear_fit(x[i:a+1], y[i:a+1])
138    r2 = lf.linear_r2(x[i:a+1], y[i:a+1], (b, slope))
139    previous_res = (i, r2, slope)
140
141    # Linear search to improve accuracy
142    while r2 < t and i < a:
143        i += 1
144        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
145        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
146        _, slope = coef
147        previous_res = (i, r2, slope)
148
149    return previous_res
150
151
152def get_neighbourhood(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
153    """
154    Get the neighbourhood (closest points) from a to b.
155
156    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
157
158    Args:
159        x (np.ndarray): the value of the points in the x axis coordinates
160        y (np.ndarray): the value of the points in the y axis coordinates
161        a (int): the initial point of the search
162        b (int): the left limit of the search
163        t (float): $R^2$ threshold (default 0.9)
164
165    Returns:
166        tuple: (neighbourhood index, r2, slope)
167    """
168
169    r2 = 1.0
170    i = a - 1
171    _, slope = lf.linear_fit(x[i:a+1], y[i:a+1])
172
173    while r2 > t and i > b:
174        # print('.')
175        previous_res = (i, r2, slope)
176        i -= 1
177        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
178        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
179        _, slope = coef
180        #print(f'{i} -> {r2}')
181
182    if r2 > t:
183        return i, r2, slope
184    else:
185        return previous_res
186
187
188def accuracy_knee(points: np.ndarray, knees: np.ndarray, t: float = 0.9) -> tuple:
189    """Compute the accuracy heuristic for a set of knees.
190
191    The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$.
192    In this version it is used the left neighbourhood of the knee.
193
194    Args:
195        points (np.ndarray): numpy array with the points (x, y)
196        knees (np.ndarray): knees indexes
197        t (float): $R^2$ threshold (default 0.9)
198
199    Returns:
200        tuple: (average_x, average_y, average_slope, average_coeffients, cost)
201    """
202    x = points[:, 0]
203    y = points[:, 1]
204
205    total_x = math.fabs(x[-1] - x[0])
206    total_y = math.fabs(y[-1] - y[0])
207
208    distances_x = []
209    distances_y = []
210    slopes = []
211    coeffients = []
212
213    previous_knee = 0
214    for i in range(len(knees)):
215        idx, r2, slope = get_neighbourhood_fast(x, y, knees[i], previous_knee)
216
217        delta_x = x[idx] - x[knees[i]]
218        delta_y = y[idx] - y[knees[i]]
219
220        distances_x.append(math.fabs(delta_x))
221        distances_y.append(math.fabs(delta_y))
222        slopes.append(math.fabs(slope))
223        coeffients.append(r2)
224
225        previous_knee = knees[i]
226
227    slopes = np.array(slopes)
228    slopes = slopes/slopes.max()
229
230    coeffients = np.array(coeffients)
231    coeffients = coeffients/coeffients.max()
232
233    distances_x = np.array(distances_x)/total_x
234    distances_y = np.array(distances_y)/total_y
235    average_x = np.average(distances_x)
236    average_y = np.average(distances_y)
237
238    average_slope = np.average(slopes)
239    average_coeffients = np.average(coeffients)
240
241    #p = slopes * distances_y * coeffients
242    p = slopes * distances_y
243    #cost = (average_x * average_y) / (average_slope)
244    cost = average_x / np.average(p)
245
246    return average_x, average_y, average_slope, average_coeffients, cost
247
248
249def accuracy_trace(points: np.ndarray, knees: np.ndarray) -> tuple:
250    """Compute the accuracy heuristic for a set of knees.
251
252    The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$.
253    In this version it is used the points from the current knee to the previous.
254
255    Args:
256        points (np.ndarray): numpy array with the points (x, y)
257        knees (np.ndarray): knees indexes
258
259    Returns:
260        tuple: (average_x, average_y, average_slope, average_coeffients, cost)
261    """
262    x = points[:, 0]
263    y = points[:, 1]
264
265    distances_x = []
266    distances_y = []
267    slopes = []
268    coeffients = []
269
270    total_x = math.fabs(x[-1] - x[0])
271    total_y = math.fabs(y[-1] - y[0])
272
273    previous_knee_x = x[knees[0]]
274    previous_knee_y = y[knees[0]]
275
276    delta_x = x[0] - previous_knee_x
277    delta_y = y[0] - previous_knee_y
278    distances_x.append(math.fabs(delta_x))
279    distances_y.append(math.fabs(delta_y))
280
281    coef = lf.linear_fit(x[0:knees[0]+1], y[0:knees[0]+1])
282    r2 = lf.linear_r2(x[0:knees[0]+1], y[0:knees[0]+1], coef)
283    coeffients.append(r2)
284    _, slope = coef
285    slopes.append(math.fabs(slope))
286
287    for i in range(1, len(knees)):
288        knee_x = x[knees[i]]
289        knee_y = y[knees[i]]
290
291        delta_x = previous_knee_x - knee_x
292        delta_y = previous_knee_y - knee_y
293
294        coef = lf.linear_fit(x[knees[i-1]:knees[i]+1],
295                             y[knees[i-1]:knees[i]+1])
296        r2 = lf.linear_r2(x[knees[i-1]:knees[i]+1],
297                          y[knees[i-1]:knees[i]+1], coef)
298
299        distances_x.append(math.fabs(delta_x))
300        distances_y.append(math.fabs(delta_y))
301        _, slope = coef
302        slopes.append(math.fabs(slope))
303        coeffients.append(r2)
304
305        previous_knee_x = knee_x
306        previous_knee_y = knee_y
307
308    distances_x = np.array(distances_x)/total_x
309    distances_y = np.array(distances_y)/total_y
310    slopes = np.array(slopes)
311    slopes = slopes/slopes.max()
312
313    coeffients = np.array(coeffients)
314    coeffients = coeffients/coeffients.max()
315
316    coeffients[coeffients < 0] = 0.0
317    p = slopes * distances_y * coeffients
318    #p = slopes * distances_y
319
320    average_x = np.average(distances_x)
321    average_y = np.average(distances_y)
322    average_slope = np.average(slopes)
323    average_coeffients = np.average(coeffients)
324
325    cost = average_x / np.average(p)
326
327    return average_x, average_y, average_slope, average_coeffients, cost
328
329
330def mae(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
331    """
332    Estimates the worst case Mean Absolute Error (MAE) for the given
333    knee and expected points.
334
335    Suppports different size arrays, and estimates the MAE based 
336    on the worst case.
337    It uses the euclidean distance to find the closer points,
338    and computes the error based on the closest point.
339
340    Args:
341        points (np.ndarray): numpy array with the points (x, y)
342        knees (np.ndarray): knees indexes
343        expected (np.ndarray): numpy array with the expected knee points (x, y)
344        s (Strategy): enum that controls the point matching (default Strategy.expected)
345
346    Returns:
347        float: the worst case MAE
348    """
349    # get the knee points
350    knee_points = points[knees]
351
352    error = 0.0
353
354    if s is Strategy.knees:
355        a = knee_points
356        b = expected
357    elif s is Strategy.expected:
358        a = expected
359        b = knee_points
360    elif s is Strategy.best:
361        if len(expected) <= len(knee_points):
362            a = expected
363            b = knee_points
364        else:
365            a = knee_points
366            b = expected
367    else:
368        if len(expected) >= len(knee_points):
369            a = expected
370            b = knee_points
371        else:
372            a = knee_points
373            b = expected
374
375    for p in a:
376        distances = np.linalg.norm(b-p, axis=1)
377        idx = np.argmin(distances)
378        error += np.sum(np.abs(p-b[idx]))
379
380    return error / (len(a)*2.0)
381
382
383def mse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
384    """
385    Estimates the worst case Mean Squared Error (MSE) for the given
386    knee and expected points.
387
388    Suppports different size arrays, and estimates the MSE based 
389    on the worst case.
390    It uses the euclidean distance to find the closer points,
391    and computes the error based on the closest point.
392
393    Args:
394        points (np.ndarray): numpy array with the points (x, y)
395        knees (np.ndarray): knees indexes
396        expected (np.ndarray): numpy array with the expected knee points (x, y)
397        s (Strategy): enum that controls the point matching (default Strategy.expected)
398
399    Returns:
400        float: the worst case MSE
401    """
402    # get the knee points
403    knee_points = points[knees]
404
405    error = 0.0
406
407    if s is Strategy.knees:
408        a = knee_points
409        b = expected
410    elif s is Strategy.expected:
411        a = expected
412        b = knee_points
413    elif s is Strategy.best:
414        if len(expected) <= len(knee_points):
415            a = expected
416            b = knee_points
417        else:
418            a = knee_points
419            b = expected
420    else:
421        if len(expected) >= len(knee_points):
422            a = expected
423            b = knee_points
424        else:
425            a = knee_points
426            b = expected
427
428    for p in a:
429        distances = np.linalg.norm(b-p, axis=1)
430        idx = np.argmin(distances)
431        error += np.sum(np.square(p-b[idx]))
432
433    return error / (len(a)*2.0)
434
435
436def rmse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
437    """
438    Estimates the worst case Root Mean Squared Error (RMSE) for the given
439    knee and expected points.
440
441    Suppports different size arrays, and estimates the RMSE based 
442    on the worst case.
443    It uses the euclidean distance to find the closer points,
444    and computes the error based on the closest point.
445
446    Args:
447        points (np.ndarray): numpy array with the points (x, y)
448        knees (np.ndarray): knees indexes
449        expected (np.ndarray): numpy array with the expected knee points (x, y)
450        s (Strategy): enum that controls the point matching (default Strategy.expected)
451
452    Returns:
453        float: the worst case RMSE
454    """
455    return math.sqrt(mse(points, knees, expected, s))
456
457
458def rmspe(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected, eps: float = 1e-16) -> float:
459    """
460    Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points.
461
462    Suppports different size arrays, and estimates the RMSPE based on the worst case.
463    It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
464
465    Args:
466        points (np.ndarray): numpy array with the points (x, y)
467        knees (np.ndarray): knees indexes
468        expected (np.ndarray): numpy array with the expected knee points (x, y)
469        s (Strategy): enum that controls the point matching (default Strategy.expected)
470        eps (float): eps value to prevent division by zero (default: 1E-16)
471
472    Returns:
473        float: the worst case RMSPE
474    """
475    # get the knee points
476    knee_points = points[knees]
477
478    if s is Strategy.knees:
479        a = knee_points
480        b = expected
481    elif s is Strategy.expected:
482        a = expected
483        b = knee_points
484    elif s is Strategy.best:
485        if len(expected) <= len(knee_points):
486            a = expected
487            b = knee_points
488        else:
489            a = knee_points
490            b = expected
491    else:
492        if len(expected) >= len(knee_points):
493            a = expected
494            b = knee_points
495        else:
496            a = knee_points
497            b = expected
498
499    errors = []
500
501    for p in a:
502        distances = np.linalg.norm(b-p, axis=1)
503        idx = np.argmin(distances)
504        e = (p - b[idx]) / (p + eps)
505        errors.extend(e)
506    errors = np.array(errors)
507
508    return np.sqrt(np.mean(np.square(errors)))
509
510
511def cm(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, t: float = 0.01) -> np.ndarray:
512    """
513    Computes the Confusion Matrix based on the knees and expected points.
514
515    Args:
516        points (np.ndarray): numpy array with the points (x, y)
517        knees (np.ndarray): knees indexes
518        expected (np.ndarray): numpy array with the expected knee points (x, y)
519        t (float): the maximum allowed distance in percentage (default 0.01)
520
521    Returns:
522        np.ndarray: the confusion matrix
523    """
524
525    #dx = math.fabs(points[-1][0] - points[0][0])
526    max_x, _ = points.max(axis=0)
527    min_x, _ = points.min(axis=0)
528    dx = math.fabs(max_x - min_x)
529    used_knees = []
530    knees_points_x = points[knees][:, 0]
531    tp = fn = fp = tn = 0
532    for px, _ in expected:
533        distances = np.fabs(knees_points_x - px)/dx
534        idx = np.argmin(distances)
535        #logger.info(f"{px} / {idx} / {distances[idx]} = {distances[idx] <= t}")
536        if distances[idx] <= t and idx not in used_knees:
537            tp += 1
538            used_knees.append(idx)
539        else:
540            fn += 1
541    fp = max(len(knees) - tp, 0)
542    tn = len(points) - (tp+fp+fn)
543
544    return np.array([[tp, fp], [fn, tn]])
545
546
547def accuracy(cm: np.ndarray) -> float:
548    """
549    Computes accuracy based on a Confusion Matrix.
550
551    Args:
552        cm (np.ndarray): the confusion matrix
553
554    Returns:
555        float: the accuracy
556    """
557
558    tp, fp = cm[0]
559    fn, tn = cm[1]
560
561    return (tp+tn)/(tp+tn+fp+fn)
562
563
564def f1score(cm: np.ndarray) -> float:
565    """
566    Computes F1-Score based on a Confusion Matrix.
567
568    Args:
569        cm (np.ndarray): the confusion matrix
570
571    Returns:
572        float: the F1-Score
573    """
574
575    tp, fp = cm[0]
576    fn, _ = cm[1]
577
578    return (2.0*tp)/(2*tp+fp+fn)
579
580
581def mcc(cm: np.ndarray) -> float:
582    """
583    Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix.
584
585    Args:
586        cm (np.ndarray): the confusion matrix
587
588    Returns:
589        float: the mcc
590    """
591
592    tp, fp = cm[0]
593    fn, tn = cm[1]
594
595    n = tp*tn - fp*fn
596    d = math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))
597
598    return n/d
599
600
601def compute_global_rmse(points: np.ndarray, reduced: np.ndarray, cache:dict=None) -> float:
602    """
603    Computes the global RMSE for a point reduction set.
604
605    For each segment within the point reduction set, computes a straight line.
606    For each line segment it computes the error with the original set.
607 
608    Args:
609        points (np.ndarray): the original points
610        reduced (np.ndarray): the indexes of the reduced points
611        cache (dict): cache used to speedup the computation (default None)
612
613    Returns:
614        float: the global RMSE
615    """
616    # Setup the cache
617    if cache is None: cache = {}
618
619    segment_errors = np.zeros(len(reduced)-1)
620
621    left = reduced[0]
622    for i in range(1, len(reduced)):
623        right = reduced[i]
624
625        # Get data from cache
626        if (left, right) not in cache:
627            pt = points[left:right+1]
628            coef = lf.linear_fit_points(pt)
629            y_hat = lf.linear_transform_points(pt, coef)
630            y = pt[:,1]
631            segment_error = np.sum(np.square((y-y_hat)))
632            cache[(left, right)] = segment_error
633        
634        segment_errors[i-1] = cache[(left, right)]
635        left = right
636
637    # compute the cost function
638    return math.sqrt(np.sum(segment_errors)/len(points))
639
640
641def mip(points: np.ndarray, reduced: np.ndarray) -> tuple:
642    """
643    Computes the median improvement per point (MIP).
644
645    Improvement is defined as a relation between a final cost against a reference cost.
646    The final cost is the RSME with the complete reduced set.
647    The reference cost id different for each point.
648    It is computed as the RMSE of the reconstruction without that specific point. 
649
650    Args:
651        points (np.ndarray): the original points
652        reduced (np.ndarray): the indexes of the reduced points
653
654    Returns:
655        tuple: the median improvement per point (MIP) and the MAD
656    """
657    # Setup the cache
658    cache = {}
659
660    ip = np.zeros(len(reduced)-2)
661
662    # Compute the final RSME
663    cost_fin = compute_global_rmse(points, reduced, cache)
664
665    for i in range(1, len(reduced)-1):
666        # Compute the reference RMSE
667        cost_ref = compute_global_rmse(points, np.delete(reduced, i), cache)
668        ip[i-1] = cost_ref - cost_fin
669
670    mip = np.median(ip)
671
672    return mip, np.median(np.absolute(ip - mip))
673
674
675def compute_cost(points: np.ndarray, segment_errors:np.ndarray, cost:metrics.Metrics, cache:dict) -> float:
676    """
677    Compute the cost of multi-point fitting using the segment errors.
678    
679    It uses a cache to speedup the compuation of the cost.
680    The cache contains the cost of previously computed segments.
681
682    Args:
683        points (np.ndarray): the original points
684        segment_errors (np.ndarray): errors from each of the segments
685        cost (metrics.Metrics): the metric used for the cost calculation
686        cache (dict): the cache used to store the segments costs
687
688    Returns:
689        float: the global cost of a multi-point fitting
690    """
691    # methods = {metrics.Metrics.r2: metrics.r2,metrics.Metrics.rpd: metrics.rpd,metrics.Metrics.rmsle: metrics.rmsle,metrics.Metrics.rmspe: metrics.rmspe,metrics.Metrics.smape: metrics.smape}
692    # cost = methods[cost](np.array(y), np.array(y_hat))
693
694    # We have to account for the head of each segment
695    total = len(points) + len(segment_errors) - 1
696
697    if cost is metrics.Metrics.r2:
698        # Get tss from the cache
699        if 'tss' not in cache:
700            # Compute TSS and add it to cache (single operation)
701            y = points[:,1]
702            y_mean = np.mean(y)
703            tss = np.sum(np.square(y-y_mean))
704            cache['tss'] = tss
705        tss = cache['tss']
706        rss = np.sum(segment_errors)
707        cost = 1.0 - rss if tss == 0 else 1.0 - (rss/tss)
708        #return np.sum(np.square(y-y_hat))
709    elif cost is metrics.Metrics.rmsle:
710        #return np.sum(np.square((np.log(y+1) - np.log(y_hat+1))))
711        cost = math.sqrt(np.sum(segment_errors)/total)
712    elif cost is metrics.Metrics.rmspe:
713        #return np.sum(np.square((y - y_hat) / (y+eps)))
714        cost = math.sqrt(np.sum(segment_errors)/total)
715    elif cost is metrics.Metrics.rpd:
716        #return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps)))
717        cost = np.sum(segment_errors)/total
718    else:
719        #return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps))
720        cost = np.sum(segment_errors)/total
721
722    cost = 0 if cost < 0 else cost
723
724    return cost
725
726
727def compute_partial_cost(y:np.ndarray, y_hat:np.ndarray, cost: metrics.Metrics, eps: float = 1e-16) -> float:
728    """
729    Compute the partial cost of a multi-point fitting.
730
731    Args:
732        y (np.ndarray): the y values from the points
733        y_hat (np.ndarray): the y values computed with the linear aproximation
734        cost (metrics.Metrics): the metric used for the cost calculation
735        eps (float): eps value to prevent division by zero (default: 1E-16)
736
737    Returns:
738        float: the global cost of a multi-point fitting
739    """
740    if cost is metrics.Metrics.r2:
741        return np.sum(np.square(y-y_hat))
742    elif cost is metrics.Metrics.rmsle:
743        return np.sum(np.square((np.log(y+1) - np.log(y_hat+1))))
744    elif cost is metrics.Metrics.rmspe:
745        return np.sum(np.square((y - y_hat) / (y+eps)))
746    elif cost is metrics.Metrics.rpd:
747        return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps)))
748    else:
749        return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps))
750
751
752def compute_global_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd, cache:dict=None) -> float:
753    """
754    Compute the cost of multi-point fitting using the segment errors.
755    
756    It uses a cache to speedup the compuation of the cost.
757    The cache contains the cost of previously computed segments.
758
759    Args:
760        points (np.ndarray): the original points
761        reduced (np.ndarray): the reduced set of points used for the fitting
762        cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
763        cache (dict): the cache used to store the segments costs (default: None)
764
765    Returns:
766        float: the global cost of a multi-point fitting
767    """
768    # Setup the cache
769    if cache is None: cache = {}
770
771    segment_errors = np.zeros(len(reduced)-1)
772
773    left = reduced[0]
774    for i in range(1, len(reduced)):
775        right = reduced[i]
776
777        # Get data from cache
778        if (left, right) not in cache:
779            pt = points[left:right+1]
780            if len(pt) <= 2:
781                cache[(left, right)] = 0
782            else:
783                y_hat = lf.linear_fit_transform_points(pt)
784                y = pt[:,1]
785                segment_error = compute_partial_cost(y, y_hat, cost)
786                cache[(left, right)] = segment_error
787        
788        segment_errors[i-1] = cache[(left, right)]
789        left = right
790
791    return compute_cost(points, segment_errors, cost, cache)
792
793
794def compute_global_segment_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd) -> tuple:
795    """
796    Compute the cost of multi-point fitting using the segment errors.
797    
798    Legacy function that does not use a cache or combines the segments costs.
799    Used for unit testing.
800
801    Args:
802        points (np.ndarray): the original points
803        reduced (np.ndarray): the reduced set of points used for the fitting
804        cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
805
806    Returns:
807        tuple: the global cost, and partial costs
808    """
809    y, y_hat = [], []
810
811    cost_segment = []
812
813    left = reduced[0]
814    for i in range(1, len(reduced)):
815        right = reduced[i]
816        pt = points[left:right+1]
817        
818        y_temp, y_hat_temp = lf.linear_fit_transform_points(pt)
819        
820        y_hat.extend(y_hat_temp)
821        y.extend(y_temp)
822
823        # compute the cost function
824        #c = compute_cost(y_temp, y_hat_temp, cost)
825        c = lf.linear_hv_residuals_points(pt) 
826        cost_segment.append(c)
827
828        left = right
829
830    # compute the cost function
831    return compute_cost(y, y_hat, cost), np.array(cost_segment)
logger = <Logger kneeliverse.evaluation (WARNING)>
class Strategy(enum.Enum):
30class Strategy(enum.Enum):
31    """
32    Enum data type that represents the strategy of MAE, MSE and RMSE
33    """
34    knees = 'knees'
35    expected = 'expected'
36    best = 'best'
37    worst = 'worst'
38
39    def __str__(self):
40        return self.value

Enum data type that represents the strategy of MAE, MSE and RMSE

knees = <Strategy.knees: 'knees'>
expected = <Strategy.expected: 'expected'>
best = <Strategy.best: 'best'>
worst = <Strategy.worst: 'worst'>
Inherited Members
enum.Enum
name
value
def get_neighbourhood_points(points: numpy.ndarray, a: int, b: int, t: float) -> tuple:
43def get_neighbourhood_points(points: np.ndarray, a: int, b: int, t: float) -> tuple:
44    """
45    Get the neighbourhood (closest points) from a to b.
46
47    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
48
49    Args:
50        points (np.ndarray): numpy array with the points (x, y)
51        a (int): the initial point of the search
52        b (int): the left limit of the search
53        t (float): $R^2$ threshold
54
55    Returns:
56        tuple: (neighbourhood index, r2, slope)
57    """
58
59    x = points[:, 0]
60    y = points[:, 1]
61    return get_neighbourhood(x, y, a, b, t)

Get the neighbourhood (closest points) from a to b.

The neighbourhood is defined as the longest straitgh line (defined by $R^2$).

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • a (int): the initial point of the search
  • b (int): the left limit of the search
  • t (float): $R^2$ threshold
Returns:

tuple: (neighbourhood index, r2, slope)

def get_neighbourhood_fast_points(points: numpy.ndarray, a: int, b: int, t: float) -> tuple:
64def get_neighbourhood_fast_points(points: np.ndarray, a: int, b: int, t: float) -> tuple:
65    """
66    Get the neighbourhood (closest points) from a to b.
67
68    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
69    This version uses a inaccurate binary search to speedup the search.
70
71    Args:
72        points (np.ndarray): numpy array with the points (x, y)
73        a (int): the initial point of the search
74        b (int): the left limit of the search
75        t (float): $R^2$ threshold
76
77    Returns:
78        tuple: (neighbourhood index, r2, slope)
79    """
80
81    x = points[:, 0]
82    y = points[:, 1]
83    return get_neighbourhood_fast(x, y, a, b, t)

Get the neighbourhood (closest points) from a to b.

The neighbourhood is defined as the longest straitgh line (defined by $R^2$). This version uses a inaccurate binary search to speedup the search.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • a (int): the initial point of the search
  • b (int): the left limit of the search
  • t (float): $R^2$ threshold
Returns:

tuple: (neighbourhood index, r2, slope)

def get_neighbourhood_binary(x: numpy.ndarray, y: numpy.ndarray, a: int, b: int, t=0.9) -> int:
 86def get_neighbourhood_binary(x: np.ndarray, y: np.ndarray, a: int, b: int, t=0.9) -> int:
 87    """
 88    Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold.
 89
 90    This version uses a inaccurate binary search to speedup the search.
 91
 92    Args:
 93        x (np.ndarray): the value of the points in the x axis coordinates
 94        y (np.ndarray): the value of the points in the y axis coordinates
 95        a (int): the initial point of the search
 96        b (int): the left limit of the search
 97        t (float): $R^2$ threshold (default 0.9)
 98
 99    Returns:
100        int: index of the point
101    """
102
103    i = b
104    right = a
105
106    while abs(i-right) > 1:
107        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
108        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
109
110        if r2 < t:
111            i = int((i+right)/2.0)
112        else:
113            right = i
114            i = int((b+right)/2.0)
115
116    return i

Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold.

This version uses a inaccurate binary search to speedup the search.

Arguments:
  • x (np.ndarray): the value of the points in the x axis coordinates
  • y (np.ndarray): the value of the points in the y axis coordinates
  • a (int): the initial point of the search
  • b (int): the left limit of the search
  • t (float): $R^2$ threshold (default 0.9)
Returns:

int: index of the point

def get_neighbourhood_fast( x: numpy.ndarray, y: numpy.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
119def get_neighbourhood_fast(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
120    """
121    Get the neighbourhood (closest points) from a to b.
122
123    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
124    This version uses a inaccurate binary search to speedup the search.
125
126    Args:
127        x (np.ndarray): the value of the points in the x axis coordinates
128        y (np.ndarray): the value of the points in the y axis coordinates
129        a (int): the initial point of the search
130        b (int): the left limit of the search
131        t (float): $R^2$ threshold (default 0.9)
132
133    Returns:
134        tuple: (neighbourhood index, r2, slope)
135    """
136    # speedup when the search using an inaccurate binary search
137    i = get_neighbourhood_binary(x, y, a, b, t)
138    b, slope = lf.linear_fit(x[i:a+1], y[i:a+1])
139    r2 = lf.linear_r2(x[i:a+1], y[i:a+1], (b, slope))
140    previous_res = (i, r2, slope)
141
142    # Linear search to improve accuracy
143    while r2 < t and i < a:
144        i += 1
145        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
146        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
147        _, slope = coef
148        previous_res = (i, r2, slope)
149
150    return previous_res

Get the neighbourhood (closest points) from a to b.

The neighbourhood is defined as the longest straitgh line (defined by $R^2$). This version uses a inaccurate binary search to speedup the search.

Arguments:
  • x (np.ndarray): the value of the points in the x axis coordinates
  • y (np.ndarray): the value of the points in the y axis coordinates
  • a (int): the initial point of the search
  • b (int): the left limit of the search
  • t (float): $R^2$ threshold (default 0.9)
Returns:

tuple: (neighbourhood index, r2, slope)

def get_neighbourhood( x: numpy.ndarray, y: numpy.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
153def get_neighbourhood(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple:
154    """
155    Get the neighbourhood (closest points) from a to b.
156
157    The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
158
159    Args:
160        x (np.ndarray): the value of the points in the x axis coordinates
161        y (np.ndarray): the value of the points in the y axis coordinates
162        a (int): the initial point of the search
163        b (int): the left limit of the search
164        t (float): $R^2$ threshold (default 0.9)
165
166    Returns:
167        tuple: (neighbourhood index, r2, slope)
168    """
169
170    r2 = 1.0
171    i = a - 1
172    _, slope = lf.linear_fit(x[i:a+1], y[i:a+1])
173
174    while r2 > t and i > b:
175        # print('.')
176        previous_res = (i, r2, slope)
177        i -= 1
178        coef = lf.linear_fit(x[i:a+1], y[i:a+1])
179        r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef)
180        _, slope = coef
181        #print(f'{i} -> {r2}')
182
183    if r2 > t:
184        return i, r2, slope
185    else:
186        return previous_res

Get the neighbourhood (closest points) from a to b.

The neighbourhood is defined as the longest straitgh line (defined by $R^2$).

Arguments:
  • x (np.ndarray): the value of the points in the x axis coordinates
  • y (np.ndarray): the value of the points in the y axis coordinates
  • a (int): the initial point of the search
  • b (int): the left limit of the search
  • t (float): $R^2$ threshold (default 0.9)
Returns:

tuple: (neighbourhood index, r2, slope)

def accuracy_knee(points: numpy.ndarray, knees: numpy.ndarray, t: float = 0.9) -> tuple:
189def accuracy_knee(points: np.ndarray, knees: np.ndarray, t: float = 0.9) -> tuple:
190    """Compute the accuracy heuristic for a set of knees.
191
192    The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$.
193    In this version it is used the left neighbourhood of the knee.
194
195    Args:
196        points (np.ndarray): numpy array with the points (x, y)
197        knees (np.ndarray): knees indexes
198        t (float): $R^2$ threshold (default 0.9)
199
200    Returns:
201        tuple: (average_x, average_y, average_slope, average_coeffients, cost)
202    """
203    x = points[:, 0]
204    y = points[:, 1]
205
206    total_x = math.fabs(x[-1] - x[0])
207    total_y = math.fabs(y[-1] - y[0])
208
209    distances_x = []
210    distances_y = []
211    slopes = []
212    coeffients = []
213
214    previous_knee = 0
215    for i in range(len(knees)):
216        idx, r2, slope = get_neighbourhood_fast(x, y, knees[i], previous_knee)
217
218        delta_x = x[idx] - x[knees[i]]
219        delta_y = y[idx] - y[knees[i]]
220
221        distances_x.append(math.fabs(delta_x))
222        distances_y.append(math.fabs(delta_y))
223        slopes.append(math.fabs(slope))
224        coeffients.append(r2)
225
226        previous_knee = knees[i]
227
228    slopes = np.array(slopes)
229    slopes = slopes/slopes.max()
230
231    coeffients = np.array(coeffients)
232    coeffients = coeffients/coeffients.max()
233
234    distances_x = np.array(distances_x)/total_x
235    distances_y = np.array(distances_y)/total_y
236    average_x = np.average(distances_x)
237    average_y = np.average(distances_y)
238
239    average_slope = np.average(slopes)
240    average_coeffients = np.average(coeffients)
241
242    #p = slopes * distances_y * coeffients
243    p = slopes * distances_y
244    #cost = (average_x * average_y) / (average_slope)
245    cost = average_x / np.average(p)
246
247    return average_x, average_y, average_slope, average_coeffients, cost

Compute the accuracy heuristic for a set of knees.

The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. In this version it is used the left neighbourhood of the knee.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • t (float): $R^2$ threshold (default 0.9)
Returns:

tuple: (average_x, average_y, average_slope, average_coeffients, cost)

def accuracy_trace(points: numpy.ndarray, knees: numpy.ndarray) -> tuple:
250def accuracy_trace(points: np.ndarray, knees: np.ndarray) -> tuple:
251    """Compute the accuracy heuristic for a set of knees.
252
253    The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$.
254    In this version it is used the points from the current knee to the previous.
255
256    Args:
257        points (np.ndarray): numpy array with the points (x, y)
258        knees (np.ndarray): knees indexes
259
260    Returns:
261        tuple: (average_x, average_y, average_slope, average_coeffients, cost)
262    """
263    x = points[:, 0]
264    y = points[:, 1]
265
266    distances_x = []
267    distances_y = []
268    slopes = []
269    coeffients = []
270
271    total_x = math.fabs(x[-1] - x[0])
272    total_y = math.fabs(y[-1] - y[0])
273
274    previous_knee_x = x[knees[0]]
275    previous_knee_y = y[knees[0]]
276
277    delta_x = x[0] - previous_knee_x
278    delta_y = y[0] - previous_knee_y
279    distances_x.append(math.fabs(delta_x))
280    distances_y.append(math.fabs(delta_y))
281
282    coef = lf.linear_fit(x[0:knees[0]+1], y[0:knees[0]+1])
283    r2 = lf.linear_r2(x[0:knees[0]+1], y[0:knees[0]+1], coef)
284    coeffients.append(r2)
285    _, slope = coef
286    slopes.append(math.fabs(slope))
287
288    for i in range(1, len(knees)):
289        knee_x = x[knees[i]]
290        knee_y = y[knees[i]]
291
292        delta_x = previous_knee_x - knee_x
293        delta_y = previous_knee_y - knee_y
294
295        coef = lf.linear_fit(x[knees[i-1]:knees[i]+1],
296                             y[knees[i-1]:knees[i]+1])
297        r2 = lf.linear_r2(x[knees[i-1]:knees[i]+1],
298                          y[knees[i-1]:knees[i]+1], coef)
299
300        distances_x.append(math.fabs(delta_x))
301        distances_y.append(math.fabs(delta_y))
302        _, slope = coef
303        slopes.append(math.fabs(slope))
304        coeffients.append(r2)
305
306        previous_knee_x = knee_x
307        previous_knee_y = knee_y
308
309    distances_x = np.array(distances_x)/total_x
310    distances_y = np.array(distances_y)/total_y
311    slopes = np.array(slopes)
312    slopes = slopes/slopes.max()
313
314    coeffients = np.array(coeffients)
315    coeffients = coeffients/coeffients.max()
316
317    coeffients[coeffients < 0] = 0.0
318    p = slopes * distances_y * coeffients
319    #p = slopes * distances_y
320
321    average_x = np.average(distances_x)
322    average_y = np.average(distances_y)
323    average_slope = np.average(slopes)
324    average_coeffients = np.average(coeffients)
325
326    cost = average_x / np.average(p)
327
328    return average_x, average_y, average_slope, average_coeffients, cost

Compute the accuracy heuristic for a set of knees.

The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. In this version it is used the points from the current knee to the previous.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
Returns:

tuple: (average_x, average_y, average_slope, average_coeffients, cost)

def mae( points: numpy.ndarray, knees: numpy.ndarray, expected: numpy.ndarray, s: Strategy = <Strategy.expected: 'expected'>) -> float:
331def mae(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
332    """
333    Estimates the worst case Mean Absolute Error (MAE) for the given
334    knee and expected points.
335
336    Suppports different size arrays, and estimates the MAE based 
337    on the worst case.
338    It uses the euclidean distance to find the closer points,
339    and computes the error based on the closest point.
340
341    Args:
342        points (np.ndarray): numpy array with the points (x, y)
343        knees (np.ndarray): knees indexes
344        expected (np.ndarray): numpy array with the expected knee points (x, y)
345        s (Strategy): enum that controls the point matching (default Strategy.expected)
346
347    Returns:
348        float: the worst case MAE
349    """
350    # get the knee points
351    knee_points = points[knees]
352
353    error = 0.0
354
355    if s is Strategy.knees:
356        a = knee_points
357        b = expected
358    elif s is Strategy.expected:
359        a = expected
360        b = knee_points
361    elif s is Strategy.best:
362        if len(expected) <= len(knee_points):
363            a = expected
364            b = knee_points
365        else:
366            a = knee_points
367            b = expected
368    else:
369        if len(expected) >= len(knee_points):
370            a = expected
371            b = knee_points
372        else:
373            a = knee_points
374            b = expected
375
376    for p in a:
377        distances = np.linalg.norm(b-p, axis=1)
378        idx = np.argmin(distances)
379        error += np.sum(np.abs(p-b[idx]))
380
381    return error / (len(a)*2.0)

Estimates the worst case Mean Absolute Error (MAE) for the given knee and expected points.

Suppports different size arrays, and estimates the MAE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • expected (np.ndarray): numpy array with the expected knee points (x, y)
  • s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:

float: the worst case MAE

def mse( points: numpy.ndarray, knees: numpy.ndarray, expected: numpy.ndarray, s: Strategy = <Strategy.expected: 'expected'>) -> float:
384def mse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
385    """
386    Estimates the worst case Mean Squared Error (MSE) for the given
387    knee and expected points.
388
389    Suppports different size arrays, and estimates the MSE based 
390    on the worst case.
391    It uses the euclidean distance to find the closer points,
392    and computes the error based on the closest point.
393
394    Args:
395        points (np.ndarray): numpy array with the points (x, y)
396        knees (np.ndarray): knees indexes
397        expected (np.ndarray): numpy array with the expected knee points (x, y)
398        s (Strategy): enum that controls the point matching (default Strategy.expected)
399
400    Returns:
401        float: the worst case MSE
402    """
403    # get the knee points
404    knee_points = points[knees]
405
406    error = 0.0
407
408    if s is Strategy.knees:
409        a = knee_points
410        b = expected
411    elif s is Strategy.expected:
412        a = expected
413        b = knee_points
414    elif s is Strategy.best:
415        if len(expected) <= len(knee_points):
416            a = expected
417            b = knee_points
418        else:
419            a = knee_points
420            b = expected
421    else:
422        if len(expected) >= len(knee_points):
423            a = expected
424            b = knee_points
425        else:
426            a = knee_points
427            b = expected
428
429    for p in a:
430        distances = np.linalg.norm(b-p, axis=1)
431        idx = np.argmin(distances)
432        error += np.sum(np.square(p-b[idx]))
433
434    return error / (len(a)*2.0)

Estimates the worst case Mean Squared Error (MSE) for the given knee and expected points.

Suppports different size arrays, and estimates the MSE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • expected (np.ndarray): numpy array with the expected knee points (x, y)
  • s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:

float: the worst case MSE

def rmse( points: numpy.ndarray, knees: numpy.ndarray, expected: numpy.ndarray, s: Strategy = <Strategy.expected: 'expected'>) -> float:
437def rmse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float:
438    """
439    Estimates the worst case Root Mean Squared Error (RMSE) for the given
440    knee and expected points.
441
442    Suppports different size arrays, and estimates the RMSE based 
443    on the worst case.
444    It uses the euclidean distance to find the closer points,
445    and computes the error based on the closest point.
446
447    Args:
448        points (np.ndarray): numpy array with the points (x, y)
449        knees (np.ndarray): knees indexes
450        expected (np.ndarray): numpy array with the expected knee points (x, y)
451        s (Strategy): enum that controls the point matching (default Strategy.expected)
452
453    Returns:
454        float: the worst case RMSE
455    """
456    return math.sqrt(mse(points, knees, expected, s))

Estimates the worst case Root Mean Squared Error (RMSE) for the given knee and expected points.

Suppports different size arrays, and estimates the RMSE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • expected (np.ndarray): numpy array with the expected knee points (x, y)
  • s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:

float: the worst case RMSE

def rmspe( points: numpy.ndarray, knees: numpy.ndarray, expected: numpy.ndarray, s: Strategy = <Strategy.expected: 'expected'>, eps: float = 1e-16) -> float:
459def rmspe(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected, eps: float = 1e-16) -> float:
460    """
461    Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points.
462
463    Suppports different size arrays, and estimates the RMSPE based on the worst case.
464    It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
465
466    Args:
467        points (np.ndarray): numpy array with the points (x, y)
468        knees (np.ndarray): knees indexes
469        expected (np.ndarray): numpy array with the expected knee points (x, y)
470        s (Strategy): enum that controls the point matching (default Strategy.expected)
471        eps (float): eps value to prevent division by zero (default: 1E-16)
472
473    Returns:
474        float: the worst case RMSPE
475    """
476    # get the knee points
477    knee_points = points[knees]
478
479    if s is Strategy.knees:
480        a = knee_points
481        b = expected
482    elif s is Strategy.expected:
483        a = expected
484        b = knee_points
485    elif s is Strategy.best:
486        if len(expected) <= len(knee_points):
487            a = expected
488            b = knee_points
489        else:
490            a = knee_points
491            b = expected
492    else:
493        if len(expected) >= len(knee_points):
494            a = expected
495            b = knee_points
496        else:
497            a = knee_points
498            b = expected
499
500    errors = []
501
502    for p in a:
503        distances = np.linalg.norm(b-p, axis=1)
504        idx = np.argmin(distances)
505        e = (p - b[idx]) / (p + eps)
506        errors.extend(e)
507    errors = np.array(errors)
508
509    return np.sqrt(np.mean(np.square(errors)))

Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points.

Suppports different size arrays, and estimates the RMSPE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • expected (np.ndarray): numpy array with the expected knee points (x, y)
  • s (Strategy): enum that controls the point matching (default Strategy.expected)
  • eps (float): eps value to prevent division by zero (default: 1E-16)
Returns:

float: the worst case RMSPE

def cm( points: numpy.ndarray, knees: numpy.ndarray, expected: numpy.ndarray, t: float = 0.01) -> numpy.ndarray:
512def cm(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, t: float = 0.01) -> np.ndarray:
513    """
514    Computes the Confusion Matrix based on the knees and expected points.
515
516    Args:
517        points (np.ndarray): numpy array with the points (x, y)
518        knees (np.ndarray): knees indexes
519        expected (np.ndarray): numpy array with the expected knee points (x, y)
520        t (float): the maximum allowed distance in percentage (default 0.01)
521
522    Returns:
523        np.ndarray: the confusion matrix
524    """
525
526    #dx = math.fabs(points[-1][0] - points[0][0])
527    max_x, _ = points.max(axis=0)
528    min_x, _ = points.min(axis=0)
529    dx = math.fabs(max_x - min_x)
530    used_knees = []
531    knees_points_x = points[knees][:, 0]
532    tp = fn = fp = tn = 0
533    for px, _ in expected:
534        distances = np.fabs(knees_points_x - px)/dx
535        idx = np.argmin(distances)
536        #logger.info(f"{px} / {idx} / {distances[idx]} = {distances[idx] <= t}")
537        if distances[idx] <= t and idx not in used_knees:
538            tp += 1
539            used_knees.append(idx)
540        else:
541            fn += 1
542    fp = max(len(knees) - tp, 0)
543    tn = len(points) - (tp+fp+fn)
544
545    return np.array([[tp, fp], [fn, tn]])

Computes the Confusion Matrix based on the knees and expected points.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • knees (np.ndarray): knees indexes
  • expected (np.ndarray): numpy array with the expected knee points (x, y)
  • t (float): the maximum allowed distance in percentage (default 0.01)
Returns:

np.ndarray: the confusion matrix

def accuracy(cm: numpy.ndarray) -> float:
548def accuracy(cm: np.ndarray) -> float:
549    """
550    Computes accuracy based on a Confusion Matrix.
551
552    Args:
553        cm (np.ndarray): the confusion matrix
554
555    Returns:
556        float: the accuracy
557    """
558
559    tp, fp = cm[0]
560    fn, tn = cm[1]
561
562    return (tp+tn)/(tp+tn+fp+fn)

Computes accuracy based on a Confusion Matrix.

Arguments:
  • cm (np.ndarray): the confusion matrix
Returns:

float: the accuracy

def f1score(cm: numpy.ndarray) -> float:
565def f1score(cm: np.ndarray) -> float:
566    """
567    Computes F1-Score based on a Confusion Matrix.
568
569    Args:
570        cm (np.ndarray): the confusion matrix
571
572    Returns:
573        float: the F1-Score
574    """
575
576    tp, fp = cm[0]
577    fn, _ = cm[1]
578
579    return (2.0*tp)/(2*tp+fp+fn)

Computes F1-Score based on a Confusion Matrix.

Arguments:
  • cm (np.ndarray): the confusion matrix
Returns:

float: the F1-Score

def mcc(cm: numpy.ndarray) -> float:
582def mcc(cm: np.ndarray) -> float:
583    """
584    Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix.
585
586    Args:
587        cm (np.ndarray): the confusion matrix
588
589    Returns:
590        float: the mcc
591    """
592
593    tp, fp = cm[0]
594    fn, tn = cm[1]
595
596    n = tp*tn - fp*fn
597    d = math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))
598
599    return n/d

Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix.

Arguments:
  • cm (np.ndarray): the confusion matrix
Returns:

float: the mcc

def compute_global_rmse( points: numpy.ndarray, reduced: numpy.ndarray, cache: dict = None) -> float:
602def compute_global_rmse(points: np.ndarray, reduced: np.ndarray, cache:dict=None) -> float:
603    """
604    Computes the global RMSE for a point reduction set.
605
606    For each segment within the point reduction set, computes a straight line.
607    For each line segment it computes the error with the original set.
608 
609    Args:
610        points (np.ndarray): the original points
611        reduced (np.ndarray): the indexes of the reduced points
612        cache (dict): cache used to speedup the computation (default None)
613
614    Returns:
615        float: the global RMSE
616    """
617    # Setup the cache
618    if cache is None: cache = {}
619
620    segment_errors = np.zeros(len(reduced)-1)
621
622    left = reduced[0]
623    for i in range(1, len(reduced)):
624        right = reduced[i]
625
626        # Get data from cache
627        if (left, right) not in cache:
628            pt = points[left:right+1]
629            coef = lf.linear_fit_points(pt)
630            y_hat = lf.linear_transform_points(pt, coef)
631            y = pt[:,1]
632            segment_error = np.sum(np.square((y-y_hat)))
633            cache[(left, right)] = segment_error
634        
635        segment_errors[i-1] = cache[(left, right)]
636        left = right
637
638    # compute the cost function
639    return math.sqrt(np.sum(segment_errors)/len(points))

Computes the global RMSE for a point reduction set.

For each segment within the point reduction set, computes a straight line. For each line segment it computes the error with the original set.

Arguments:
  • points (np.ndarray): the original points
  • reduced (np.ndarray): the indexes of the reduced points
  • cache (dict): cache used to speedup the computation (default None)
Returns:

float: the global RMSE

def mip(points: numpy.ndarray, reduced: numpy.ndarray) -> tuple:
642def mip(points: np.ndarray, reduced: np.ndarray) -> tuple:
643    """
644    Computes the median improvement per point (MIP).
645
646    Improvement is defined as a relation between a final cost against a reference cost.
647    The final cost is the RSME with the complete reduced set.
648    The reference cost id different for each point.
649    It is computed as the RMSE of the reconstruction without that specific point. 
650
651    Args:
652        points (np.ndarray): the original points
653        reduced (np.ndarray): the indexes of the reduced points
654
655    Returns:
656        tuple: the median improvement per point (MIP) and the MAD
657    """
658    # Setup the cache
659    cache = {}
660
661    ip = np.zeros(len(reduced)-2)
662
663    # Compute the final RSME
664    cost_fin = compute_global_rmse(points, reduced, cache)
665
666    for i in range(1, len(reduced)-1):
667        # Compute the reference RMSE
668        cost_ref = compute_global_rmse(points, np.delete(reduced, i), cache)
669        ip[i-1] = cost_ref - cost_fin
670
671    mip = np.median(ip)
672
673    return mip, np.median(np.absolute(ip - mip))

Computes the median improvement per point (MIP).

Improvement is defined as a relation between a final cost against a reference cost. The final cost is the RSME with the complete reduced set. The reference cost id different for each point. It is computed as the RMSE of the reconstruction without that specific point.

Arguments:
  • points (np.ndarray): the original points
  • reduced (np.ndarray): the indexes of the reduced points
Returns:

tuple: the median improvement per point (MIP) and the MAD

def compute_cost( points: numpy.ndarray, segment_errors: numpy.ndarray, cost: kneeliverse.metrics.Metrics, cache: dict) -> float:
676def compute_cost(points: np.ndarray, segment_errors:np.ndarray, cost:metrics.Metrics, cache:dict) -> float:
677    """
678    Compute the cost of multi-point fitting using the segment errors.
679    
680    It uses a cache to speedup the compuation of the cost.
681    The cache contains the cost of previously computed segments.
682
683    Args:
684        points (np.ndarray): the original points
685        segment_errors (np.ndarray): errors from each of the segments
686        cost (metrics.Metrics): the metric used for the cost calculation
687        cache (dict): the cache used to store the segments costs
688
689    Returns:
690        float: the global cost of a multi-point fitting
691    """
692    # methods = {metrics.Metrics.r2: metrics.r2,metrics.Metrics.rpd: metrics.rpd,metrics.Metrics.rmsle: metrics.rmsle,metrics.Metrics.rmspe: metrics.rmspe,metrics.Metrics.smape: metrics.smape}
693    # cost = methods[cost](np.array(y), np.array(y_hat))
694
695    # We have to account for the head of each segment
696    total = len(points) + len(segment_errors) - 1
697
698    if cost is metrics.Metrics.r2:
699        # Get tss from the cache
700        if 'tss' not in cache:
701            # Compute TSS and add it to cache (single operation)
702            y = points[:,1]
703            y_mean = np.mean(y)
704            tss = np.sum(np.square(y-y_mean))
705            cache['tss'] = tss
706        tss = cache['tss']
707        rss = np.sum(segment_errors)
708        cost = 1.0 - rss if tss == 0 else 1.0 - (rss/tss)
709        #return np.sum(np.square(y-y_hat))
710    elif cost is metrics.Metrics.rmsle:
711        #return np.sum(np.square((np.log(y+1) - np.log(y_hat+1))))
712        cost = math.sqrt(np.sum(segment_errors)/total)
713    elif cost is metrics.Metrics.rmspe:
714        #return np.sum(np.square((y - y_hat) / (y+eps)))
715        cost = math.sqrt(np.sum(segment_errors)/total)
716    elif cost is metrics.Metrics.rpd:
717        #return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps)))
718        cost = np.sum(segment_errors)/total
719    else:
720        #return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps))
721        cost = np.sum(segment_errors)/total
722
723    cost = 0 if cost < 0 else cost
724
725    return cost

Compute the cost of multi-point fitting using the segment errors.

It uses a cache to speedup the compuation of the cost. The cache contains the cost of previously computed segments.

Arguments:
  • points (np.ndarray): the original points
  • segment_errors (np.ndarray): errors from each of the segments
  • cost (metrics.Metrics): the metric used for the cost calculation
  • cache (dict): the cache used to store the segments costs
Returns:

float: the global cost of a multi-point fitting

def compute_partial_cost( y: numpy.ndarray, y_hat: numpy.ndarray, cost: kneeliverse.metrics.Metrics, eps: float = 1e-16) -> float:
728def compute_partial_cost(y:np.ndarray, y_hat:np.ndarray, cost: metrics.Metrics, eps: float = 1e-16) -> float:
729    """
730    Compute the partial cost of a multi-point fitting.
731
732    Args:
733        y (np.ndarray): the y values from the points
734        y_hat (np.ndarray): the y values computed with the linear aproximation
735        cost (metrics.Metrics): the metric used for the cost calculation
736        eps (float): eps value to prevent division by zero (default: 1E-16)
737
738    Returns:
739        float: the global cost of a multi-point fitting
740    """
741    if cost is metrics.Metrics.r2:
742        return np.sum(np.square(y-y_hat))
743    elif cost is metrics.Metrics.rmsle:
744        return np.sum(np.square((np.log(y+1) - np.log(y_hat+1))))
745    elif cost is metrics.Metrics.rmspe:
746        return np.sum(np.square((y - y_hat) / (y+eps)))
747    elif cost is metrics.Metrics.rpd:
748        return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps)))
749    else:
750        return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps))

Compute the partial cost of a multi-point fitting.

Arguments:
  • y (np.ndarray): the y values from the points
  • y_hat (np.ndarray): the y values computed with the linear aproximation
  • cost (metrics.Metrics): the metric used for the cost calculation
  • eps (float): eps value to prevent division by zero (default: 1E-16)
Returns:

float: the global cost of a multi-point fitting

def compute_global_cost( points: numpy.ndarray, reduced: numpy.ndarray, cost: kneeliverse.metrics.Metrics = <Metrics.rpd: 'rpd'>, cache: dict = None) -> float:
753def compute_global_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd, cache:dict=None) -> float:
754    """
755    Compute the cost of multi-point fitting using the segment errors.
756    
757    It uses a cache to speedup the compuation of the cost.
758    The cache contains the cost of previously computed segments.
759
760    Args:
761        points (np.ndarray): the original points
762        reduced (np.ndarray): the reduced set of points used for the fitting
763        cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
764        cache (dict): the cache used to store the segments costs (default: None)
765
766    Returns:
767        float: the global cost of a multi-point fitting
768    """
769    # Setup the cache
770    if cache is None: cache = {}
771
772    segment_errors = np.zeros(len(reduced)-1)
773
774    left = reduced[0]
775    for i in range(1, len(reduced)):
776        right = reduced[i]
777
778        # Get data from cache
779        if (left, right) not in cache:
780            pt = points[left:right+1]
781            if len(pt) <= 2:
782                cache[(left, right)] = 0
783            else:
784                y_hat = lf.linear_fit_transform_points(pt)
785                y = pt[:,1]
786                segment_error = compute_partial_cost(y, y_hat, cost)
787                cache[(left, right)] = segment_error
788        
789        segment_errors[i-1] = cache[(left, right)]
790        left = right
791
792    return compute_cost(points, segment_errors, cost, cache)

Compute the cost of multi-point fitting using the segment errors.

It uses a cache to speedup the compuation of the cost. The cache contains the cost of previously computed segments.

Arguments:
  • points (np.ndarray): the original points
  • reduced (np.ndarray): the reduced set of points used for the fitting
  • cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
  • cache (dict): the cache used to store the segments costs (default: None)
Returns:

float: the global cost of a multi-point fitting

def compute_global_segment_cost( points: numpy.ndarray, reduced: numpy.ndarray, cost: kneeliverse.metrics.Metrics = <Metrics.rpd: 'rpd'>) -> tuple:
795def compute_global_segment_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd) -> tuple:
796    """
797    Compute the cost of multi-point fitting using the segment errors.
798    
799    Legacy function that does not use a cache or combines the segments costs.
800    Used for unit testing.
801
802    Args:
803        points (np.ndarray): the original points
804        reduced (np.ndarray): the reduced set of points used for the fitting
805        cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
806
807    Returns:
808        tuple: the global cost, and partial costs
809    """
810    y, y_hat = [], []
811
812    cost_segment = []
813
814    left = reduced[0]
815    for i in range(1, len(reduced)):
816        right = reduced[i]
817        pt = points[left:right+1]
818        
819        y_temp, y_hat_temp = lf.linear_fit_transform_points(pt)
820        
821        y_hat.extend(y_hat_temp)
822        y.extend(y_temp)
823
824        # compute the cost function
825        #c = compute_cost(y_temp, y_hat_temp, cost)
826        c = lf.linear_hv_residuals_points(pt) 
827        cost_segment.append(c)
828
829        left = right
830
831    # compute the cost function
832    return compute_cost(y, y_hat, cost), np.array(cost_segment)

Compute the cost of multi-point fitting using the segment errors.

Legacy function that does not use a cache or combines the segments costs. Used for unit testing.

Arguments:
  • points (np.ndarray): the original points
  • reduced (np.ndarray): the reduced set of points used for the fitting
  • cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
Returns:

tuple: the global cost, and partial costs