import logging
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from scipy import signal
from statsmodels.nonparametric.smoothers_lowess import lowess
from ..adapters import sktime_interface
from tqdm import tqdm
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
[docs]
class cbr_fox:
"""
Core class to perform calculations and analysis at technique-level depth.
This class is used to preprocess the provided input data for performing correlation and find the best cases. Its}
functionality follows classic AI library guidelines and standards such as scikit-learn and keras.
Parameters
-------
metric : str or callable, optional
The metric to use for correlation (default is "dtw").
smoothness_factor : float, optional
The smoothness factor for preprocessing (default is 0.2).
kwargs : dict, optional
Additional keyword arguments for customization.
Methods
-------
__init__(self, metric, smoothness_factor, kwargs)
Initializes the cbr_fox class with specified parameters.
"""
[docs]
def __init__(self, metric: str or callable = "dtw", smoothness_factor: float = .2, kwargs: dict = {}):
"""
Initializes the cbr_fox class with specified parameters.
Parameters
----------
metric
smoothness_factor
kwargs
"""
# Variables for setting
self.metric = metric
self.smoothness_factor = smoothness_factor
self.kwargs = kwargs
# Variables for results
self.smoothed_correlation = None
self.analysis_report = None
self.analysis_report_combined = None
self.best_windows_index = list()
self.worst_windows_index = list()
self.best_mae = list()
self.worst_mae = list()
# Private variables for easy access by private methods
self.correlation_per_window = None
self.input_data_dictionary = None
self.records_array = None
self.records_array_combined = None
self.dtype = [('index', 'i4'),
('window', 'O'),
('target_window', 'O'),
('correlation', 'f8'),
('MAE', 'f8')]
# FIRST PRIVATE LAYER
def _smoothe_correlation(self):
"""
Smooth the correlation using the lowess method from scipy, applying the specified smoothness factor.
This method applies the Lowess (Locally Weighted Scatterplot Smoothing) technique to smooth the
correlation values for further analysis, with the smoothness factor used to control the degree
of smoothing.
Returns
-------
numpy.ndarray
A numpy array representing the smoothed correlation values, which can be used for further analysis.
"""
return lowess(self.__correlation_per_window, np.arange(len(self.__correlation_per_window)),
self.smoothness_factor)[:, 1]
def _identify_valleys_peaks_indexes(self):
"""
Identify the indices of valleys and peaks in the smoothed correlation array.
This method uses SciPy's `argrelextrema` function to locate local minima (valleys) and
local maxima (peaks) for further analysis.
Returns
-------
tuple of numpy.ndarray
A tuple containing two numpy arrays:
- The first array represents the indices of valleys (local minima).
- The second array represents the indices of peaks (local maxima).
"""
return signal.argrelextrema(self.smoothed_correlation, np.less)[0], \
signal.argrelextrema(self.smoothed_correlation, np.greater)[0]
def _retreive_concave_convex_segments(self, windows_len):
"""
Extract concave and convex segments from the smoothed correlation array.
This method splits the smoothed correlation data into concave and convex segments based on
the identified valley and peak indices, storing the results in private attributes.
Parameters
----------
windows_len : int
The length of the windows, corresponding to the number of data points in the correlation array.
Returns
-------
None
This method does not return a value but stores the calculated concave and convex segments
in the private attributes `self.concaveSegments` and `self.convexSegments`, respectively.
"""
self.concave_segments = np.split(
np.transpose(np.array((np.arange(windows_len), self.smoothed_correlation))),
self.valley_index)
self.convex_segments = np.split(
np.transpose(np.array((np.arange(windows_len), self.smoothed_correlation))),
self.peak_index)
def _retreive_original_indexes(self):
"""
Retrieve original data point indexes from concave and convex segments.
This method processes the concave and convex segments to extract the original indexes
of data points from the correlation array. The indexes are stored in the private attributes
`self.best_windows_index` (for concave segments) and `self.worst_windows_index` (for convex segments).
Returns
-------
None
The extracted indexes are stored in private attributes for further analysis.
"""
for split in tqdm(self.concave_segments, desc="Segmentos cóncavos"):
self.best_windows_index.append(int(split[np.where(split == max(split[:, 1]))[0][0], 0]))
for split in tqdm(self.convex_segments, desc="Segmentos convexos"):
self.worst_windows_index.append(int(split[np.where(split == min(split[:, 1]))[0][0], 0]))
[docs]
def calculate_analysis(self, indexes, input_data_dictionary):
"""
Compute analysis results and store them in a record array for reporting and visualization.
This method processes data for the specified indices, extracting relevant information such as
training windows, target windows, correlation values, and Mean Absolute Error (MAE). The results
are stored in a structured record array.
Parameters
----------
indexes : list of int
A list of indices corresponding to valleys and peaks in the non-smoothed correlation array.
input_data_dictionary : dict
A dictionary containing preprocessed input data, including training and target windows.
Returns
-------
numpy.recarray
A structured record array containing the following fields:
- Index: The index of the data point.
- Training window: The array representing the training window.
- Target window: The array representing the target window.
- Correlation value: The correlation value from the non-smoothed correlation array.
- MAE: The Mean Absolute Error (MAE) value comparing the target window and the prediction.
"""
return np.array([(index,
input_data_dictionary["training_windows"][index],
input_data_dictionary["target_training_windows"][index],
self.correlation_per_window[index],
mean_absolute_error(input_data_dictionary["target_training_windows"][index],
input_data_dictionary["prediction"].reshape(-1, 1)))
for index in indexes], dtype=self.dtype)
[docs]
def calculate_analysis_combined(self, input_data_dictionary, mode):
# SECOND PRIVATE LAYER
def weighted_average(values, weights):
"""
Calculate the weighted average of a set of values based on the provided weights.
Parameters
----------
values : numpy.ndarray
A 3D array of values with shape (n_windows, window_len, components_len),
where `n_windows` is the number of windows, `window_len` is the length of each window,
and `components_len` is the number of features per timestep.
weights : numpy.ndarray or list
A 1D array or list of weights corresponding to the values. The length of `weights` must
match the first dimension of `values` (i.e., `n_windows`).
Returns
-------
numpy.ndarray
A 2D array of shape (window_len, components_len) representing the weighted average
of the input values.
"""
weights = np.array(weights)[:, np.newaxis, np.newaxis]
return np.sum(values * weights, axis=0) / np.sum(weights)
results = []
for index, indices in enumerate([self.best_windows_index, self.worst_windows_index]):
selected_cases = indices[:input_data_dictionary["num_cases"]]
# Promedio simple
if mode == "weighted":
# Promedio ponderado
average = weighted_average(input_data_dictionary["training_windows"][selected_cases],
self.correlation_per_window[selected_cases])
elif mode == "simple":
average = np.mean(input_data_dictionary["training_windows"][selected_cases], axis=0)
else:
raise ValueError(f'Mode "{mode}" is not supported. Try: "simple" or "weighted".')
target_average = np.mean(input_data_dictionary["target_training_windows"][selected_cases], axis=0)
correlation_mean = np.mean(self.correlation_per_window[selected_cases])
# se sustituye np.mean(input_data_dictionary["target_training_windows"][selected_cases], axis=0)
mae = mean_absolute_error(target_average, input_data_dictionary["prediction"].reshape(-1, 1))
results.append((-index, average, target_average, correlation_mean, mae))
return np.array(results, dtype=self.dtype)
def _preprocess_input_data(self, training_windows, target_training_windows, forecasted_window):
"""
Gather basic data information from the input variables.
This method processes the input data used for model training and forecasting,
creating a dictionary of relevant metadata.
Parameters
----------
training_windows : numpy.ndarray
A 3D array of training windows with shape (n_windows, window_len, components_len),
where `n_windows` is the number of windows, `window_len` is the length of each window,
and `components_len` is the number of features per timestep.
target_training_windows : list
A list of target windows corresponding to each training window.
forecasted_window : list
The window used for forecasting and making predictions.
Returns
-------
dict
A dictionary containing processed input data, including metadata such as
`components_len`, `window_len`, and `windows_len`.
"""
input_data_dictionary = dict()
input_data_dictionary['training_windows'] = training_windows
input_data_dictionary['target_training_windows'] = target_training_windows
input_data_dictionary['forecasted_window'] = forecasted_window
input_data_dictionary['components_len'] = training_windows.shape[2]
input_data_dictionary['window_len'] = training_windows.shape[1]
input_data_dictionary['windows_len'] = len(training_windows)
return input_data_dictionary
def _compute_correlation(self, input_data_dictionary):
"""
Compute the correlation between training windows and target windows.
This method uses the sktime interface or custom metrics to calculate
correlations for further analysis and case retrieval.
Parameters
----------
input_data_dictionary : dict
A dictionary containing processed input data, including training and
target windows.
Returns
-------
numpy.ndarray
An array containing the correlation values for each window, normalized
between 0 and 1.
"""
# Implementing interface architecture to reduce tight coupling.
correlation_per_window = sktime_interface.compute_distance_interface(input_data_dictionary, self.metric,
self.kwargs)
correlation_per_window = np.sum(correlation_per_window, axis=1)
correlation_per_window = ((correlation_per_window - min(correlation_per_window)) /
(max(correlation_per_window) - min(correlation_per_window)))
self.correlation_per_window = correlation_per_window
return correlation_per_window
def _compute_cbr_analysis(self, input_data_dictionary):
"""
Compute the analysis to smoothe, extract, retrieve indexes from the non-smoothed correlation
This method smooths the window correlation, identifies peaks and valleys,
extracts concave and convex segments, and retrieves the original indexes
from the non-smoothed correlation array.
Parameters
----------
input_data_dictionary : dict
A dictionary containing processed input data, which includes information
about the window lengths.
Returns
-------
None
This method modifies internal attributes and does not return a value.
"""
logging.info("Suavizando Correlación")
self.smoothed_correlation = self._smoothe_correlation()
logging.info("Extrayendo crestas y valles")
self.valley_index, self.peak_index = self._identify_valleys_peaks_indexes()
logging.info("Recuperando segmentos convexos y cóncavos")
self._retreive_concave_convex_segments(input_data_dictionary['windows_len'])
logging.info("Recuperando índices originales de correlación")
self._retreive_original_indexes()
def _compute_statistics(self, input_data_dictionary, mode):
"""
Calculate statistics based on identified cases.
This method performs calculations for both non-combined and combined results,
selecting the appropriate case based on the number passed by the user.
It generates a report for the user based on the computed statistics.
Parameters
----------
input_data_dictionary : dict
A dictionary containing processed input data, including the number of cases to select.
mode : str
A string passed when the instance is created, used to set the strategy for combining cases.
Returns
-------
None
This method performs operations to generate and set the results, including reports for the user.
"""
self.records_array = self.calculate_analysis(self.best_windows_index + self.worst_windows_index,
input_data_dictionary)
self.records_array = np.sort(self.records_array, order="correlation")[::-1]
# Selecting just the number of elements according to num_cases variable
# The conditional is to avoid duplicity in case records_arrays's shape is not greater than the selected num_cases
if (self.records_array.shape[0] > (input_data_dictionary["num_cases"] * 2)):
self.records_array = np.concatenate(
(self.records_array[:input_data_dictionary["num_cases"]], self.records_array[
-input_data_dictionary[
"num_cases"]:]))
self.records_array_combined = self.calculate_analysis_combined(input_data_dictionary, mode)
logging.info("Generando reporte de análisis")
self.analysis_report = pd.DataFrame(data=pd.DataFrame.from_records(self.records_array))
self.analysis_report_combined = pd.DataFrame(data=pd.DataFrame.from_records(self.records_array_combined))
# PUBLIC METHODS. ALL THESE METHODS ARE PROVIDED FOR THE USER. Public layer
[docs]
def fit(self, training_windows: np.ndarray, target_training_windows: np.ndarray, forecasted_window: np.ndarray):
"""
Perform correlation analysis and identify cases based on the provided data.
This method analyzes the provided training windows, calculates the correlation using
the selected metric, and performs CBR analysis to identify cases based on the data.
Parameters
----------
training_windows : numpy.ndarray
A 3D array of training windows with shape (n_windows, window_len, components_len),
where `n_windows` is the number of windows, `window_len` is the length of each window,
and `components_len` is the number of features per timestep.
target_training_windows : list
A list of target windows corresponding to each training window.
forecasted_window : list
A window used for forecasting and making predictions.
Returns
-------
None
This method performs calculations for correlation analysis and case-based reasoning.
"""
logging.info("Analizando conjunto de datos")
self.input_data_dictionary = self._preprocess_input_data(training_windows, target_training_windows,
forecasted_window)
logging.info("Calculando Correlación")
self.__correlation_per_window = self._compute_correlation(self.input_data_dictionary)
logging.info("Computando análisis de CBR")
self._compute_cbr_analysis(self.input_data_dictionary)
logging.info("Análisis finalizado")
[docs]
def predict(self, prediction, num_cases: int, mode):
"""
Perform analysis to identify the best cases based on the provided prediction.
This method computes the statistics to identify the best cases compared to the provided
prediction and returns results that can be accessed through corresponding methods.
Parameters
----------
prediction : numpy.ndarray
The prediction generated by the AI model.
num_cases : int
The maximum number of cases to identify.
mode : str, optional
A string to specify the mode for the combined case option, which can be either 'simple' or 'combined'.
The default is "simple".
Returns
-------
None
This method performs the analysis, and results can be accessed via the corresponding methods.
"""
self.input_data_dictionary['prediction'] = prediction
self.input_data_dictionary['num_cases'] = num_cases
self._compute_statistics(self.input_data_dictionary, mode)
[docs]
def get_analysis_report(self):
"""
Access the analysis report containing the best cases based on the analysis.
Returns
-------
pandas.DataFrame
A DataFrame containing the best cases and their respective information based on the analysis.
"""
return self.analysis_report
[docs]
def get_analysis_report_combined(self):
"""
Access the combined analysis report containing the best cases based on the combined analysis.
Returns
-------
pandas.DataFrame
A DataFrame containing the combined best cases and their respective information based on the analysis.
"""
return self.analysis_report_combined