-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathUtil.py
More file actions
105 lines (86 loc) · 2.9 KB
/
Util.py
File metadata and controls
105 lines (86 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import pandas as pd
from scipy.stats import norm
import pywt
from nolitsa import surrogates
import numpy as np
from statsmodels.tsa.stattools import adfuller
def flatten_list(collection):
return [y for x in collection for y in x]
def generate_synthetic_stock_data(num_bars, x, delta=0.25, dt=0.1):
result = []
for _ in range(num_bars):
result.append(x)
x = x + norm.rvs(scale=delta ** 2 * dt)
return result
# TODO need to lookup a way to generate synthetic volume data
def generate_synthetic_ohlcv_data(num_bars, x, delta=0.25, dt=0.1):
open = []
high = []
low = []
close = []
volume = []
current_price = x
for _ in range(num_bars):
rands = norm.rvs(size = 4, scale = delta ** 2 * dt)
open.append(current_price + rands[0])
high.append(current_price + max(rands))
low.append(current_price + min(rands))
close.append(current_price + rands[3])
volume.append(0)
current_price = close[-1]
result = pd.DataFrame({
'open': open,
'high': high,
'low': low,
'close': close,
'volume': volume
})
return result
def calculate_class_weights(train_Y):
unique, counts = np.unique(train_Y.reshape(-1), return_counts=True)
class_weights = {}
for i in range(len(unique)):
class_weights[unique[i]] = 1.0 / counts[i]
return class_weights
def adf_test(values):
result = adfuller(values)
print('-' * 10)
print('Adfuller Test')
print(f'adf: \t%s' % str(result[0]))
print(f'p-value\t%s' % str(result[1]))
def fractional_difference(series, order, cutoff=1e-4):
def get_weights(d, lags):
weights = [1]
for k in range(1, lags):
weights.append(-weights[-1] * ((d - k + 1)) / k)
weights = np.array(weights).reshape(-1, 1)
return weights
def find_cutoff(cutoff_order, cutoff, start_lags):
val = np.inf
lags = start_lags
while abs(val) > cutoff:
weight = get_weights(cutoff_order, lags)
val = weight[len(weight) - 1]
lags += 1
return lags
lag_cutoff = ( find_cutoff(order, cutoff, 1) )
weights = get_weights(order, lag_cutoff)
result = 0
for k in range(lag_cutoff):
result += weights[k] * series.shift(k).fillna(0)
return result[lag_cutoff:]
def dwt_denoise(data, level=5):
#len(c)%(2**n)=0; where n = level; I used len(c)=512
# original level was 5
coeffs = pywt.wavedec(data, 'db4', level=level) #returns array of cA5,cD5,cD4,cD3,...
for i in range(1, len(coeffs)):
temp = coeffs[i]
mu = temp.mean()
sigma = temp.std()
omega = temp.max()
kappa = (omega - mu) / sigma #threshold value
coeffs[i] = pywt.threshold(temp, kappa, mode='garrote')
return pywt.waverec(coeffs, 'db4')
# Make sure there is no na
def aaft(returns):
return surrogates.aaft(returns).cumsum()