kneeliverse.evaluation
The following module provides a set of methods used for evaluation knee detection algorithms.
1# coding: utf-8 2 3''' 4The following module provides a set of methods 5used for evaluation knee detection algorithms. 6''' 7 8__author__ = 'Mário Antunes' 9__version__ = '1.0' 10__email__ = 'mario.antunes@ua.pt' 11__status__ = 'Development' 12__license__ = 'MIT' 13__copyright__ = ''' 14Copyright (c) 2021-2023 Stony Brook University 15Copyright (c) 2021-2023 The Research Foundation of SUNY 16''' 17 18import enum 19import math 20import logging 21import numpy as np 22import kneeliverse.linear_fit as lf 23import kneeliverse.metrics as metrics 24 25 26logger = logging.getLogger(__name__) 27 28 29class Strategy(enum.Enum): 30 """ 31 Enum data type that represents the strategy of MAE, MSE and RMSE 32 """ 33 knees = 'knees' 34 expected = 'expected' 35 best = 'best' 36 worst = 'worst' 37 38 def __str__(self): 39 return self.value 40 41 42def get_neighbourhood_points(points: np.ndarray, a: int, b: int, t: float) -> tuple: 43 """ 44 Get the neighbourhood (closest points) from a to b. 45 46 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 47 48 Args: 49 points (np.ndarray): numpy array with the points (x, y) 50 a (int): the initial point of the search 51 b (int): the left limit of the search 52 t (float): $R^2$ threshold 53 54 Returns: 55 tuple: (neighbourhood index, r2, slope) 56 """ 57 58 x = points[:, 0] 59 y = points[:, 1] 60 return get_neighbourhood(x, y, a, b, t) 61 62 63def get_neighbourhood_fast_points(points: np.ndarray, a: int, b: int, t: float) -> tuple: 64 """ 65 Get the neighbourhood (closest points) from a to b. 66 67 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 68 This version uses a inaccurate binary search to speedup the search. 69 70 Args: 71 points (np.ndarray): numpy array with the points (x, y) 72 a (int): the initial point of the search 73 b (int): the left limit of the search 74 t (float): $R^2$ threshold 75 76 Returns: 77 tuple: (neighbourhood index, r2, slope) 78 """ 79 80 x = points[:, 0] 81 y = points[:, 1] 82 return get_neighbourhood_fast(x, y, a, b, t) 83 84 85def get_neighbourhood_binary(x: np.ndarray, y: np.ndarray, a: int, b: int, t=0.9) -> int: 86 """ 87 Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold. 88 89 This version uses a inaccurate binary search to speedup the search. 90 91 Args: 92 x (np.ndarray): the value of the points in the x axis coordinates 93 y (np.ndarray): the value of the points in the y axis coordinates 94 a (int): the initial point of the search 95 b (int): the left limit of the search 96 t (float): $R^2$ threshold (default 0.9) 97 98 Returns: 99 int: index of the point 100 """ 101 102 i = b 103 right = a 104 105 while abs(i-right) > 1: 106 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 107 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 108 109 if r2 < t: 110 i = int((i+right)/2.0) 111 else: 112 right = i 113 i = int((b+right)/2.0) 114 115 return i 116 117 118def get_neighbourhood_fast(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple: 119 """ 120 Get the neighbourhood (closest points) from a to b. 121 122 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 123 This version uses a inaccurate binary search to speedup the search. 124 125 Args: 126 x (np.ndarray): the value of the points in the x axis coordinates 127 y (np.ndarray): the value of the points in the y axis coordinates 128 a (int): the initial point of the search 129 b (int): the left limit of the search 130 t (float): $R^2$ threshold (default 0.9) 131 132 Returns: 133 tuple: (neighbourhood index, r2, slope) 134 """ 135 # speedup when the search using an inaccurate binary search 136 i = get_neighbourhood_binary(x, y, a, b, t) 137 b, slope = lf.linear_fit(x[i:a+1], y[i:a+1]) 138 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], (b, slope)) 139 previous_res = (i, r2, slope) 140 141 # Linear search to improve accuracy 142 while r2 < t and i < a: 143 i += 1 144 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 145 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 146 _, slope = coef 147 previous_res = (i, r2, slope) 148 149 return previous_res 150 151 152def get_neighbourhood(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple: 153 """ 154 Get the neighbourhood (closest points) from a to b. 155 156 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 157 158 Args: 159 x (np.ndarray): the value of the points in the x axis coordinates 160 y (np.ndarray): the value of the points in the y axis coordinates 161 a (int): the initial point of the search 162 b (int): the left limit of the search 163 t (float): $R^2$ threshold (default 0.9) 164 165 Returns: 166 tuple: (neighbourhood index, r2, slope) 167 """ 168 169 r2 = 1.0 170 i = a - 1 171 _, slope = lf.linear_fit(x[i:a+1], y[i:a+1]) 172 173 while r2 > t and i > b: 174 # print('.') 175 previous_res = (i, r2, slope) 176 i -= 1 177 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 178 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 179 _, slope = coef 180 #print(f'{i} -> {r2}') 181 182 if r2 > t: 183 return i, r2, slope 184 else: 185 return previous_res 186 187 188def accuracy_knee(points: np.ndarray, knees: np.ndarray, t: float = 0.9) -> tuple: 189 """Compute the accuracy heuristic for a set of knees. 190 191 The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. 192 In this version it is used the left neighbourhood of the knee. 193 194 Args: 195 points (np.ndarray): numpy array with the points (x, y) 196 knees (np.ndarray): knees indexes 197 t (float): $R^2$ threshold (default 0.9) 198 199 Returns: 200 tuple: (average_x, average_y, average_slope, average_coeffients, cost) 201 """ 202 x = points[:, 0] 203 y = points[:, 1] 204 205 total_x = math.fabs(x[-1] - x[0]) 206 total_y = math.fabs(y[-1] - y[0]) 207 208 distances_x = [] 209 distances_y = [] 210 slopes = [] 211 coeffients = [] 212 213 previous_knee = 0 214 for i in range(len(knees)): 215 idx, r2, slope = get_neighbourhood_fast(x, y, knees[i], previous_knee) 216 217 delta_x = x[idx] - x[knees[i]] 218 delta_y = y[idx] - y[knees[i]] 219 220 distances_x.append(math.fabs(delta_x)) 221 distances_y.append(math.fabs(delta_y)) 222 slopes.append(math.fabs(slope)) 223 coeffients.append(r2) 224 225 previous_knee = knees[i] 226 227 slopes = np.array(slopes) 228 slopes = slopes/slopes.max() 229 230 coeffients = np.array(coeffients) 231 coeffients = coeffients/coeffients.max() 232 233 distances_x = np.array(distances_x)/total_x 234 distances_y = np.array(distances_y)/total_y 235 average_x = np.average(distances_x) 236 average_y = np.average(distances_y) 237 238 average_slope = np.average(slopes) 239 average_coeffients = np.average(coeffients) 240 241 #p = slopes * distances_y * coeffients 242 p = slopes * distances_y 243 #cost = (average_x * average_y) / (average_slope) 244 cost = average_x / np.average(p) 245 246 return average_x, average_y, average_slope, average_coeffients, cost 247 248 249def accuracy_trace(points: np.ndarray, knees: np.ndarray) -> tuple: 250 """Compute the accuracy heuristic for a set of knees. 251 252 The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. 253 In this version it is used the points from the current knee to the previous. 254 255 Args: 256 points (np.ndarray): numpy array with the points (x, y) 257 knees (np.ndarray): knees indexes 258 259 Returns: 260 tuple: (average_x, average_y, average_slope, average_coeffients, cost) 261 """ 262 x = points[:, 0] 263 y = points[:, 1] 264 265 distances_x = [] 266 distances_y = [] 267 slopes = [] 268 coeffients = [] 269 270 total_x = math.fabs(x[-1] - x[0]) 271 total_y = math.fabs(y[-1] - y[0]) 272 273 previous_knee_x = x[knees[0]] 274 previous_knee_y = y[knees[0]] 275 276 delta_x = x[0] - previous_knee_x 277 delta_y = y[0] - previous_knee_y 278 distances_x.append(math.fabs(delta_x)) 279 distances_y.append(math.fabs(delta_y)) 280 281 coef = lf.linear_fit(x[0:knees[0]+1], y[0:knees[0]+1]) 282 r2 = lf.linear_r2(x[0:knees[0]+1], y[0:knees[0]+1], coef) 283 coeffients.append(r2) 284 _, slope = coef 285 slopes.append(math.fabs(slope)) 286 287 for i in range(1, len(knees)): 288 knee_x = x[knees[i]] 289 knee_y = y[knees[i]] 290 291 delta_x = previous_knee_x - knee_x 292 delta_y = previous_knee_y - knee_y 293 294 coef = lf.linear_fit(x[knees[i-1]:knees[i]+1], 295 y[knees[i-1]:knees[i]+1]) 296 r2 = lf.linear_r2(x[knees[i-1]:knees[i]+1], 297 y[knees[i-1]:knees[i]+1], coef) 298 299 distances_x.append(math.fabs(delta_x)) 300 distances_y.append(math.fabs(delta_y)) 301 _, slope = coef 302 slopes.append(math.fabs(slope)) 303 coeffients.append(r2) 304 305 previous_knee_x = knee_x 306 previous_knee_y = knee_y 307 308 distances_x = np.array(distances_x)/total_x 309 distances_y = np.array(distances_y)/total_y 310 slopes = np.array(slopes) 311 slopes = slopes/slopes.max() 312 313 coeffients = np.array(coeffients) 314 coeffients = coeffients/coeffients.max() 315 316 coeffients[coeffients < 0] = 0.0 317 p = slopes * distances_y * coeffients 318 #p = slopes * distances_y 319 320 average_x = np.average(distances_x) 321 average_y = np.average(distances_y) 322 average_slope = np.average(slopes) 323 average_coeffients = np.average(coeffients) 324 325 cost = average_x / np.average(p) 326 327 return average_x, average_y, average_slope, average_coeffients, cost 328 329 330def mae(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 331 """ 332 Estimates the worst case Mean Absolute Error (MAE) for the given 333 knee and expected points. 334 335 Suppports different size arrays, and estimates the MAE based 336 on the worst case. 337 It uses the euclidean distance to find the closer points, 338 and computes the error based on the closest point. 339 340 Args: 341 points (np.ndarray): numpy array with the points (x, y) 342 knees (np.ndarray): knees indexes 343 expected (np.ndarray): numpy array with the expected knee points (x, y) 344 s (Strategy): enum that controls the point matching (default Strategy.expected) 345 346 Returns: 347 float: the worst case MAE 348 """ 349 # get the knee points 350 knee_points = points[knees] 351 352 error = 0.0 353 354 if s is Strategy.knees: 355 a = knee_points 356 b = expected 357 elif s is Strategy.expected: 358 a = expected 359 b = knee_points 360 elif s is Strategy.best: 361 if len(expected) <= len(knee_points): 362 a = expected 363 b = knee_points 364 else: 365 a = knee_points 366 b = expected 367 else: 368 if len(expected) >= len(knee_points): 369 a = expected 370 b = knee_points 371 else: 372 a = knee_points 373 b = expected 374 375 for p in a: 376 distances = np.linalg.norm(b-p, axis=1) 377 idx = np.argmin(distances) 378 error += np.sum(np.abs(p-b[idx])) 379 380 return error / (len(a)*2.0) 381 382 383def mse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 384 """ 385 Estimates the worst case Mean Squared Error (MSE) for the given 386 knee and expected points. 387 388 Suppports different size arrays, and estimates the MSE based 389 on the worst case. 390 It uses the euclidean distance to find the closer points, 391 and computes the error based on the closest point. 392 393 Args: 394 points (np.ndarray): numpy array with the points (x, y) 395 knees (np.ndarray): knees indexes 396 expected (np.ndarray): numpy array with the expected knee points (x, y) 397 s (Strategy): enum that controls the point matching (default Strategy.expected) 398 399 Returns: 400 float: the worst case MSE 401 """ 402 # get the knee points 403 knee_points = points[knees] 404 405 error = 0.0 406 407 if s is Strategy.knees: 408 a = knee_points 409 b = expected 410 elif s is Strategy.expected: 411 a = expected 412 b = knee_points 413 elif s is Strategy.best: 414 if len(expected) <= len(knee_points): 415 a = expected 416 b = knee_points 417 else: 418 a = knee_points 419 b = expected 420 else: 421 if len(expected) >= len(knee_points): 422 a = expected 423 b = knee_points 424 else: 425 a = knee_points 426 b = expected 427 428 for p in a: 429 distances = np.linalg.norm(b-p, axis=1) 430 idx = np.argmin(distances) 431 error += np.sum(np.square(p-b[idx])) 432 433 return error / (len(a)*2.0) 434 435 436def rmse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 437 """ 438 Estimates the worst case Root Mean Squared Error (RMSE) for the given 439 knee and expected points. 440 441 Suppports different size arrays, and estimates the RMSE based 442 on the worst case. 443 It uses the euclidean distance to find the closer points, 444 and computes the error based on the closest point. 445 446 Args: 447 points (np.ndarray): numpy array with the points (x, y) 448 knees (np.ndarray): knees indexes 449 expected (np.ndarray): numpy array with the expected knee points (x, y) 450 s (Strategy): enum that controls the point matching (default Strategy.expected) 451 452 Returns: 453 float: the worst case RMSE 454 """ 455 return math.sqrt(mse(points, knees, expected, s)) 456 457 458def rmspe(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected, eps: float = 1e-16) -> float: 459 """ 460 Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points. 461 462 Suppports different size arrays, and estimates the RMSPE based on the worst case. 463 It uses the euclidean distance to find the closer points, and computes the error based on the closest point. 464 465 Args: 466 points (np.ndarray): numpy array with the points (x, y) 467 knees (np.ndarray): knees indexes 468 expected (np.ndarray): numpy array with the expected knee points (x, y) 469 s (Strategy): enum that controls the point matching (default Strategy.expected) 470 eps (float): eps value to prevent division by zero (default: 1E-16) 471 472 Returns: 473 float: the worst case RMSPE 474 """ 475 # get the knee points 476 knee_points = points[knees] 477 478 if s is Strategy.knees: 479 a = knee_points 480 b = expected 481 elif s is Strategy.expected: 482 a = expected 483 b = knee_points 484 elif s is Strategy.best: 485 if len(expected) <= len(knee_points): 486 a = expected 487 b = knee_points 488 else: 489 a = knee_points 490 b = expected 491 else: 492 if len(expected) >= len(knee_points): 493 a = expected 494 b = knee_points 495 else: 496 a = knee_points 497 b = expected 498 499 errors = [] 500 501 for p in a: 502 distances = np.linalg.norm(b-p, axis=1) 503 idx = np.argmin(distances) 504 e = (p - b[idx]) / (p + eps) 505 errors.extend(e) 506 errors = np.array(errors) 507 508 return np.sqrt(np.mean(np.square(errors))) 509 510 511def cm(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, t: float = 0.01) -> np.ndarray: 512 """ 513 Computes the Confusion Matrix based on the knees and expected points. 514 515 Args: 516 points (np.ndarray): numpy array with the points (x, y) 517 knees (np.ndarray): knees indexes 518 expected (np.ndarray): numpy array with the expected knee points (x, y) 519 t (float): the maximum allowed distance in percentage (default 0.01) 520 521 Returns: 522 np.ndarray: the confusion matrix 523 """ 524 525 #dx = math.fabs(points[-1][0] - points[0][0]) 526 max_x, _ = points.max(axis=0) 527 min_x, _ = points.min(axis=0) 528 dx = math.fabs(max_x - min_x) 529 used_knees = [] 530 knees_points_x = points[knees][:, 0] 531 tp = fn = fp = tn = 0 532 for px, _ in expected: 533 distances = np.fabs(knees_points_x - px)/dx 534 idx = np.argmin(distances) 535 #logger.info(f"{px} / {idx} / {distances[idx]} = {distances[idx] <= t}") 536 if distances[idx] <= t and idx not in used_knees: 537 tp += 1 538 used_knees.append(idx) 539 else: 540 fn += 1 541 fp = max(len(knees) - tp, 0) 542 tn = len(points) - (tp+fp+fn) 543 544 return np.array([[tp, fp], [fn, tn]]) 545 546 547def accuracy(cm: np.ndarray) -> float: 548 """ 549 Computes accuracy based on a Confusion Matrix. 550 551 Args: 552 cm (np.ndarray): the confusion matrix 553 554 Returns: 555 float: the accuracy 556 """ 557 558 tp, fp = cm[0] 559 fn, tn = cm[1] 560 561 return (tp+tn)/(tp+tn+fp+fn) 562 563 564def f1score(cm: np.ndarray) -> float: 565 """ 566 Computes F1-Score based on a Confusion Matrix. 567 568 Args: 569 cm (np.ndarray): the confusion matrix 570 571 Returns: 572 float: the F1-Score 573 """ 574 575 tp, fp = cm[0] 576 fn, _ = cm[1] 577 578 return (2.0*tp)/(2*tp+fp+fn) 579 580 581def mcc(cm: np.ndarray) -> float: 582 """ 583 Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix. 584 585 Args: 586 cm (np.ndarray): the confusion matrix 587 588 Returns: 589 float: the mcc 590 """ 591 592 tp, fp = cm[0] 593 fn, tn = cm[1] 594 595 n = tp*tn - fp*fn 596 d = math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)) 597 598 return n/d 599 600 601def compute_global_rmse(points: np.ndarray, reduced: np.ndarray, cache:dict=None) -> float: 602 """ 603 Computes the global RMSE for a point reduction set. 604 605 For each segment within the point reduction set, computes a straight line. 606 For each line segment it computes the error with the original set. 607 608 Args: 609 points (np.ndarray): the original points 610 reduced (np.ndarray): the indexes of the reduced points 611 cache (dict): cache used to speedup the computation (default None) 612 613 Returns: 614 float: the global RMSE 615 """ 616 # Setup the cache 617 if cache is None: cache = {} 618 619 segment_errors = np.zeros(len(reduced)-1) 620 621 left = reduced[0] 622 for i in range(1, len(reduced)): 623 right = reduced[i] 624 625 # Get data from cache 626 if (left, right) not in cache: 627 pt = points[left:right+1] 628 coef = lf.linear_fit_points(pt) 629 y_hat = lf.linear_transform_points(pt, coef) 630 y = pt[:,1] 631 segment_error = np.sum(np.square((y-y_hat))) 632 cache[(left, right)] = segment_error 633 634 segment_errors[i-1] = cache[(left, right)] 635 left = right 636 637 # compute the cost function 638 return math.sqrt(np.sum(segment_errors)/len(points)) 639 640 641def mip(points: np.ndarray, reduced: np.ndarray) -> tuple: 642 """ 643 Computes the median improvement per point (MIP). 644 645 Improvement is defined as a relation between a final cost against a reference cost. 646 The final cost is the RSME with the complete reduced set. 647 The reference cost id different for each point. 648 It is computed as the RMSE of the reconstruction without that specific point. 649 650 Args: 651 points (np.ndarray): the original points 652 reduced (np.ndarray): the indexes of the reduced points 653 654 Returns: 655 tuple: the median improvement per point (MIP) and the MAD 656 """ 657 # Setup the cache 658 cache = {} 659 660 ip = np.zeros(len(reduced)-2) 661 662 # Compute the final RSME 663 cost_fin = compute_global_rmse(points, reduced, cache) 664 665 for i in range(1, len(reduced)-1): 666 # Compute the reference RMSE 667 cost_ref = compute_global_rmse(points, np.delete(reduced, i), cache) 668 ip[i-1] = cost_ref - cost_fin 669 670 mip = np.median(ip) 671 672 return mip, np.median(np.absolute(ip - mip)) 673 674 675def compute_cost(points: np.ndarray, segment_errors:np.ndarray, cost:metrics.Metrics, cache:dict) -> float: 676 """ 677 Compute the cost of multi-point fitting using the segment errors. 678 679 It uses a cache to speedup the compuation of the cost. 680 The cache contains the cost of previously computed segments. 681 682 Args: 683 points (np.ndarray): the original points 684 segment_errors (np.ndarray): errors from each of the segments 685 cost (metrics.Metrics): the metric used for the cost calculation 686 cache (dict): the cache used to store the segments costs 687 688 Returns: 689 float: the global cost of a multi-point fitting 690 """ 691 # methods = {metrics.Metrics.r2: metrics.r2,metrics.Metrics.rpd: metrics.rpd,metrics.Metrics.rmsle: metrics.rmsle,metrics.Metrics.rmspe: metrics.rmspe,metrics.Metrics.smape: metrics.smape} 692 # cost = methods[cost](np.array(y), np.array(y_hat)) 693 694 # We have to account for the head of each segment 695 total = len(points) + len(segment_errors) - 1 696 697 if cost is metrics.Metrics.r2: 698 # Get tss from the cache 699 if 'tss' not in cache: 700 # Compute TSS and add it to cache (single operation) 701 y = points[:,1] 702 y_mean = np.mean(y) 703 tss = np.sum(np.square(y-y_mean)) 704 cache['tss'] = tss 705 tss = cache['tss'] 706 rss = np.sum(segment_errors) 707 cost = 1.0 - rss if tss == 0 else 1.0 - (rss/tss) 708 #return np.sum(np.square(y-y_hat)) 709 elif cost is metrics.Metrics.rmsle: 710 #return np.sum(np.square((np.log(y+1) - np.log(y_hat+1)))) 711 cost = math.sqrt(np.sum(segment_errors)/total) 712 elif cost is metrics.Metrics.rmspe: 713 #return np.sum(np.square((y - y_hat) / (y+eps))) 714 cost = math.sqrt(np.sum(segment_errors)/total) 715 elif cost is metrics.Metrics.rpd: 716 #return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps))) 717 cost = np.sum(segment_errors)/total 718 else: 719 #return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps)) 720 cost = np.sum(segment_errors)/total 721 722 cost = 0 if cost < 0 else cost 723 724 return cost 725 726 727def compute_partial_cost(y:np.ndarray, y_hat:np.ndarray, cost: metrics.Metrics, eps: float = 1e-16) -> float: 728 """ 729 Compute the partial cost of a multi-point fitting. 730 731 Args: 732 y (np.ndarray): the y values from the points 733 y_hat (np.ndarray): the y values computed with the linear aproximation 734 cost (metrics.Metrics): the metric used for the cost calculation 735 eps (float): eps value to prevent division by zero (default: 1E-16) 736 737 Returns: 738 float: the global cost of a multi-point fitting 739 """ 740 if cost is metrics.Metrics.r2: 741 return np.sum(np.square(y-y_hat)) 742 elif cost is metrics.Metrics.rmsle: 743 return np.sum(np.square((np.log(y+1) - np.log(y_hat+1)))) 744 elif cost is metrics.Metrics.rmspe: 745 return np.sum(np.square((y - y_hat) / (y+eps))) 746 elif cost is metrics.Metrics.rpd: 747 return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps))) 748 else: 749 return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps)) 750 751 752def compute_global_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd, cache:dict=None) -> float: 753 """ 754 Compute the cost of multi-point fitting using the segment errors. 755 756 It uses a cache to speedup the compuation of the cost. 757 The cache contains the cost of previously computed segments. 758 759 Args: 760 points (np.ndarray): the original points 761 reduced (np.ndarray): the reduced set of points used for the fitting 762 cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd) 763 cache (dict): the cache used to store the segments costs (default: None) 764 765 Returns: 766 float: the global cost of a multi-point fitting 767 """ 768 # Setup the cache 769 if cache is None: cache = {} 770 771 segment_errors = np.zeros(len(reduced)-1) 772 773 left = reduced[0] 774 for i in range(1, len(reduced)): 775 right = reduced[i] 776 777 # Get data from cache 778 if (left, right) not in cache: 779 pt = points[left:right+1] 780 if len(pt) <= 2: 781 cache[(left, right)] = 0 782 else: 783 y_hat = lf.linear_fit_transform_points(pt) 784 y = pt[:,1] 785 segment_error = compute_partial_cost(y, y_hat, cost) 786 cache[(left, right)] = segment_error 787 788 segment_errors[i-1] = cache[(left, right)] 789 left = right 790 791 return compute_cost(points, segment_errors, cost, cache) 792 793 794def compute_global_segment_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd) -> tuple: 795 """ 796 Compute the cost of multi-point fitting using the segment errors. 797 798 Legacy function that does not use a cache or combines the segments costs. 799 Used for unit testing. 800 801 Args: 802 points (np.ndarray): the original points 803 reduced (np.ndarray): the reduced set of points used for the fitting 804 cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd) 805 806 Returns: 807 tuple: the global cost, and partial costs 808 """ 809 y, y_hat = [], [] 810 811 cost_segment = [] 812 813 left = reduced[0] 814 for i in range(1, len(reduced)): 815 right = reduced[i] 816 pt = points[left:right+1] 817 818 y_temp, y_hat_temp = lf.linear_fit_transform_points(pt) 819 820 y_hat.extend(y_hat_temp) 821 y.extend(y_temp) 822 823 # compute the cost function 824 #c = compute_cost(y_temp, y_hat_temp, cost) 825 c = lf.linear_hv_residuals_points(pt) 826 cost_segment.append(c) 827 828 left = right 829 830 # compute the cost function 831 return compute_cost(y, y_hat, cost), np.array(cost_segment)
30class Strategy(enum.Enum): 31 """ 32 Enum data type that represents the strategy of MAE, MSE and RMSE 33 """ 34 knees = 'knees' 35 expected = 'expected' 36 best = 'best' 37 worst = 'worst' 38 39 def __str__(self): 40 return self.value
Enum data type that represents the strategy of MAE, MSE and RMSE
Inherited Members
- enum.Enum
- name
- value
43def get_neighbourhood_points(points: np.ndarray, a: int, b: int, t: float) -> tuple: 44 """ 45 Get the neighbourhood (closest points) from a to b. 46 47 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 48 49 Args: 50 points (np.ndarray): numpy array with the points (x, y) 51 a (int): the initial point of the search 52 b (int): the left limit of the search 53 t (float): $R^2$ threshold 54 55 Returns: 56 tuple: (neighbourhood index, r2, slope) 57 """ 58 59 x = points[:, 0] 60 y = points[:, 1] 61 return get_neighbourhood(x, y, a, b, t)
Get the neighbourhood (closest points) from a to b.
The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- a (int): the initial point of the search
- b (int): the left limit of the search
- t (float): $R^2$ threshold
Returns:
tuple: (neighbourhood index, r2, slope)
64def get_neighbourhood_fast_points(points: np.ndarray, a: int, b: int, t: float) -> tuple: 65 """ 66 Get the neighbourhood (closest points) from a to b. 67 68 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 69 This version uses a inaccurate binary search to speedup the search. 70 71 Args: 72 points (np.ndarray): numpy array with the points (x, y) 73 a (int): the initial point of the search 74 b (int): the left limit of the search 75 t (float): $R^2$ threshold 76 77 Returns: 78 tuple: (neighbourhood index, r2, slope) 79 """ 80 81 x = points[:, 0] 82 y = points[:, 1] 83 return get_neighbourhood_fast(x, y, a, b, t)
Get the neighbourhood (closest points) from a to b.
The neighbourhood is defined as the longest straitgh line (defined by $R^2$). This version uses a inaccurate binary search to speedup the search.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- a (int): the initial point of the search
- b (int): the left limit of the search
- t (float): $R^2$ threshold
Returns:
tuple: (neighbourhood index, r2, slope)
86def get_neighbourhood_binary(x: np.ndarray, y: np.ndarray, a: int, b: int, t=0.9) -> int: 87 """ 88 Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold. 89 90 This version uses a inaccurate binary search to speedup the search. 91 92 Args: 93 x (np.ndarray): the value of the points in the x axis coordinates 94 y (np.ndarray): the value of the points in the y axis coordinates 95 a (int): the initial point of the search 96 b (int): the left limit of the search 97 t (float): $R^2$ threshold (default 0.9) 98 99 Returns: 100 int: index of the point 101 """ 102 103 i = b 104 right = a 105 106 while abs(i-right) > 1: 107 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 108 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 109 110 if r2 < t: 111 i = int((i+right)/2.0) 112 else: 113 right = i 114 i = int((b+right)/2.0) 115 116 return i
Get the index of the point within the range $[b, a]$ where the $R^2$ is close to the threshold.
This version uses a inaccurate binary search to speedup the search.
Arguments:
- x (np.ndarray): the value of the points in the x axis coordinates
- y (np.ndarray): the value of the points in the y axis coordinates
- a (int): the initial point of the search
- b (int): the left limit of the search
- t (float): $R^2$ threshold (default 0.9)
Returns:
int: index of the point
119def get_neighbourhood_fast(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple: 120 """ 121 Get the neighbourhood (closest points) from a to b. 122 123 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 124 This version uses a inaccurate binary search to speedup the search. 125 126 Args: 127 x (np.ndarray): the value of the points in the x axis coordinates 128 y (np.ndarray): the value of the points in the y axis coordinates 129 a (int): the initial point of the search 130 b (int): the left limit of the search 131 t (float): $R^2$ threshold (default 0.9) 132 133 Returns: 134 tuple: (neighbourhood index, r2, slope) 135 """ 136 # speedup when the search using an inaccurate binary search 137 i = get_neighbourhood_binary(x, y, a, b, t) 138 b, slope = lf.linear_fit(x[i:a+1], y[i:a+1]) 139 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], (b, slope)) 140 previous_res = (i, r2, slope) 141 142 # Linear search to improve accuracy 143 while r2 < t and i < a: 144 i += 1 145 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 146 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 147 _, slope = coef 148 previous_res = (i, r2, slope) 149 150 return previous_res
Get the neighbourhood (closest points) from a to b.
The neighbourhood is defined as the longest straitgh line (defined by $R^2$). This version uses a inaccurate binary search to speedup the search.
Arguments:
- x (np.ndarray): the value of the points in the x axis coordinates
- y (np.ndarray): the value of the points in the y axis coordinates
- a (int): the initial point of the search
- b (int): the left limit of the search
- t (float): $R^2$ threshold (default 0.9)
Returns:
tuple: (neighbourhood index, r2, slope)
153def get_neighbourhood(x: np.ndarray, y: np.ndarray, a: int, b: int, t: float = 0.9) -> tuple: 154 """ 155 Get the neighbourhood (closest points) from a to b. 156 157 The neighbourhood is defined as the longest straitgh line (defined by $R^2$). 158 159 Args: 160 x (np.ndarray): the value of the points in the x axis coordinates 161 y (np.ndarray): the value of the points in the y axis coordinates 162 a (int): the initial point of the search 163 b (int): the left limit of the search 164 t (float): $R^2$ threshold (default 0.9) 165 166 Returns: 167 tuple: (neighbourhood index, r2, slope) 168 """ 169 170 r2 = 1.0 171 i = a - 1 172 _, slope = lf.linear_fit(x[i:a+1], y[i:a+1]) 173 174 while r2 > t and i > b: 175 # print('.') 176 previous_res = (i, r2, slope) 177 i -= 1 178 coef = lf.linear_fit(x[i:a+1], y[i:a+1]) 179 r2 = lf.linear_r2(x[i:a+1], y[i:a+1], coef) 180 _, slope = coef 181 #print(f'{i} -> {r2}') 182 183 if r2 > t: 184 return i, r2, slope 185 else: 186 return previous_res
Get the neighbourhood (closest points) from a to b.
The neighbourhood is defined as the longest straitgh line (defined by $R^2$).
Arguments:
- x (np.ndarray): the value of the points in the x axis coordinates
- y (np.ndarray): the value of the points in the y axis coordinates
- a (int): the initial point of the search
- b (int): the left limit of the search
- t (float): $R^2$ threshold (default 0.9)
Returns:
tuple: (neighbourhood index, r2, slope)
189def accuracy_knee(points: np.ndarray, knees: np.ndarray, t: float = 0.9) -> tuple: 190 """Compute the accuracy heuristic for a set of knees. 191 192 The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. 193 In this version it is used the left neighbourhood of the knee. 194 195 Args: 196 points (np.ndarray): numpy array with the points (x, y) 197 knees (np.ndarray): knees indexes 198 t (float): $R^2$ threshold (default 0.9) 199 200 Returns: 201 tuple: (average_x, average_y, average_slope, average_coeffients, cost) 202 """ 203 x = points[:, 0] 204 y = points[:, 1] 205 206 total_x = math.fabs(x[-1] - x[0]) 207 total_y = math.fabs(y[-1] - y[0]) 208 209 distances_x = [] 210 distances_y = [] 211 slopes = [] 212 coeffients = [] 213 214 previous_knee = 0 215 for i in range(len(knees)): 216 idx, r2, slope = get_neighbourhood_fast(x, y, knees[i], previous_knee) 217 218 delta_x = x[idx] - x[knees[i]] 219 delta_y = y[idx] - y[knees[i]] 220 221 distances_x.append(math.fabs(delta_x)) 222 distances_y.append(math.fabs(delta_y)) 223 slopes.append(math.fabs(slope)) 224 coeffients.append(r2) 225 226 previous_knee = knees[i] 227 228 slopes = np.array(slopes) 229 slopes = slopes/slopes.max() 230 231 coeffients = np.array(coeffients) 232 coeffients = coeffients/coeffients.max() 233 234 distances_x = np.array(distances_x)/total_x 235 distances_y = np.array(distances_y)/total_y 236 average_x = np.average(distances_x) 237 average_y = np.average(distances_y) 238 239 average_slope = np.average(slopes) 240 average_coeffients = np.average(coeffients) 241 242 #p = slopes * distances_y * coeffients 243 p = slopes * distances_y 244 #cost = (average_x * average_y) / (average_slope) 245 cost = average_x / np.average(p) 246 247 return average_x, average_y, average_slope, average_coeffients, cost
Compute the accuracy heuristic for a set of knees.
The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. In this version it is used the left neighbourhood of the knee.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- t (float): $R^2$ threshold (default 0.9)
Returns:
tuple: (average_x, average_y, average_slope, average_coeffients, cost)
250def accuracy_trace(points: np.ndarray, knees: np.ndarray) -> tuple: 251 """Compute the accuracy heuristic for a set of knees. 252 253 The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. 254 In this version it is used the points from the current knee to the previous. 255 256 Args: 257 points (np.ndarray): numpy array with the points (x, y) 258 knees (np.ndarray): knees indexes 259 260 Returns: 261 tuple: (average_x, average_y, average_slope, average_coeffients, cost) 262 """ 263 x = points[:, 0] 264 y = points[:, 1] 265 266 distances_x = [] 267 distances_y = [] 268 slopes = [] 269 coeffients = [] 270 271 total_x = math.fabs(x[-1] - x[0]) 272 total_y = math.fabs(y[-1] - y[0]) 273 274 previous_knee_x = x[knees[0]] 275 previous_knee_y = y[knees[0]] 276 277 delta_x = x[0] - previous_knee_x 278 delta_y = y[0] - previous_knee_y 279 distances_x.append(math.fabs(delta_x)) 280 distances_y.append(math.fabs(delta_y)) 281 282 coef = lf.linear_fit(x[0:knees[0]+1], y[0:knees[0]+1]) 283 r2 = lf.linear_r2(x[0:knees[0]+1], y[0:knees[0]+1], coef) 284 coeffients.append(r2) 285 _, slope = coef 286 slopes.append(math.fabs(slope)) 287 288 for i in range(1, len(knees)): 289 knee_x = x[knees[i]] 290 knee_y = y[knees[i]] 291 292 delta_x = previous_knee_x - knee_x 293 delta_y = previous_knee_y - knee_y 294 295 coef = lf.linear_fit(x[knees[i-1]:knees[i]+1], 296 y[knees[i-1]:knees[i]+1]) 297 r2 = lf.linear_r2(x[knees[i-1]:knees[i]+1], 298 y[knees[i-1]:knees[i]+1], coef) 299 300 distances_x.append(math.fabs(delta_x)) 301 distances_y.append(math.fabs(delta_y)) 302 _, slope = coef 303 slopes.append(math.fabs(slope)) 304 coeffients.append(r2) 305 306 previous_knee_x = knee_x 307 previous_knee_y = knee_y 308 309 distances_x = np.array(distances_x)/total_x 310 distances_y = np.array(distances_y)/total_y 311 slopes = np.array(slopes) 312 slopes = slopes/slopes.max() 313 314 coeffients = np.array(coeffients) 315 coeffients = coeffients/coeffients.max() 316 317 coeffients[coeffients < 0] = 0.0 318 p = slopes * distances_y * coeffients 319 #p = slopes * distances_y 320 321 average_x = np.average(distances_x) 322 average_y = np.average(distances_y) 323 average_slope = np.average(slopes) 324 average_coeffients = np.average(coeffients) 325 326 cost = average_x / np.average(p) 327 328 return average_x, average_y, average_slope, average_coeffients, cost
Compute the accuracy heuristic for a set of knees.
The heuristic is based on the average distance of X and Y axis, the slope and the $R^2$. In this version it is used the points from the current knee to the previous.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
Returns:
tuple: (average_x, average_y, average_slope, average_coeffients, cost)
331def mae(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 332 """ 333 Estimates the worst case Mean Absolute Error (MAE) for the given 334 knee and expected points. 335 336 Suppports different size arrays, and estimates the MAE based 337 on the worst case. 338 It uses the euclidean distance to find the closer points, 339 and computes the error based on the closest point. 340 341 Args: 342 points (np.ndarray): numpy array with the points (x, y) 343 knees (np.ndarray): knees indexes 344 expected (np.ndarray): numpy array with the expected knee points (x, y) 345 s (Strategy): enum that controls the point matching (default Strategy.expected) 346 347 Returns: 348 float: the worst case MAE 349 """ 350 # get the knee points 351 knee_points = points[knees] 352 353 error = 0.0 354 355 if s is Strategy.knees: 356 a = knee_points 357 b = expected 358 elif s is Strategy.expected: 359 a = expected 360 b = knee_points 361 elif s is Strategy.best: 362 if len(expected) <= len(knee_points): 363 a = expected 364 b = knee_points 365 else: 366 a = knee_points 367 b = expected 368 else: 369 if len(expected) >= len(knee_points): 370 a = expected 371 b = knee_points 372 else: 373 a = knee_points 374 b = expected 375 376 for p in a: 377 distances = np.linalg.norm(b-p, axis=1) 378 idx = np.argmin(distances) 379 error += np.sum(np.abs(p-b[idx])) 380 381 return error / (len(a)*2.0)
Estimates the worst case Mean Absolute Error (MAE) for the given knee and expected points.
Suppports different size arrays, and estimates the MAE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- expected (np.ndarray): numpy array with the expected knee points (x, y)
- s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:
float: the worst case MAE
384def mse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 385 """ 386 Estimates the worst case Mean Squared Error (MSE) for the given 387 knee and expected points. 388 389 Suppports different size arrays, and estimates the MSE based 390 on the worst case. 391 It uses the euclidean distance to find the closer points, 392 and computes the error based on the closest point. 393 394 Args: 395 points (np.ndarray): numpy array with the points (x, y) 396 knees (np.ndarray): knees indexes 397 expected (np.ndarray): numpy array with the expected knee points (x, y) 398 s (Strategy): enum that controls the point matching (default Strategy.expected) 399 400 Returns: 401 float: the worst case MSE 402 """ 403 # get the knee points 404 knee_points = points[knees] 405 406 error = 0.0 407 408 if s is Strategy.knees: 409 a = knee_points 410 b = expected 411 elif s is Strategy.expected: 412 a = expected 413 b = knee_points 414 elif s is Strategy.best: 415 if len(expected) <= len(knee_points): 416 a = expected 417 b = knee_points 418 else: 419 a = knee_points 420 b = expected 421 else: 422 if len(expected) >= len(knee_points): 423 a = expected 424 b = knee_points 425 else: 426 a = knee_points 427 b = expected 428 429 for p in a: 430 distances = np.linalg.norm(b-p, axis=1) 431 idx = np.argmin(distances) 432 error += np.sum(np.square(p-b[idx])) 433 434 return error / (len(a)*2.0)
Estimates the worst case Mean Squared Error (MSE) for the given knee and expected points.
Suppports different size arrays, and estimates the MSE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- expected (np.ndarray): numpy array with the expected knee points (x, y)
- s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:
float: the worst case MSE
437def rmse(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected) -> float: 438 """ 439 Estimates the worst case Root Mean Squared Error (RMSE) for the given 440 knee and expected points. 441 442 Suppports different size arrays, and estimates the RMSE based 443 on the worst case. 444 It uses the euclidean distance to find the closer points, 445 and computes the error based on the closest point. 446 447 Args: 448 points (np.ndarray): numpy array with the points (x, y) 449 knees (np.ndarray): knees indexes 450 expected (np.ndarray): numpy array with the expected knee points (x, y) 451 s (Strategy): enum that controls the point matching (default Strategy.expected) 452 453 Returns: 454 float: the worst case RMSE 455 """ 456 return math.sqrt(mse(points, knees, expected, s))
Estimates the worst case Root Mean Squared Error (RMSE) for the given knee and expected points.
Suppports different size arrays, and estimates the RMSE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- expected (np.ndarray): numpy array with the expected knee points (x, y)
- s (Strategy): enum that controls the point matching (default Strategy.expected)
Returns:
float: the worst case RMSE
459def rmspe(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, s: Strategy = Strategy.expected, eps: float = 1e-16) -> float: 460 """ 461 Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points. 462 463 Suppports different size arrays, and estimates the RMSPE based on the worst case. 464 It uses the euclidean distance to find the closer points, and computes the error based on the closest point. 465 466 Args: 467 points (np.ndarray): numpy array with the points (x, y) 468 knees (np.ndarray): knees indexes 469 expected (np.ndarray): numpy array with the expected knee points (x, y) 470 s (Strategy): enum that controls the point matching (default Strategy.expected) 471 eps (float): eps value to prevent division by zero (default: 1E-16) 472 473 Returns: 474 float: the worst case RMSPE 475 """ 476 # get the knee points 477 knee_points = points[knees] 478 479 if s is Strategy.knees: 480 a = knee_points 481 b = expected 482 elif s is Strategy.expected: 483 a = expected 484 b = knee_points 485 elif s is Strategy.best: 486 if len(expected) <= len(knee_points): 487 a = expected 488 b = knee_points 489 else: 490 a = knee_points 491 b = expected 492 else: 493 if len(expected) >= len(knee_points): 494 a = expected 495 b = knee_points 496 else: 497 a = knee_points 498 b = expected 499 500 errors = [] 501 502 for p in a: 503 distances = np.linalg.norm(b-p, axis=1) 504 idx = np.argmin(distances) 505 e = (p - b[idx]) / (p + eps) 506 errors.extend(e) 507 errors = np.array(errors) 508 509 return np.sqrt(np.mean(np.square(errors)))
Estimates the worst case Root Mean Squared Percentage Error (RMSPE) for the given knee and expected points.
Suppports different size arrays, and estimates the RMSPE based on the worst case. It uses the euclidean distance to find the closer points, and computes the error based on the closest point.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- expected (np.ndarray): numpy array with the expected knee points (x, y)
- s (Strategy): enum that controls the point matching (default Strategy.expected)
- eps (float): eps value to prevent division by zero (default: 1E-16)
Returns:
float: the worst case RMSPE
512def cm(points: np.ndarray, knees: np.ndarray, expected: np.ndarray, t: float = 0.01) -> np.ndarray: 513 """ 514 Computes the Confusion Matrix based on the knees and expected points. 515 516 Args: 517 points (np.ndarray): numpy array with the points (x, y) 518 knees (np.ndarray): knees indexes 519 expected (np.ndarray): numpy array with the expected knee points (x, y) 520 t (float): the maximum allowed distance in percentage (default 0.01) 521 522 Returns: 523 np.ndarray: the confusion matrix 524 """ 525 526 #dx = math.fabs(points[-1][0] - points[0][0]) 527 max_x, _ = points.max(axis=0) 528 min_x, _ = points.min(axis=0) 529 dx = math.fabs(max_x - min_x) 530 used_knees = [] 531 knees_points_x = points[knees][:, 0] 532 tp = fn = fp = tn = 0 533 for px, _ in expected: 534 distances = np.fabs(knees_points_x - px)/dx 535 idx = np.argmin(distances) 536 #logger.info(f"{px} / {idx} / {distances[idx]} = {distances[idx] <= t}") 537 if distances[idx] <= t and idx not in used_knees: 538 tp += 1 539 used_knees.append(idx) 540 else: 541 fn += 1 542 fp = max(len(knees) - tp, 0) 543 tn = len(points) - (tp+fp+fn) 544 545 return np.array([[tp, fp], [fn, tn]])
Computes the Confusion Matrix based on the knees and expected points.
Arguments:
- points (np.ndarray): numpy array with the points (x, y)
- knees (np.ndarray): knees indexes
- expected (np.ndarray): numpy array with the expected knee points (x, y)
- t (float): the maximum allowed distance in percentage (default 0.01)
Returns:
np.ndarray: the confusion matrix
548def accuracy(cm: np.ndarray) -> float: 549 """ 550 Computes accuracy based on a Confusion Matrix. 551 552 Args: 553 cm (np.ndarray): the confusion matrix 554 555 Returns: 556 float: the accuracy 557 """ 558 559 tp, fp = cm[0] 560 fn, tn = cm[1] 561 562 return (tp+tn)/(tp+tn+fp+fn)
Computes accuracy based on a Confusion Matrix.
Arguments:
- cm (np.ndarray): the confusion matrix
Returns:
float: the accuracy
565def f1score(cm: np.ndarray) -> float: 566 """ 567 Computes F1-Score based on a Confusion Matrix. 568 569 Args: 570 cm (np.ndarray): the confusion matrix 571 572 Returns: 573 float: the F1-Score 574 """ 575 576 tp, fp = cm[0] 577 fn, _ = cm[1] 578 579 return (2.0*tp)/(2*tp+fp+fn)
Computes F1-Score based on a Confusion Matrix.
Arguments:
- cm (np.ndarray): the confusion matrix
Returns:
float: the F1-Score
582def mcc(cm: np.ndarray) -> float: 583 """ 584 Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix. 585 586 Args: 587 cm (np.ndarray): the confusion matrix 588 589 Returns: 590 float: the mcc 591 """ 592 593 tp, fp = cm[0] 594 fn, tn = cm[1] 595 596 n = tp*tn - fp*fn 597 d = math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)) 598 599 return n/d
Computes Matthews Correlation Coefficient (MCC) based on a Confusion Matrix.
Arguments:
- cm (np.ndarray): the confusion matrix
Returns:
float: the mcc
602def compute_global_rmse(points: np.ndarray, reduced: np.ndarray, cache:dict=None) -> float: 603 """ 604 Computes the global RMSE for a point reduction set. 605 606 For each segment within the point reduction set, computes a straight line. 607 For each line segment it computes the error with the original set. 608 609 Args: 610 points (np.ndarray): the original points 611 reduced (np.ndarray): the indexes of the reduced points 612 cache (dict): cache used to speedup the computation (default None) 613 614 Returns: 615 float: the global RMSE 616 """ 617 # Setup the cache 618 if cache is None: cache = {} 619 620 segment_errors = np.zeros(len(reduced)-1) 621 622 left = reduced[0] 623 for i in range(1, len(reduced)): 624 right = reduced[i] 625 626 # Get data from cache 627 if (left, right) not in cache: 628 pt = points[left:right+1] 629 coef = lf.linear_fit_points(pt) 630 y_hat = lf.linear_transform_points(pt, coef) 631 y = pt[:,1] 632 segment_error = np.sum(np.square((y-y_hat))) 633 cache[(left, right)] = segment_error 634 635 segment_errors[i-1] = cache[(left, right)] 636 left = right 637 638 # compute the cost function 639 return math.sqrt(np.sum(segment_errors)/len(points))
Computes the global RMSE for a point reduction set.
For each segment within the point reduction set, computes a straight line. For each line segment it computes the error with the original set.
Arguments:
- points (np.ndarray): the original points
- reduced (np.ndarray): the indexes of the reduced points
- cache (dict): cache used to speedup the computation (default None)
Returns:
float: the global RMSE
642def mip(points: np.ndarray, reduced: np.ndarray) -> tuple: 643 """ 644 Computes the median improvement per point (MIP). 645 646 Improvement is defined as a relation between a final cost against a reference cost. 647 The final cost is the RSME with the complete reduced set. 648 The reference cost id different for each point. 649 It is computed as the RMSE of the reconstruction without that specific point. 650 651 Args: 652 points (np.ndarray): the original points 653 reduced (np.ndarray): the indexes of the reduced points 654 655 Returns: 656 tuple: the median improvement per point (MIP) and the MAD 657 """ 658 # Setup the cache 659 cache = {} 660 661 ip = np.zeros(len(reduced)-2) 662 663 # Compute the final RSME 664 cost_fin = compute_global_rmse(points, reduced, cache) 665 666 for i in range(1, len(reduced)-1): 667 # Compute the reference RMSE 668 cost_ref = compute_global_rmse(points, np.delete(reduced, i), cache) 669 ip[i-1] = cost_ref - cost_fin 670 671 mip = np.median(ip) 672 673 return mip, np.median(np.absolute(ip - mip))
Computes the median improvement per point (MIP).
Improvement is defined as a relation between a final cost against a reference cost. The final cost is the RSME with the complete reduced set. The reference cost id different for each point. It is computed as the RMSE of the reconstruction without that specific point.
Arguments:
- points (np.ndarray): the original points
- reduced (np.ndarray): the indexes of the reduced points
Returns:
tuple: the median improvement per point (MIP) and the MAD
676def compute_cost(points: np.ndarray, segment_errors:np.ndarray, cost:metrics.Metrics, cache:dict) -> float: 677 """ 678 Compute the cost of multi-point fitting using the segment errors. 679 680 It uses a cache to speedup the compuation of the cost. 681 The cache contains the cost of previously computed segments. 682 683 Args: 684 points (np.ndarray): the original points 685 segment_errors (np.ndarray): errors from each of the segments 686 cost (metrics.Metrics): the metric used for the cost calculation 687 cache (dict): the cache used to store the segments costs 688 689 Returns: 690 float: the global cost of a multi-point fitting 691 """ 692 # methods = {metrics.Metrics.r2: metrics.r2,metrics.Metrics.rpd: metrics.rpd,metrics.Metrics.rmsle: metrics.rmsle,metrics.Metrics.rmspe: metrics.rmspe,metrics.Metrics.smape: metrics.smape} 693 # cost = methods[cost](np.array(y), np.array(y_hat)) 694 695 # We have to account for the head of each segment 696 total = len(points) + len(segment_errors) - 1 697 698 if cost is metrics.Metrics.r2: 699 # Get tss from the cache 700 if 'tss' not in cache: 701 # Compute TSS and add it to cache (single operation) 702 y = points[:,1] 703 y_mean = np.mean(y) 704 tss = np.sum(np.square(y-y_mean)) 705 cache['tss'] = tss 706 tss = cache['tss'] 707 rss = np.sum(segment_errors) 708 cost = 1.0 - rss if tss == 0 else 1.0 - (rss/tss) 709 #return np.sum(np.square(y-y_hat)) 710 elif cost is metrics.Metrics.rmsle: 711 #return np.sum(np.square((np.log(y+1) - np.log(y_hat+1)))) 712 cost = math.sqrt(np.sum(segment_errors)/total) 713 elif cost is metrics.Metrics.rmspe: 714 #return np.sum(np.square((y - y_hat) / (y+eps))) 715 cost = math.sqrt(np.sum(segment_errors)/total) 716 elif cost is metrics.Metrics.rpd: 717 #return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps))) 718 cost = np.sum(segment_errors)/total 719 else: 720 #return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps)) 721 cost = np.sum(segment_errors)/total 722 723 cost = 0 if cost < 0 else cost 724 725 return cost
Compute the cost of multi-point fitting using the segment errors.
It uses a cache to speedup the compuation of the cost. The cache contains the cost of previously computed segments.
Arguments:
- points (np.ndarray): the original points
- segment_errors (np.ndarray): errors from each of the segments
- cost (metrics.Metrics): the metric used for the cost calculation
- cache (dict): the cache used to store the segments costs
Returns:
float: the global cost of a multi-point fitting
728def compute_partial_cost(y:np.ndarray, y_hat:np.ndarray, cost: metrics.Metrics, eps: float = 1e-16) -> float: 729 """ 730 Compute the partial cost of a multi-point fitting. 731 732 Args: 733 y (np.ndarray): the y values from the points 734 y_hat (np.ndarray): the y values computed with the linear aproximation 735 cost (metrics.Metrics): the metric used for the cost calculation 736 eps (float): eps value to prevent division by zero (default: 1E-16) 737 738 Returns: 739 float: the global cost of a multi-point fitting 740 """ 741 if cost is metrics.Metrics.r2: 742 return np.sum(np.square(y-y_hat)) 743 elif cost is metrics.Metrics.rmsle: 744 return np.sum(np.square((np.log(y+1) - np.log(y_hat+1)))) 745 elif cost is metrics.Metrics.rmspe: 746 return np.sum(np.square((y - y_hat) / (y+eps))) 747 elif cost is metrics.Metrics.rpd: 748 return np.sum(np.abs((y - y_hat) / (np.maximum(y, y_hat)+eps))) 749 else: 750 return np.sum(2.0 * np.abs(y_hat - y) / (np.abs(y) + np.abs(y_hat) + eps))
Compute the partial cost of a multi-point fitting.
Arguments:
- y (np.ndarray): the y values from the points
- y_hat (np.ndarray): the y values computed with the linear aproximation
- cost (metrics.Metrics): the metric used for the cost calculation
- eps (float): eps value to prevent division by zero (default: 1E-16)
Returns:
float: the global cost of a multi-point fitting
753def compute_global_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd, cache:dict=None) -> float: 754 """ 755 Compute the cost of multi-point fitting using the segment errors. 756 757 It uses a cache to speedup the compuation of the cost. 758 The cache contains the cost of previously computed segments. 759 760 Args: 761 points (np.ndarray): the original points 762 reduced (np.ndarray): the reduced set of points used for the fitting 763 cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd) 764 cache (dict): the cache used to store the segments costs (default: None) 765 766 Returns: 767 float: the global cost of a multi-point fitting 768 """ 769 # Setup the cache 770 if cache is None: cache = {} 771 772 segment_errors = np.zeros(len(reduced)-1) 773 774 left = reduced[0] 775 for i in range(1, len(reduced)): 776 right = reduced[i] 777 778 # Get data from cache 779 if (left, right) not in cache: 780 pt = points[left:right+1] 781 if len(pt) <= 2: 782 cache[(left, right)] = 0 783 else: 784 y_hat = lf.linear_fit_transform_points(pt) 785 y = pt[:,1] 786 segment_error = compute_partial_cost(y, y_hat, cost) 787 cache[(left, right)] = segment_error 788 789 segment_errors[i-1] = cache[(left, right)] 790 left = right 791 792 return compute_cost(points, segment_errors, cost, cache)
Compute the cost of multi-point fitting using the segment errors.
It uses a cache to speedup the compuation of the cost. The cache contains the cost of previously computed segments.
Arguments:
- points (np.ndarray): the original points
- reduced (np.ndarray): the reduced set of points used for the fitting
- cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
- cache (dict): the cache used to store the segments costs (default: None)
Returns:
float: the global cost of a multi-point fitting
795def compute_global_segment_cost(points: np.ndarray, reduced: np.ndarray, cost: metrics.Metrics = metrics.Metrics.rpd) -> tuple: 796 """ 797 Compute the cost of multi-point fitting using the segment errors. 798 799 Legacy function that does not use a cache or combines the segments costs. 800 Used for unit testing. 801 802 Args: 803 points (np.ndarray): the original points 804 reduced (np.ndarray): the reduced set of points used for the fitting 805 cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd) 806 807 Returns: 808 tuple: the global cost, and partial costs 809 """ 810 y, y_hat = [], [] 811 812 cost_segment = [] 813 814 left = reduced[0] 815 for i in range(1, len(reduced)): 816 right = reduced[i] 817 pt = points[left:right+1] 818 819 y_temp, y_hat_temp = lf.linear_fit_transform_points(pt) 820 821 y_hat.extend(y_hat_temp) 822 y.extend(y_temp) 823 824 # compute the cost function 825 #c = compute_cost(y_temp, y_hat_temp, cost) 826 c = lf.linear_hv_residuals_points(pt) 827 cost_segment.append(c) 828 829 left = right 830 831 # compute the cost function 832 return compute_cost(y, y_hat, cost), np.array(cost_segment)
Compute the cost of multi-point fitting using the segment errors.
Legacy function that does not use a cache or combines the segments costs. Used for unit testing.
Arguments:
- points (np.ndarray): the original points
- reduced (np.ndarray): the reduced set of points used for the fitting
- cost (metrics.Metrics): the metric used for the cost calculation (default: metrics.Metrics.rpd)
Returns:
tuple: the global cost, and partial costs