-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathVarOptItemsSketch.java
More file actions
1313 lines (1132 loc) · 46.7 KB
/
VarOptItemsSketch.java
File metadata and controls
1313 lines (1132 loc) · 46.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.sampling;
import static java.lang.foreign.ValueLayout.JAVA_BYTE;
import static java.lang.foreign.ValueLayout.JAVA_DOUBLE_UNALIGNED;
import static org.apache.datasketches.common.Util.LS;
import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK;
import static org.apache.datasketches.sampling.PreambleUtil.GADGET_FLAG_MASK;
import static org.apache.datasketches.sampling.PreambleUtil.TOTAL_WEIGHT_R_DOUBLE;
import static org.apache.datasketches.sampling.PreambleUtil.VAROPT_SER_VER;
import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_EMPTY;
import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_FULL;
import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_WARMUP;
import static org.apache.datasketches.sampling.PreambleUtil.extractFamilyID;
import static org.apache.datasketches.sampling.PreambleUtil.extractFlags;
import static org.apache.datasketches.sampling.PreambleUtil.extractHRegionItemCount;
import static org.apache.datasketches.sampling.PreambleUtil.extractK;
import static org.apache.datasketches.sampling.PreambleUtil.extractN;
import static org.apache.datasketches.sampling.PreambleUtil.extractRRegionItemCount;
import static org.apache.datasketches.sampling.PreambleUtil.extractResizeFactor;
import static org.apache.datasketches.sampling.PreambleUtil.extractSerVer;
import static org.apache.datasketches.sampling.PreambleUtil.extractTotalRWeight;
import static org.apache.datasketches.sampling.PreambleUtil.getAndCheckPreLongs;
import static org.apache.datasketches.sampling.SamplingUtil.pseudoHypergeometricLBonP;
import static org.apache.datasketches.sampling.SamplingUtil.pseudoHypergeometricUBonP;
import java.lang.foreign.MemorySegment;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
import org.apache.datasketches.common.ArrayOfBooleansSerDe;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.ResizeFactor;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.SketchesStateException;
import org.apache.datasketches.common.Util;
/**
* This sketch provides a variance optimal sample over an input stream of weighted items. The
* sketch can be used to compute subset sums over predicates, producing estimates with optimal
* variance for a given sketch size.
*
* <p>Using this sketch with uniformly constant item weights (e.g. 1.0) will produce a standard
* reservoir sample over the steam.</p>
*
* @param <T> The type of object held in the sketch.
*
* @author Jon Malkin
* @author Kevin Lang
*/
public final class VarOptItemsSketch<T> {
/**
* The smallest sampling array allocated: 16
*/
private static final int MIN_LG_ARR_ITEMS = 4;
/**
* Default sampling size multiple when reallocating storage: 8
*/
private static final ResizeFactor DEFAULT_RESIZE_FACTOR = ResizeFactor.X8;
private static final ArrayOfBooleansSerDe MARK_SERDE = new ArrayOfBooleansSerDe();
private int k_; // max size of sketch, in items
private int currItemsAlloc_; // currently allocated array size
private final ResizeFactor rf_; // resize factor
private ArrayList<T> data_; // stored sampled items
private ArrayList<Double> weights_; // weights for sampled items
private long n_; // total number of items processed by the sketch
private int h_; // number of items in heap
private int m_; // number of items in middle region
private int r_; // number of items in reservoir-like area
private double totalWtR_; // total weight of items in reservoir-like area
// The next two fields are hidden from the user because they are part of the state of the
// unioning algorithm, NOT part of a varopt sketch, or even of a varopt "gadget" (our name for
// the potentially invalid sketch that is maintained by the unioning algorithm). It would make
// more sense logically for these fields to be declared in the unioning object (whose entire
// purpose is storing the state of the unioning algorithm) but for reasons of programming
// convenience we are currently declaring them here. However, that could change in the future.
// Following int is:
// 1. Zero (for a varopt sketch)
// 2. Count of marked items in H region, if part of a unioning algo's gadget
private int numMarksInH_;
// The following array is absent in a varopt sketch, and notionally present in a gadget
// [although it really belongs in the unioning object]. If the array were to be made explicit,
// some additional coding would need to be done to ensure that all of the necessary data motion
// occurs and is properly tracked.
private ArrayList<Boolean> marks_;
// used to return a shallow copy of the sketch's samples to a VarOptItemsSamples, as arrays
// with any null value stripped and the R region weight computed
class Result {
T[] items;
double[] weights;
}
private VarOptItemsSketch(final int k, final ResizeFactor rf) {
// required due to a theorem about lightness during merging
if ((k < 1) || (k > (Integer.MAX_VALUE - 1))) {
throw new SketchesArgumentException("k must be at least 1 and less than " + Integer.MAX_VALUE
+ ". Found: " + k);
}
k_ = k;
n_ = 0;
rf_ = rf;
h_ = 0;
m_ = 0;
r_ = 0;
totalWtR_ = 0;
numMarksInH_ = 0;
final int ceilingLgK = Util.exactLog2OfInt(Util.ceilingPowerOf2(k_), "VarOptItemsSketch");
final int initialLgSize =
SamplingUtil.startingSubMultiple(ceilingLgK, rf_.lg(), MIN_LG_ARR_ITEMS);
currItemsAlloc_ = SamplingUtil.getAdjustedSize(k_, 1 << initialLgSize);
if (currItemsAlloc_ == k_) {
++currItemsAlloc_;
}
data_ = new ArrayList<>(currItemsAlloc_);
weights_ = new ArrayList<>(currItemsAlloc_);
marks_ = null;
}
private VarOptItemsSketch(final ArrayList<T> dataList,
final ArrayList<Double> weightList,
final int k,
final long n,
final int currItemsAlloc,
final ResizeFactor rf,
final int hCount,
final int rCount,
final double totalWtR) {
assert dataList != null;
assert weightList != null;
assert dataList.size() == weightList.size();
assert currItemsAlloc >= dataList.size();
assert k >= 2;
assert n >= 0;
assert hCount >= 0;
assert rCount >= 0;
assert ((rCount == 0) && (dataList.size() == hCount)) || ((rCount > 0) && (dataList.size() == (k + 1)));
k_ = k;
n_ = n;
h_ = hCount;
r_ = rCount;
m_ = 0;
totalWtR_ = totalWtR;
currItemsAlloc_ = currItemsAlloc;
rf_ = rf;
data_ = dataList;
weights_ = weightList;
numMarksInH_ = 0;
marks_ = null;
}
/**
* Construct a varopt sampling sketch with up to k samples using the default resize factor (8).
*
* @param k Maximum size of sampling. Allocated size may be smaller until sketch fills.
* Unlike many sketches in this package, this value does <em>not</em> need to be a
* power of 2.
* @param <T> The type of object held in the sketch.
* @return A VarOptItemsSketch initialized with maximum size k and resize factor rf.
*/
public static <T> VarOptItemsSketch<T> newInstance(final int k) {
return new VarOptItemsSketch<>(k, DEFAULT_RESIZE_FACTOR);
}
/**
* Construct a varopt sampling sketch with up to k samples using the specified resize factor.
*
* @param k Maximum size of sampling. Allocated size may be smaller until sketch fills.
* Unlike many sketches in this package, this value does <em>not</em> need to be a
* power of 2. The maximum size is Integer.MAX_VALUE-1.
* @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
* @param <T> The type of object held in the sketch.
* @return A VarOptItemsSketch initialized with maximum size k and resize factor rf.
*/
public static <T> VarOptItemsSketch<T> newInstance(final int k, final ResizeFactor rf) {
return new VarOptItemsSketch<>(k, rf);
}
/**
* Construct a varopt sketch for use as a unioning gadget, meaning the array of marked elements
* is also initialized.
*
* @param k Maximum size of sampling. Allocated size may be smaller until sketch fills.
* Unlike many sketches in this package, this value does <em>not</em> need to be a
* power of 2.
* @param <T> The type of object held in the sketch.
* @return A VarOptItemsSketch initialized with maximum size k and a valid array of marks.
*/
static <T> VarOptItemsSketch<T> newInstanceAsGadget(final int k) {
final VarOptItemsSketch<T> sketch = new VarOptItemsSketch<>(k, DEFAULT_RESIZE_FACTOR);
sketch.marks_ = new ArrayList<>(sketch.currItemsAlloc_);
return sketch;
}
/**
* Construct a varopt sketch as the output of a union's getResult() method. Because this method
* is package-private, we do not perform checks on the input values.
*
* <p>Assumes dataList.size() is the correct allocated size but does not check.</p>
*
* @param <T> The type of object held in the sketch.
* @param dataList an ArrayList of data
* @param weightList an ArrayList of weights
* @param k Maximum size of sampling. Allocated size may be smaller until sketch fills.
* Unlike many sketches in this package, this value does <em>not</em> need to be a
* power of 2.
* @param n The current count of items seen by the sketch
* @param hCount the count of heavy items
* @param rCount the reservoir count of (non-heavy) items
* @param totalWtR the sum of the reservoir weights.
* @return A VarOptItemsSketch initialized with maximum size k and a valid array of marks.
*/
static <T> VarOptItemsSketch<T> newInstanceFromUnionResult(final ArrayList<T> dataList,
final ArrayList<Double> weightList,
final int k,
final long n,
final int hCount,
final int rCount,
final double totalWtR) {
final VarOptItemsSketch<T> sketch = new VarOptItemsSketch<>(dataList, weightList, k, n,
dataList.size(), DEFAULT_RESIZE_FACTOR, hCount, rCount, totalWtR);
sketch.convertToHeap();
return sketch;
}
/**
* Returns a sketch instance of this class from the given srcSeg,
* which must be a MemorySegment representation of this sketch class.
*
* @param <T> The type of item this sketch contains
* @param srcSeg a MemorySegment representation of a sketch of this class.
* @param serDe An instance of ArrayOfItemsSerDe
* @return a sketch instance of this class
*/
public static <T> VarOptItemsSketch<T> heapify(final MemorySegment srcSeg,
final ArrayOfItemsSerDe<T> serDe) {
final int numPreLongs = getAndCheckPreLongs(srcSeg);
final ResizeFactor rf = ResizeFactor.getRF(extractResizeFactor(srcSeg));
final int serVer = extractSerVer(srcSeg);
final int familyId = extractFamilyID(srcSeg);
final int flags = extractFlags(srcSeg);
final boolean isEmpty = (flags & EMPTY_FLAG_MASK) != 0;
final boolean isGadget = (flags & GADGET_FLAG_MASK) != 0;
// Check values
if (isEmpty) {
if (numPreLongs != VO_PRELONGS_EMPTY) {
throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_EMPTY
+ " for an empty sketch. Found: " + numPreLongs);
}
} else if ((numPreLongs != VO_PRELONGS_WARMUP)
&& (numPreLongs != VO_PRELONGS_FULL)) {
throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_WARMUP
+ " or " + VO_PRELONGS_FULL + " for a non-empty sketch. Found: " + numPreLongs);
}
if (serVer != VAROPT_SER_VER) {
throw new SketchesArgumentException(
"Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer);
}
final int reqFamilyId = Family.VAROPT.getID();
if (familyId != reqFamilyId) {
throw new SketchesArgumentException(
"Possible Corruption: FamilyID must be " + reqFamilyId + ": " + familyId);
}
final int k = extractK(srcSeg);
if (k < 1) {
throw new SketchesArgumentException("Possible Corruption: k must be at least 1: " + k);
}
if (isEmpty) {
assert numPreLongs == Family.VAROPT.getMinPreLongs();
return new VarOptItemsSketch<>(k, rf);
}
final long n = extractN(srcSeg);
if (n < 0) {
throw new SketchesArgumentException("Possible Corruption: n cannot be negative: " + n);
}
// get rest of preamble
final int hCount = extractHRegionItemCount(srcSeg);
final int rCount = extractRRegionItemCount(srcSeg);
if (hCount < 0) {
throw new SketchesArgumentException("Possible Corruption: H region count cannot be "
+ "negative: " + hCount);
}
if (rCount < 0) {
throw new SketchesArgumentException("Possible Corruption: R region count cannot be "
+ "negative: " + rCount);
}
double totalRWeight = 0.0;
if (numPreLongs == Family.VAROPT.getMaxPreLongs()) {
if (rCount > 0) {
totalRWeight = extractTotalRWeight(srcSeg);
if (Double.isNaN(totalRWeight) || (totalRWeight <= 0.0)) {
throw new SketchesArgumentException("Possible Corruption: deserializing in full mode "
+ "but invalid R region weight. Found r = " + rCount
+ ", R region weight = " + totalRWeight);
}
} else {
throw new SketchesArgumentException(
"Possible Corruption: "
+ Family.VAROPT.getMaxPreLongs() + " preLongs but no items in R region");
}
}
final int preLongBytes = numPreLongs << 3;
final int totalItems = hCount + rCount;
int allocatedItems = k + 1; // default to full
if (rCount == 0) {
// Not in sampling mode, so determine size to allocate, using ceilingLog2(hCount) as minimum
final int ceilingLgK = Util.exactLog2OfInt(Util.ceilingPowerOf2(k), "heapify");
final int minLgSize = Util.exactLog2OfInt(Util.ceilingPowerOf2(hCount), "heapify");
final int initialLgSize = SamplingUtil.startingSubMultiple(ceilingLgK, rf.lg(),
Math.max(minLgSize, MIN_LG_ARR_ITEMS));
allocatedItems = SamplingUtil.getAdjustedSize(k, 1 << initialLgSize);
if (allocatedItems == k) {
++allocatedItems;
}
}
// allocate full-sized ArrayLists, but we store only hCount weights at any moment
final long weightOffsetBytes = TOTAL_WEIGHT_R_DOUBLE + (rCount > 0 ? Double.BYTES : 0);
final ArrayList<Double> weightList = new ArrayList<>(allocatedItems);
final double[] wts = new double[allocatedItems];
MemorySegment.copy(srcSeg, JAVA_DOUBLE_UNALIGNED, weightOffsetBytes, wts, 0, hCount);
// can't use Arrays.asList(wts) since double[] rather than Double[]
for (int i = 0; i < hCount; ++ i) {
if (wts[i] <= 0.0) {
throw new SketchesArgumentException("Possible Corruption: "
+ "Non-positive weight in heapify(): " + wts[i]);
}
weightList.add(wts[i]);
}
// marks, if we have a gadget
long markBytes = 0;
int markCount = 0;
ArrayList<Boolean> markList = null;
if (isGadget) {
final long markOffsetBytes = preLongBytes + ((long) hCount * Double.BYTES);
markBytes = ArrayOfBooleansSerDe.computeBytesNeeded(hCount);
markList = new ArrayList<>(allocatedItems);
final ArrayOfBooleansSerDe booleansSerDe = new ArrayOfBooleansSerDe();
final Boolean[] markArray = booleansSerDe.deserializeFromMemorySegment(
srcSeg.asSlice(markOffsetBytes, (hCount >>> 3) + 1), 0, hCount);
for (final Boolean mark : markArray) {
if (mark) { ++markCount; }
}
markList.addAll(Arrays.asList(markArray));
}
final long offsetBytes = preLongBytes + ((long) hCount * Double.BYTES) + markBytes;
final T[] data = serDe.deserializeFromMemorySegment(srcSeg.asSlice(offsetBytes), 0, totalItems);
final List<T> wrappedData = Arrays.asList(data);
final ArrayList<T> dataList = new ArrayList<>(allocatedItems);
dataList.addAll(wrappedData.subList(0, hCount));
// Load items in R as needed
if (rCount > 0) {
weightList.add(-1.0); // the gap
if (isGadget) { markList.add(false); } // the gap
for (int i = 0; i < rCount; ++i) {
weightList.add(-1.0);
if (isGadget) { markList.add(false); }
}
dataList.add(null); // the gap
dataList.addAll(wrappedData.subList(hCount, totalItems));
}
final VarOptItemsSketch<T> sketch =
new VarOptItemsSketch<>(dataList, weightList, k, n,
allocatedItems, rf, hCount, rCount, totalRWeight);
if (isGadget) {
sketch.marks_ = markList;
sketch.numMarksInH_ = markCount;
}
return sketch;
}
/**
* Returns the sketch's value of <i>k</i>, the maximum number of samples stored in the
* sketch. The current number of items in the sketch may be lower.
*
* @return k, the maximum number of samples in the sketch
*/
public int getK() {
return k_;
}
/**
* Returns the number of items processed from the input stream
*
* @return n, the number of stream items the sketch has seen
*/
public long getN() {
return n_;
}
/**
* Returns the current number of items in the sketch, which may be smaller than the
* sketch capacity.
*
* @return the number of items currently in the sketch
*/
public int getNumSamples() {
return Math.min(k_, h_ + r_);
}
/**
* Gets a result iterator object.
* @return An object with an iterator over the results
*/
public VarOptItemsSamples<T> getSketchSamples() {
return new VarOptItemsSamples<>(this);
}
/**
* Randomly decide whether or not to include an item in the sample set.
*
* @param item an item of the set being sampled from
* @param weight a strictly positive weight associated with the item
*/
public void update(final T item, final double weight) {
update(item, weight, false);
}
/**
* Resets this sketch to the empty state, but retains the original value of k.
*/
public void reset() {
final int ceilingLgK = Util.exactLog2OfInt(Util.ceilingPowerOf2(k_), "VarOptItemsSketch");
final int initialLgSize =
SamplingUtil.startingSubMultiple(ceilingLgK, rf_.lg(), MIN_LG_ARR_ITEMS);
currItemsAlloc_ = SamplingUtil.getAdjustedSize(k_, 1 << initialLgSize);
if (currItemsAlloc_ == k_) {
++currItemsAlloc_;
}
data_ = new ArrayList<>(currItemsAlloc_);
weights_ = new ArrayList<>(currItemsAlloc_);
if (marks_ != null) {
marks_ = new ArrayList<>(currItemsAlloc_);
}
n_ = 0;
h_ = 0;
m_ = 0;
r_ = 0;
numMarksInH_ = 0;
totalWtR_ = 0.0;
}
/**
* Returns a human-readable summary of the sketch.
*
* @return A string version of the sketch summary
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
final String thisSimpleName = this.getClass().getSimpleName();
sb.append(LS);
sb.append("### ").append(thisSimpleName).append(" SUMMARY: ").append(LS);
sb.append(" k : ").append(k_).append(LS);
sb.append(" h : ").append(h_).append(LS);
sb.append(" r : ").append(r_).append(LS);
sb.append(" weight_r : ").append(totalWtR_).append(LS);
sb.append(" Current size : ").append(currItemsAlloc_).append(LS);
sb.append(" Resize factor: ").append(rf_).append(LS);
sb.append("### END SKETCH SUMMARY").append(LS);
return sb.toString();
}
/**
* Returns a human readable string of the preamble of a byte array image of a VarOptItemsSketch.
* @param byteArr the given byte array
* @return a human readable string of the preamble of a byte array image of a VarOptItemsSketch.
*/
public static String toString(final byte[] byteArr) {
return PreambleUtil.preambleToString(byteArr);
}
/**
* Returns a human readable string of the preamble of a MemorySegment image of a VarOptItemsSketch.
* @param seg the given MemorySegment
* @return a human readable string of the preamble of a MemorySegment image of a VarOptItemsSketch.
*/
public static String toString(final MemorySegment seg) {
return PreambleUtil.preambleToString(seg);
}
/**
* Returns a byte array representation of this sketch. May fail for polymorphic item types.
*
* @param serDe An instance of ArrayOfItemsSerDe
* @return a byte array representation of this sketch
*/
public byte[] toByteArray(final ArrayOfItemsSerDe<? super T> serDe) {
if ((r_ == 0) && (h_ == 0)) {
// null class is ok since empty -- no need to call serDe
return toByteArray(serDe, null);
} else {
final int validIndex = (h_ == 0 ? 1 : 0);
final Class<?> clazz = data_.get(validIndex).getClass();
return toByteArray(serDe, clazz);
}
}
/**
* Returns a byte array representation of this sketch. Copies contents into an array of the
* specified class for serialization to allow for polymorphic types.
*
* @param serDe An instance of ArrayOfItemsSerDe
* @param clazz The class represented by <T>
* @return a byte array representation of this sketch
*/
// bytes will be null only if empty == true
public byte[] toByteArray(final ArrayOfItemsSerDe<? super T> serDe, final Class<?> clazz) {
final int preLongs, numMarkBytes, outBytes;
final boolean empty = (r_ == 0) && (h_ == 0);
byte[] itemBytes = null; // for serialized items from serDe
int flags = marks_ == null ? 0 : GADGET_FLAG_MASK;
if (empty) {
preLongs = Family.VAROPT.getMinPreLongs();
outBytes = Family.VAROPT.getMinPreLongs() << 3; // only contains the minimum header info
flags |= EMPTY_FLAG_MASK;
} else {
preLongs = (r_ == 0 ? PreambleUtil.VO_PRELONGS_WARMUP : Family.VAROPT.getMaxPreLongs());
itemBytes = serDe.serializeToByteArray(getDataSamples(clazz));
numMarkBytes = marks_ == null ? 0 : ArrayOfBooleansSerDe.computeBytesNeeded(h_);
outBytes = (preLongs << 3) + (h_ * Double.BYTES) + numMarkBytes + itemBytes.length;
}
final byte[] outArr = new byte[outBytes];
final MemorySegment seg = MemorySegment.ofArray(outArr);
// build first preLong
PreambleUtil.insertPreLongs(seg, preLongs); // Byte 0
PreambleUtil.insertLgResizeFactor(seg, rf_.lg());
PreambleUtil.insertSerVer(seg, VAROPT_SER_VER); // Byte 1
PreambleUtil.insertFamilyID(seg, Family.VAROPT.getID()); // Byte 2
PreambleUtil.insertFlags(seg, flags); // Byte 3
PreambleUtil.insertK(seg, k_); // Bytes 4-7
if (!empty) {
PreambleUtil.insertN(seg, n_); // Bytes 8-15
PreambleUtil.insertHRegionItemCount(seg, h_); // Bytes 16-19
PreambleUtil.insertRRegionItemCount(seg, r_); // Bytes 20-23
if (r_ > 0) {
PreambleUtil.insertTotalRWeight(seg, totalWtR_); // Bytes 24-31
}
// write the first h_ weights
int offset = preLongs << 3;
for (int i = 0; i < h_; ++i) {
seg.set(JAVA_DOUBLE_UNALIGNED, offset, weights_.get(i));
offset += Double.BYTES;
}
// write the first h_ marks, iff we have a gadget
if (marks_ != null) {
final byte[] markBytes;
markBytes = MARK_SERDE.serializeToByteArray(marks_.subList(0, h_).toArray(new Boolean[0]));
MemorySegment.copy(markBytes, 0, seg, JAVA_BYTE, offset, markBytes.length);
offset += markBytes.length;
}
// write the sample items, using offset from earlier
MemorySegment.copy(itemBytes, 0, seg, JAVA_BYTE, offset, itemBytes.length);
}
return outArr;
}
/**
* Computes an estimated subset sum from the entire stream for objects matching a given
* predicate. Provides a lower bound, estimate, and upper bound using a target of 2 standard
* deviations.
*
* <p>This is technically a heuristic method, and tries to err on the conservative side.</p>
*
* @param predicate A predicate to use when identifying items.
* @return A summary object containing the estimate, upper and lower bounds, and the total
* sketch weight.
*/
public SampleSubsetSummary estimateSubsetSum(final Predicate<T> predicate) {
if (n_ == 0) {
return new SampleSubsetSummary(0.0, 0.0, 0.0, 0.0);
}
double totalWtH = 0.0;
double hTrueWeight = 0.0;
int idx = 0;
for (; idx < h_; ++idx) {
final double wt = weights_.get(idx);
totalWtH += wt;
if (predicate.test(data_.get(idx))) {
hTrueWeight += wt;
}
}
// if only heavy items, we have an exact answer
if (r_ == 0) {
return new SampleSubsetSummary(hTrueWeight, hTrueWeight, hTrueWeight, hTrueWeight);
}
final long numSampled = n_ - h_;
assert numSampled > 0;
final double effectiveSamplingRate = r_ / (double) numSampled;
assert effectiveSamplingRate >= 0.0;
assert effectiveSamplingRate <= 1.0;
int rTrueCount = 0;
++idx; // skip the gap
for (; idx < (k_ + 1); ++idx) {
if (predicate.test(data_.get(idx))) {
++rTrueCount;
}
}
final double lbTrueFraction = pseudoHypergeometricLBonP(r_, rTrueCount, effectiveSamplingRate);
final double estimatedTrueFraction = (1.0 * rTrueCount) / r_;
final double ubTrueFraction = pseudoHypergeometricUBonP(r_, rTrueCount, effectiveSamplingRate);
return new SampleSubsetSummary(
hTrueWeight + (totalWtR_ * lbTrueFraction),
hTrueWeight + (totalWtR_ * estimatedTrueFraction),
hTrueWeight + (totalWtR_ * ubTrueFraction),
totalWtH + totalWtR_);
}
/**
* Returns a VarOptItemsSketch.Result structure containing the items and weights in separate
* lists. The returned list lengths may be smaller than the total capacity.
*
* @return A Result object containing items and weights.
*/
Result getSamplesAsArrays() {
if ((r_ + h_) == 0) {
return null;
}
final int validIndex = (h_ == 0 ? 1 : 0);
final Class<?> clazz = data_.get(validIndex).getClass();
return getSamplesAsArrays(clazz);
}
/**
* Creates a copy of the sketch, optionally discarding any information about marks that would
* indicate the class's use as a union gadget as opposed to a valid sketch.
*
* @param asSketch If true, copies as a sketch; if false, copies as a union gadget
* @param adjustedN Target value of n for the resulting sketch. Ignored if negative.
* @return A copy of the sketch.
*/
VarOptItemsSketch<T> copyAndSetN(final boolean asSketch, final long adjustedN) {
final VarOptItemsSketch<T> sketch;
sketch = new VarOptItemsSketch<>(data_, weights_, k_,n_,
currItemsAlloc_, rf_, h_, r_, totalWtR_);
if (!asSketch) {
sketch.marks_ = marks_;
sketch.numMarksInH_ = numMarksInH_;
}
if (adjustedN >= 0) {
sketch.n_ = adjustedN;
}
return sketch;
}
/**
* Strips the mark array from the object, making what had been a gadget indistinguishable form
* a sketch. Avoids an extra copy.
*/
void stripMarks() {
assert marks_ != null;
numMarksInH_ = 0;
marks_ = null;
}
/**
* Returns a VarOptItemsSketch.Result structure containing the items and weights in separate
* lists. The returned list lengths may be smaller than the total capacity.
*
* <p>This method allocates an array of class <em>clazz</em>, which must either match or
* be parent of T. This method should be used when objects in the array are all instances of T
* but are not necessarily instances of the base class.</p>
*
* @param clazz A class to which the items are cast before returning
* @return A Result object containing items and weights.
*/
@SuppressWarnings("unchecked")
Result getSamplesAsArrays(final Class<?> clazz) {
if ((r_ + h_) == 0) {
return null;
}
// are Array.asList(data_.subList()) copies better?
final int numSamples = getNumSamples();
final T[] prunedItems = (T[]) Array.newInstance(clazz, numSamples);
final double[] prunedWeights = new double[numSamples];
int j = 0;
final double rWeight = totalWtR_ / r_;
for (int i = 0; j < numSamples; ++i) {
final T item = data_.get(i);
if (item != null) {
prunedItems[j] = item;
prunedWeights[j] = (weights_.get(i) > 0 ? weights_.get(i) : rWeight);
++j;
}
}
final Result output = new Result();
output.items = prunedItems;
output.weights = prunedWeights;
return output;
}
// package-private getters
// package-private: Relies on ArrayList for bounds checking and assumes caller knows how to handle
// a null from the middle of the list
T getItem(final int idx) {
return data_.get(idx);
}
// package-private: Relies on ArrayList for bounds checking and assumes caller knows how to handle
// a negative value (whether from the null in the middle or an R-region item)
double getWeight(final int idx) {
return weights_.get(idx);
}
// package-private: Relies on ArrayList for bounds checking and assumes caller knows how to
// handle a null from the middle of the list.
boolean getMark(final int idx) { return marks_.get(idx); }
int getHRegionCount() {
return h_;
}
int getRRegionCount() { return r_; }
int getNumMarksInH() { return numMarksInH_; }
// Needed by result object and for unioning
double getTau() {
return r_ == 0 ? Double.NaN : (totalWtR_ / r_);
}
double getTotalWtR() {
return totalWtR_;
}
// package-private setter, used to resolve gadget into sketch during union
void forceSetK(final int k) {
assert k > 0;
k_ = k;
}
/**
* Internal implementation of update() which requires the user to know if an item is
* marked as coming from the reservoir region of a sketch. The marks are used only in
* merging.
*
* @param item an item of the set being sampled from
* @param weight a strictly positive weight associated with the item
* @param mark true if an item comes from a sketch's reservoir region
*/
void update(final T item, final double weight, final boolean mark) {
if (item == null) {
return;
}
if (weight <= 0.0 || Double.isNaN(weight) || Double.isInfinite(weight)) {
throw new SketchesArgumentException("Item weights must be strictly positive and finite number: "
+ weight + ", for item " + item.toString());
}
++n_;
if (r_ == 0) {
// exact mode
updateWarmupPhase(item, weight, mark);
} else {
// sketch is in estimation mode, so we can make the following check
assert (h_ == 0) || (peekMin() >= getTau());
// what tau would be if deletion candidates turn out to be R plus the new item
// note: (r_ + 1) - 1 is intentional
final double hypotheticalTau = (weight + totalWtR_) / ((r_ + 1) - 1);
// is new item's turn to be considered for reservoir?
final boolean condition1 = (h_ == 0) || (weight <= peekMin());
// is new item light enough for reservoir?
final boolean condition2 = weight < hypotheticalTau;
if (condition1 && condition2) {
updateLight(item, weight, mark);
} else if (r_ == 1) {
updateHeavyREq1(item, weight, mark);
} else {
updateHeavyGeneral(item, weight, mark);
}
}
}
/**
* Decreases sketch's value of k by 1, updating stored values as needed.
*
* <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
* the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
*/
void decreaseKBy1() {
if (k_ <= 1) {
throw new SketchesStateException("Cannot decrease k below 1 in union");
}
if ((h_ == 0) && (r_ == 0)) {
// exact mode, but no data yet; this reduction is somewhat gratuitous
--k_;
} else if ((h_ > 0) && (r_ == 0)) {
// exact mode, but we have some data
--k_;
if (h_ > k_) {
transitionFromWarmup();
}
} else if ((h_ > 0) && (r_ > 0)) {
// reservoir mode, but we have some exact samples.
// Our strategy will be to pull an item out of H (which we are allowed to do since it's
// still just data), reduce k, and then re-insert the item
// first, slide the R zone to the left by 1, temporarily filling the gap
final int oldGapIdx = h_;
final int oldFinalRIdx = (h_ + 1 + r_) - 1;
assert oldFinalRIdx == k_;
swapValues(oldFinalRIdx, oldGapIdx);
// now we pull an item out of H; any item is ok, but if we grab the rightmost and then
// reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
// the push() of the item that will probably happen later will be cheap.
final int pulledIdx = h_ - 1;
final T pulledItem = data_.get(pulledIdx);
final double pulledWeight = weights_.get(pulledIdx);
final boolean pulledMark = marks_.get(pulledIdx);
if (pulledMark) { --numMarksInH_; }
weights_.set(pulledIdx, -1.0); // to make bugs easier to spot
--h_;
--k_;
--n_; // will be re-incremented with the update
update(pulledItem, pulledWeight, pulledMark);
} else if ((h_ == 0) && (r_ > 0)) {
// pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
assert r_ >= 2;
final int rIdxToDelete = 1 + SamplingUtil.rand().nextInt(r_); // 1 for the gap
final int rightmostRIdx = (1 + r_) - 1;
swapValues(rIdxToDelete, rightmostRIdx);
weights_.set(rightmostRIdx, -1.0);
--k_;
--r_;
}
}
/* In the "light" case the new item has weight <= old_tau, so
would appear to the right of the R items in a hypothetical reverse-sorted
list. It is easy to prove that it is light enough to be part of this
round's downsampling */
private void updateLight(final T item, final double weight, final boolean mark) {
assert r_ >= 1;
assert (r_ + h_) == k_;
final int mSlot = h_; // index of the gap, which becomes the M region
data_.set(mSlot, item);
weights_.set(mSlot, weight);
if (marks_ != null) { marks_.set(mSlot, mark); }
++m_;
growCandidateSet(totalWtR_ + weight, r_ + 1);
}
/* In the "heavy" case the new item has weight > old_tau, so would
appear to the left of items in R in a hypothetical reverse-sorted list and
might or might not be light enough be part of this round's downsampling.
[After first splitting off the R=1 case] we greatly simplify the code by
putting the new item into the H heap whether it needs to be there or not.
In other words, it might go into the heap and then come right back out,
but that should be okay because pseudo_heavy items cannot predominate
in long streams unless (max wt) / (min wt) > o(exp(N)) */
private void updateHeavyGeneral(final T item, final double weight, final boolean mark) {
assert m_ == 0;
assert r_ >= 2;
assert (r_ + h_) == k_;
// put into H, although may come back out momentarily
push(item, weight, mark);
growCandidateSet(totalWtR_, r_);
}
/* The analysis of this case is similar to that of the general heavy case.
The one small technical difference is that since R < 2, we must grab an M item
to have a valid starting point for continue_by_growing_candidate_set () */
private void updateHeavyREq1(final T item, final double weight, final boolean mark) {
assert m_ == 0;
assert r_ == 1;
assert (r_ + h_) == k_;
push(item, weight, mark); // new item into H
popMinToMRegion(); // pop lightest back into M
// Any set of two items is downsample-able to one item,
// so the two lightest items are a valid starting point for the following
final int mSlot = k_ - 1; // array is k+1, 1 in R, so slot before is M
growCandidateSet(weights_.get(mSlot) + totalWtR_, 2);
}
private void updateWarmupPhase(final T item, final double wt, final boolean mark) {
assert r_ == 0;
assert m_ == 0;
assert h_ <= k_;
if (h_ >= currItemsAlloc_) {
growDataArrays();
}
// store items as they come in, until full
data_.add(h_, item);
weights_.add(h_, wt);
if (marks_ != null) { marks_.add(h_, mark); }
++h_;
numMarksInH_ += mark ? 1 : 0;
// check if need to heapify
if (h_ > k_) {
transitionFromWarmup();
}
}