123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # -*- encoding:utf-8 -*-
- import numpy as np
- from keras.models import load_model
- import joblib
- import random
- def read_data(path):
- lines = []
- with open(path) as f:
- for line in f.readlines()[:]:
- line = eval(line.strip())
- lines.append(line)
- size = len(lines[0])
- train_x=[s[:size - 2] for s in lines]
- train_y=[s[size-1] for s in lines]
- return np.array(train_x, dtype=np.float32),np.array(train_y, dtype=np.float32),lines
- def _score(fact,):
- up_right = 0
- up_error = 0
- if fact[0] == 1:
- up_right = up_right + 1.1
- elif fact[1] == 1:
- up_error = up_error + 0.4
- up_right = up_right + 1.03
- elif fact[2] == 1:
- up_error = up_error + 0.7
- up_right = up_right + 0.96
- else:
- up_error = up_error + 1
- up_right = up_right + 0.9
- return up_right,up_error
- def random_predict(file_path=''):
- test_x,test_y,lines=read_data(file_path)
- a=0
- b=0
- c=0
- d=0
- R=0
- up_num = 0
- up_error = 0
- up_right = 0
- down_num = 0
- down_error = 0
- down_right = 0
- for x in test_y:
- xx = random.uniform(0, 4)
- if xx > 3:
- if x[0] == 1:
- R = R + 1
- tmp_right,tmp_error = _score(x,)
- up_num = up_num + 1
- up_right = up_right + tmp_right
- up_error = tmp_error + up_error
- elif xx > 2:
- if x[1] == 1:
- R = R + 1
- elif xx > 1:
- if x[2] == 1:
- R = R + 1
- elif xx > 0:
- if x[3] == 1:
- R = R + 1
- if x[0] == 1:
- a = a + 1
- elif x[1] == 1:
- b = b + 1
- elif x[2] == 1:
- c = c + 1
- else:
- d = d + 1
- if up_num == 0:
- up_num = 1
- if down_num == 0:
- down_num = 1
- print(R/(a + b + c +d), up_right/up_num, up_error/up_num)
- def predict(file_path='', model_path='15min_dnn_seq.h5', idx=-1, row=18, col=20):
- test_x,test_y,lines=read_data(file_path)
- test_x_a = test_x[:,:row*col]
- test_x_a = test_x_a.reshape(test_x.shape[0], row, col, 1)
- # test_x_b = test_x[:, 18*col:row*col]
- # test_x_b = test_x_b.reshape(test_x.shape[0], 10, col, 1)
- test_x_c = test_x[:,row*col:]
- model=load_model(model_path)
- score = model.evaluate([test_x_c, test_x_a, ], test_y)
- print('MIX', score)
- up_num = 0
- up_error = 0
- up_right = 0
- down_num = 0
- down_error = 0
- down_right = 0
- i = 0
- result = model.predict([test_x_c, test_x_a, ])
- win_dnn = []
- for r in result:
- fact = test_y[i]
- xx = random.uniform(0, 4)
- if idx in [-2]:
- if r[0] > 0.5 or r[1] > 0.5:
- pass
- else:
- if r[1] > 0.6 :
- tmp_right,tmp_error = _score(fact,)
- up_right = tmp_right + up_right
- up_error = tmp_error + up_error
- up_num = up_num + 1
- elif r[2] > 0.5 or r[3] > 0.5:
- if fact[0] == 1:
- down_error = down_error + 1
- down_right = down_right + 1.1
- elif fact[1] == 1:
- down_error = down_error + 0.7
- down_right = down_right + 1.04
- elif fact[2] == 1:
- down_error = down_error + 0.3
- down_right = down_right + 0.94
- else:
- down_right = down_right + 0.9
- down_num = down_num + 1
- i = i + 1
- if up_num == 0:
- up_num = 1
- if down_num == 0:
- down_num = 1
- print('MIX', up_right, up_num, up_right/up_num, up_error/up_num, down_right/down_num, down_error/down_num)
- return win_dnn,up_right/up_num,down_right/down_num
- if __name__ == '__main__':
- xx = random.uniform(0,4)
- # predict(file_path='D:\\data\\quantization\\stock513_28d_train1.log', model_path='513_28d_mix_3D_ma5_s_seq.h5', row=28, col=16)
- predict(file_path='D:\\data\\quantization\\stock538_28d_train1.log', model_path='539_28d_mix_5D_ma5_s_seq.h5', row=28, col=17)
- # random_predict(file_path='D:\\data\\quantization\\week101_18d_test.log')
- # random_predict(file_path='D:\\data\\quantization\\stock537_28d_train1.log')
|