Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions spq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package super_test

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"runtime/debug"
"slices"
"strconv"
"strings"
"testing"
Expand All @@ -16,11 +18,14 @@ import (
"github.com/brimdata/super/compiler"
"github.com/brimdata/super/compiler/parser"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/exec"
"github.com/brimdata/super/sbuf"
"github.com/brimdata/super/sio"
"github.com/brimdata/super/sio/anyio"
"github.com/brimdata/super/sio/arrowio"
"github.com/brimdata/super/sio/bsupio"
"github.com/brimdata/super/sio/supio"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/vector/vio"
"github.com/brimdata/super/ztest"
"github.com/stretchr/testify/assert"
Expand All @@ -45,6 +50,7 @@ func TestSPQ(t *testing.T) {
runAllBoomerangs(t, "csup", data)
runAllBoomerangs(t, "parquet", data)
runAllBoomerangs(t, "sup", data)
runAllFusionBoomerangs(t, data)
})

for d := range dirs {
Expand Down Expand Up @@ -198,6 +204,140 @@ func runOneBoomerang(t *testing.T, format, data string) {
require.Equal(t, baseline.String(), boomerang.String(), "baseline and boomerang differ")
}

func runAllFusionBoomerangs(t *testing.T, data map[string]string) {
t.Run("fusion", func(t *testing.T) {
t.Parallel()
for name, data := range data {
t.Run(name, func(t *testing.T) {
t.Parallel()
runOneFusionBoomerang(t, data)
})
}
})
}

func runOneFusionBoomerang(t *testing.T, data string) {
// Create an auto-detecting reader for data.
dataSctx := super.NewContext()
dataReader, err := anyio.NewReader(dataSctx, strings.NewReader(data), anyio.ReaderOpts{})
require.NoError(t, err)
defer dataReader.Close()

// Serialize non-fusion values from dataReader to baseline as SUP.
r := &fusionRemovingReader{dataReader, hasFusion{}}
puller := sbuf.NewDematerializer(dataSctx, sbuf.NewPuller(r))
baseline, err := serialize(puller, "sup")
require.NoError(t, err)
if baseline == "" {
t.Skip("skipping because data contains no non-fusion values")
}

samBoomerang, err := fuseDefuse(t.Context(), baseline, exec.RuntimeSAM)
if assert.NoError(t, err) {
assert.Equal(t, baseline, samBoomerang, "baseline and boomerang differ for sam")
}

vamBoomerang, err := fuseDefuse(t.Context(), baseline, exec.RuntimeVAM)
if assert.NoError(t, err) {
assert.Equal(t, baseline, vamBoomerang, "baseline and boomerang differ for vam")
}
}

func fuseDefuse(ctx context.Context, s string, runtime exec.Runtime) (string, error) {
sctx := super.NewContext()
r := supio.NewReader(sctx, strings.NewReader(s))
q, err := newQuery(ctx, sctx, runtime, "fuse | defuse(this)", r)
if err != nil {
return "", err
}
defer q.Pull(true)
return serialize(q, "sup")
}

func newQuery(ctx context.Context, sctx *super.Context, rt exec.Runtime, spq string, r sio.Reader) (vio.Puller, error) {
ast, err := parser.ParseText(spq)
if err != nil {
return nil, err
}
e := exec.NewEnvironment(nil, nil)
e.Runtime = rt
rctx := runtime.NewContext(ctx, sctx)
q, err := compiler.NewCompilerWithEnv(e).NewQuery(rctx, ast, []sio.Reader{r}, 0)
if err != nil {
return nil, err
}
return &unlabeler{q}, nil
}

type unlabeler struct {
vio.Puller
}

func (u *unlabeler) Pull(done bool) (vector.Any, error) {
vec, err := u.Puller.Pull(done)
vec, _ = vector.Unlabel(vec)
return vec, err
}

func serialize(p vio.Puller, outputFormat string) (string, error) {
var b strings.Builder
w, err := anyio.NewWriter(sio.NopCloser(&b), anyio.WriterOpts{Format: outputFormat})
if err != nil {
return "", err
}
err = vio.Copy(w, p)
err2 := w.Close()
return b.String(), errors.Join(err, err2)
}

type fusionRemovingReader struct {
r sio.Reader
h hasFusion
}

func (f *fusionRemovingReader) Read() (*super.Value, error) {
for {
val, err := f.r.Read()
if val != nil && err == nil {
if val.IsQuiet() || f.h.hasFusion(val.Type()) {
continue
}
}
return val, err
}
}

type hasFusion map[super.Type]bool

func (h hasFusion) hasFusion(typ super.Type) bool {
if has, ok := h[typ]; ok {
return has
}
var has bool
switch typ := typ.(type) {
case *super.TypeRecord:
has = slices.ContainsFunc(typ.Fields, func(f super.Field) bool { return h.hasFusion(f.Type) })
case *super.TypeArray:
has = h.hasFusion(typ.Type)
case *super.TypeSet:
has = h.hasFusion(typ.Type)
case *super.TypeMap:
has = h.hasFusion(typ.KeyType) || h.hasFusion(typ.ValType)
case *super.TypeUnion:
has = slices.ContainsFunc(typ.Types, h.hasFusion)
case *super.TypeError:
has = h.hasFusion(typ.Type)
case *super.TypeFusion:
has = true
case *super.TypeNamed:
// Store false to prevent infinite recursion.
h[typ] = false
has = h.hasFusion(typ.Type)
}
h[typ] = has
return has
}

// If there's a problem with panics in the boomerangs, the Skip() can be commented
// out and this test run to have each input loaded in a test to see where the problem lies.
func TestLoad(t *testing.T) {
Expand Down
Loading