量化 时间序列预测 视频 以腾讯股票为例

时间序列预测-上

时间序列预测-下

复现

抄了2个小时,复现了和视频中一样的结果,非常棒,作者是香港的物理學碩士、資訊科技及人工智能建築師

视频一些精彩的评论

价格走势的高波动性和信息不完全的特点决定了任何一款现有AI模型的最高实际成功率(可不是测试成功率哦)大概只有45%-55%之间,相当于瞎猜。当预测正确时,AI可以知道方向但是不可能知道涨跌幅度,如果不知道涨跌幅度,AI就很难在合适位置平仓,如果很难在合适位置平仓也同样说明很难在合适位置开仓。所以你只能靠自己对市场的理解从0开发AI模型。

预测股价的随机游走模型效果差不多,而单步模型误差不会差很多很容易误导观众你有点出这点,可以去做一些多步,多输出预测。

我的毕业专题就是做这个 预测收盘价根本没有意义 模型会倾向于选择误差最小值 也就是直接算一个接近昨日价格的数值 再者即便准那买卖策略该如何制定 在我们研究中如果模型(LSTM)预测隔天收盘价高于今天收盘价 开盘价有很高的几率会高于或等于收盘价 这样子的情况下交易次数少的可怜 且若是出现交易机会当日开盘买入收盘就要卖,频繁的交易手续费也会磨损你的资金。

原始代码

需要tensorflow的环境,最好安装Anaconda来建立虚拟环境

建立模型

import yfinance as yf
import numpy as np
import pandas as pd

SYMBOL = "0700.HK"
HISTORY = "10y"

all_day_k = yf.Ticker(SYMBOL).history(period=HISTORY, interval="1d")
# 删除没有意义的列
if "Dividends" in all_day_k.values:
    all_day_k = all_day_k.drop(columns=["Dividends"])
if "Adj close" in all_day_k.values:
    all_day_k = all_day_k.drop(columns=["Adj close"])
if "Stock Splits" in all_day_k.values:
    all_day_k = all_day_k.drop(columns=["Stock Splits"])

# 删除最后一行,因为可能不完整
all_day_k = all_day_k[:-1]

# 开始制作模型
PAST_WIN_LEN = 100
CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")

x, y = [], []
for today_i in range(len(all_day_k)):
    day_k_past = all_day_k[: today_i + 1]
    day_k_forward = all_day_k[today_i + 1 :]
    if len(day_k_past) < PAST_WIN_LEN or len(day_k_forward) < 1:
        continue
    day_k_past_win = day_k_past[-PAST_WIN_LEN:]
    day_k_forward_win = day_k_forward[:1]

    # find label
    today_price = day_k_past_win.iloc[-1]["Close"]
    tomorrow_price = day_k_forward_win.iloc[0]["Close"]
    label = LABEL_BULL if tomorrow_price > today_price else LABEL_BEAR

    # store
    x.append(day_k_past_win.values)
    y.append(label)

x, y = np.array(x), np.array(y)

TRAIN_SPLIT, VAL_SPLIT, TEST_SPLIT = 0.7, 0.2, 0.1

# Take the last portion to be the test dataset
test_split_index = -round(len(x) * TEST_SPLIT)
x_other, x_test = np.split(x, [test_split_index])
y_other, y_test = np.split(y, [test_split_index])

# shuffle the remaining portion and split into training and validation datasets
train_split_index = round(len(x) * TRAIN_SPLIT)
indexes = np.arange(len(x_other))
np.random.shuffle(indexes)
train_indexes, val_indexes = np.split(indexes, [train_split_index])
x_train, x_val = x_other[train_indexes], x_other[val_indexes]
y_train, y_val = y_other[train_indexes], y_other[val_indexes]

# show label distribution
label_distribution = pd.DataFrame(
    [
        {
            "Dataset": "train",
            "Bull": np.count_nonzero(y_train==LABEL_BULL),
            "Bear": np.count_nonzero(y_train==LABEL_BEAR),
        },
        {
            "Dataset": "val",
            "Bull": np.count_nonzero(y_val==LABEL_BULL),
            "Bear": np.count_nonzero(y_val==LABEL_BEAR),
        },
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test==LABEL_BULL),
            "Bear": np.count_nonzero(y_test==LABEL_BEAR),
        },
    ]
)

# Balance labels of test dataset
# 因为测试数据集的2个长度必须相等
x_test_bull = x_test[y_test == LABEL_BULL]
x_test_bear = x_test[y_test == LABEL_BEAR]

min_n_labels = min(len(x_test_bull), len(x_test_bear))

x_test_bull = x_test_bull[
    np.random.choice(len(x_test_bull), min_n_labels, replace=False), :
]
x_test_bear = x_test_bear[
    np.random.choice(len(x_test_bear), min_n_labels, replace=False), :
]
x_test = np.vstack([x_test_bull, x_test_bear])

y_test = np.array([LABEL_BULL] * min_n_labels + [LABEL_BEAR] * min_n_labels)

# Test dataset label distribution
pd.DataFrame(
    [
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test == LABEL_BULL),
            "Bear": np.count_nonzero(y_test == LABEL_BEAR),
        }
    ]
)

# 将三个数据集存储起来
np.savez(
    "datasets.npz",
    x_train=x_train,
    y_train=y_train,
    x_val=x_val,
    y_val=y_val,
    x_test=x_test,
    y_test=y_test,
)


回测

import numpy as np
import pandas as pd
import os
import keras
from keras.utils import to_categorical
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow import math

CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")



datasets = np.load("datasets.npz")
x_train, y_train = datasets["x_train"], datasets["y_train"]
x_val, y_val = datasets["x_val"], datasets["y_val"]
x_test, y_test = datasets["x_test"], datasets["y_test"]

# label distribution
label_distribution = pd.DataFrame(
    [
        {
            "Dataset": "train",
            "Bull": np.count_nonzero(y_train == LABEL_BULL),
            "Bear": np.count_nonzero(y_train == LABEL_BEAR),
        },
        {
            "Dataset": "val",
            "Bull": np.count_nonzero(y_val == LABEL_BULL),
            "Bear": np.count_nonzero(y_val == LABEL_BEAR),
        },
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test == LABEL_BULL),
            "Bear": np.count_nonzero(y_test == LABEL_BEAR),
        },
    ]
)



model = keras.models.load_model('best_model.keras')

model.evaluate(x_test,to_categorical(y_test))



y_pred_prob = model.predict(x_test)
y_pred = math.argmax(y_pred_prob,axis=-1)
cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES)).numpy()


plt.clf()
plt.figure(figsize=(5,4))
sns.heatmap(cm,xticklabels=CLASSES,yticklabels=CLASSES,annot=True,fmt='g')
plt.xlabel("Prediction")
plt.ylabel("Label")
# plt.show()


n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL] 
n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]
bull_accuracy = n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0

n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR] 
n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]
bear_accuracy = n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0

n_total_pred =n_bull_pred + n_bear_pred
n_total_true_pos =n_bull_true_pos + n_bear_true_pos
total_accuracy = n_total_true_pos / n_total_pred if n_total_pred > 0 else 0

mypd = pd.DataFrame([{
    "Prediction":"Bull","Accuracy":bull_accuracy
},
{
    "Prediction":"Bear","Accuracy":bear_accuracy
},{
    "Prediction":"Total","Accuracy":total_accuracy
}
])
print(mypd)

CLASSES_EXT = CLASSES + ['Sideways']
LABEL_SIDEWAYS = CLASSES_EXT.index('Sideways')


y_pred_prob = model.predict(x_test)
# y_pred = math.argmax(y_pred_prob,axis=-1)
results = []

for prob_theas in np.arange(0.5,0.71,0.02):
    y_pred = []
    for row in y_pred_prob:
        if row[LABEL_BULL] > prob_theas:
            y_pred.append(LABEL_BULL)
        elif row[LABEL_BEAR] > prob_theas:
            y_pred.append(LABEL_BEAR)
        else:
            y_pred.append(LABEL_SIDEWAYS)
    # print(1)
    cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES_EXT)).numpy()

    n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL] 
    n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]

    n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR] 
    n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]

    n_total_pred =n_bull_pred + n_bear_pred
    n_total_true_pos =n_bull_true_pos + n_bear_true_pos

    results.append({"prob_thres":prob_theas,
                    "n_bull_pred":n_bull_pred,
                    "n_bear_pred":n_bear_pred,
                    "n_total_pred":n_total_pred,
                    "bull_acc":n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0,
                    "bear_acc":n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0,
                    "total_acc":n_total_true_pos / n_total_pred if n_total_pred > 0 else 0
                    })

reuslts = pd.DataFrame(results)

plt.style.use('seaborn-v0_8')
fig, axes = plt.subplots(2, 1, figsize=(16, 7))

axes[0].set_title("Accuracy")
axes[0].plot(reuslts['prob_thres'],reuslts['bull_acc'],'-o', label="Bull")
axes[0].plot(reuslts['prob_thres'],reuslts['bear_acc'],'-o', label="Bear")
axes[0].plot(reuslts['prob_thres'],reuslts['total_acc'],'-o', label="Total")
axes[0].legend()

axes[1].set_title("Number of Cases")
axes[1].set_xlabel("Probability Threshold")
axes[1].plot(reuslts["prob_thres"],reuslts['n_bull_pred'],'-o', label="Bull")
axes[1].plot(reuslts["prob_thres"],reuslts['n_bear_pred'],'-o', label="Bear")
axes[1].plot(reuslts["prob_thres"],reuslts['n_total_pred'],'-o', label="Total")
axes[1].legend()

plt.show()

使用deepseek优化后的代码

建立模型



import os
import sqlite3
import numpy as np
import pandas as pd

coin_short_name = 'doge'
time_bar='1d'
coin_name = f'{coin_short_name}_usdt_swap'

current_folder_path  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(current_folder_path,'db','coin','1d',f'{coin_name}.db')
dataset_path = os.path.join(current_folder_path,'db',f'{coin_short_name}_{time_bar}_datasets.npz')
# 数据库连接和查询
conn = sqlite3.connect(db_path)
query = f"""
SELECT ts, open, close, heigh, low, vol
FROM {coin_name}
ORDER BY ts;
"""
shib_data = pd.read_sql_query(query, conn)

# 数据预处理
# 假设shib_data是DataFrame,并且已经按时间排序
shib_data['Close'] = shib_data['close']  # 重命名列以匹配原始算法

# 特征工程和标签生成
PAST_WIN_LEN = 100
CLASSES = ["Bull", "Bear"]
LABEL_BULL = 1
LABEL_BEAR = 0

x, y = [], []
for today_i in range(len(shib_data) - PAST_WIN_LEN):
    day_k_past_win = shib_data.iloc[today_i:today_i + PAST_WIN_LEN]
    today_price = day_k_past_win.iloc[-1]['Close']
    tomorrow_price = shib_data.iloc[today_i + PAST_WIN_LEN]['Close']
    label = LABEL_BULL if tomorrow_price > today_price else LABEL_BEAR

    x.append(day_k_past_win[['open', 'close', 'heigh', 'low', 'vol']].values)
    y.append(label)

# 将x和y转换为numpy数组
x, y = np.array(x), np.array(y)

TRAIN_SPLIT, VAL_SPLIT, TEST_SPLIT = 0.7, 0.2, 0.1

# Take the last portion to be the test dataset
test_split_index = -round(len(x) * TEST_SPLIT)
x_other, x_test = np.split(x, [test_split_index])
y_other, y_test = np.split(y, [test_split_index])

# shuffle the remaining portion and split into training and validation datasets
train_split_index = round(len(x) * TRAIN_SPLIT)
indexes = np.arange(len(x_other))
np.random.shuffle(indexes)
train_indexes, val_indexes = np.split(indexes, [train_split_index])
x_train, x_val = x_other[train_indexes], x_other[val_indexes]
y_train, y_val = y_other[train_indexes], y_other[val_indexes]

# show label distribution
label_distribution = pd.DataFrame(
    [
        {
            "Dataset": "train",
            "Bull": np.count_nonzero(y_train==LABEL_BULL),
            "Bear": np.count_nonzero(y_train==LABEL_BEAR),
        },
        {
            "Dataset": "val",
            "Bull": np.count_nonzero(y_val==LABEL_BULL),
            "Bear": np.count_nonzero(y_val==LABEL_BEAR),
        },
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test==LABEL_BULL),
            "Bear": np.count_nonzero(y_test==LABEL_BEAR),
        },
    ]
)

# Balance labels of test dataset
# 因为测试数据集的2个长度必须相等
x_test_bull = x_test[y_test == LABEL_BULL]
x_test_bear = x_test[y_test == LABEL_BEAR]

min_n_labels = min(len(x_test_bull), len(x_test_bear))

x_test_bull = x_test_bull[
    np.random.choice(len(x_test_bull), min_n_labels, replace=False), :
]
x_test_bear = x_test_bear[
    np.random.choice(len(x_test_bear), min_n_labels, replace=False), :
]
x_test = np.vstack([x_test_bull, x_test_bear])

y_test = np.array([LABEL_BULL] * min_n_labels + [LABEL_BEAR] * min_n_labels)

# Test dataset label distribution
pd.DataFrame(
    [
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test == LABEL_BULL),
            "Bear": np.count_nonzero(y_test == LABEL_BEAR),
        }
    ]
)

# 将三个数据集存储起来
print("Saving datasets...")

np.savez(
    dataset_path,
    x_train=x_train,
    y_train=y_train,
    x_val=x_val,
    y_val=y_val,
    x_test=x_test,
    y_test=y_test,
)




回测

import numpy as np
import pandas as pd
import os
import keras
from keras.utils import to_categorical
import matplotlib.pyplot as plt
from tensorflow import math


coin_name = 'doge'
time_bar='1d'
current_folder_path  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# db_path = os.path.join(current_folder_path,'db','coin','5m','shib_usdt_swap.db')
dataset_path = os.path.join(current_folder_path,'db',f'{coin_name}_{time_bar}_datasets.npz')
keras_path = os.path.join(current_folder_path,'db',f'{coin_name}_{time_bar}_best_model.keras')

CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")



datasets = np.load(dataset_path)
x_train, y_train = datasets["x_train"], datasets["y_train"]
x_val, y_val = datasets["x_val"], datasets["y_val"]
x_test, y_test = datasets["x_test"], datasets["y_test"]

# label distribution
label_distribution = pd.DataFrame(
    [
        {
            "Dataset": "train",
            "Bull": np.count_nonzero(y_train == LABEL_BULL),
            "Bear": np.count_nonzero(y_train == LABEL_BEAR),
        },
        {
            "Dataset": "val",
            "Bull": np.count_nonzero(y_val == LABEL_BULL),
            "Bear": np.count_nonzero(y_val == LABEL_BEAR),
        },
        {
            "Dataset": "test",
            "Bull": np.count_nonzero(y_test == LABEL_BULL),
            "Bear": np.count_nonzero(y_test == LABEL_BEAR),
        },
    ]
)

model = keras.models.load_model(keras_path)

model.evaluate(x_test,to_categorical(y_test))

# 多加的条件
CLASSES_EXT = CLASSES + ['Sideways']
LABEL_SIDEWAYS = CLASSES_EXT.index("Sideways")

y_pred_prob = model.predict(x_test)

results = []

for prob_thres in np.arange(0.5,0.71,0.02):
    y_pred = []
    for row in y_pred_prob:
        if row[LABEL_BULL] > prob_thres:
            y_pred.append(LABEL_BULL)
        elif row[LABEL_BEAR] > prob_thres:
            y_pred.append(LABEL_BEAR)
        else:
            y_pred.append(LABEL_SIDEWAYS)
    cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES_EXT)).numpy()
    n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL]
    n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]
    # bull_accuracy = n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0
    n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR]
    n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]

    # bear_accuracy = n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0
   
    n_total_true_pos =n_bull_true_pos + n_bear_true_pos
    n_total_pred =n_bull_pred + n_bear_pred
    # total_accuracy = n_total_true_pos / n_total_pred if n_total_pred > 0 else 0
    results.append({
        "prob_thres":prob_thres,
        "n_bull_pred":n_bull_pred,
        "n_bear_pred":n_bear_pred,
        "n_total_pred":n_total_pred,
        "bull_acc": n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0,
        "bear_acc":n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0,
        "total_acc":n_total_true_pos / n_total_pred if n_total_true_pos > 0 else 0,
    })

results = pd.DataFrame(results)

plt.style.use('seaborn-v0_8')
fig,axes = plt.subplots(2,1,figsize=(16,7))

axes[0].set_title("Accuracy")
axes[0].plot(results['prob_thres'],results['bull_acc'],"-o",label="Bull")
axes[0].plot(results['prob_thres'],results['bear_acc'],"-o",label="Bear")
axes[0].plot(results['prob_thres'],results['total_acc'],"-o",label="Total")
axes[0].legend()

axes[1].set_title("Number of Cases")
axes[1].set_xlabel("Probability Threshold")
axes[1].plot(results['prob_thres'],results['n_bull_pred'],"-o",label="Bull")
axes[1].plot(results['prob_thres'],results['n_bear_pred'],"-o",label="Bear")
axes[1].plot(results['prob_thres'],results['n_total_pred'],"-o",label="Total")
axes[1].legend()

plt.show()








作者:spike

分类: 量化交易

创作时间:2024-03-14

更新时间:2024-12-09

联系方式放在中括号之中例如[[email protected]],回复评论在开头加上标号例如:#1