🚀 Think you’ve got what it takes for a career in Data? Find out in just one minute!

The VAR model: From business approach to results plotting

-
8
 m de lecture
-
var

1. Introduction

Welcome to this explanatory article about the VAR model. We will delve into the “fundamentals” of this model but not the general aspects of time series (except for stationarity 🤯).

The dataset used is available here. And here is my GitHub: https://github.com/gambitl/Article_VAR_FR_Public

By the end of this article, you will know:

  • What the VAR model is and what its significance is 🧐
  • How and why to prepare your dataset to use the VAR model
  • How to choose the right parameter (lag count) for your dataset to minimize an error metric, not just a statistical metric
  • Why and how to test the relationship between variables and remove those that add noise to the model
  • Automate the two previous steps and converge towards an optimal result that combines the most effective parameter and the most suitable dataset 👍

At each stage, I’ll highlight the theoretical points first, then the “in-house” functions, and finally the code that calls the functions. Each function is commented and explained within the code itself.

At the end of the article, you’ll find all the non-functional code.

In this example, I’m using the VAR model to predict ONE particular variable.

2. VAR model definition: equation & interest

The Vector Autoregression (VAR) model is a time series model that predicts several variables over time. Each variable’s prediction contributes to the prediction of the others. So, if we were to write it in a linear format, we’d have this:

Let there be 2 variables, Y and X, and (t) a moment in time.

Y(t+1) = f(Y(t), X(t))

X(t+1) = f(X(t), Y(t))

Y(t+1) = α1 + β1Y(t) + β2X(t) + ε1

X(t+1) = α2 + β3X(t) + β4Y(t) + ε2

With α the constants, β the coefficients associated with the variables, ε the errors.

Based on historical data, the model calculates the constants and coefficients for each variable.

In addition, the model includes “lags” (λ), which allow all values of X, Y up to X(t- λ), Y(t- λ) to be included in the calculation of X(t+1), Y(t+1). In the previous system, the next value of the variable (X) is calculated as a function of the previous value of (X) and the previous value of (Y). The lag (λ) is therefore equal to 1.

It is these lags that constitute the parameter to be optimized in this model, i.e. how many days of history will make up our equations.

3. Import from booksellers, set-up

We need the following imports:

				
					import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import grangercausalitytests, adfuller
from scipy.stats import boxcox, norm, probplot, normaltest
from scipy.special import inv_boxcox
				
			

Loading data :

				
					df = pd.read_csv("path.csv")
df['settlement_date'] = pd.to_datetime(df['settlement_date'])
df = df.set_index('settlement_date')
df = df.drop(['settlement_period', 'period_hour'], axis = 1)
exog = df[['is_holiday']].copy()
df = df.drop(['is_holiday'], axis =1)
				
			

Parameters :

				
					target_variable = "tsd" #the feature we ultimately want to predict
n_period_predict = 48 * 7 #the number of period we want to predict. I want to predict 7 days and there are 48 data points per day
				
			

Preparations/Cleaning :

				
					mask = (df.index > '2022-01-01') #I will use the data post 2022
df_pour_var = df.loc[mask]
exog_pour_var = exog.loc[mask]
df_pour_var = df_pour_var +1 #We erase the every value that are equals to 0 in order to facilitate the boxcox transformation
initial_testing_set = df_pour_var[-n_period_predict:] #there will be a lot of transformation so we want to keep a safe testing set
				
			

4. Stationarity: what it is and why it's importan

We define (weak) stationarity as follows:

Whatever the moment (t) in time, the expectation of our variable is always the same. There is no trend.
Whatever the moment (t) in time, the variance of our variable is always the same and not infinite.
Whatever the moment in time, the auto-correlation between two moments in time is a function only of the time lag (and position does not change the value of the auto-correlation).

Without going into detail, it has been shown that regression on a non-stationary variable even has invalid parameters (they follow a Brownian motion, i.e. very irregular, for simplicity’s sake). I recommend this feed on researchgate for more details.

The first step in creating a time series model is to stationarize the series. There are several methods for doing this, all of which involve transforming the data:

The various transformation methods for normalizing our data, such as MinMaxScaler()
The Boxcox transformation, widely used and common in the case of time series.
Differentiation, which consists in subtracting the previous value from a value in the series. In my experience, it’s rare for a series to be non-stationary after one or two differentiations. Note that in ARIMA/ARMA models, the library usually takes care of differentiating the series itself, but this is not the case here 😀toujours la même. There is no trend.
Whatever the moment (t) in time, the variance of our variable is always the same and not infinite.
Whatever the moment in time, the auto-correlation between two moments in time is a function only of the time lag (and position does not change the value of the auto-correlation).

Without going into detail, it has been shown that regression on a non-stationary variable even has invalid parameters (they follow a Brownian motion, i.e. very irregular, for simplicity’s sake). I recommend this feed on researchgate for more details.

Ideally, we’d like to transform a series using Boxcox, then differentiate it if required.

Unfortunately, not all series can be transformed using Boxcox (negative values). I therefore propose the following code, which will test the set:

				
					def box_cox_test(dataframe_to_test):
    """_summary_


    Args:
        dataframe_to_test (dataframe)


    Returns:
        list_box_cox : the list of values you can transform thanks to boxcox
        list_non_box_cox : the list of values you CAN'T transform thanks to boxcox
    """
    list_non_transformed = []
    list_box_cox = []
    for col in dataframe_to_test:
        data = dataframe_to_test[col]
        if (data < 0).values.any():
            list_non_transformed.append(col)
            continue
        fig = plt.figure()
        ax1 = fig.add_subplot(221)
        prob = probplot(data, dist=norm, plot=ax1)
        ax1.set_xlabel('')
        ax1.set_title('Probplot against normal distribution')


        ax21 = fig.add_subplot(222)
        ax21.hist(data)
        ax21.set_xlabel(col)
        ax21.set_title('distplot')


        # normality test
        stat, p = normaltest(data)
        print('Statistics=%.3f, p=%.3f' % (stat, p))
        # interpret
        alpha = 0.05
        if p > alpha:
            print('Sample looks Gaussian (fail to reject H0)')
        else:
            print('Sample does not look Gaussian (reject H0)')


        xt, _ = boxcox(data)
        if not np.isfinite(xt).any():
            list_non_transformed.append(col)
            continue
        list_box_cox.append(col)
       
        ax2 = fig.add_subplot(223)
        prob = probplot(xt, dist=norm, plot=ax2)
        ax2.set_title('Probplot after Box-Cox transformation')
        ax22 = fig.add_subplot(224)
        ax22.hist(xt)
        ax22.set_xlabel(col)
        ax22.set_title('distplot')


        stat, p = normaltest(xt)
        print('Statistics=%.3f, p=%.3f' % (stat, p))
        # interpret
        alpha = 0.05
        if p > alpha:
            print('Sample looks Gaussian (fail to reject H0)')
        else:
            print('Sample does not look Gaussian (reject H0)')
       
        plt.subplots_adjust(top = 0.99, bottom=0.01, hspace=1.2, wspace=0.4)
        plt.show()
    print('*'*50 +'\n' +'*'*50)
    print(f"Les données suivantes n'ont pas pu être scaled (négatives, infinites) : {list_non_transformed}. \n Voici leur plot non transformé")
    for col in list_non_transformed:
        data = dataframe_to_test[col]
        fig = plt.figure()
        ax1 = fig.add_subplot(211)
        prob = probplot(data, dist=norm, plot=ax1)
        ax1.set_xlabel('')
        ax1.set_title('Probplot against normal distribution')


        ax21 = fig.add_subplot(212)
        ax21.hist(data)
        ax21.set_xlabel(col)
        ax21.set_title('distplot')


    return list_box_cox, list_non_transformed
				
			

We use it to test our dataframe:

				
					list_features_to_box_cox, list_features_non_box_cox = box_cox_test(df_pour_var)
				
			

We now have a list of variables that can be transformed via boxcox, and a list that cannot be transformed via boxcox.

We now transform the variables that can be transformed via boxcox. I’ve coded a function to obtain a dictionary containing the “features – lambda” associations used for the transformation. This will come in handy for what’s to come!

				
					def transfo_boxcox(dataframe_to_transform, list_features_to_transform):
    """_summary_
    Args:
        dataframe_to_transform (dataframe)
        list_features_to_transform (list)
    Returns:
        transformed (dataframe): the dataframe after being transformed
        dict_lambda(dict): the dict containing in key the name of a feature and in value the lambda used for transforming it via boxcox
    """
    dict_lambda = {}
    transformed = dataframe_to_transform.copy()
    for ft in list_features_to_transform:
        transformed[ft], l = boxcox(dataframe_to_transform[ft])
        dict_lambda[ft] = l
    return transformed, dict_lambda
				
			
				
					df_var_transformed, dict_lambda_bc = transfo_boxcox(df_pour_var, list_features_to_box_cox)
				
			
				
					#We apply a minmaxscaler transform to the data we could not transform thanks to boxcox
minmaxscaler = MinMaxScaler()
minmaxscaler = minmaxscaler.fit(df_pour_var[list_features_non_box_cox])
df_var_transformed[list_features_non_box_cox] = minmaxscaler.transform(df_pour_var[list_features_non_box_cox])
				
			

It’s time to test our features and find out if they’re stationary. Here’s the test function:

				
					def dickey_fuller_test(data, seuil_signif=0.05, name='', verbose=False):
    """A function tests whether our features are stationary or not.


    Args:
        data (series): the data to test
        seuil_signif (float, optional): test level. Defaults to 0.05.
        name (str, optional): _description_. Defaults to ''.
        verbose (bool, optional): _description_. Defaults to False.
    """
    result = adfuller(data, autolag='AIC')
    output = {'Test statistic':round(result[0], 4), 'p_value':round(result[1], 4), 'n_lags':round(result[2], 4), 'n_obs':result[3]}
    p_value = output['p_value']


    # Print Summary
    print(f'    Augmented Dickey-Fuller test on : "{name}"', "\n   ", '~'*47)
    print(f' H0: the data shows a unit root, and then is not stationary.')
    print(f' P-Value equals      : {p_value}')
    print(f' Confidence level    = {seuil_signif}')
    print(f' Test statistic   = {output["Test statistic
"]}')
    print(f' Number of lags = {output["n_lags"]}')


    if p_value <= seuil_signif:
        print(f" => P-Value = {p_value} <= {seuil_signif}. We can reject H0 on the confidence level.")
        print(f" => The serie is stationary.")
    else:
        print(f" => P-Value = {p_value} > {seuil_signif}. We can not reject H0 on the confidence level.")
        print(f" => The serie is NOT stationary.")
				
			

And the test in question:

				
					for name, column in df_var_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			

Unfortunately, some series are not stationary. So we’ll differentiate the series and repeat the test:

				
					df_diff_1_transformed = df_var_transformed.diff().dropna()
exog_pour_var = exog_pour_var[1:] #we need to offset of 1 because we dropped the first value by differencing
for name, column in df_diff_1_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			

Now everything’s stationary! We can now move on to training the model.

5. The model and lag/variable convergence

a. Select variables

When we run a more “classic” classification or regression model, we always test the relevance of the input variables, i.e. the quality of the information provided by these variables to predict the target variable. In a time series exercise, it’s the same thing: a variable may not explain the target variable at all/too little. So you have to decide which ones to keep, which ones to delete. 🤗

In our case, there’s a test, the Granger test, that allows us to finish whether a variable helps explain the predictions of another variable. This is exactly what we’re looking for 😀

Here’s the test function:

				
					def grangers_test(data, lag, test='ssr_chi2test',verbose=False):    
    """_summary_
        The values inside the dataframe are the p-value of the test for the couple feature X/Y
    H0 hypothesis is the following :
        "Forecasts of X does not influence forecasts on Y"
    Therefor, p-value under 0.005 threshold allows us to reject H0 with a confidence level of 95% and pushs us to keep this couple of variables


Les arguments sont :
        data (dataframe): dataframe that holds the value of each feature
        lag (int): the lag to apply to the test. Must be equal to the lag used in the model
        test (str, optional): the test to apply. Defaults to 'ssr_chi2test'.
        verbose (bool, optional):  Defaults to False.


    Returns:
        the dataframe with p-value
    """
    df = pd.DataFrame(np.zeros((len(data.columns), len(data.columns))), columns=data.columns, index=data.columns)
    for col in df.columns:
        for row in df.index:
            test_result = grangercausalitytests(data[[row, col]], maxlag=[lag], verbose=False)
            p_values = round(test_result[lag][0][test][1],4) #on va avoir toutes les p-valeurs une part lag
            p_value = np.min(p_values) #On s'intéresse à la valeur minimale des p-valeur
            df.loc[row, col] = p_value
    df.columns = [var for var in data.columns]
    df.index = [var + '_Y' for var in data.columns]
    return df
				
			

In addition to the function that tests, we set up a function that will reduce our initial dataset, depending on the results of the test: we test the dataframe, delete the variable with the highest sum of p-values, and iterate until all variables pass the test.

Note that you can enter a “targeted feature” which allows you never to delete a particular variable you wish to target. As a return value, the function returns the dataframe reduced in dimensionality. All that remains are variables whose predictions help to predict each of the other variables:

				
					def reduce_dim_granger_test(df_to_reduce, lag_to_apply, targeted_feature = None):
    """We reduce the dataset thanks to the granger test
        We test that every variables in the dataframe are usefull to predict each other, and if not, we drop them
        After each iteration of "features optimization", we run again the model fit in order to verify that the best lag_order has not changed.
        If it as, hwe run the granger test optimization again
    Args:
        df_to_reduce (dataframe to reduce): the data frame we want to reduce
        lag_to_apply (int): the current lag order of the model we use in that precise iteration
        targeted_feature(string)


    Returns:
        _type_: we return the dataframe that has been reduced
    """
    df_granger = grangers_test(df_to_reduce, lag = lag_to_apply)
    np.fill_diagonal(df_granger.values, 0)
    #plt.figure(figsize = (16,5))
    #sns.heatmap(df_granger, annot = True)
    #plt.show()
    while (df_granger.values>= 0.05).any():
        list_feature_non_granger_causal = []
        list_value_non_granger_causal = []
        for col in df_granger.columns:
            if (df_granger[col].values >= 0.05).any() and col != targeted_feature:
                list_feature_non_granger_causal.append(col)
                list_value_non_granger_causal.append(sum(df_granger[col].values))
        df_feature_to_pop = pd.DataFrame(list_value_non_granger_causal, index = list_feature_non_granger_causal, columns=['p-value'])  
        df_to_reduce = df_to_reduce.drop(df_feature_to_pop['p-value'].idxmax(), axis = 1)
        df_granger = grangers_test(df_to_reduce, lag = 5)
        np.fill_diagonal(df_granger.values, 0)
        #plt.figure(figsize = (16,5))
        #sns.heatmap(df_granger, annot = True)


    return df_to_reduce
				
			

We can see that the function uses a parameter called “lag”: this is the parameter of our model we mentioned earlier. Depending on the lag selected, the test will not return the same values. So it’s very important to specify this lag as a function of the model lag… This is where things get a little complicated.

b. Optimum lag selection

We want to find the lambda value that minimizes the chosen error on the chosen variable. To do this, we can either use a method available in the statsmodels package, or use an in-house function when the library’s metrics don’t suit us.

In fact, statsmodels can only optimize a VAR model on statistical metrics. However, it’s not necessarily a good idea to optimize only statistical metrics (AIC, BIC). In our case, we’d rather optimize the MAE. To do this, we’ll need to calculate our target metric for each of the iteration’s lambda values:

For a given lag (lambda) level, we need to test the relevance of the variables using our granger test and, if necessary, eliminate unnecessary features.
Have a function that will de-transform our prediction (as a reminder, we currently work with variables that are transformed via boxcox, minmaxscaler, then differentiated!)
Have a predict function
Error measurement functions
Remember the best model to date for the chosen metric, in this case MAE.

In this example, my models are tested each time on the variable I’ve set as my target (tsd). So I’m not measuring (and optimizing) according to the algorithm’s ability to predict the set of variables well.

We’ll need a predict function:

				
					def predict_results(model_fitted, training_set, exog_for_predicting):
    """function to predict the forecasts


    Args:
        model_fitted (var model): the fitted model
        training_set (dataframe): the training set  
        exog_for_predicting (dataframe): exog on the index of predicting


    Returns:
        dataframe : returns three dataframe : the actual predictions, the upper confidence interval, the lower confidence interval
    """
    lag_order = model_fitted.k_ar
    input_data = training_set.values[-lag_order:]
    y_predicted = model_fitted.forecast(y = input_data, steps = len(exog_for_predicting), exog_future=exog_for_predicting)
    df_predicted_interval = model_fitted.forecast_interval(y = input_data, steps = len(exog_for_predicting), exog_future=exog_for_predicting)
    interval_upper = pd.DataFrame(df_predicted_interval[2],  index=exog_for_predicting.index, columns=training_set.columns)
    interval_lower = pd.DataFrame(df_predicted_interval[1], index=exog_for_predicting.index, columns=training_set.columns)


    df_predicted = pd.DataFrame(y_predicted, index=exog_for_predicting.index, columns=training_set.columns)
    return df_predicted, interval_upper, interval_lower
				
			

Then re-transform the variables, first by reversing the differentiation:

				
					def inv_diff(df_orig, df_forecast, second_diff = False):
    """invert the differentiation of a dataframe


    Args:
        df_orig (dataframe): the dataframe that existed JUST BEFORE differencing
        df_forecast (dataframe): the dataframe with the forecasted values
        second_diff (bool, optional): Did you apply two diff order ?


    Returns:
        dataframe with the diff reverted
    """
    columns = df_forecast.columns
    df_fc_inv = df_forecast.copy()
    n_jour_cible = len(df_forecast)
    for col in columns:
        if second_diff:
            df_fc_inv[str(col)+'_1d'] = (df_orig[col].iloc[-n_jour_cible-1]-df_orig[col].iloc[-n_jour_cible-2]) + df_fc_inv[str(col)].cumsum()
            df_fc_inv[str(col)] = df_orig[col].iloc[-n_jour_cible-1] + df_fc_inv[str(col)+"_1d"].cumsum()
        else:
            df_fc_inv[str(col)] = df_orig[col].iloc[-n_jour_cible-1] + df_fc_inv[str(col)].cumsum()
    return df_fc_inv
				
			

Then invert the boxcox & minmaxscaler transformations on the correct variables:

				
					def invert_transform(list_of_features_boxcox, list_of_features_normalized, scaler, predicted, initial_dataframe, dict_lambda_value):
    """invert the transformation boxcox and of a scaler of any kind


    Args:
        list_of_features_boxcox (list): list of features you transformed with Boxcox
        list_of_features_normalized (list): list of features you normalized
        scaler (scaler objet): the scaler you innitiated for normalizing
        predicted (dataframe): forecasted dataframe
        initial_dataframe (dataframe: the very first dataframe, not transformed
        dict_lambda_value (_type_): the dict  in we stored the lambdas values of each feature transformed via boxcox


    Returns:
        the predicted dataframe with true destransformed value
    """
    list_of_features_boxcox = list(set(list_of_features_boxcox) & set(predicted.columns))
    list_of_features_normalized = list(set(list_of_features_normalized) & set(predicted.columns))


    scaler = scaler.fit(initial_dataframe[list_of_features_boxcox])
    predicted[list_of_features_normalized] = scaler.inverse_transform(predicted[list_of_features_normalized])
    for col in list_of_features_boxcox:
        predicted[col]=inv_boxcox(predicted[col], dict_lambda_value[col]).values
   
    return predicted
				
			

And last but not least, a number of custom functions for measuring errors:

				
					def r2_ajusted(r2, df):
    longueur = len(df.index)
    nb_col = len(df.columns)
    score_ajusted = 1-((1-r2)*(longueur-1) / (longueur - nb_col -1))
    return score_ajusted
				
			
				
					def mape(y_test, pred):
    y_test, pred = np.array(y_test), np.array(pred)
    mape = np.mean(np.abs((y_test - pred) / y_test))
    return mape
				
			

And finally we have our optimization function, which records the metrics of each iteration:

				
					def convergence_lag_features(transformed_diff_dataframe, transformed_dataframe, initial_dataframe, list_of_features_boxcox, list_of_features_normalized, scaler, dict_lambda_value,
                             exog, maxlag_to_converge, n_period, targeted_feature):
    """We will optimize the features according to the chosen lag until the lag and features do not change


    Args:
        initial_transforemd_diff_dataframe (Dataframe): The initial dataframe you want to perform the convergence on. It is the dataframe you want the data
        to be trained on, so it must be transformed, differenciated if needed...
        transformed_dataframe (Dataframe): The initial dataframe that is transformed but not yet differenciated
        exog (pandas dataframe): the exogeneous features you give to your model. Must be known for train and test ! Good example is holidays,
        Fourier features...
        maxlag_to_converge (int): the maximum number of lags to include in your model. The code will test them all so the bigger the longer
        the training time ! Be careful as it is exponential UwU
        n_period (int): number of time periods (days, hours...) you want to predict
        targeted_feature: the main feature you want to predict


    Returns:
    best_model (model object) : the best fitted model among all p
    training_set (pandas dataframe) : the training set being granger tested and reduced
    training_set_exog (pandas dataframe) : the training set exog
    testing_set (pandas dataframe) : the test set being granger tested and reduced
    test_set_exog (pandas dataframe) : the testing set exog
    transformed_dataframe_updated (pandas dataframe) :
    training_metrics
    initial_dataframe_uptaded
    """
    mae_training_loop = []
    MAPE_training_loop = []
    aic_training_loop = []
    r2_training_loop= []
    normality_training_loop = []
    whiteness_training_loop = []
    best_model = None
    best_df_diff = None


    training_set_exog = exog[:-n_period]
    test_set_exog = exog[-n_period:]
    initial_testing_set = initial_dataframe[-n_period:]


    for i in range(1, maxlag_to_converge):
        df_diff = transformed_diff_dataframe.copy()
        df_diff= reduce_dim_granger_test(df_diff, i, targeted_feature)


        training_set=df_diff[:-n_period]
        testing_set = df_diff[-n_period:]


        model = VAR(endog = training_set, exog = training_set_exog)
        model = model.fit(i)


        predict, predict_upper, predict_lower = predict_results(model, training_set, test_set_exog)


        initial_testing_set_updated = initial_testing_set[predict.columns]
        initial_dataframe_uptaded = initial_dataframe[predict.columns]
       
        predict = inv_diff(transformed_dataframe, predict)
        predict = invert_transform(list_of_features_boxcox, list_of_features_normalized, scaler, predict, initial_dataframe, dict_lambda_value)


        mae = mean_absolute_error(initial_testing_set_updated[targeted_feature], predict[targeted_feature])
        if len(mae_training_loop) == 0 or min(mae_training_loop) > mae:
            best_model = model
            best_df_diff = df_diff
        mae_training_loop.append(mae)
        MAPE_training_loop.append(mape(initial_testing_set_updated[targeted_feature], predict[targeted_feature]))
        r2_training_loop.append(r2_ajusted(r2_score(initial_testing_set_updated[targeted_feature], predict[targeted_feature]), predict))
        aic_training_loop.append(model.aic)
        normality_training_loop.append(model.test_normality().pvalue)
        whiteness_training_loop.append(model.test_whiteness(round((len(training_set))/5)).pvalue)


   
    training_metrics = pd.DataFrame()
    training_metrics['mae'] = mae_training_loop
    training_metrics['mape'] = MAPE_training_loop
    training_metrics['r2'] = r2_training_loop
    training_metrics['aic'] = aic_training_loop
    training_metrics['normality'] = normality_training_loop  #H0: distribution is gaussian. if pvalue < 0.05 we reject and that is not good
    training_metrics['whiteness'] =  whiteness_training_loop   #H0: there is no presence of autocorrelation. if pvalue < 0.05 we reject and that is not good


    training_set=best_df_diff[:-n_period]
    testing_set = best_df_diff[-n_period:]
    initial_dataframe_uptaded = initial_dataframe[best_df_diff.columns]
    transformed_dataframe_updated = transformed_dataframe[best_df_diff.columns]
    return best_model, training_set, training_set_exog, testing_set, test_set_exog, transformed_dataframe_updated, training_metrics, initial_dataframe_uptaded
				
			

And finally we have our optimization function, which records the metrics of each iteration:

				
					model_var, train_var, train_var_exog, test_var, test_var_exog, df_var_transformed, df_optimizing_metrics, df_var = convergence_lag_features(df_diff_1_transformed, df_var_transformed, df_var, list_features_to_box_cox, list_features_non_box_cox,
                                                                                                                                      minmaxscaler, dict_lambda_bc, exog_pour_var, 52, n_period_predict, target_variable)
				
			

And finally we have our optimization function, which records the metrics of each iteration:

6. Prediction with the best model and display of results

Now that we have our best optimized model with the training/testing set also optimized in terms of choice of variables, with also the initial dataframe with the selected variables, we can predict, invert the transformations (I predict at the same time the confidence intervals, and I also de-transform them):

				
					df_predicted, df_interval_lower, df_interval_upper = predict_results(model_var, train_var, test_var_exog)


df_predicted = inv_diff(df_var_transformed, df_predicted)
df_interval_lower = inv_diff(df_var_transformed, df_interval_lower)
df_interval_upper = inv_diff(df_var_transformed,df_interval_upper)
df_predicted = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_predicted, df_var, dict_lambda_bc)
df_interval_lower = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_interval_lower, df_var, dict_lambda_bc)
df_interval_upper = invert_transform(list_features_to_box_cox, list_features_non_box_cox, minmaxscaler,
                                df_interval_upper, df_var, dict_lambda_bc)
				
			

Finally, I plot my true vs pred graphs:

				
					def append_axes(fig, as_cols=False):
    """Append new Axes to Figure."""
    n = len(fig.axes) + 1
    nrows, ncols = (1, n) if as_cols else (n, 1)
    gs = mpl.gridspec.GridSpec(nrows, ncols, figure=fig)
    for i,ax in enumerate(fig.axes):
        ax.set_subplotspec(mpl.gridspec.SubplotSpec(gs, i))
    return fig.add_subplot(nrows, ncols, n)


def plot_pred_vs_true(true_value, predicted, lower_int, upper_int, normality_of_residual = False):
    """_summary_
    The function will plot every true column of the dataframe against pred


    Args:
        true_value (dataframe): df containing the true value
        predicted (dataframe): df of the predictions
        lower_int (dataframe) : df with the lower interval values
        upper_int (dataframe) : df with the upper interval values
        normality_of_residuals : are the residuals following a gaussian ? if not, interval will be strange /EVIL LAUGHTER/
    """
    sns.set(style="ticks", context="talk")
    plt.style.use("dark_background")
    plt.rc('legend',**{'fontsize':25})
    fig = plt.figure(layout='constrained', figsize=(40,70))
    col = predicted.columns
    predicted.columns = predicted.columns+'_pred'
    lower_int.columns =lower_int.columns+'_low_int'
    upper_int.columns = upper_int.columns+'_up_int'
    predicted.plot(subplots=True, ax=append_axes(fig), color='r', fontsize=30, legend=True)


    if normality_of_residual:
        lower_int.plot(subplots=True, ax=fig.axes, color='r', fontsize=30, legend=True, linestyle ='-.')
        upper_int.plot(subplots=True, ax=fig.axes, color='r', fontsize=30, legend=True, linestyle ='-.')
    true_value[col].plot(subplots=True, ax=fig.axes, color='b', fontsize=30, legend=True)
    plt.show()
				
			
				
					plot_pred_vs_true(initial_testing_set, df_predicted, df_interval_lower, df_interval_lower, normality_of_residual=False)
				
			

7. Conclusion

In this exercise, we achieve a MAE of 3068, a MAPE of 9.4% on our target variable 🎉. Here’s the true/pred plot for the target variable ‘tsd’ :

If my residuals don’t pass the normality test (do the residuals follow a Gaussian?) then I risk ending up with completely false confidence intervals! So don’t count on it. There are methods for making residuals Gaussian, but this will be the subject of a future article 😀

We can also note that in terms of code elegance, we could do much better with object-oriented code and by creating a new class that would build on statsmodels’ VAR model.

To conclude, here’s the code executed from A to Z apart from the functions:

				
					df = pd.read_csv("path.csv")
df['settlement_date'] = pd.to_datetime(df['settlement_date'])
df = df.set_index('settlement_date')
df = df.drop(['settlement_period', 'period_hour'], axis = 1)
exog = df[['is_holiday']].copy()
df = df.drop(['is_holiday'], axis =1)
				
			
				
					target_variable = "tsd" #the feature we ultimately want to predict
n_period_predict = 48 * 7 #the number of period we want to predict. I want to predict 7 days and there are 48 data points per day
				
			
				
					mask = (df.index > '2022-01-01') #I will use the data post 2022
df_var = df.loc[mask]
exog_pour_var = exog.loc[mask]
df_var = df_var +1 #We erase the every value that are equals to 0 in order to facilitate the boxcox transformation
				
			
				
					initial_testing_set = df_var[-n_period_predict:] #there will be a lot of transformation so we want to keep a safe testing set
				
			
				
					list_features_to_box_cox, list_features_non_box_cox = box_cox_test(df_var)
				
			
				
					df_var_transformed, dict_lambda_bc = transfo_boxcox(df_var, list_features_to_box_cox)
				
			
				
					#We apply a minmaxscaler transform to the data we could not transform thanks to boxcox
minmaxscaler = MinMaxScaler()
minmaxscaler = minmaxscaler.fit(df_var[list_features_non_box_cox])
df_var_transformed[list_features_non_box_cox] = minmaxscaler.transform(df_var[list_features_non_box_cox])
				
			
				
					for name, column in df_var_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			
				
					df_diff_1_transformed = df_var_transformed.diff().dropna()
exog_pour_var = exog_pour_var[1:] #we need to offset of 1 because we dropped the first value by differencing
for name, column in df_diff_1_transformed.iteritems():
    dickey_fuller_test(column, name=column.name)
    print('\n')
				
			
				
					model_var, train_var, train_var_exog, test_var, test_var_exog, df_var_transformed, df_optimizing_metrics, df_var = convergence_lag_features(df_diff_1_transformed, df_var_transformed, df_var, list_features_to_box_cox, list_features_non_box_cox,
                                                                                                                                      minmaxscaler, dict_lambda_bc, exog_pour_var, 52, n_period_predict, target_variable)
				
			
				
					print(df_optimizing_metrics.describe())
				
			
				
					df_predicted, df_interval_lower, df_interval_upper = predict_results(model_var, train_var, test_var_exog)
				
			
				
					plot_pred_vs_true(initial_testing_set, df_predicted, df_interval_lower, df_interval_lower, normality_of_residual=False)
				
			
Facebook
Twitter
LinkedIn

DataScientest News

Sign up for our Newsletter to receive our guides, tutorials, events, and the latest news directly in your inbox.

You are not available?

Leave us your e-mail, so that we can send you your new articles when they are published!
icon newsletter

DataNews

Get monthly insider insights from experts directly in your mailbox