Si te gusta Skforecast , ayúdanos dándonos una estrella en GitHub! ⭐️
Más sobre forecasting en: cienciadedatos.net
Al tratar de anticipar valores futuros de una serie temporal, la mayoría de los modelos de forecasting intentan predecir cuál será el valor más probable, esto se llama point-forecasting. Aunque conocer de antemano el valor esperado de una serie temporal es útil en casi todos los casos de negocio, este tipo de predicción no proporciona información sobre la confianza del modelo ni sobre la incertidumbre de sus predicciones.
El forecasting probabilístico, a diferencia del point-forecasting, es una familia de técnicas que permiten predecir la distribución esperada de la variable respuesta en lugar de un único valor puntual. Este tipo de forecasting proporciona información muy valiosa ya que permite crear intervalos de predicción, es decir, el rango de valores donde es más probable que pueda estar el valor real. Más formalmente, un intervalo de predicción define el intervalo dentro del cual se espera encontrar el verdadero valor de la variable respuesta con una determinada probabilidad.
Existen múltiples formas de estimar intervalos de predicción en modelos de forecasting, la mayoría de las cuales requieren que los residuos (errores) del modelo sigan una distribución normal. Cuando no se puede asumir esta propiedad, dos alternativas comúnmente utilizadas son el bootstrapping y la regresión cuantílica. Para ilustrar cómo la librería skforecast permite estimar intervalos de predicción en modelos de forecasting multi-step se muestran dos ejemplo:
Intervalos de predicción basados en bootstrapping para un modelo de tipo recursive-multi-step forecaster.
Intervalos de predicción basados en regresión cuantílica para un modelo de tipo direct-multi-step forecaster.
⚠ Warning
Tal y como describe Rob J Hyndman en su blog, en los casos reales, casi todos los intervalos de predicción resultan ser demasiado estrechos. Por ejemplo, intervalos teóricos del 95% solo suelen conseguir una cobertura real de entre el 71% y el 87%. Este fenómeno surge debido a que estos intervalos no contemplan todas las fuentes de incertidumbre que, en el caso de modelos de *forecasting*, suelen ser de 4 tipos:✎ Note
Si es la primera vez que usa skforecast, visite Skforecast: forecasting de series temporales con Python y Scikit-learn para una introducción rápida.✎ Note
Conformal prediction es un método relativamente nuevo que permite estimar la incertidumbre aociada a las predicciones de modelos de machine learning. Este método está en la hoja de ruta de skforecast, pero aún no está disponible.El error en la predicción del siguiente valor de una serie (one-step-ahead forecast) se define como $e_t = y_t - \hat{y}_{t|t-1}$. Asumiendo que los errores futuros serán similares a los errores pasados, es posible simular diferentes predicciones tomando muestras de los errores vistos previamente en el pasado (es decir, los residuos) y agregándolos a las predicciones.
Al hacer esto repetidamente, se crea una colección de predicciones ligeramente diferentes (posibles caminos futuros), que representan la varianza esperada en el proceso de forecasting.
Finalmente, los intervalos de predicción se crean calculando los percentiles $\alpha/2$ y $1−\alpha/2$ de los datos simulados en cada horizonte de predicción.
La principal ventaja de esta estrategia es que solo requiere de un único modelo para estimar cualquier intervalo. El inconveniente es la necesidad de ejecutar cientos o miles de iteraciones de bootstrapping lo cual resulta muy costoso desde el punto de vista computacional y no siempre es posible.
# Preprocesado de datos
# ==============================================================================
import numpy as np
import pandas as pd
from skforecast.datasets import fetch_dataset
# Plots
# ==============================================================================
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.graphics.tsaplots import plot_pacf
import plotly.graph_objects as go
import plotly.io as pio
import plotly.offline as poff
pio.templates.default = "seaborn"
pio.renderers.default = 'notebook'
poff.init_notebook_mode(connected=True)
plt.style.use('seaborn-v0_8-darkgrid')
from skforecast.plot import plot_residuals
from skforecast.plot import plot_prediction_distribution
from pprint import pprint
# Modelado y Forecasting
# ==============================================================================
import skforecast
import sklearn
import lightgbm
from lightgbm import LGBMRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from skforecast.ForecasterAutoregDirect import ForecasterAutoregDirect
from skforecast.model_selection import bayesian_search_forecaster
from skforecast.model_selection import backtesting_forecaster
from sklearn.metrics import mean_pinball_loss
from scipy.stats import norm
# Configuración
# ==============================================================================
import warnings
warnings.filterwarnings('once')
color = '\033[1m\033[38;5;208m'
print(f"{color}Versión skforecast: {skforecast.__version__}")
print(f"{color}Versión scikit-learn: {sklearn.__version__}")
print(f"{color}Version lightgbm: {lightgbm.__version__}")
print(f"{color}Versión pandas: {pd.__version__}")
print(f"{color}Versión numpy: {np.__version__}")
# Descarga de datos
# ==============================================================================
data = fetch_dataset(name='bike_sharing_extended_features')
data.head(2)
# One hot encoding de las variables categóricas
# ==============================================================================
encoder = ColumnTransformer(
[('one_hot_encoder', OneHotEncoder(sparse_output=False), ['weather'])],
remainder='passthrough',
verbose_feature_names_out=False
).set_output(transform="pandas")
data = encoder.fit_transform(data)
# Selección de las variables exógenas
# ==============================================================================
exog_features = [
'weather_clear', 'weather_mist', 'weather_rain', 'month_sin', 'month_cos',
'week_of_year_sin', 'week_of_year_cos', 'week_day_sin', 'week_day_cos',
'hour_day_sin', 'hour_day_cos', 'sunrise_hour_sin', 'sunrise_hour_cos',
'sunset_hour_sin', 'sunset_hour_cos', 'temp'
]
data = data[['users'] + exog_features]
Para facilitar el entrenamiento de los modelos, la búsqueda de hiperparámetros óptimos y la evaluación de su capacidad predictiva, los datos se dividen en tres conjuntos separados: entrenamiento, validación y prueba.
# Partición de datos en entrenamiento-validación-test
# ==============================================================================
data = data.loc['2011-05-30 23:59:00':, :]
end_train = '2012-08-30 23:59:00'
end_validation = '2012-11-15 23:59:00'
data_train = data.loc[: end_train, :]
data_val = data.loc[end_train:end_validation, :]
data_test = data.loc[end_validation:, :]
print(f"Dates train : {data_train.index.min()} --- {data_train.index.max()} (n={len(data_train)})")
print(f"Dates validacion : {data_val.index.min()} --- {data_val.index.max()} (n={len(data_val)})")
print(f"Dates test : {data_test.index.min()} --- {data_test.index.max()} (n={len(data_test)})")
La exploración gráfica de series temporales puede ser una forma eficaz de identificar tendencias, patrones y variaciones estacionales. Esto, a su vez, ayuda a orientar la selección del modelo de forecasting más adecuado.
# Gráfico de la serie temporal
# ==============================================================================
fig = go.Figure()
fig.add_trace(go.Scatter(x=data_train.index, y=data_train['users'], mode='lines', name='Train'))
fig.add_trace(go.Scatter(x=data_val.index, y=data_val['users'], mode='lines', name='Validation'))
fig.add_trace(go.Scatter(x=data_test.index, y=data_test['users'], mode='lines', name='Test'))
fig.update_layout(
title = 'Número de usuarios',
xaxis_title="Fecha",
yaxis_title="Usuarios",
legend_title="Partición:",
width=900,
height=400,
margin=dict(l=20, r=20, t=35, b=20),
legend=dict(
orientation="h",
yanchor="top",
y=1,
xanchor="left",
x=0.001
)
)
fig.show()
Los gráficos de autocorrelación muestran la correlación entre una serie temporal y sus valores pasados. Son una herramienta útil para identificar el orden de un modelo autorregresivo, es decir, los valores pasados (lags) que se deben incluir en el modelo.
La función de autocorrelación (ACF) mide la correlación entre una serie temporal y sus valores pasados. La función de autocorrelación parcial (PACF) mide la correlación entre una serie temporal y sus valores pasados, pero solo después de eliminar las variaciones explicadas por los valores pasados intermedios.
# Gráfico de autocorrelación
# ==============================================================================
fig, ax = plt.subplots(figsize=(6, 2))
plot_acf(data.users, ax=ax, lags=24*3)
plt.show()
# Gráfico autocorrelación parcial
# ==============================================================================
fig, ax = plt.subplots(figsize=(6, 2))
plot_pacf(data.users, ax=ax, lags=24*3)
plt.show()
Los resultados del estudio de autocorrelación indican una correlación significativa entre el número de usuarios en las horas anteriores, así como en los días previos. Esto significa que conocer del número de usuarios durante periodos específicos del pasado proporciona información útil para predecir el número de usuarios en el futuro.
Se entrena un recursive-multi-step forecaster y se optimiza sus hiperparámetros. Luego, se estiman los intervalos de predicción utilizando bootstrapping.
# Create forecaster and hyperparameters search
# ==============================================================================
# Forecaster
forecaster = ForecasterAutoreg(
regressor = LGBMRegressor(random_state=15926, verbose=-1),
lags = 7 # Place holder that will be replaced during the search
)
# Lags used as predictors
lags_grid = [24, 48, (1, 2, 3, 23, 24, 25, 47, 48, 49, 71, 72, 73, 364*24, 365*24)]
# Regressor hyperparameters search space
def search_space(trial):
search_space = {
'lags' : trial.suggest_categorical('lags', lags_grid),
'n_estimators' : trial.suggest_int('n_estimators', 200, 800, step=100),
'max_depth' : trial.suggest_int('max_depth', 3, 8, step=1),
'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 25, 500),
'learning_rate' : trial.suggest_float('learning_rate', 0.01, 0.5),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 0.8, step=0.1),
'max_bin' : trial.suggest_int('max_bin', 50, 100, step=25),
'reg_alpha' : trial.suggest_float('reg_alpha', 0, 1, step=0.1),
'reg_lambda' : trial.suggest_float('reg_lambda', 0, 1, step=0.1)
}
return search_space
results_search, frozen_trial = bayesian_search_forecaster(
forecaster = forecaster,
y = data.loc[:end_validation, 'users'],
exog = data.loc[:end_validation, exog_features],
steps = 24,
metric = 'mean_absolute_error',
search_space = search_space,
initial_train_size = len(data[:end_train]),
refit = False,
n_trials = 20, # Increase for more exhaustive search
random_state = 123,
return_best = True,
n_jobs = 'auto',
verbose = False,
show_progress = True
)
best_params = results_search['params'].iloc[0]
best_lags = results_search['lags'].iloc[0]
Una vez encontrados los mejores hiperparámetros, se utiliza la función backtesting_forecaster()
para generar los intervalos de predicción de todo el conjunto de test.
El argumento interval
se emplea para especificar la probabilidad de cobertura deseada de los intervalos de predicción. En este caso, interval
se establece en [10, 90]
, lo que significa que los intervalos de predicción se calculan para los percentiles 10 y 90, lo que da como resultado una cobertura teórica del 80%.
El argumento n_boot
se utiliza para especificar el número de muestras bootstrap que se utilizarán para estimar los intervalos de predicción. Cuanto mayor sea el número de muestras, más precisos serán los intervalos de predicción, pero mayor el tiempo necesario.
Por defecto, los intervalos se calculan utilizando los residuos in-sample (residuos del conjunto de entrenamiento). Esto puede dar lugar a intervalos demasiado estrechos (demasiado optimistas).
# Backtesting con intervalos en los datos de test usando residuos in-sample
# ==============================================================================
metric, predictions = backtesting_forecaster(
forecaster = forecaster,
y = data['users'],
exog = data[exog_features],
steps = 24,
metric = 'mean_absolute_error',
initial_train_size = len(data.loc[:end_validation]),
refit = False,
interval = [10, 90], # 80% intervalos
n_boot = 250,
in_sample_residuals = True, # Usar residuos in-sample
binned_residuals = False,
n_jobs = 'auto',
verbose = False,
show_progress = True
)
display(metric)
predictions.head(5)
# Function to plot predicted intervals
# ======================================================================================
def plot_predicted_intervals(
predictions: pd.DataFrame,
y_true: pd.DataFrame,
target_variable: str,
initial_x_zoom: list=None,
title: str=None,
xaxis_title: str=None,
yaxis_title: str=None,
):
"""
Plot predicted intervals vs real values
Parameters
----------
predictions : pandas DataFrame
Predicted values and intervals.
y_true : pandas DataFrame
Real values of target variable.
target_variable : str
Name of target variable.
initial_x_zoom : list, default `None`
Initial zoom of x-axis, by default None.
title : str, default `None`
Title of the plot, by default None.
xaxis_title : str, default `None`
Title of x-axis, by default None.
yaxis_title : str, default `None`
Title of y-axis, by default None.
"""
fig = go.Figure([
go.Scatter(name='Prediction', x=predictions.index, y=predictions['pred'], mode='lines'),
go.Scatter(name='Real value', x=y_true.index, y=y_true[target_variable], mode='lines'),
go.Scatter(
name='Upper Bound', x=predictions.index, y=predictions['upper_bound'],
mode='lines', marker=dict(color="#444"), line=dict(width=0), showlegend=False
),
go.Scatter(
name='Lower Bound', x=predictions.index, y=predictions['lower_bound'],
marker=dict(color="#444"), line=dict(width=0), mode='lines',
fillcolor='rgba(68, 68, 68, 0.3)', fill='tonexty', showlegend=False
)
])
fig.update_layout(
title=title,
xaxis_title=xaxis_title,
yaxis_title=yaxis_title,
width=900,
height=400,
margin=dict(l=20, r=20, t=35, b=20),
hovermode="x",
xaxis=dict(range=initial_x_zoom),
legend=dict(
orientation="h",
yanchor="top",
y=1.1,
xanchor="left",
x=0.001
)
)
fig.show()
def empirical_coverage(y, lower_bound, upper_bound):
"""
Calculate coverage of a given interval
"""
return np.mean(np.logical_and(y >= lower_bound, y <= upper_bound))
# Gráfico intervalos
# ==============================================================================
plot_predicted_intervals(
predictions=predictions,
y_true=data_test,
target_variable="users",
initial_x_zoom = ['2012-12-01', '2012-12-25'],
title="Valor real vs predicciones en el conjunto de test",
xaxis_title="Date time",
yaxis_title="users",
)
# Cobertura del intervalo en los datos de test
# ==============================================================================
coverage = empirical_coverage(
y = data.loc[end_validation:, 'users'],
lower_bound = predictions["lower_bound"],
upper_bound = predictions["upper_bound"]
)
print(f"Cobertura del intervalo: {round(100*coverage, 2)} %")
# Area del intervalo
# ==============================================================================
area = (predictions["upper_bound"] - predictions["lower_bound"]).sum()
print(f"Area del intervalo: {round(area, 2)}")
Los intervalos de predicción presentan un exceso de confianza, ya que tienden a ser demasiado estrechos, lo que da lugar a una cobertura real inferior a la cobertura teóritca (80%). Esto se debe a la tendencia de los residuos de entrenamiento (in-sample) a sobrestimar la capacidad de predicción del modelo.
El método set_out_sample_residuals()
se utiliza para almacenar residuos fuera de muestra (out-sample) calculados con un conjunto de validación a través de backtesting. Una vez añadidos los nuevos residuos al foreacster, se tiene que indicar in_sample_residuals
en False
para que el forecaster los utilice.
# Backtesting con datos de validación para obtener residuos out-sample
# ==============================================================================
_, predictions_val = backtesting_forecaster(
forecaster = forecaster,
y = data.loc[:end_validation, 'users'],
exog = data.loc[:end_validation, exog_features],
steps = 24,
metric = 'mean_absolute_error',
initial_train_size = len(data.loc[:end_train]),
refit = False,
n_jobs = 'auto',
verbose = False,
show_progress = True
)
residuals = data.loc[predictions_val.index, 'users'] - predictions_val['pred']
residuals = residuals.dropna()
# Distribución residuos out-sample
# ==============================================================================
print(pd.Series(np.where(residuals < 0, 'negarivo', 'positivo')).value_counts())
plt.rcParams.update({'font.size': 8})
_ = plot_residuals(residuals=residuals, figsize=(7, 4))
# Almacenar residuos out-sample en el forecaster
# ==============================================================================
forecaster.set_out_sample_residuals(residuals=residuals)
# Backtesting con intervalos en los datos de test usando residuos out-sample
# ==============================================================================
metric, predictions = backtesting_forecaster(
forecaster = forecaster,
y = data['users'],
exog = data[exog_features],
steps = 24,
metric = 'mean_absolute_error',
initial_train_size = len(data.loc[:end_validation]),
refit = False,
interval = [10, 90], # 80% intervalos
n_boot = 250,
in_sample_residuals = False, # Usar residuos out-sample
binned_residuals = False,
n_jobs = 'auto',
verbose = False,
show_progress =