Source code for ergodicity.tools.evaluate

"""
evaluate Submodule

The `evaluate` submodule provides tools for assessing the statistical properties and characteristics of stochastic processes. This includes methods for testing ergodicity, self-similarity, fat-tailedness, and more. It offers a variety of functions for time series analysis, focusing on stochastic processes commonly used in finance, physics, and other fields requiring statistical modeling.

Key Features:

1. **Ergodicity Testing**:

   - `test_ergodicity_increments`: Tests ergodicity by comparing time and ensemble averages of a process's increments. It also checks for stationarity and normality of the increments.

2. **Self-Similarity Analysis**:

   - `hurst_exponent`: Estimates the Hurst exponent using the Rescaled Range (R/S) method, a key indicator of self-similarity and long-term memory.

   - `aggregate_variance`: Uses the aggregate variance method to estimate the scaling parameter.

   - `test_self_similarity`: Combines multiple methods to test for self-similarity, including Hurst exponent estimation and scaling parameter analysis.

   - `test_self_similarity_wavelet`: Uses wavelet analysis to estimate the Hurst exponent, offering an alternative method for testing self-similarity.

3. **Fat-Tailedness Testing**:

   - `test_fat_tailedness`: Performs multiple tests to detect fat-tailedness in data, including kurtosis tests, Jarque-Bera tests, and tail index estimation using the Hill estimator. It also generates Q-Q plots for visual analysis.

   - `kurtosis_test`, `jarque_bera_test`, `tail_index_hill`: Individual functions to test for fat tails in distributions.

4. **Wavelet-Based Analysis**:

   - `wavelet_estimation`: Estimates the Hurst exponent using wavelet-based techniques. This is particularly useful for processes with fractal or self-similar properties.

   - `plot_wavelet_analysis`: Provides visual analysis of wavelet decomposition, allowing users to assess the variance of wavelet coefficients across scales.

5. **Visualization**:

   - The submodule includes multiple plotting functions such as `plot_ergodicity_results` and `qqplot_analysis`, which help visualize important statistical properties of time series data.

Applications:

This submodule is highly applicable in fields such as:

- **Financial Modeling**: Understanding stock prices, asset returns, and market dynamics through ergodicity and self-similarity testing.

- **Physics**: Analyzing systems governed by stochastic dynamics, such as diffusion processes.

- **Environmental Science**: Studying long-range dependence in environmental data (e.g., temperature, rainfall).

- **Machine Learning**: Utilizing statistical tools to evaluate the properties of learning algorithms that incorporate random processes.

The `evaluate` submodule serves as a powerful toolkit for researchers and practitioners who need to test key statistical properties of their models or real-world data. By providing a range of methods, it simplifies complex evaluations and enhances the understanding of stochastic processes.
"""

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.tsa.stattools import adfuller
from scipy.stats import linregress
import pywt
import scipy.stats as stats
from ergodicity.tools.helper import *
from ergodicity.tools.compute import relative_increments as ri

[docs] def test_ergodicity_increments(data, relative_increments=False): """ Test the ergodicity of a stochastic process based on its increments. :param data: Array of shape (num_time_steps, num_instances + 1) where the first row is time reprsenting the time steps and subsequent rows are instances of the process :type data: numpy.ndarray :param dt:vTime step size (1/number of steps in a time unit) :type dt: float :return: Dictionary containing test results and metrics :rtype: dict """ times, process_data = separate(data) if not relative_increments: increments = np.diff(process_data, axis=1) else: increments = ri(data) times, increments = separate(increments) num_instances, num_time_steps = process_data.shape dt = len(times) / (num_time_steps - 1) results = {} # 1. Test stationarity of increments (ADF test) adf_results = [adfuller(inc) for inc in increments] results['stationarity_p_values'] = [result[1] for result in adf_results] results['increments_stationary'] = np.mean(results['stationarity_p_values']) < 0.05 # 2. Calculate time averages if relative_increments: process_data = np.log(process_data) time_averages = (process_data[:, -1] - process_data[:, 0]) / (num_time_steps - 1) time_averages_per_unit = time_averages / dt # 3. Calculate ensemble averages of increments ensemble_averages = np.mean(increments, axis=0) ensemble_average_per_step = np.mean(ensemble_averages) ensemble_average_per_unit = ensemble_average_per_step / dt results['time_average_per_unit'] = np.mean(time_averages_per_unit) results['ensemble_average_per_unit'] = ensemble_average_per_unit # 4. Compare time and ensemble averages results['average_relative_difference'] = np.abs( results['time_average_per_unit'] - results['ensemble_average_per_unit']) / results['ensemble_average_per_unit'] # 5. Test normality of increments _, normality_p_value = stats.normaltest(increments.flatten()) results['increments_normality_p_value'] = normality_p_value return results
[docs] def plot_ergodicity_results(data, results): """ Plot visualizations to help assess ergodicity of increments. :param data: Array of shape (num_time_steps, num_instances + 1) where the first row is time and subsequent rows are instances of the process :type data: numpy.ndarray :param results: Dictionary containing test results from test_ergodicity_increments function :type results: dict :return: None :rtype: None """ times = data[0] process_data = data[1:] increments = np.diff(process_data, axis=1) fig, axs = plt.subplots(2, 2, figsize=(15, 15)) # Plot 1: Sample paths for i in range(min(5, process_data.shape[0])): axs[0, 0].plot(times, process_data[i], alpha=0.5) axs[0, 0].set_title("Sample Paths") axs[0, 0].set_xlabel("Time") axs[0, 0].set_ylabel("Value") # Plot 2: Increment distribution axs[0, 1].hist(increments.flatten(), bins=50, density=True, alpha=0.7) axs[0, 1].set_title(f"Increment Distribution\nNormality p-value: {results['increments_normality_p_value']:.3e}") axs[0, 1].set_xlabel("Increment Value") axs[0, 1].set_ylabel("Density") # Plot 3: QQ plot of increments stats.probplot(increments.flatten(), dist="norm", plot=axs[1, 0]) axs[1, 0].set_title("Q-Q Plot of Increments") # Plot 4: Autocorrelation of increments lag_max = min(100, increments.shape[1] - 1) acf = np.mean([np.correlate(inc, inc, mode='full')[lag_max:] for inc in increments], axis=0) acf /= acf[0] axs[1, 1].plot(acf[:lag_max]) axs[1, 1].set_title("Mean Autocorrelation of Increments") axs[1, 1].set_xlabel("Lag") axs[1, 1].set_ylabel("Autocorrelation") plt.tight_layout() plt.show()
# Example usage if __name__ == "__main__": # Generate some example data (replace this with your actual process data) num_instances = 100 num_time_steps = 1001 # Odd number to have 1000 increments dt = 0.01 # Example: Brownian motion (which has stationary, ergodic increments) times = np.linspace(0, (num_time_steps - 1) * dt, num_time_steps) increments = np.random.normal(0, np.sqrt(dt), (num_instances, num_time_steps - 1)) process_data = np.cumsum(increments, axis=1) process_data = np.insert(process_data, 0, 0, axis=1) # Start at 0 data = np.vstack((times, process_data)) # Test ergodicity results = test_ergodicity_increments(data, dt) # Print results print("Ergodicity Test Results:") for key, value in results.items(): if isinstance(value, float): print(f"{key}: {value:.6f}") else: print(f"{key}: {value}") # Plot results plot_ergodicity_results(data, results)
[docs] def detrended_fluctuation_analysis(time_series): """ Estimate the Hurst exponent using Detrended Fluctuation Analysis (DFA). :param time_series: The time series to analyze :type time_series: array_like :return: Estimated Hurst exponent :rtype: float """ N = len(time_series) # Cumulative sum of deviations from the mean Y = np.cumsum(time_series - np.mean(time_series)) # Define scales (window sizes) scales = np.floor(np.logspace(np.log10(10), np.log10(N // 4), num=20)).astype(int) scales = np.unique(scales) flucts = [] for scale in scales: if scale < 2: continue # Number of non-overlapping windows n_windows = N // scale if n_windows < 2: continue F_nu = [] for i in range(n_windows): segment = Y[i * scale:(i + 1) * scale] # Fit a polynomial of order 1 (linear detrending) coeffs = np.polyfit(range(scale), segment, 1) fit = np.polyval(coeffs, range(scale)) # Calculate the fluctuation (standard deviation) after detrending F_nu.append(np.sqrt(np.mean((segment - fit) ** 2))) # Average fluctuation over all segments of this scale F = np.mean(F_nu) flucts.append((scale, F)) if len(flucts) < 2: return np.nan scales, Fs = zip(*flucts) log_scales = np.log(scales) log_Fs = np.log(Fs) slope, intercept, r_value, p_value, std_err = linregress(log_scales, log_Fs) hurst = slope-1 return hurst
[docs] def test_self_similarity(time_series): """ Test the self-similarity of a given time series. :param time_series: The time series to analyze :type time_series: array_like :return: Dictionary containing the estimated Hurst exponents :rtype: dict """ hurst_dfa = detrended_fluctuation_analysis(time_series) results = { "Hurst Exponent (DFA)": hurst_dfa, } return results
# Example usage if __name__ == "__main__": # Generate a sample time series (fractional Brownian motion with H=0.7) from numpy.random import RandomState def fbm(n, H): rng = RandomState(0) t = np.arange(n) dt = 1 X = np.zeros(n) X[1:] = np.cumsum(rng.standard_normal(n - 1) * dt ** H) return X n_points = 10000 H_true = 0.7 time_series = fbm(n_points, H_true) # Test self-similarity results = test_self_similarity(time_series) print("Self-similarity test results:") for key, value in results.items(): print(f"{key}: {value:.4f}") print(f"\nTrue Hurst exponent: {H_true:.4f}") print(f"True alpha: {1 / H_true:.4f}") # Plot the time series plt.figure(figsize=(12, 6)) plt.plot(time_series) plt.title(f"Sample Time Series (Fractional Brownian Motion, H={H_true})") plt.xlabel("Time") plt.ylabel("Value") plt.show()
[docs] def wavelet_estimation(time_series, wavelet='db4', max_level=None): """ Estimate the Hurst exponent using wavelet-based method. :param time_series: The time series to analyze :type time_series: array_like :param wavelet: The wavelet to use for the analysis (default is 'db4') :type wavelet: str, optional :param max_level: The maximum decomposition level (default is None, which means it will be automatically determined) :type max_level: int, optional :return: Estimated Hurst exponent :rtype: float """ n = len(time_series) # Determine the maximum decomposition level if not provided if max_level is None: max_level = int(np.log2(n)) - 1 # Perform the wavelet transform coeffs = pywt.wavedec(time_series, wavelet, level=max_level) # Compute the variance of the wavelet coefficients at each level variances = [np.var(coeff) for coeff in coeffs[1:]] # Exclude the approximation coefficients # Compute the scales scales = [2 ** i for i in range(1, len(variances) + 1)] # Perform linear regression slope, intercept, r_value, p_value, std_err = linregress(np.log2(scales), np.log2(variances)) # The Hurst exponent is related to the slope H = -(slope + 1) / 2 return H
[docs] def test_self_similarity_wavelet(time_series, wavelet='db4', max_level=None): """ Test the self-similarity of a given time series using wavelet-based method. :param time_series: The time series to analyze :type time_series: array_like :param wavelet: The wavelet to use for the analysis (default is 'db4') :type wavelet: str, optional :param max_level: The maximum decomposition level (default is None, which means it will be automatically determined) :type max_level: int, optional :return: Dictionary containing the estimated Hurst exponent and scaling parameter :rtype: dict """ H = wavelet_estimation(time_series, wavelet, max_level) results = { "Hurst Exponent (Wavelet)": H, "Estimated Alpha (Wavelet)": 1 / H if H > 0 else np.nan } return results
[docs] def plot_wavelet_analysis(time_series, wavelet='db4', max_level=None): """ Perform wavelet analysis and plot the results. :param time_series: The time series to analyze :type time_series: array_like :param wavelet: The wavelet to use for the analysis (default is 'db4') :type wavelet: str, optional :param max_level: The maximum decomposition level (default is None, which means it will be automatically determined) :type max_level: int, optional :return: None :rtype: None """ n = len(time_series) if max_level is None: max_level = int(np.log2(n)) - 1 coeffs = pywt.wavedec(time_series, wavelet, level=max_level) variances = [np.var(coeff) for coeff in coeffs[1:]] scales = [2 ** i for i in range(1, len(variances) + 1)] slope, intercept, r_value, p_value, std_err = linregress(np.log2(scales), np.log2(variances)) H = -(slope + 1) / 2 plt.figure(figsize=(12, 6)) plt.loglog(scales, variances, 'bo-') plt.loglog(scales, [2 ** (intercept + slope * np.log2(scale)) for scale in scales], 'r--') plt.title(f"Wavelet Variance Plot (Estimated H = {H:.4f})") plt.xlabel("Scale") plt.ylabel("Variance of Wavelet Coefficients") plt.legend(['Data', 'Fitted Line']) plt.grid(True) plt.show()
# Example usage if __name__ == "__main__": # Generate a sample time series (fractional Brownian motion with H=0.7) from numpy.random import RandomState def fbm(n, H): rng = RandomState(0) t = np.arange(n) dt = 1 X = np.zeros(n) X[1:] = np.cumsum(rng.standard_normal(n - 1) * dt ** H) return X n_points = 2 ** 14 # Use a power of 2 for efficiency in wavelet transform H_true = 0.7 time_series = fbm(n_points, H_true) # Test self-similarity using wavelet method results = test_self_similarity_wavelet(time_series) print("Wavelet-based self-similarity test results:") for key, value in results.items(): print(f"{key}: {value:.4f}") print(f"\nTrue Hurst exponent: {H_true:.4f}") print(f"True alpha: {1 / H_true:.4f}") # Plot wavelet analysis plot_wavelet_analysis(time_series) # Plot the time series plt.figure(figsize=(12, 6)) plt.plot(time_series) plt.title(f"Sample Time Series (Fractional Brownian Motion, H={H_true})") plt.xlabel("Time") plt.ylabel("Value") plt.show()
[docs] def kurtosis_test(data): """ Test for fat-tailedness using excess kurtosis. :param data: The data to analyze :type data: array_like :return: Dictionary containing the kurtosis and its interpretation :rtype: dict """ kurt = stats.kurtosis(data) result = { "Excess Kurtosis": kurt, "Interpretation": "Fat-tailed" if kurt > 0 else "Not fat-tailed" } return result
[docs] def jarque_bera_test(data): """ Perform Jarque-Bera test for normality. :param data: The data to analyze :type data: array_like :return: Dictionary containing the test statistic, p-value, and interpretation :rtype: dict """ statistic, p_value = stats.jarque_bera(data) result = { "JB Statistic": statistic, "p-value": p_value, "Interpretation": "Likely fat-tailed" if p_value < 0.05 else "Not enough evidence for fat-tailedness" } return result
[docs] def tail_index_hill(data, tail_fraction=0.1): """ Estimate the tail index using Hill estimator. :param data: The data to analyze :type data: array_like :param tail_fraction: Fraction of the data to consider as the tail :type tail_fraction: float :return: Dictionary containing the tail index and its interpretation :rtype: dict """ sorted_data = np.sort(np.abs(data)) n = len(sorted_data) k = int(n * tail_fraction) tail_index = 1 / np.mean(np.log(sorted_data[-k:]) - np.log(sorted_data[-k])) result = { "Tail Index": tail_index, "Interpretation": "Fat-tailed" if tail_index < 3 else "Not fat-tailed" } return result
[docs] def qqplot_analysis(data): """ Perform Q-Q plot analysis against normal distribution. :param data: The data to analyze :type data: array_like :return: None (displays the Q-Q plot) :rtype: None """ fig, ax = plt.subplots(figsize=(10, 6)) stats.probplot(data, dist="norm", plot=ax) ax.set_title("Q-Q Plot against Normal Distribution") plt.show()
[docs] def test_fat_tailedness(data): """ Perform a comprehensive test for fat-tailedness. :param data: The data to analyze :type data: array_like :return: Dictionary containing results from various tests :rtype: dict """ results = {} results["Kurtosis Test"] = kurtosis_test(data) results["Jarque-Bera Test"] = jarque_bera_test(data) results["Tail Index (Hill Estimator)"] = tail_index_hill(data) print("Fat-tailedness Test Results:") for test_name, test_results in results.items(): print(f"\n{test_name}:") for key, value in test_results.items(): if isinstance(value, float): print(f" {key}: {value:.4f}") else: print(f" {key}: {value}") print("\nGenerating Q-Q Plot...") qqplot_analysis(data) return results
# Example usage if __name__ == "__main__": # Generate sample data from a fat-tailed distribution (Student's t with 3 degrees of freedom) np.random.seed(0) fat_tailed_data = stats.t.rvs(df=3, size=10000) # Test for fat-tailedness results = test_fat_tailedness(fat_tailed_data) # Generate and test normal distribution for comparison normal_data = np.random.normal(size=10000) print("\n\nFor comparison, results for normal distribution:") _ = test_fat_tailedness(normal_data)