-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathbowdiff.go
More file actions
73 lines (65 loc) · 2.03 KB
/
Copy pathbowdiff.go
File metadata and controls
73 lines (65 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package bow
import (
"fmt"
"sync"
)
// Diff calculates the first discrete difference of each row compared with the previous row.
// If any of the current or the previous row is nil, the result will be nil.
// For boolean columns, XOR operation is used.
// TODO: directly mutate bow && only read currVal at each iteration for performance improvement
func (b *bow) Diff(colIndices ...int) (Bow, error) {
selectedCols, err := selectCols(b, colIndices)
if err != nil {
return nil, err
}
for colIndex, col := range b.Schema().Fields() {
switch b.ColumnType(colIndex) {
case Int64:
case Float64:
case Boolean:
default:
return nil, fmt.Errorf(
"column '%s' is of unsupported type '%s'",
col.Name, b.ColumnType(colIndex))
}
}
var wg sync.WaitGroup
calcSeries := make([]Series, b.NumCols())
for colIndex, col := range b.Schema().Fields() {
if !selectedCols[colIndex] {
calcSeries[colIndex] = b.NewSeriesFromCol(colIndex)
continue
}
wg.Add(1)
go func(colIndex int, colName string) {
defer wg.Done()
colType := b.ColumnType(colIndex)
colBuf := b.NewBufferFromCol(colIndex)
calcBuf := NewBuffer(b.NumRows(), colType)
for rowIndex := 1; rowIndex < b.NumRows(); rowIndex++ {
valid := b.Column(colIndex).IsValid(rowIndex) &&
b.Column(colIndex).IsValid(rowIndex-1)
if !valid {
continue
}
switch colType {
case Int64:
currVal := colBuf.GetValue(rowIndex).(int64)
prevVal := colBuf.GetValue(rowIndex - 1).(int64)
calcBuf.SetOrDrop(rowIndex, currVal-prevVal)
case Float64:
currVal := colBuf.GetValue(rowIndex).(float64)
prevVal := colBuf.GetValue(rowIndex - 1).(float64)
calcBuf.SetOrDrop(rowIndex, currVal-prevVal)
case Boolean:
currVal := colBuf.GetValue(rowIndex).(bool)
prevVal := colBuf.GetValue(rowIndex - 1).(bool)
calcBuf.SetOrDrop(rowIndex, currVal != prevVal)
}
}
calcSeries[colIndex] = NewSeriesFromBuffer(colName, calcBuf)
}(colIndex, col.Name)
}
wg.Wait()
return NewBowWithMetadata(b.Metadata(), calcSeries...)
}