Le modèle VAR : de l’approche métier au plotting des résultats

-
11
 m de lecture
-
var
  1. Introduction
  2. Définition du modèle VAR : équation & intérêt
  3. Import des libraires, set-up
  4. La stationnarité : kezako et pourquoi c’est important
  5. Le modèle et la convergence lag/variables
    a. Sélectionner des variables
    b. Sélection du lag optimal
  6. Prédiction avec le meilleur modèle et affichage des résultats
  7. Conclusion

1. Introduction

Bienvenue sur cet article explicatif du modèle VAR. Nous étudierons les “bases” concernant ce modèle mais pas celles générales à propos des séries temporelles (en dehors de la stationnarité 🤯). 

Le dataset utilisé est disponible ici. Et voici mon github : https://github.com/gambitl/Article_VAR_FR_Public

À la fin de cette article, vous saurez : 

  • Ce qu’est le modèle VAR et quel est son intérêt 🧐
  • Comment et pourquoi préparer son dataset pour utiliser le modèle VAR
  • Comment choisir le bon paramètre (nombre de lag) selon son dataset pour minimiser une métrique d’erreur et pas simplement une métrique statistique
  • Pourquoi et comment tester le lien entre les variables et enlever celles qui bruitent le modèle
  • Automatiser les deux étapes précédentes et converger vers un résultat optimal qui combine le paramètre le plus performant et le jeu de données le plus adapté 👍

À chaque étape, je mettrais d’abord en avant les points théoriques, puis les fonctions « maisons », et enfin le code qui appelle les fonctions. Chaque fonction est assez commentée et expliquée au sein même du code.

Vous trouverez à la fin de l’article l’ensemble du code hors fonction.

Dans le cas de cet exemple, j’utilise le modèle VAR pour prédire UNE variable en particulier.

2. Définition du modèle VAR : équation & intérêt

Le modèle VAR pour Vector Autoregression est un modèle de séries temporelles qui permet de prédire plusieurs variables dans le temps. Chacune des prédictions de chaque variable contribue à la prédiction des autres. Ainsi, si on l’écrivait sous un format linéaire, nous aurions ceci :

Soit 2 variables, Y et X, et (t) un moment dans le temps.

Y(t+1) = f(Y(t), X(t)) 

X(t+1) = f(X(t), Y(t))

Y(t+1) = α1 + β1Y(t) + β2X(t) + ε1

X(t+1) = α2 + β3X(t) + β4Y(t) + ε2

Avec α les constantes, β les coefficients associés aux variables, ε les erreurs.

Le modèle calcule sur la base de l’historique les constantes, coefficients pour chaque variable

De plus, le modèle inclut des « lags » (λ), qui permettent d’inclure dans le calcul de X(t+1), Y(t+1) toutes les valeurs de X, Y jusqu’à X(t- λ), Y(t- λ). Dans le système précédent, la valeur suivante de la variable (X) est calculée en fonction de la valeur précédente de (X) et de la valeur précédente de (Y). Le lag (λ) est donc égal à 1.

Ce sont ces lags qui constituent le paramètre à optimiser dans ce modèle, c’est-à-dire combien de jours d’historique vont composer nos équations.

3. Import des libraires, set-up

Nous avons besoin des imports suivants :

				
					import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import grangercausalitytests, adfuller
from scipy.stats import boxcox, norm, probplot, normaltest
from scipy.special import inv_boxcox
				
			

Chargement de la donnée :

				
					df = pd.read_csv("path.csv")
df['settlement_date'] = pd.to_datetime(df['settlement_date'])
df = df.set_index('settlement_date')
df = df.drop(['settlement_period', 'period_hour'], axis = 1)
exog = df[['is_holiday']].copy()
df = df.drop(['is_holiday'], axis =1)
				
			

Paramètres :

				
					target_variable = "tsd" #the feature we ultimately want to predict
n_period_predict = 48 * 7 #the number of period we want to predict. I want to predict 7 days and there are 48 data points per day
				
			

Préparations/Cleaning :

				
					mask = (df.index > '2022-01-01') #I will use the data post 2022
df_pour_var = df.loc[mask]
exog_pour_var = exog.loc[mask]
df_pour_var = df_pour_var +1 #We erase the every value that are equals to 0 in order to facilitate the boxcox transformation
initial_testing_set = df_pour_var[-n_period_predict:] #there will be a lot of transformation so we want to keep a safe testing set
				
			

4. La stationnarité : kezako et pourquoi c’est important

On définit la stationnarité (faible) de la manière suivante :

  • Quelque soit le moment (t) dans le temps, l’espérance de notre variable est toujours la même. Il y a absence de tendance.
  • Quelque soit le moment (t) dans le temps, la variance de notre variable est toujours la même et non infinie.
  • Quelque soit le moment dans le temps, l’auto-corrélation entre deux moments dans le temps n’est fonction que du décalage dans le temps (et la position ne change pas la valeur de l’auto-corrélation).

Sans rentrer dans les détails, il a été montré que réaliser une régression sur une variable non stationnaire même à des paramètres invalides (ils suivent un mouvement brownien c’est-à-dire très irrégulier, pour simplifier). Je recommande ce feed sur researchgate pour plus de détails.

La première étape lorsqu’on réalise un modèle de séries temporelles est donc de stationnariser notre série. Pour cela, il existe plusieurs méthodes, qui consiste à transformer les données : 

  • Les différentes méthodes de transformation pour normaliser notre donnée, comme MinMaxScaler()
  • La transformation Boxcox, très utilisée et commune dans le cas des séries temporelles.
  • La différenciation, qui consiste à soustraire à une valeur de la série sa valeur précédente. C’est une méthode très efficace, il est rare (de mon expérience) qu’une série ne soit pas stationnaire après une, ou deux différenciation.  A noter que dans des modèles ARIMA/ARMA, en général, la librairie s’occupe elle même de différencier la série. mais ce n’est pas le cas ici 😀

Dans l’idéal, on souhaite transformer une série grâce à Boxcox, puis la différencier si besoin.

Malheureusement, toutes les séries ne se transforment pas via Boxcox (valeurs négatives). Je vous propose donc le code suivant qui va tester l’ensemble :

				
					def box_cox_test(dataframe_to_test):
    """_summary_


    Args:
        dataframe_to_test (dataframe)


    Returns:
        list_box_cox : the list of values you can transform thanks to boxcox
        list_non_box_cox : the list of values you CAN'T transform thanks to boxcox
    """
    list_non_transformed = []
    list_box_cox = []
    for col in dataframe_to_test:
        data = dataframe_to_test[col]
        if (data < 0).values.any():
            list_non_transformed.append(col)
            continue
        fig = plt.figure()
        ax1 = fig.add_subplot(221)
        prob = probplot(data, dist=norm, plot=ax1)
        ax1.set_xlabel('')
        ax1.set_title('Probplot against normal distribution')


        ax21 = fig.add_subplot(222)
        ax21.hist(data)
        ax21.set_xlabel(col)
        ax21.set_title('distplot')


        # normality test
        stat, p = normaltest(data)
        print('Statistics=%.3f, p=%.3f' % (stat, p))
        # interpret
        alpha = 0.05
        if p > alpha:
            print('Sample looks Gaussian (fail to reject H0)')
        else:
            print('Sample does not look Gaussian (reject H0)')


        xt, _ = boxcox(data)
        if not np.isfinite(xt).any():
            list_non_transformed.append(col)
            continue
        list_box_cox.append(col)
       
        ax2 = fig.add_subplot(223)
        prob = probplot(xt, dist=norm, plot=ax2)
        ax2.set_title('Probplot after Box-Cox transformation')
        ax22 = fig.add_subplot(224)
        ax22.hist(xt)
        ax22.set_xlabel(col)
        ax22.set_title('distplot')


        stat, p = normaltest(xt)
        print('Statistics=%.3f, p=%.3f' % (stat, p))
        # interpret
        alpha = 0.05
        if p > alpha:
            print('Sample looks Gaussian (fail to reject H0)')
        else:
            print('Sample does not look Gaussian (reject H0)')
       
        plt.subplots_adjust(top = 0.99, bottom=0.01, hspace=1.2, wspace=0.4)
        plt.show()
    print('*'*50 +'\n' +'*'*50)
    print(f"Les données suivantes n'ont pas pu être scaled (négatives, infinites) : {list_non_transformed}. \n Voici leur plot non transformé")
    for col in list_non_transformed:
        data = dataframe_to_test[col]
        fig = plt.figure()
        ax1 = fig.add_subplot(211)
        prob = probplot(data, dist=norm, plot=ax1)
        ax1.set_xlabel('')
        ax1.set_title('Probplot against normal distribution')


        ax21 = fig.add_subplot(212)
        ax21.hist(data)
        ax21.set_xlabel(col)
        ax21.set_title('distplot')


    return list_box_cox, list_non_transformed
				
			

On l’utilise pour tester notre dataframe :

				
					list_features_to_box_cox, list_features_non_box_cox = box_cox_test(df_pour_var)
				
			

Nous avons donc maintenant une liste de variables qui peuvent être transformées via boxcox, et une liste qui ne peut pas être transformées via boxcox.

UNE CARRIÈRE DANS LA DATA VOUS TEND LES BRAS !

Une reconversion dans le big data vous intéresse, mais vous ne savez pas par où commencer ? Découvrez nos formations en Data Science.

Participer à votre première formation data gratuitement !

Assistez aux cours dispensés en live par nos formateurs pour démarrer sur Python, SQL, Power BI…

UNE CARRIÈRE DANS LA DATA VOUS TEND LES BRAS !

Une reconversion dans le big data vous intéresse, mais vous ne savez pas par où commencer

Découvrez nos formations en Data Science.

Participer à votre première formation data gratuitement !

Assistez aux cours dispensés en live par nos formateurs pour démarrer sur Python, SQL, Power BI …

Nous transformons donc les variables qui peuvent l’être via box cox. J’ai codé une fonction pour obtenir un dictionnaire qui contient les associations « features – lambda » utilisées pour la transformation. Cela va nous servir pour la suite !

				
					def transfo_boxcox(dataframe_to_transform, list_features_to_transform):
    """_summary_
    Args:
        dataframe_to_transform (dataframe)
        list_features_to_transform (list)
    Returns:
        transformed (dataframe): the dataframe after being transformed
        dict_lambda(dict): the dict containing in key the name of a feature and in value the lambda used for transforming it via boxcox
    """
    dict_lambda = {}
    transformed = dataframe_to_transform.copy()
    for ft in list_features_to_transform:
        transformed[ft], l = boxcox(dataframe_to_transform[ft])
        dict_lambda[ft] = l
    return transformed, dict_lambda
				
			
				
					df_var_transformed, dict_lambda_bc = transfo_boxcox(df_pour_var, list_features_to_box_cox)
				
			
				
					#We apply a minmaxscaler transform to the data we could not transform thanks to boxcox
minmaxscaler = MinMaxScaler()
minmaxscaler = minmaxscaler.fit(df_pour_var[list_features_non_box_cox])
df_var_transformed[list_features_non_box_cox] = minmaxscaler.transform(df_pour_var[list_features_non_box_cox])
				
			

Il est temps de tester nos features et de savoir si elles sont stationnaires. Voici la fonction de test :

				
					def dickey_fuller_test(data, seuil_signif=0.05, name='', verbose=False):
    """A function tests whether our features are stationary or not.


    Args:
        data (series): the data to test
        seuil_signif (float, optional): test level. Defaults to 0.05.
        name (str, optional): _description_. Defaults to ''.
        verbose (bool, optional): _description_. Defaults to False.
    """
    result = adfuller(data, autolag='AIC')
    output = {'Test statistic':round(result[0], 4), 'p_value':round(result[1], 4), 'n_lags':round(result[2], 4), 'n_obs':result[3]}
    p_value = output['p_value']


    # Print Summary
    print(f'    Augmented Dickey-Fuller test on : "{name}"', "\n   ", '~'*47)
    print(f' H0: the data shows a unit root, and then is not stationary.')
    print(f' P-Value equals      : {p_value}')
    print(f' Confidence level    = {seuil_signif}')
    print(f' Test statistic   = {output["Test statistic
"]}')
    print(f' Number of lags = {output["n_lags"]}')


    if p_value <= seuil_signif:
        print(f" => P-Value = {p_value} <= {seuil_signif}. We can reject H0 on the confidence level.")
        print(f" => The serie is stationary.")
    else:
        print(f" => P-Value = {p_value} > {seuil_signif}. We can not reject H0 on the confidence level.")
        print(f" => The serie is NOT stationary.")
				
			

Et le test en question :

				
					for name, column in df_var_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			

Malheureusement, des séries ne sont pas stationnaires. On va donc différencier la série, et réitérer le test :

				
					df_diff_1_transformed = df_var_transformed.diff().dropna()
exog_pour_var = exog_pour_var[1:] #we need to offset of 1 because we dropped the first value by differencing
for name, column in df_diff_1_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			

Maintenant, tout est stationnaire ! On peut passer à l’entraînement du modèle.

5. Le modèle et la convergence lag/variables

a. Sélectionner des variables

Lorsque l’on réalise un modèle de classification ou de régression plus “classique”, on teste toujours la pertinence des variables en input, c’est-à-dire la qualité de l’information délivrée par ces variables pour prédire la variable cible. Dans un exercice de séries temporelles, c’est la même chose : il se peut qu’une variable n’explique pas/trop peu la variable cible. Il faut donc déterminer lesquelles garder, lesquelles supprimer. 🤗

Dans notre cas, il existe un test, le test de Granger, qui permet de terminer si une variable permet d’expliquer les prédictions d’une autre variable. C’est exactement ce que l’on cherche 😀 

Voici la fonction du test :

				
					def grangers_test(data, lag, test='ssr_chi2test',verbose=False):    
    """_summary_
        The values inside the dataframe are the p-value of the test for the couple feature X/Y
    H0 hypothesis is the following :
        "Forecasts of X does not influence forecasts on Y"
    Therefor, p-value under 0.005 threshold allows us to reject H0 with a confidence level of 95% and pushs us to keep this couple of variables


Les arguments sont :
        data (dataframe): dataframe that holds the value of each feature
        lag (int): the lag to apply to the test. Must be equal to the lag used in the model
        test (str, optional): the test to apply. Defaults to 'ssr_chi2test'.
        verbose (bool, optional):  Defaults to False.


    Returns:
        the dataframe with p-value
    """
    df = pd.DataFrame(np.zeros((len(data.columns), len(data.columns))), columns=data.columns, index=data.columns)
    for col in df.columns:
        for row in df.index:
            test_result = grangercausalitytests(data[[row, col]], maxlag=[lag], verbose=False)
            p_values = round(test_result[lag][0][test][1],4) #on va avoir toutes les p-valeurs une part lag
            p_value = np.min(p_values) #On s'intéresse à la valeur minimale des p-valeur
            df.loc[row, col] = p_value
    df.columns = [var for var in data.columns]
    df.index = [var + '_Y' for var in data.columns]
    return df
				
			

En plus de la fonction qui teste, on met en place une fonction qui va réduire notre dataset initial, en fonction des résultats du test : on teste le dataframe, on supprime la variable avec la somme des p-value la plus forte, et on itère jusqu’à ce que toutes les variables passent le test.

À noter que l’on peut rentrer une « targeted feature » qui nous permet de ne jamais supprimer une variable en particulier que l’on voudrait cibler. En valeur retour, la fonction renvoie le dataframe réduit en dimentionalité. Il ne reste que des variables dont les prédictions contribuent à prédire chacune des autre variables :

				
					def reduce_dim_granger_test(df_to_reduce, lag_to_apply, targeted_feature = None):
    """We reduce the dataset thanks to the granger test
        We test that every variables in the dataframe are usefull to predict each other, and if not, we drop them
        After each iteration of "features optimization", we run again the model fit in order to verify that the best lag_order has not changed.
        If it as, hwe run the granger test optimization again
    Args:
        df_to_reduce (dataframe to reduce): the data frame we want to reduce
        lag_to_apply (int): the current lag order of the model we use in that precise iteration
        targeted_feature(string)


    Returns:
        _type_: we return the dataframe that has been reduced
    """
    df_granger = grangers_test(df_to_reduce, lag = lag_to_apply)
    np.fill_diagonal(df_granger.values, 0)
    #plt.figure(figsize = (16,5))
    #sns.heatmap(df_granger, annot = True)
    #plt.show()
    while (df_granger.values>= 0.05).any():
        list_feature_non_granger_causal = []
        list_value_non_granger_causal = []
        for col in df_granger.columns:
            if (df_granger[col].values >= 0.05).any() and col != targeted_feature:
                list_feature_non_granger_causal.append(col)
                list_value_non_granger_causal.append(sum(df_granger[col].values))
        df_feature_to_pop = pd.DataFrame(list_value_non_granger_causal, index = list_feature_non_granger_causal, columns=['p-value'])  
        df_to_reduce = df_to_reduce.drop(df_feature_to_pop['p-value'].idxmax(), axis = 1)
        df_granger = grangers_test(df_to_reduce, lag = 5)
        np.fill_diagonal(df_granger.values, 0)
        #plt.figure(figsize = (16,5))
        #sns.heatmap(df_granger, annot = True)


    return df_to_reduce
				
			

On peut voir que la fonction utilise un paramètre qui est le “lag” : il s’agit du paramètre de notre modèle dont nous parlions plus tôt. En fonction du lag retenu, le test ne renverra pas les mêmes valeurs. Il est donc très important de spécifier ce lag en fonction du lag du modèle… C’est là que les choses se compliquent un peu.

b. Sélection du lag optimal

On va vouloir trouver la valeur lambda qui minimise l’erreur choisie sur la variable choisie. Pour cela, on peut soit passer par une méthode disponible dans le package statsmodels, soit utiliser une fonction maison lorsque les métriques de la librairie ne nous conviennent pas.

En effet, statsmodels ne permet d’optimiser un modèle VAR que sur des métriques statistiques. Or, il n’est pas forcément bon d’optimiser seulement les métriques statistiques (AIC, BIC). Dans notre cas, cherchons plutôt à optimiser la MAE. Pour cela, nous allons devoir, pour chacune des valeurs (lambda) de l’itération, calculer notre métrique cible, ce qui veut donc dire : 

  • Pour un niveau de lag (lambda) choisi, on doit tester la pertinences des variables grâce à notre test de granger et le cas échéant, éliminer les features inutiles
  • Avoir une fonction qui va dé-transformer notre prédiction (pour rappel, on fonctionne actuellement avec des variables qui sont transformées via boxcox, minmaxscaler, puis différenciées !)
  • Avoir une fonction de predict
  • Avoir des fonctions de mesure de l’erreur
  • Garder en mémoire le meilleur modèle à date pour la métrique choisie, ici la MAE.

Dans le cadre de cet exemple, mes modèles sont testés à chaque fois sur la variable que j’ai terminée comme étant ma cible (tsd). Je ne mesure donc pas (et je n’optimise pas) en fonction de la capacité qu’a l’algorithme à bien prédire l’ensemble des variables.

On va avoir besoin d’une fonction predict :

				
					def predict_results(model_fitted, training_set, exog_for_predicting):
    """function to predict the forecasts


    Args:
        model_fitted (var model): the fitted model
        training_set (dataframe): the training set  
        exog_for_predicting (dataframe): exog on the index of predicting


    Returns:
        dataframe : returns three dataframe : the actual predictions, the upper confidence interval, the lower confidence interval
    """
    lag_order = model_fitted.k_ar
    input_data = training_set.values[-lag_order:]
    y_predicted = model_fitted.forecast(y = input_data, steps = len(exog_for_predicting), exog_future=exog_for_predicting)
    df_predicted_interval = model_fitted.forecast_interval(y = input_data, steps = len(exog_for_predicting), exog_future=exog_for_predicting)
    interval_upper = pd.DataFrame(df_predicted_interval[2],  index=exog_for_predicting.index, columns=training_set.columns)
    interval_lower = pd.DataFrame(df_predicted_interval[1], index=exog_for_predicting.index, columns=training_set.columns)


    df_predicted = pd.DataFrame(y_predicted, index=exog_for_predicting.index, columns=training_set.columns)
    return df_predicted, interval_upper, interval_lower
				
			

Puis de re-transformer les variables, d’abord en inversant la différenciation :

				
					def inv_diff(df_orig, df_forecast, second_diff = False):
    """invert the differentiation of a dataframe


    Args:
        df_orig (dataframe): the dataframe that existed JUST BEFORE differencing
        df_forecast (dataframe): the dataframe with the forecasted values
        second_diff (bool, optional): Did you apply two diff order ?


    Returns:
        dataframe with the diff reverted
    """
    columns = df_forecast.columns
    df_fc_inv = df_forecast.copy()
    n_jour_cible = len(df_forecast)
    for col in columns:
        if second_diff:
            df_fc_inv[str(col)+'_1d'] = (df_orig[col].iloc[-n_jour_cible-1]-df_orig[col].iloc[-n_jour_cible-2]) + df_fc_inv[str(col)].cumsum()
            df_fc_inv[str(col)] = df_orig[col].iloc[-n_jour_cible-1] + df_fc_inv[str(col)+"_1d"].cumsum()
        else:
            df_fc_inv[str(col)] = df_orig[col].iloc[-n_jour_cible-1] + df_fc_inv[str(col)].cumsum()
    return df_fc_inv
				
			

Puis en inversant les transformations de boxcox & minmaxscaler sur les bonnes variables :

				
					def invert_transform(list_of_features_boxcox, list_of_features_normalized, scaler, predicted, initial_dataframe, dict_lambda_value):
    """invert the transformation boxcox and of a scaler of any kind


    Args:
        list_of_features_boxcox (list): list of features you transformed with Boxcox
        list_of_features_normalized (list): list of features you normalized
        scaler (scaler objet): the scaler you innitiated for normalizing
        predicted (dataframe): forecasted dataframe
        initial_dataframe (dataframe: the very first dataframe, not transformed
        dict_lambda_value (_type_): the dict  in we stored the lambdas values of each feature transformed via boxcox


    Returns:
        the predicted dataframe with true destransformed value
    """
    list_of_features_boxcox = list(set(list_of_features_boxcox) & set(predicted.columns))
    list_of_features_normalized = list(set(list_of_features_normalized) & set(predicted.columns))


    scaler = scaler.fit(initial_dataframe[list_of_features_boxcox])
    predicted[list_of_features_normalized] = scaler.inverse_transform(predicted[list_of_features_normalized])
    for col in list_of_features_boxcox:
        predicted[col]=inv_boxcox(predicted[col], dict_lambda_value[col]).values
   
    return predicted
				
			

Et enfin, différentes fonctions perso qui permettent de mesurer les erreurs :

				
					def r2_ajusted(r2, df):
    longueur = len(df.index)
    nb_col = len(df.columns)
    score_ajusted = 1-((1-r2)*(longueur-1) / (longueur - nb_col -1))
    return score_ajusted
				
			
				
					def mape(y_test, pred):
    y_test, pred = np.array(y_test), np.array(pred)
    mape = np.mean(np.abs((y_test - pred) / y_test))
    return mape
				
			

Et on a finalement notre fonction d’optimisation, qui enregistre les metrics de chacune des itérations :

				
					def convergence_lag_features(transformed_diff_dataframe, transformed_dataframe, initial_dataframe, list_of_features_boxcox, list_of_features_normalized, scaler, dict_lambda_value,
                             exog, maxlag_to_converge, n_period, targeted_feature):
    """We will optimize the features according to the chosen lag until the lag and features do not change


    Args:
        initial_transforemd_diff_dataframe (Dataframe): The initial dataframe you want to perform the convergence on. It is the dataframe you want the data
        to be trained on, so it must be transformed, differenciated if needed...
        transformed_dataframe (Dataframe): The initial dataframe that is transformed but not yet differenciated
        exog (pandas dataframe): the exogeneous features you give to your model. Must be known for train and test ! Good example is holidays,
        Fourier features...
        maxlag_to_converge (int): the maximum number of lags to include in your model. The code will test them all so the bigger the longer
        the training time ! Be careful as it is exponential UwU
        n_period (int): number of time periods (days, hours...) you want to predict
        targeted_feature: the main feature you want to predict


    Returns:
    best_model (model object) : the best fitted model among all p
    training_set (pandas dataframe) : the training set being granger tested and reduced
    training_set_exog (pandas dataframe) : the training set exog
    testing_set (pandas dataframe) : the test set being granger tested and reduced
    test_set_exog (pandas dataframe) : the testing set exog
    transformed_dataframe_updated (pandas dataframe) :
    training_metrics
    initial_dataframe_uptaded
    """
    mae_training_loop = []
    MAPE_training_loop = []
    aic_training_loop = []
    r2_training_loop= []
    normality_training_loop = []
    whiteness_training_loop = []
    best_model = None
    best_df_diff = None


    training_set_exog = exog[:-n_period]
    test_set_exog = exog[-n_period:]
    initial_testing_set = initial_dataframe[-n_period:]


    for i in range(1, maxlag_to_converge):
        df_diff = transformed_diff_dataframe.copy()
        df_diff= reduce_dim_granger_test(df_diff, i, targeted_feature)


        training_set=df_diff[:-n_period]
        testing_set = df_diff[-n_period:]


        model = VAR(endog = training_set, exog = training_set_exog)
        model = model.fit(i)


        predict, predict_upper, predict_lower = predict_results(model, training_set, test_set_exog)


        initial_testing_set_updated = initial_testing_set[predict.columns]
        initial_dataframe_uptaded = initial_dataframe[predict.columns]
       
        predict = inv_diff(transformed_dataframe, predict)
        predict = invert_transform(list_of_features_boxcox, list_of_features_normalized, scaler, predict, initial_dataframe, dict_lambda_value)


        mae = mean_absolute_error(initial_testing_set_updated[targeted_feature], predict[targeted_feature])
        if len(mae_training_loop) == 0 or min(mae_training_loop) > mae:
            best_model = model
            best_df_diff = df_diff
        mae_training_loop.append(mae)
        MAPE_training_loop.append(mape(initial_testing_set_updated[targeted_feature], predict[targeted_feature]))
        r2_training_loop.append(r2_ajusted(r2_score(initial_testing_set_updated[targeted_feature], predict[targeted_feature]), predict))
        aic_training_loop.append(model.aic)
        normality_training_loop.append(model.test_normality().pvalue)
        whiteness_training_loop.append(model.test_whiteness(round((len(training_set))/5)).pvalue)


   
    training_metrics = pd.DataFrame()
    training_metrics['mae'] = mae_training_loop
    training_metrics['mape'] = MAPE_training_loop
    training_metrics['r2'] = r2_training_loop
    training_metrics['aic'] = aic_training_loop
    training_metrics['normality'] = normality_training_loop  #H0: distribution is gaussian. if pvalue < 0.05 we reject and that is not good
    training_metrics['whiteness'] =  whiteness_training_loop   #H0: there is no presence of autocorrelation. if pvalue < 0.05 we reject and that is not good


    training_set=best_df_diff[:-n_period]
    testing_set = best_df_diff[-n_period:]
    initial_dataframe_uptaded = initial_dataframe[best_df_diff.columns]
    transformed_dataframe_updated = transformed_dataframe[best_df_diff.columns]
    return best_model, training_set, training_set_exog, testing_set, test_set_exog, transformed_dataframe_updated, training_metrics, initial_dataframe_uptaded
				
			

On appelle le code en choisissant notre lag maximum à tester, ici 52 :

				
					model_var, train_var, train_var_exog, test_var, test_var_exog, df_var_transformed, df_optimizing_metrics, df_var = convergence_lag_features(df_diff_1_transformed, df_var_transformed, df_var, list_features_to_box_cox, list_features_non_box_cox,
                                                                                                                                      minmaxscaler, dict_lambda_bc, exog_pour_var, 52, n_period_predict, target_variable)
				
			

On trouve un compte rendu de toutes les itérations dans le dataframe « df_optimizing_metrics ».

6. Prédiction avec le meilleur modèle et affichage des résultats

Maintenant que l’on a notre meilleur modèle optimisé avec les training/testing set aussi optimisés en termes de choix de variables, avec aussi le dataframe initial avec les variables sélectionnés, on peut prédire, inverser les transformations (je prédis en même temps les intervals de confiance, et je les dé-transforme aussi) :

				
					df_predicted, df_interval_lower, df_interval_upper = predict_results(model_var, train_var, test_var_exog)


df_predicted = inv_diff(df_var_transformed, df_predicted)
df_interval_lower = inv_diff(df_var_transformed, df_interval_lower)
df_interval_upper = inv_diff(df_var_transformed,df_interval_upper)
df_predicted = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_predicted, df_var, dict_lambda_bc)
df_interval_lower = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_interval_lower, df_var, dict_lambda_bc)
df_interval_upper = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_interval_upper, df_var, dict_lambda_bc)
				
			

Enfin, je trace mes graphiques true vs pred :

				
					def append_axes(fig, as_cols=False):
    """Append new Axes to Figure."""
    n = len(fig.axes) + 1
    nrows, ncols = (1, n) if as_cols else (n, 1)
    gs = mpl.gridspec.GridSpec(nrows, ncols, figure=fig)
    for i,ax in enumerate(fig.axes):
        ax.set_subplotspec(mpl.gridspec.SubplotSpec(gs, i))
    return fig.add_subplot(nrows, ncols, n)


def plot_pred_vs_true(true_value, predicted, lower_int, upper_int, normality_of_residual = False):
    """_summary_
    The function will plot every true column of the dataframe against pred


    Args:
        true_value (dataframe): df containing the true value
        predicted (dataframe): df of the predictions
        lower_int (dataframe) : df with the lower interval values
        upper_int (dataframe) : df with the upper interval values
        normality_of_residuals : are the residuals following a gaussian ? if not, interval will be strange /EVIL LAUGHTER/
    """
    sns.set(style="ticks", context="talk")
    plt.style.use("dark_background")
    plt.rc('legend',**{'fontsize':25})
    fig = plt.figure(layout='constrained', figsize=(40,70))
    col = predicted.columns
    predicted.columns = predicted.columns+'_pred'
    lower_int.columns =lower_int.columns+'_low_int'
    upper_int.columns = upper_int.columns+'_up_int'
    predicted.plot(subplots=True, ax=append_axes(fig), color='r', fontsize=30, legend=True)


    if normality_of_residual:
        lower_int.plot(subplots=True, ax=fig.axes, color='r', fontsize=30, legend=True, linestyle ='-.')
        upper_int.plot(subplots=True, ax=fig.axes, color='r', fontsize=30, legend=True, linestyle ='-.')
    true_value[col].plot(subplots=True, ax=fig.axes, color='b', fontsize=30, legend=True)
    plt.show()
				
			
				
					plot_pred_vs_true(initial_testing_set, df_predicted, df_interval_lower, df_interval_lower, normality_of_residual=False)
				
			

7. Conclusion

Dans cet exercice, on atteint une MAE de 3068, une MAPE de 9,4% sur notre variable cible 🎉. Voici le graphique true/pred de la variable cible ‘tsd’ : 

Si mes résidus ne passent pas le test de normalité (les résidus suivent-ils une gaussienne?) alors je risque de me retrouver avec des intervalles de confiance complètement faux ! Il ne faut donc surtout pas compter dessus. Il existe des méthodes pour rendre les résidus gaussiens, mais cela sera le sujet d’un prochain article 😀

On peut aussi noter qu’en termes d’élégance de code, on pourrait faire bien mieux avec du code orienté objet et en créant une nouvelle classe qui s’appuierait sur le modèle VAR de statsmodels.

Pour conclure, voici le code exécuté de A à Z en dehors des fonctions :

				
					df = pd.read_csv("path.csv")
df['settlement_date'] = pd.to_datetime(df['settlement_date'])
df = df.set_index('settlement_date')
df = df.drop(['settlement_period', 'period_hour'], axis = 1)
exog = df[['is_holiday']].copy()
df = df.drop(['is_holiday'], axis =1)
				
			
				
					target_variable = "tsd" #the feature we ultimately want to predict
n_period_predict = 48 * 7 #the number of period we want to predict. I want to predict 7 days and there are 48 data points per day
				
			
				
					mask = (df.index > '2022-01-01') #I will use the data post 2022
df_var = df.loc[mask]
exog_pour_var = exog.loc[mask]
df_var = df_var +1 #We erase the every value that are equals to 0 in order to facilitate the boxcox transformation
				
			
				
					initial_testing_set = df_var[-n_period_predict:] #there will be a lot of transformation so we want to keep a safe testing set
				
			
				
					list_features_to_box_cox, list_features_non_box_cox = box_cox_test(df_var)
				
			
				
					df_var_transformed, dict_lambda_bc = transfo_boxcox(df_var, list_features_to_box_cox)
				
			
				
					#We apply a minmaxscaler transform to the data we could not transform thanks to boxcox
minmaxscaler = MinMaxScaler()
minmaxscaler = minmaxscaler.fit(df_var[list_features_non_box_cox])
df_var_transformed[list_features_non_box_cox] = minmaxscaler.transform(df_var[list_features_non_box_cox])
				
			
				
					for name, column in df_var_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			
				
					df_diff_1_transformed = df_var_transformed.diff().dropna()
exog_pour_var = exog_pour_var[1:] #we need to offset of 1 because we dropped the first value by differencing
for name, column in df_diff_1_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			
				
					model_var, train_var, train_var_exog, test_var, test_var_exog, df_var_transformed, df_optimizing_metrics, df_var = convergence_lag_features(df_diff_1_transformed, df_var_transformed, df_var, list_features_to_box_cox, list_features_non_box_cox,
                                                                                                                                      minmaxscaler, dict_lambda_bc, exog_pour_var, 52, n_period_predict, target_variable)
				
			
				
					print(df_optimizing_metrics.describe())
				
			
				
					df_predicted, df_interval_lower, df_interval_upper = predict_results(model_var, train_var, test_var_exog)
				
			
				
					plot_pred_vs_true(initial_testing_set, df_predicted, df_interval_lower, df_interval_lower, normality_of_residual=False)
				
			
Facebook
Twitter
LinkedIn

DataScientest News

Inscrivez-vous à notre Newsletter pour recevoir nos guides, tutoriels, et les dernières actualités data directement dans votre boîte mail.

Vous souhaitez être alerté des nouveaux contenus en data science et intelligence artificielle ?

Laissez-nous votre e-mail, pour que nous puissions vous envoyer vos nouveaux articles au moment de leur publication !

Newsletter icone
icon newsletter

DataNews

Vous souhaitez recevoir notre
newsletter Data hebdomadaire ?