基于Python 3.9版本演示
一、写在前面
上一节,我们学了EMD&LSTM-ARIMA组合策略去做预测。
从这一节开始,我们做一些魔改。
二、EMD&RF-ARIMA组合策略
该组合策略主要是将传统的经验模态分解(EMD)方法和现代的机器学习技术(RF 和 ARIMA 模型)相结合,用于增强时序数据的预测能力。下面是这个策略的具体描述:
(1)经验模态分解 (EMD):
1)首先,使用 EMD 方法处理原始时序数据,将其分解为多个内模函数(IMF)和一个剩余信号。这一步骤的目的是提取数据中的不同频率成分,每个 IMF 代表原始信号的不同频率层次,而剩余信号包含了趋势信息。
2)EMD 是一种自适应方法,适用于非线性和非平稳时间序列数据分析,可以揭示隐藏在复杂数据集中的简单结构和成分。
(2)RF 和 ARIMA 模型的应用:
将不同的 IMF 成分分配给不同的预测模型:选定的IMF由 RF 模型处理,通常选择那些更具高频和复杂动态的成分;而趋势性较强的成分(包括剩余信号)则交由 ARIMA 模型进行分析。
三、EMD&RF-ARIMA组合策略代码Pyhton实现
下面,我使用的是之前分享过的肺结核的数据做演示:
Pyhon代码:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error# 读取数据
file_path = 'pone.0277314.s006.xlsx'
data = pd.read_excel(file_path)# 提取时间和PTB病例数
time_series = data['Time']
ptb_cases = data['PTB cases']# 将时间转换为数值形式
time_numeric = np.arange(len(time_series))def get_envelope_mean(signal):"""计算信号的上包络线和下包络线的均值"""maxima = np.where(np.r_[True, signal[1:] > signal[:-1]] & np.r_[signal[:-1] > signal[1:], True])[0]minima = np.where(np.r_[True, signal[1:] < signal[:-1]] & np.r_[signal[:-1] < signal[1:], True])[0]if len(maxima) < 2 or len(minima) < 2:return np.zeros_like(signal)upper_env = CubicSpline(maxima, signal[maxima])(time_numeric)lower_env = CubicSpline(minima, signal[minima])(time_numeric)return (upper_env + lower_env) / 2def sift(signal, max_iter=1000, tol=1e-6):"""对信号进行sifting操作,提取IMF"""h = signalfor _ in range(max_iter):m = get_envelope_mean(h)h1 = h - mif np.mean(np.abs(h - h1)) < tol:breakh = h1return hdef emd(signal, max_imfs=6):"""进行EMD分解"""residual = signalimfs = []for _ in range(max_imfs):imf = sift(residual)imfs.append(imf)residual = residual - imfif np.all(np.abs(residual) < 1e-6):breakreturn np.array(imfs), residual# 执行EMD分解
imfs, residual = emd(ptb_cases.values)# 随机森林训练函数
def train_rf_model(train_data, n_steps):X = []y = []for i in range(n_steps, len(train_data)):X.append(train_data[i-n_steps:i])y.append(train_data[i])model = RandomForestRegressor(n_estimators=100)model.fit(X, y)return model# ARIMA模型训练函数
def arima_model(train_data, order):model = ARIMA(train_data, order=order)model_fit = model.fit()return model_fitn_steps = 10
rf_indices = [0, 1, 2] # 使用RF模型的IMFs索引
arima_indices = [i for i in range(len(imfs)) if i not in rf_indices] # 使用ARIMA模型的IMFs索引predictions = np.zeros(len(time_numeric))# RF预测
for idx in rf_indices:print(f'Training RF for IMF {idx+1}')train_data = imfs[idx].flatten()model = train_rf_model(train_data, n_steps)predictions_rf = model.predict(np.array([train_data[i-n_steps:i] for i in range(n_steps, len(train_data))]))predictions[n_steps:len(train_data)] += predictions_rfprint(f'RF predictions for IMF {idx+1} completed')# ARIMA预测
for idx in arima_indices:print(f'Training ARIMA for IMF {idx+1}')train_data = imfs[idx]model_fit = arima_model(train_data, order=(5, 1, 0))arima_predictions = model_fit.predict(start=0, end=len(train_data) - 1)predictions[:len(arima_predictions)] += arima_predictionsprint(f'ARIMA predictions for IMF {idx+1} completed')# 计算误差
mae = mean_absolute_error(ptb_cases, predictions)
mse = mean_squared_error(ptb_cases, predictions)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((ptb_cases - predictions) / ptb_cases)) * 100# 打印误差
print(f'MAE: {mae}')
print(f'MSE: {mse}')
print(f'RMSE: {rmse}')
print(f'MAPE: {mape}')# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(time_numeric, ptb_cases, label='Original Data')
plt.plot(time_numeric, predictions, label='Predicted Data')
plt.legend()
plt.show()
输出:
跟原图对比:
跟上次的一样,还是整体向下偏移了一波。让GPT帮忙优化一下算法:
添加校准层:有时模型的输出需要进行校准才能与实际数据对齐。可以考虑在模型输出后添加一个线性层或非线性校准过程,以修正系统性的偏差。
五、优化
采用一种简单的线性回归方法,以调整预测结果,减少偏差,使用LinearRegression模型来校准预测结果:
代码为:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error# 读取数据
file_path = 'pone.0277314.s006.xlsx'
data = pd.read_excel(file_path)# 提取时间和PTB病例数
time_series = data['Time']
ptb_cases = data['PTB cases']# 将时间转换为数值形式
time_numeric = np.arange(len(time_series))def get_envelope_mean(signal):"""计算信号的上包络线和下包络线的均值"""maxima = np.where(np.r_[True, signal[1:] > signal[:-1]] & np.r_[signal[:-1] > signal[1:], True])[0]minima = np.where(np.r_[True, signal[1:] < signal[:-1]] & np.r_[signal[:-1] < signal[1:], True])[0]if len(maxima) < 2 or len(minima) < 2:return np.zeros_like(signal)upper_env = CubicSpline(maxima, signal[maxima])(time_numeric)lower_env = CubicSpline(minima, signal[minima])(time_numeric)return (upper_env + lower_env) / 2def sift(signal, max_iter=1000, tol=1e-6):"""对信号进行sifting操作,提取IMF"""h = signalfor _ in range(max_iter):m = get_envelope_mean(h)h1 = h - mif np.mean(np.abs(h - h1)) < tol:breakh = h1return hdef emd(signal, max_imfs=6):"""进行EMD分解"""residual = signalimfs = []for _ in range(max_imfs):imf = sift(residual)imfs.append(imf)residual = residual - imfif np.all(np.abs(residual) < 1e-6):breakreturn np.array(imfs), residual# 执行EMD分解
imfs, residual = emd(ptb_cases.values)# 训练和预测函数
def train_predict_models(imfs, ptb_cases, n_steps):rf_indices = [0, 1, 2]arima_indices = [i for i in range(len(imfs)) if i not in rf_indices]predictions = np.zeros(len(ptb_cases))for idx in rf_indices:train_data = imfs[idx].flatten()X_rf = [train_data[i-n_steps:i] for i in range(n_steps, len(train_data))]y_rf = train_data[n_steps:]model_rf = RandomForestRegressor(n_estimators=100)model_rf.fit(X_rf, y_rf)predictions_rf = model_rf.predict(X_rf)predictions[n_steps:len(predictions_rf) + n_steps] += predictions_rffor idx in arima_indices:train_data = imfs[idx]model_arima = ARIMA(train_data, order=(5, 1, 0))model_fit = model_arima.fit()predictions_arima = model_fit.predict(start=0, end=len(train_data) - 1)predictions[:len(predictions_arima)] += predictions_arimareturn predictions# 初始预测
initial_predictions = train_predict_models(imfs, ptb_cases.values, n_steps=10)# 校准
calibrator = LinearRegression()
calibrator.fit(initial_predictions.reshape(-1, 1), ptb_cases.values.reshape(-1, 1))
calibrated_predictions = calibrator.predict(initial_predictions.reshape(-1, 1)).flatten()# 计算误差
mae = mean_absolute_error(ptb_cases, calibrated_predictions)
mse = mean_squared_error(ptb_cases, calibrated_predictions)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((ptb_cases - calibrated_predictions) / ptb_cases)) * 100# 打印误差
print(f'MAE: {mae}')
print(f'MSE: {mse}')
print(f'RMSE: {rmse}')
print(f'MAPE: {mape}')# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(time_numeric, ptb_cases, label='Original Data')
plt.plot(time_numeric, calibrated_predictions, label='Calibrated Predicted Data')
plt.legend()
plt.show()
看看结果:
似乎还不错呢。
六、最后
下一期,我们来测试一下其他矫正方法。