-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbivariateICV.py
More file actions
437 lines (341 loc) · 14.8 KB
/
bivariateICV.py
File metadata and controls
437 lines (341 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#Script: bivariateICV.py
#Author: Grant Shoffner
#Date: 2021-07-02
#Contact: gshoffner AT mbi DOT ucla DOT edu
#Description: This is a Python implementation of the
#Indirect Cross Validation (ICV) method of [1] for bandwidth
#selection in 2-dimensional kernel density estimation
#problems using a bivariate Gaussian kernel.
#
#References:
#
# [1] Savchuk, O. Y., Hart, J. D., & Sheather, S. J. (2010).
# Indirect cross-validation for density estimation.
# Journal of the American Statistical Association,
# 105(489), 415–423.
# https://doi.org/10.1198/jasa.2010.tm08532
#
# [2] Savchuk, O. Y., Hart, J. D., & Sheather, S. J. (2008).
# An Empirical Study of Indirect. 1, 1–22.
# http://arxiv.org/abs/0812.0052
import sys
import numpy as np
from scipy.stats import multivariate_normal
#Shorthand for the normal PDF since this gets called frequently
N = lambda *args, **kwargs : multivariate_normal.pdf(*args, **kwargs)
ICV_PARAM_SELECTION_METHODS = {'auto' , 'Savchuk2008'}
class BivariateICV:
"""Base class for bivariate indirect cross validation.
Don't use directly, use IndirectCrossValidation or
IndirectKFoldCrossValidation instead.
"""
def __init__(self, data, params = 'auto'):
#Verify data shape
try:
assert type(data) == np.ndarray
assert len(data.shape) == 2
assert data.shape[1] == 2
except AssertionError:
sys.exit(
"Error: data must be an ndarray with shape (n_samples, 2)."
)
self.data = data
#Verify params
self.set_params(params)
#Cache the RL value
self._RL = self.RL(self.params)
def set_params(self, params):
"""Verifies selection method and stores (alpha,sigma)
Can be used to reset (alpha, sigma) to custom values
after instantiation of an ICV class.
"""
param_error_msg = "Error: supplied params must be a 2-tuple (alpha, sigma)."
if type(params) == str:
if params not in ICV_PARAM_SELECTION_METHODS:
sys.exit("Error: unrecognized param selection sting %s given." \
% params)
elif params == 'auto':
self.params = \
(2.42, max(5.06, 0.149 * self.data.shape[0]**(3./8.)))
elif params == 'Savchuk2008':
self.params = self._Savchuk2008(self.data.shape[0])
elif type(params) == tuple:
if len(params) != 2:
sys.exit(param_error_msg)
else:
self.params = np.array(params)
elif type(params) == np.ndarray:
if params.flatten().shape != (2,):
sys.exit(param_error_msg)
else:
self.params = params.flatten()
else:
sys.exit("Error: `params` value not recognized. Either specify a method string or a 2-tuple (alpha, sigma)")
@staticmethod
def _Savchuk2008(N_samples):
"""Calculates the alpha,sigma params based on the
formulas giving in [Savchuk2008]."""
lNsamp = np.log10(N_samples)
alpha_mod = 10.0**(
3.39-1.093*lNsamp + 0.025*lNsamp**3 - 0.00004*lNsamp**6.0
)
sigma_mod = 10.0**(
-0.58 + 0.386*lNsamp - 0.012*lNsamp**2.0
)
return alpha_mod, sigma_mod
@staticmethod
def RL(params):
"""Operator `R(g)` from [Savchuk2010] applied to the L
kernel and evaluated at params = (alpha,sigma)."""
α, σ = params
value = (1.0/np.pi)*(((1.0+α)**2.0)/4.0 \
- α*(1.0+α)*σ/(1.0+σ**2.0)\
+ (α**2.0)/4.0)
return value
def _L_Kernel(self, points):
"""Evaluates the 2-dimensional L-kernel."""
α, σ = self.params
L = (1.0 + α) * N(points, mean = np.zeros(2))
L -= (α/σ) * N(points/σ, mean = np.zeros(2))
return L
def _LSCV_step(self, b, kde_pts, trial_pts):
"""Evaluates a single ICV step on the L-kernel bandwidth `b`.
This function implements the sumations shown in
equation (2) in [Savchuk2010] for the L-kernel. Note
that the first term in the equation (1/nh)R(L) is
left out so that iterative calls to _LSCV_step over
successive values of `j` (the index of the left out
data point) do not repeatedly add this first term.
This means _LSCV_step should be invoked as:
SUM = (1/nh)R(L)
for OUT_PT in PTS:
SUM += _LSCV_step(b, IN_PTS, OUT_PT)
The awkward outer-product call to substract below is
for compatibility with multiple left-out data, ie in
K-fold cross validation.
"""
n = float(self.data.shape[0])
#Not sure if using Unicode symbols in code is exactly
#kosher, but it greatly simplifies and clarifies the
#equations below.
α, σ = self.params
#Take the difference between the kernel means `kde_pts`
#the left-out points `trial_pts`
diff = np.concatenate(
[np.subtract.outer(kde_pts[:,i], trial_pts[:,i]).reshape(-1,1)\
for i in range(kde_pts.shape[1])],
axis = 1
)
#Rescale by the bandwidth `b`
diff /= b
I = np.eye(2)
mu = np.zeros(2)
#Second term in Equation (2)
SUM = (1.0/(b * n**2.0)) * np.sum(
((1.0+α)**2.0) * N(diff, mean=mu, cov = 2.0*I) \
- 2.0*σ*α*(1.0+α) * N(diff, mean=mu, cov = (1.0+σ**2.0)*I) \
+ (σ**2.0)*(α**2.0) * N(diff, mean=mu, cov = 2.0*(σ**2.0)*I)
)
#Third term in Equation (2)
SUM -= (2.0/(n*(n-1.0)*b)) * np.sum(self._L_Kernel(diff))
return SUM
def bUCV_to_hUCV_factor(self):
"""Calculates the multipicative factor required for
converting the L-kernel bandwidth bUCV to the
Gaussian kernel bandwidth hUCV."""
α, σ = self.params
C = (((4.0*(1.0 + α) - 2.0*α*(σ**3.0))**2.0)/(64.0*np.pi*self._RL))**0.2
return C
def evaluate(self, b):
"""Evaluates Full ICV score on the L-kernel bandwidth `b`."""
return self.__call__(b)
class IndirectCrossValidation(BivariateICV):
"""Evaluate 2-D KDE bandwidths by Indirect Cross Validation.
Performs the ICV protocol of [Savchuk2010] to score
a proposed bandwidth `b` using the L-kernel
over leave-one-out cross validation. Currently only set
up for 2-D bivariate data.
This scales as O(N_samples**2) so it will struggle on
large datasets.
Parameters
----------
data : np.ndarray
Array of shape (N_samples, 2)
params : {'auto' , 'Savchuk2008'} or tuple(alpha, sigma)
default = 'auto'
`params` is either a string specifying the automatic
selection method to use for setting the parameters
ALPHA and SIGMA, or a tuple (alpha, sigma) containing
user defined values for these parameters.
`auto` uses the reference rule from [Savchuk2010]
(alpha, sigma) = (2.42, max(5.06, 0.149*N_samples**(3/8)))
`Savchuk2008` uses the adaptive rule proposed in
[Savchuk2008] for (alpha_mod, sigma_mod).
Note that these two methods can produce very different
values for alpha, sigma.
Examples
--------
This example uses multiprocessing to speed up ICV over
a range of possible bandwidths.
>>> import numpy as np
>>> import multiprocessing as mp
>>> from bivariateICV import IndirectCrossValidation
>>> from scipy.stats import multivariate_normal as MVN
>>> from matplotlib import pyplot
>>> sample = MVN.rvs(mean = np.zeros(2), size = 300)
>>> ICV = IndirectCrossValidation(sample)
>>> pool = mp.Pool(None)
>>> b_grid = np.linspace(0.002,0.15,100)
>>> scores = pool.map(ICV, b_grid)
>>> b_hat_UCV = b_grid[np.argmin(scores)]
>>> print(f"Optimal L-kernel bandwidth b_hat_UCV = {b_hat_UCV}")
The value b_hat_UCV is the optimal LSCV bandwidth for
the L-kernel. We need to convert this to the optimal
bandwidth h_hat_UCV for the Gaussian kernel.
>>> C = ICV.bUCV_to_hUCV_factor()
>>> h_hat_UCV = C * b_hat_UCV
>>> print(f"Optimal Gaussian bandwidth h_hat_UCV = {h_hat_UCV}")
Here we plot the scores to visually check that we are in
the range of the smallest minimum. If the minimum occurs
in the tail of the plot, expand the `b_grid` bounds above
>>> pyplot.plot(b_grid, scores, '-')
>>> pyplot.suptitle("LSCV score vs L-kernel bandwidth")
>>> pyplot.title(
... f"Parameters: n={ICV.data.shape[0]}, α={ICV.params[0]:.3f}, σ={ICV.params[1]:.3f}")
>>> pyplot.xlabel("Bandwidth (b)")
>>> pyplot.ylabel("LSCV score")
>>> pyplot.plot(np.tile(b_hat_UCV, 10), np.linspace(min(scores),
... max(scores)-0.3, 10), 'k--', linewidth = 1)
>>> pyplot.annotate(f"b̂UCV = {b_hat_UCV:0.3f}\nĥUCV = {C:0.3f}⋅b̂UCV = {h_hat_UCV:0.3f}",
... (b_hat_UCV, max(scores)-0.3))
>>> pyplot.grid(linestyle = '-.', linewidth = 0.5)
>>> pyplot.show()
We can also plot the L-kernel for the current values
of (alpha, sigma). Since the 2-D kernel is symmetric
about the origin, we just plot a slice through the
y = 0 plane.
>>> support = np.linspace(-10,10,200)
>>> support2d = np.concatenate(
... (support.reshape(-1,1), np.zeros(200).reshape(-1,1)),
... axis = 1)
>>> kernel = ICV._L_Kernel(support2d)
>>> pyplot.plot(support, kernel, '-')
>>> pyplot.title(
... f"L(u, α={ICV.params[0]:.3f}, σ={ICV.params[1]:.3f})")
>>> pyplot.xlabel('u')
>>> pyplot.ylabel('L(u,α,σ)')
>>> pyplot.grid(linestyle = '-.', linewidth = 0.5)
>>> pyplot.tight_layout()
>>> pyplot.show()
"""
def __init__(self, data, params = 'auto'):
super().__init__(data, params)
def __call__(self, b):
"""Evaluates Full ICV score on the L-kernel bandwidth `b`."""
n = self.data.shape[0]
#Initiallize the LSCV score with the first term in
#equation (2) in [Savchuk2010]
LOO_CV_score = (1.0/(n*b)) * self._RL
for missing_idx in np.arange(n):
LOOarray = np.concatenate(
(self.data[:missing_idx],
self.data[missing_idx+1:]),
axis = 0,
)
trial_pt = self.data[missing_idx].reshape(1,2)
LOO_CV_score += self._LSCV_step(b, LOOarray, trial_pt)
return LOO_CV_score
class IndirectKFoldCrossValidation(BivariateICV):
"""Evaluate 2-D KDE bandwidths by K-fold Indirect CV.
Performs the ICV protocol of [Savchuk2010] to score
a proposed bandwidth `b` using the L-kernel over K-fold
cross validation. Currently only set up for 2-D
bivariate data.
K-fold CV scales as O(((K-1)/K)N**2) so it offers a
small performance advantage over Leave-One-Out CV for
large data sets when K is small.
Note that K-fold CV is not addressed in [Savchuk2010]
so its use here may not strictly follow their theory.
Testing on example data has shown that bandwidths
estimated by K-fold CV always overestimate the h_hat_UCV
obtained from LOO-CV by a relatively small margin. So
the error from K-fold estimation is on the conservative
side.
Parameters
----------
data : np.ndarray
Array of shape (N_samples, 2)
K : int, default = 5
Subdivide the data into K portions for K-fold CV.
If K is not a divisor of N_samples the final portion
will be truncated. For large data sets this will not
have a material impact on the result. For small data
use IndirectCrossValidation instead.
params : {'auto' , 'Savchuk2008'} or tuple(alpha, sigma)
default = 'auto'
`params` is either a string specifying the automatic
selection method to use for setting the parameters
ALPHA and SIGMA, or a tuple (alpha, sigma) containing
user defined values for these parameters.
`auto` uses the reference rule from [Savchuk2010]
(alpha, sigma) = (2.42, max(5.06, 0.149*N_samples**(3/8)))
`Savchuk2008` uses the adaptive rule proposed in
[Savchuk2008] for (alpha_mod, sigma_mod).
Note that these two methods can produce very different
values for alpha, sigma.
seed : int, default = 0
Seed for numpy.random.default_rng for use in data
shuffling prior to splitting. A default value is
given here so that multiple calls to
IndirectKFoldCrossValidation on the same data will
produce the same results. Set seed = None for no
seed in the rng.
Examples
--------
This example uses multiprocessing to speed up ICV over
a range of possible bandwidths. Results from LOO and
K-fold are compared.
>>> import numpy as np
>>> import multiprocessing as mp
>>> from bivariateICV import IndirectCrossValidation,\
>>> IndirectKFoldCrossValidation
>>> from scipy.stats import multivariate_normal as MVN
>>> from matplotlib import pyplot
>>> sample = MVN.rvs(mean = np.zeros(2), size = 300)
>>> ICV = IndirectCrossValidation(sample)
>>> KICV = IndirectKFoldCrossValidation(sample)
>>> pool = mp.Pool(None)
>>> b_grid = np.linspace(0.002,0.15,100)
>>> loo_scores = pool.map(ICV, b_grid)
>>> kfold_scores = pool.map(KICV, b_grid)
Plot the scores vs the L-kernel bandwidth
>>> pyplot.plot(b_grid, loo_scores, '-')
>>> pyplot.plot(b_grid, kfold_scores, '-')
>>> pyplot.show()
"""
def __init__(self, data, K = 5, params = 'auto', seed = 0):
super().__init__(data.copy(), params)
self.rng = np.random.default_rng(seed = seed)
self.split_data = None
self._generate_kfold_splits(K)
def _generate_kfold_splits(self, K):
#Shuffle the samples. Since we called data.copy()
#in __init__ this doesn't change the original data
self.rng.shuffle(self.data, axis = 0)
self.split_data = np.array_split(self.data, K, axis = 0)
def __call__(self, b):
K = len(self.split_data)
n = self.data.shape[0]
#Initiallize the LSCV score with the first term in
#equation (2) in [Savchuk2010]
KFold_CV_score = (1.0/(n*b)) * self._RL
for missing_idx in np.arange(K):
KFoldarray = np.concatenate(
(*self.split_data[:missing_idx],
*self.split_data[missing_idx+1:]),
axis = 0
)
KFoldarray = KFoldarray.reshape(-1,2)
trial_pts = self.split_data[missing_idx]
KFold_CV_score += self._LSCV_step(b, KFoldarray, trial_pts)
return KFold_CV_score