kneeliverse.lmethod

The following module provides knee detection method based on L-method algorithm.

  1# coding: utf-8
  2
  3'''
  4The following module provides knee detection method
  5based on L-method algorithm.
  6'''
  7
  8__author__ = 'Mário Antunes'
  9__version__ = '1.0'
 10__email__ = 'mario.antunes@ua.pt'
 11__status__ = 'Development'
 12__license__ = 'MIT'
 13__copyright__ = '''
 14Copyright (c) 2021-2023 Stony Brook University
 15Copyright (c) 2021-2023 The Research Foundation of SUNY
 16'''
 17
 18import math
 19import logging
 20import numpy as np
 21import kneeliverse.linear_fit as lf 
 22import kneeliverse.multi_knee as mk
 23
 24
 25from enum import Enum
 26
 27
 28logger = logging.getLogger(__name__)
 29
 30
 31class Fit(Enum):
 32    """
 33    Enum that defines the types of linear fitting
 34    """
 35    best_fit = 'bestfit'
 36    point_fit = 'pointfit'
 37
 38    def __str__(self):
 39        return self.value
 40
 41
 42class Cost(Enum):
 43    """
 44    Enum that defines the cost used in L-method
 45    
 46    RMSE was originaly used by the original authors.
 47    RSS can be used to speedup the computation 
 48    (it has similar results in most practical cases).
 49    """
 50    rss = 'rss'
 51    rmse = 'rmse'
 52
 53    def __str__(self):
 54        return self.value
 55
 56
 57class Refinement(Enum):
 58    """
 59    Enum that defines the iterative refinement used in L-method
 60
 61    none as the name implies, it avoids the iterative refinement
 62    original apply the iterative refinement from the paper
 63    adjusted applies a variant that should be stable on noisy data
 64    """
 65    none='none'
 66    original='original'
 67    adjusted='adjusted'
 68
 69    def __str__(self):
 70        return self.value
 71
 72
 73def compute_error(x: np.ndarray, y: np.ndarray, index:int, length:int, fit=Fit.point_fit, cost=Cost.rmse):
 74    """
 75    Returns the fitting error that is minimized by the L-method algorithm.
 76
 77    Args:
 78        x (np.ndarray): the value of the points in the x axis coordinates
 79        y (np.ndarray): the value of the points in the y axis coordinates
 80        index (int): the index where the fitting line is divided
 81        lenght (int): the lenght of the points considered
 82        fit (Fit): select between point fit and best fit
 83        cost (Cost): use either RMSE (original work) or RSS (faster implementation)
 84    
 85    Returns:
 86        float: the fitting cost
 87    """
 88    
 89    left_length = x[index] - x[0]
 90    right_length = x[-1] - x[index]
 91
 92    left_ratio = left_length/length
 93    right_ratio = right_length/length
 94
 95    if fit is Fit.best_fit:
 96        coef_left, r_left, *_  = np.polyfit(x[0:index+1], y[0:index+1], 1, full=True)
 97        coef_right, r_rigth, *_ = np.polyfit(x[index:], y[index:], 1, full=True)
 98        #error = r_left[0]*(left_length/length) + r_rigth[0]*(right_length/length)
 99        r_left = r_left[0]
100        r_rigth = r_rigth[0]
101    else:
102        coef_left = lf.linear_fit(x[0:index+1], y[0:index+1])
103        coef_right = lf.linear_fit(x[index:], y[index:])
104        r_left = lf.linear_residuals(x[0:index+1], y[0:index+1], coef_left)
105        r_rigth = lf.linear_residuals(x[index:], y[index:], coef_right)
106        
107    if cost is Cost.rmse:
108        error = left_ratio*math.sqrt(r_left*left_ratio) + right_ratio*math.sqrt(right_ratio*r_rigth)
109    else:
110        error = r_left*left_ratio + r_rigth*right_ratio
111    
112    return error, coef_left, coef_right
113
114
115def get_knee(x: np.ndarray, y: np.ndarray, fit=Fit.point_fit, cost=Cost.rmse) -> int:
116    """
117    Returns the index of the knee point based on the L-method.
118
119    This method does not use the iterative refinement.
120    It represents a single iteration of the refinement technique.
121
122    Args:
123        x (np.ndarray): the value of the points in the x axis coordinates
124        y (np.ndarray): the value of the points in the y axis coordinates
125        fit (Fit): select between point fit and best fit
126
127    Returns:
128        int: the index of the knee point
129    """
130
131    index = 2
132    length = x[-1] - x[0]
133    error, coef_left, coef_right = compute_error(x, y, index, length, fit, cost)
134    #logger.info("Error(%s) = %s", index, error)
135
136    for i in range(index+1, len(x)-2):
137        current_error, i_coef_left, i_coef_right = compute_error(x, y, i, length, fit, cost)
138        #logger.info("Error(%s) = %s", i, error)
139
140        if current_error < error:
141            error = current_error
142            index = i
143            coef_left = i_coef_left
144            coef_right = i_coef_right
145
146    return (index, coef_left, coef_right)
147
148
149def knee(points:np.ndarray, fit:Fit=Fit.point_fit, it:Refinement=Refinement.adjusted, limit:int=10) -> int:
150    """
151    Returns the index of the knee point based on the L-method.
152
153    This method uses iterative refinement.
154
155    Args:
156        points (np.ndarray): numpy array with the points (x, y)
157        fit (Fit): select between point fit and best fit (Fit.point_fit)
158        it (Refinement): defines the iterative refinement method (Refinement.adjusted)
159        limit(int): minimal number of points for the fitted lines (10)
160
161    Returns:
162        int: the index of the knee point
163    """
164
165    x = points[:,0]
166    y = points[:,1]
167
168    last_knee = -1
169    cutoff  = current_knee = len(points)
170    done = False
171    while current_knee != last_knee and not done:
172        last_knee = current_knee
173        current_knee, _, _ = get_knee(x[0:cutoff+1], y[0:cutoff+1], fit)
174        if it is Refinement.adjusted:
175            cutoff = max(limit, int((current_knee + last_knee)/2.0))
176        elif it is Refinement.original:
177            cutoff = max(limit, min(current_knee*2, len(points)))
178        else:
179            done = True
180        
181    return current_knee
182
183
184def multi_knee(points: np.ndarray, t1: float = 0.001, t2: int = 4) -> np.ndarray:
185    """Recursive knee point detection based on the L-method.
186
187    It returns the knee points on the curve.
188
189    Args:
190        points (np.ndarray): numpy array with the points (x, y)
191        t1 (float): coefficient of determination threshold (default 0.01)
192        t2 (int): number of points threshold (default 3)
193
194    Returns:
195        np.ndarray: knee points on the curve
196    """
197    return mk.multi_knee(knee, points, t1, t2)
logger = <Logger kneeliverse.lmethod (WARNING)>
class Fit(enum.Enum):
32class Fit(Enum):
33    """
34    Enum that defines the types of linear fitting
35    """
36    best_fit = 'bestfit'
37    point_fit = 'pointfit'
38
39    def __str__(self):
40        return self.value

Enum that defines the types of linear fitting

best_fit = <Fit.best_fit: 'bestfit'>
point_fit = <Fit.point_fit: 'pointfit'>
Inherited Members
enum.Enum
name
value
class Cost(enum.Enum):
43class Cost(Enum):
44    """
45    Enum that defines the cost used in L-method
46    
47    RMSE was originaly used by the original authors.
48    RSS can be used to speedup the computation 
49    (it has similar results in most practical cases).
50    """
51    rss = 'rss'
52    rmse = 'rmse'
53
54    def __str__(self):
55        return self.value

Enum that defines the cost used in L-method

RMSE was originaly used by the original authors. RSS can be used to speedup the computation (it has similar results in most practical cases).

rss = <Cost.rss: 'rss'>
rmse = <Cost.rmse: 'rmse'>
Inherited Members
enum.Enum
name
value
class Refinement(enum.Enum):
58class Refinement(Enum):
59    """
60    Enum that defines the iterative refinement used in L-method
61
62    none as the name implies, it avoids the iterative refinement
63    original apply the iterative refinement from the paper
64    adjusted applies a variant that should be stable on noisy data
65    """
66    none='none'
67    original='original'
68    adjusted='adjusted'
69
70    def __str__(self):
71        return self.value

Enum that defines the iterative refinement used in L-method

none as the name implies, it avoids the iterative refinement original apply the iterative refinement from the paper adjusted applies a variant that should be stable on noisy data

none = <Refinement.none: 'none'>
original = <Refinement.original: 'original'>
adjusted = <Refinement.adjusted: 'adjusted'>
Inherited Members
enum.Enum
name
value
def compute_error( x: numpy.ndarray, y: numpy.ndarray, index: int, length: int, fit=<Fit.point_fit: 'pointfit'>, cost=<Cost.rmse: 'rmse'>):
 74def compute_error(x: np.ndarray, y: np.ndarray, index:int, length:int, fit=Fit.point_fit, cost=Cost.rmse):
 75    """
 76    Returns the fitting error that is minimized by the L-method algorithm.
 77
 78    Args:
 79        x (np.ndarray): the value of the points in the x axis coordinates
 80        y (np.ndarray): the value of the points in the y axis coordinates
 81        index (int): the index where the fitting line is divided
 82        lenght (int): the lenght of the points considered
 83        fit (Fit): select between point fit and best fit
 84        cost (Cost): use either RMSE (original work) or RSS (faster implementation)
 85    
 86    Returns:
 87        float: the fitting cost
 88    """
 89    
 90    left_length = x[index] - x[0]
 91    right_length = x[-1] - x[index]
 92
 93    left_ratio = left_length/length
 94    right_ratio = right_length/length
 95
 96    if fit is Fit.best_fit:
 97        coef_left, r_left, *_  = np.polyfit(x[0:index+1], y[0:index+1], 1, full=True)
 98        coef_right, r_rigth, *_ = np.polyfit(x[index:], y[index:], 1, full=True)
 99        #error = r_left[0]*(left_length/length) + r_rigth[0]*(right_length/length)
100        r_left = r_left[0]
101        r_rigth = r_rigth[0]
102    else:
103        coef_left = lf.linear_fit(x[0:index+1], y[0:index+1])
104        coef_right = lf.linear_fit(x[index:], y[index:])
105        r_left = lf.linear_residuals(x[0:index+1], y[0:index+1], coef_left)
106        r_rigth = lf.linear_residuals(x[index:], y[index:], coef_right)
107        
108    if cost is Cost.rmse:
109        error = left_ratio*math.sqrt(r_left*left_ratio) + right_ratio*math.sqrt(right_ratio*r_rigth)
110    else:
111        error = r_left*left_ratio + r_rigth*right_ratio
112    
113    return error, coef_left, coef_right

Returns the fitting error that is minimized by the L-method algorithm.

Arguments:
  • x (np.ndarray): the value of the points in the x axis coordinates
  • y (np.ndarray): the value of the points in the y axis coordinates
  • index (int): the index where the fitting line is divided
  • lenght (int): the lenght of the points considered
  • fit (Fit): select between point fit and best fit
  • cost (Cost): use either RMSE (original work) or RSS (faster implementation)
Returns:

float: the fitting cost

def get_knee( x: numpy.ndarray, y: numpy.ndarray, fit=<Fit.point_fit: 'pointfit'>, cost=<Cost.rmse: 'rmse'>) -> int:
116def get_knee(x: np.ndarray, y: np.ndarray, fit=Fit.point_fit, cost=Cost.rmse) -> int:
117    """
118    Returns the index of the knee point based on the L-method.
119
120    This method does not use the iterative refinement.
121    It represents a single iteration of the refinement technique.
122
123    Args:
124        x (np.ndarray): the value of the points in the x axis coordinates
125        y (np.ndarray): the value of the points in the y axis coordinates
126        fit (Fit): select between point fit and best fit
127
128    Returns:
129        int: the index of the knee point
130    """
131
132    index = 2
133    length = x[-1] - x[0]
134    error, coef_left, coef_right = compute_error(x, y, index, length, fit, cost)
135    #logger.info("Error(%s) = %s", index, error)
136
137    for i in range(index+1, len(x)-2):
138        current_error, i_coef_left, i_coef_right = compute_error(x, y, i, length, fit, cost)
139        #logger.info("Error(%s) = %s", i, error)
140
141        if current_error < error:
142            error = current_error
143            index = i
144            coef_left = i_coef_left
145            coef_right = i_coef_right
146
147    return (index, coef_left, coef_right)

Returns the index of the knee point based on the L-method.

This method does not use the iterative refinement. It represents a single iteration of the refinement technique.

Arguments:
  • x (np.ndarray): the value of the points in the x axis coordinates
  • y (np.ndarray): the value of the points in the y axis coordinates
  • fit (Fit): select between point fit and best fit
Returns:

int: the index of the knee point

def knee( points: numpy.ndarray, fit: Fit = <Fit.point_fit: 'pointfit'>, it: Refinement = <Refinement.adjusted: 'adjusted'>, limit: int = 10) -> int:
150def knee(points:np.ndarray, fit:Fit=Fit.point_fit, it:Refinement=Refinement.adjusted, limit:int=10) -> int:
151    """
152    Returns the index of the knee point based on the L-method.
153
154    This method uses iterative refinement.
155
156    Args:
157        points (np.ndarray): numpy array with the points (x, y)
158        fit (Fit): select between point fit and best fit (Fit.point_fit)
159        it (Refinement): defines the iterative refinement method (Refinement.adjusted)
160        limit(int): minimal number of points for the fitted lines (10)
161
162    Returns:
163        int: the index of the knee point
164    """
165
166    x = points[:,0]
167    y = points[:,1]
168
169    last_knee = -1
170    cutoff  = current_knee = len(points)
171    done = False
172    while current_knee != last_knee and not done:
173        last_knee = current_knee
174        current_knee, _, _ = get_knee(x[0:cutoff+1], y[0:cutoff+1], fit)
175        if it is Refinement.adjusted:
176            cutoff = max(limit, int((current_knee + last_knee)/2.0))
177        elif it is Refinement.original:
178            cutoff = max(limit, min(current_knee*2, len(points)))
179        else:
180            done = True
181        
182    return current_knee

Returns the index of the knee point based on the L-method.

This method uses iterative refinement.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • fit (Fit): select between point fit and best fit (Fit.point_fit)
  • it (Refinement): defines the iterative refinement method (Refinement.adjusted)
  • limit(int): minimal number of points for the fitted lines (10)
Returns:

int: the index of the knee point

def multi_knee(points: numpy.ndarray, t1: float = 0.001, t2: int = 4) -> numpy.ndarray:
185def multi_knee(points: np.ndarray, t1: float = 0.001, t2: int = 4) -> np.ndarray:
186    """Recursive knee point detection based on the L-method.
187
188    It returns the knee points on the curve.
189
190    Args:
191        points (np.ndarray): numpy array with the points (x, y)
192        t1 (float): coefficient of determination threshold (default 0.01)
193        t2 (int): number of points threshold (default 3)
194
195    Returns:
196        np.ndarray: knee points on the curve
197    """
198    return mk.multi_knee(knee, points, t1, t2)

Recursive knee point detection based on the L-method.

It returns the knee points on the curve.

Arguments:
  • points (np.ndarray): numpy array with the points (x, y)
  • t1 (float): coefficient of determination threshold (default 0.01)
  • t2 (int): number of points threshold (default 3)
Returns:

np.ndarray: knee points on the curve