复现
抄了2个小时,复现了和视频中一样的结果,非常棒,作者是香港的物理學碩士、資訊科技及人工智能建築師
视频一些精彩的评论
价格走势的高波动性和信息不完全的特点决定了任何一款现有AI模型的最高实际成功率(可不是测试成功率哦)大概只有45%-55%之间,相当于瞎猜。当预测正确时,AI可以知道方向但是不可能知道涨跌幅度,如果不知道涨跌幅度,AI就很难在合适位置平仓,如果很难在合适位置平仓也同样说明很难在合适位置开仓。所以你只能靠自己对市场的理解从0开发AI模型。
预测股价的随机游走模型效果差不多,而单步模型误差不会差很多很容易误导观众你有点出这点,可以去做一些多步,多输出预测。
我的毕业专题就是做这个 预测收盘价根本没有意义 模型会倾向于选择误差最小值 也就是直接算一个接近昨日价格的数值 再者即便准那买卖策略该如何制定 在我们研究中如果模型(LSTM)预测隔天收盘价高于今天收盘价 开盘价有很高的几率会高于或等于收盘价 这样子的情况下交易次数少的可怜 且若是出现交易机会当日开盘买入收盘就要卖,频繁的交易手续费也会磨损你的资金。
原始代码
需要tensorflow的环境,最好安装Anaconda来建立虚拟环境
建立模型
import yfinance as yf
import numpy as np
import pandas as pd
SYMBOL = "0700.HK"
HISTORY = "10y"
all_day_k = yf.Ticker(SYMBOL).history(period=HISTORY, interval="1d")
# 删除没有意义的列
if "Dividends" in all_day_k.values:
all_day_k = all_day_k.drop(columns=["Dividends"])
if "Adj close" in all_day_k.values:
all_day_k = all_day_k.drop(columns=["Adj close"])
if "Stock Splits" in all_day_k.values:
all_day_k = all_day_k.drop(columns=["Stock Splits"])
# 删除最后一行,因为可能不完整
all_day_k = all_day_k[:-1]
# 开始制作模型
PAST_WIN_LEN = 100
CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")
x, y = [], []
for today_i in range(len(all_day_k)):
day_k_past = all_day_k[: today_i + 1]
day_k_forward = all_day_k[today_i + 1 :]
if len(day_k_past) < PAST_WIN_LEN or len(day_k_forward) < 1:
continue
day_k_past_win = day_k_past[-PAST_WIN_LEN:]
day_k_forward_win = day_k_forward[:1]
# find label
today_price = day_k_past_win.iloc[-1]["Close"]
tomorrow_price = day_k_forward_win.iloc[0]["Close"]
label = LABEL_BULL if tomorrow_price > today_price else LABEL_BEAR
# store
x.append(day_k_past_win.values)
y.append(label)
x, y = np.array(x), np.array(y)
TRAIN_SPLIT, VAL_SPLIT, TEST_SPLIT = 0.7, 0.2, 0.1
# Take the last portion to be the test dataset
test_split_index = -round(len(x) * TEST_SPLIT)
x_other, x_test = np.split(x, [test_split_index])
y_other, y_test = np.split(y, [test_split_index])
# shuffle the remaining portion and split into training and validation datasets
train_split_index = round(len(x) * TRAIN_SPLIT)
indexes = np.arange(len(x_other))
np.random.shuffle(indexes)
train_indexes, val_indexes = np.split(indexes, [train_split_index])
x_train, x_val = x_other[train_indexes], x_other[val_indexes]
y_train, y_val = y_other[train_indexes], y_other[val_indexes]
# show label distribution
label_distribution = pd.DataFrame(
[
{
"Dataset": "train",
"Bull": np.count_nonzero(y_train==LABEL_BULL),
"Bear": np.count_nonzero(y_train==LABEL_BEAR),
},
{
"Dataset": "val",
"Bull": np.count_nonzero(y_val==LABEL_BULL),
"Bear": np.count_nonzero(y_val==LABEL_BEAR),
},
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test==LABEL_BULL),
"Bear": np.count_nonzero(y_test==LABEL_BEAR),
},
]
)
# Balance labels of test dataset
# 因为测试数据集的2个长度必须相等
x_test_bull = x_test[y_test == LABEL_BULL]
x_test_bear = x_test[y_test == LABEL_BEAR]
min_n_labels = min(len(x_test_bull), len(x_test_bear))
x_test_bull = x_test_bull[
np.random.choice(len(x_test_bull), min_n_labels, replace=False), :
]
x_test_bear = x_test_bear[
np.random.choice(len(x_test_bear), min_n_labels, replace=False), :
]
x_test = np.vstack([x_test_bull, x_test_bear])
y_test = np.array([LABEL_BULL] * min_n_labels + [LABEL_BEAR] * min_n_labels)
# Test dataset label distribution
pd.DataFrame(
[
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test == LABEL_BULL),
"Bear": np.count_nonzero(y_test == LABEL_BEAR),
}
]
)
# 将三个数据集存储起来
np.savez(
"datasets.npz",
x_train=x_train,
y_train=y_train,
x_val=x_val,
y_val=y_val,
x_test=x_test,
y_test=y_test,
)
回测
import numpy as np
import pandas as pd
import os
import keras
from keras.utils import to_categorical
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow import math
CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")
datasets = np.load("datasets.npz")
x_train, y_train = datasets["x_train"], datasets["y_train"]
x_val, y_val = datasets["x_val"], datasets["y_val"]
x_test, y_test = datasets["x_test"], datasets["y_test"]
# label distribution
label_distribution = pd.DataFrame(
[
{
"Dataset": "train",
"Bull": np.count_nonzero(y_train == LABEL_BULL),
"Bear": np.count_nonzero(y_train == LABEL_BEAR),
},
{
"Dataset": "val",
"Bull": np.count_nonzero(y_val == LABEL_BULL),
"Bear": np.count_nonzero(y_val == LABEL_BEAR),
},
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test == LABEL_BULL),
"Bear": np.count_nonzero(y_test == LABEL_BEAR),
},
]
)
model = keras.models.load_model('best_model.keras')
model.evaluate(x_test,to_categorical(y_test))
y_pred_prob = model.predict(x_test)
y_pred = math.argmax(y_pred_prob,axis=-1)
cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES)).numpy()
plt.clf()
plt.figure(figsize=(5,4))
sns.heatmap(cm,xticklabels=CLASSES,yticklabels=CLASSES,annot=True,fmt='g')
plt.xlabel("Prediction")
plt.ylabel("Label")
# plt.show()
n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL]
n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]
bull_accuracy = n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0
n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR]
n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]
bear_accuracy = n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0
n_total_pred =n_bull_pred + n_bear_pred
n_total_true_pos =n_bull_true_pos + n_bear_true_pos
total_accuracy = n_total_true_pos / n_total_pred if n_total_pred > 0 else 0
mypd = pd.DataFrame([{
"Prediction":"Bull","Accuracy":bull_accuracy
},
{
"Prediction":"Bear","Accuracy":bear_accuracy
},{
"Prediction":"Total","Accuracy":total_accuracy
}
])
print(mypd)
CLASSES_EXT = CLASSES + ['Sideways']
LABEL_SIDEWAYS = CLASSES_EXT.index('Sideways')
y_pred_prob = model.predict(x_test)
# y_pred = math.argmax(y_pred_prob,axis=-1)
results = []
for prob_theas in np.arange(0.5,0.71,0.02):
y_pred = []
for row in y_pred_prob:
if row[LABEL_BULL] > prob_theas:
y_pred.append(LABEL_BULL)
elif row[LABEL_BEAR] > prob_theas:
y_pred.append(LABEL_BEAR)
else:
y_pred.append(LABEL_SIDEWAYS)
# print(1)
cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES_EXT)).numpy()
n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL]
n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]
n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR]
n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]
n_total_pred =n_bull_pred + n_bear_pred
n_total_true_pos =n_bull_true_pos + n_bear_true_pos
results.append({"prob_thres":prob_theas,
"n_bull_pred":n_bull_pred,
"n_bear_pred":n_bear_pred,
"n_total_pred":n_total_pred,
"bull_acc":n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0,
"bear_acc":n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0,
"total_acc":n_total_true_pos / n_total_pred if n_total_pred > 0 else 0
})
reuslts = pd.DataFrame(results)
plt.style.use('seaborn-v0_8')
fig, axes = plt.subplots(2, 1, figsize=(16, 7))
axes[0].set_title("Accuracy")
axes[0].plot(reuslts['prob_thres'],reuslts['bull_acc'],'-o', label="Bull")
axes[0].plot(reuslts['prob_thres'],reuslts['bear_acc'],'-o', label="Bear")
axes[0].plot(reuslts['prob_thres'],reuslts['total_acc'],'-o', label="Total")
axes[0].legend()
axes[1].set_title("Number of Cases")
axes[1].set_xlabel("Probability Threshold")
axes[1].plot(reuslts["prob_thres"],reuslts['n_bull_pred'],'-o', label="Bull")
axes[1].plot(reuslts["prob_thres"],reuslts['n_bear_pred'],'-o', label="Bear")
axes[1].plot(reuslts["prob_thres"],reuslts['n_total_pred'],'-o', label="Total")
axes[1].legend()
plt.show()
使用deepseek优化后的代码
建立模型
import os
import sqlite3
import numpy as np
import pandas as pd
coin_short_name = 'doge'
time_bar='1d'
coin_name = f'{coin_short_name}_usdt_swap'
current_folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(current_folder_path,'db','coin','1d',f'{coin_name}.db')
dataset_path = os.path.join(current_folder_path,'db',f'{coin_short_name}_{time_bar}_datasets.npz')
# 数据库连接和查询
conn = sqlite3.connect(db_path)
query = f"""
SELECT ts, open, close, heigh, low, vol
FROM {coin_name}
ORDER BY ts;
"""
shib_data = pd.read_sql_query(query, conn)
# 数据预处理
# 假设shib_data是DataFrame,并且已经按时间排序
shib_data['Close'] = shib_data['close'] # 重命名列以匹配原始算法
# 特征工程和标签生成
PAST_WIN_LEN = 100
CLASSES = ["Bull", "Bear"]
LABEL_BULL = 1
LABEL_BEAR = 0
x, y = [], []
for today_i in range(len(shib_data) - PAST_WIN_LEN):
day_k_past_win = shib_data.iloc[today_i:today_i + PAST_WIN_LEN]
today_price = day_k_past_win.iloc[-1]['Close']
tomorrow_price = shib_data.iloc[today_i + PAST_WIN_LEN]['Close']
label = LABEL_BULL if tomorrow_price > today_price else LABEL_BEAR
x.append(day_k_past_win[['open', 'close', 'heigh', 'low', 'vol']].values)
y.append(label)
# 将x和y转换为numpy数组
x, y = np.array(x), np.array(y)
TRAIN_SPLIT, VAL_SPLIT, TEST_SPLIT = 0.7, 0.2, 0.1
# Take the last portion to be the test dataset
test_split_index = -round(len(x) * TEST_SPLIT)
x_other, x_test = np.split(x, [test_split_index])
y_other, y_test = np.split(y, [test_split_index])
# shuffle the remaining portion and split into training and validation datasets
train_split_index = round(len(x) * TRAIN_SPLIT)
indexes = np.arange(len(x_other))
np.random.shuffle(indexes)
train_indexes, val_indexes = np.split(indexes, [train_split_index])
x_train, x_val = x_other[train_indexes], x_other[val_indexes]
y_train, y_val = y_other[train_indexes], y_other[val_indexes]
# show label distribution
label_distribution = pd.DataFrame(
[
{
"Dataset": "train",
"Bull": np.count_nonzero(y_train==LABEL_BULL),
"Bear": np.count_nonzero(y_train==LABEL_BEAR),
},
{
"Dataset": "val",
"Bull": np.count_nonzero(y_val==LABEL_BULL),
"Bear": np.count_nonzero(y_val==LABEL_BEAR),
},
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test==LABEL_BULL),
"Bear": np.count_nonzero(y_test==LABEL_BEAR),
},
]
)
# Balance labels of test dataset
# 因为测试数据集的2个长度必须相等
x_test_bull = x_test[y_test == LABEL_BULL]
x_test_bear = x_test[y_test == LABEL_BEAR]
min_n_labels = min(len(x_test_bull), len(x_test_bear))
x_test_bull = x_test_bull[
np.random.choice(len(x_test_bull), min_n_labels, replace=False), :
]
x_test_bear = x_test_bear[
np.random.choice(len(x_test_bear), min_n_labels, replace=False), :
]
x_test = np.vstack([x_test_bull, x_test_bear])
y_test = np.array([LABEL_BULL] * min_n_labels + [LABEL_BEAR] * min_n_labels)
# Test dataset label distribution
pd.DataFrame(
[
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test == LABEL_BULL),
"Bear": np.count_nonzero(y_test == LABEL_BEAR),
}
]
)
# 将三个数据集存储起来
print("Saving datasets...")
np.savez(
dataset_path,
x_train=x_train,
y_train=y_train,
x_val=x_val,
y_val=y_val,
x_test=x_test,
y_test=y_test,
)
回测
import numpy as np
import pandas as pd
import os
import keras
from keras.utils import to_categorical
import matplotlib.pyplot as plt
from tensorflow import math
coin_name = 'doge'
time_bar='1d'
current_folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# db_path = os.path.join(current_folder_path,'db','coin','5m','shib_usdt_swap.db')
dataset_path = os.path.join(current_folder_path,'db',f'{coin_name}_{time_bar}_datasets.npz')
keras_path = os.path.join(current_folder_path,'db',f'{coin_name}_{time_bar}_best_model.keras')
CLASSES = ["Bull", "Bear"]
LABEL_BULL = CLASSES.index("Bull")
LABEL_BEAR = CLASSES.index("Bear")
datasets = np.load(dataset_path)
x_train, y_train = datasets["x_train"], datasets["y_train"]
x_val, y_val = datasets["x_val"], datasets["y_val"]
x_test, y_test = datasets["x_test"], datasets["y_test"]
# label distribution
label_distribution = pd.DataFrame(
[
{
"Dataset": "train",
"Bull": np.count_nonzero(y_train == LABEL_BULL),
"Bear": np.count_nonzero(y_train == LABEL_BEAR),
},
{
"Dataset": "val",
"Bull": np.count_nonzero(y_val == LABEL_BULL),
"Bear": np.count_nonzero(y_val == LABEL_BEAR),
},
{
"Dataset": "test",
"Bull": np.count_nonzero(y_test == LABEL_BULL),
"Bear": np.count_nonzero(y_test == LABEL_BEAR),
},
]
)
model = keras.models.load_model(keras_path)
model.evaluate(x_test,to_categorical(y_test))
# 多加的条件
CLASSES_EXT = CLASSES + ['Sideways']
LABEL_SIDEWAYS = CLASSES_EXT.index("Sideways")
y_pred_prob = model.predict(x_test)
results = []
for prob_thres in np.arange(0.5,0.71,0.02):
y_pred = []
for row in y_pred_prob:
if row[LABEL_BULL] > prob_thres:
y_pred.append(LABEL_BULL)
elif row[LABEL_BEAR] > prob_thres:
y_pred.append(LABEL_BEAR)
else:
y_pred.append(LABEL_SIDEWAYS)
cm = math.confusion_matrix(y_test,y_pred,num_classes = len(CLASSES_EXT)).numpy()
n_bull_pred = cm[LABEL_BULL,LABEL_BULL] + cm[LABEL_BEAR,LABEL_BULL]
n_bull_true_pos = cm[LABEL_BULL,LABEL_BULL]
# bull_accuracy = n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0
n_bear_pred = cm[LABEL_BULL,LABEL_BEAR] + cm[LABEL_BEAR,LABEL_BEAR]
n_bear_true_pos = cm[LABEL_BEAR,LABEL_BEAR]
# bear_accuracy = n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0
n_total_true_pos =n_bull_true_pos + n_bear_true_pos
n_total_pred =n_bull_pred + n_bear_pred
# total_accuracy = n_total_true_pos / n_total_pred if n_total_pred > 0 else 0
results.append({
"prob_thres":prob_thres,
"n_bull_pred":n_bull_pred,
"n_bear_pred":n_bear_pred,
"n_total_pred":n_total_pred,
"bull_acc": n_bull_true_pos / n_bull_pred if n_bull_pred > 0 else 0,
"bear_acc":n_bear_true_pos / n_bear_pred if n_bear_pred > 0 else 0,
"total_acc":n_total_true_pos / n_total_pred if n_total_true_pos > 0 else 0,
})
results = pd.DataFrame(results)
plt.style.use('seaborn-v0_8')
fig,axes = plt.subplots(2,1,figsize=(16,7))
axes[0].set_title("Accuracy")
axes[0].plot(results['prob_thres'],results['bull_acc'],"-o",label="Bull")
axes[0].plot(results['prob_thres'],results['bear_acc'],"-o",label="Bear")
axes[0].plot(results['prob_thres'],results['total_acc'],"-o",label="Total")
axes[0].legend()
axes[1].set_title("Number of Cases")
axes[1].set_xlabel("Probability Threshold")
axes[1].plot(results['prob_thres'],results['n_bull_pred'],"-o",label="Bull")
axes[1].plot(results['prob_thres'],results['n_bear_pred'],"-o",label="Bear")
axes[1].plot(results['prob_thres'],results['n_total_pred'],"-o",label="Total")
axes[1].legend()
plt.show()