diff --git a/spq_test.go b/spq_test.go index cf3377824..7690beb04 100644 --- a/spq_test.go +++ b/spq_test.go @@ -2,12 +2,14 @@ package super_test import ( "bytes" + "context" "errors" "fmt" "os" "path/filepath" "regexp" "runtime/debug" + "slices" "strconv" "strings" "testing" @@ -16,11 +18,14 @@ import ( "github.com/brimdata/super/compiler" "github.com/brimdata/super/compiler/parser" "github.com/brimdata/super/runtime" + "github.com/brimdata/super/runtime/exec" "github.com/brimdata/super/sbuf" "github.com/brimdata/super/sio" "github.com/brimdata/super/sio/anyio" "github.com/brimdata/super/sio/arrowio" "github.com/brimdata/super/sio/bsupio" + "github.com/brimdata/super/sio/supio" + "github.com/brimdata/super/vector" "github.com/brimdata/super/vector/vio" "github.com/brimdata/super/ztest" "github.com/stretchr/testify/assert" @@ -45,6 +50,7 @@ func TestSPQ(t *testing.T) { runAllBoomerangs(t, "csup", data) runAllBoomerangs(t, "parquet", data) runAllBoomerangs(t, "sup", data) + runAllFusionBoomerangs(t, data) }) for d := range dirs { @@ -198,6 +204,140 @@ func runOneBoomerang(t *testing.T, format, data string) { require.Equal(t, baseline.String(), boomerang.String(), "baseline and boomerang differ") } +func runAllFusionBoomerangs(t *testing.T, data map[string]string) { + t.Run("fusion", func(t *testing.T) { + t.Parallel() + for name, data := range data { + t.Run(name, func(t *testing.T) { + t.Parallel() + runOneFusionBoomerang(t, data) + }) + } + }) +} + +func runOneFusionBoomerang(t *testing.T, data string) { + // Create an auto-detecting reader for data. + dataSctx := super.NewContext() + dataReader, err := anyio.NewReader(dataSctx, strings.NewReader(data), anyio.ReaderOpts{}) + require.NoError(t, err) + defer dataReader.Close() + + // Serialize non-fusion values from dataReader to baseline as SUP. + r := &fusionRemovingReader{dataReader, hasFusion{}} + puller := sbuf.NewDematerializer(dataSctx, sbuf.NewPuller(r)) + baseline, err := serialize(puller, "sup") + require.NoError(t, err) + if baseline == "" { + t.Skip("skipping because data contains no non-fusion values") + } + + samBoomerang, err := fuseDefuse(t.Context(), baseline, exec.RuntimeSAM) + if assert.NoError(t, err) { + assert.Equal(t, baseline, samBoomerang, "baseline and boomerang differ for sam") + } + + vamBoomerang, err := fuseDefuse(t.Context(), baseline, exec.RuntimeVAM) + if assert.NoError(t, err) { + assert.Equal(t, baseline, vamBoomerang, "baseline and boomerang differ for vam") + } +} + +func fuseDefuse(ctx context.Context, s string, runtime exec.Runtime) (string, error) { + sctx := super.NewContext() + r := supio.NewReader(sctx, strings.NewReader(s)) + q, err := newQuery(ctx, sctx, runtime, "fuse | defuse(this)", r) + if err != nil { + return "", err + } + defer q.Pull(true) + return serialize(q, "sup") +} + +func newQuery(ctx context.Context, sctx *super.Context, rt exec.Runtime, spq string, r sio.Reader) (vio.Puller, error) { + ast, err := parser.ParseText(spq) + if err != nil { + return nil, err + } + e := exec.NewEnvironment(nil, nil) + e.Runtime = rt + rctx := runtime.NewContext(ctx, sctx) + q, err := compiler.NewCompilerWithEnv(e).NewQuery(rctx, ast, []sio.Reader{r}, 0) + if err != nil { + return nil, err + } + return &unlabeler{q}, nil +} + +type unlabeler struct { + vio.Puller +} + +func (u *unlabeler) Pull(done bool) (vector.Any, error) { + vec, err := u.Puller.Pull(done) + vec, _ = vector.Unlabel(vec) + return vec, err +} + +func serialize(p vio.Puller, outputFormat string) (string, error) { + var b strings.Builder + w, err := anyio.NewWriter(sio.NopCloser(&b), anyio.WriterOpts{Format: outputFormat}) + if err != nil { + return "", err + } + err = vio.Copy(w, p) + err2 := w.Close() + return b.String(), errors.Join(err, err2) +} + +type fusionRemovingReader struct { + r sio.Reader + h hasFusion +} + +func (f *fusionRemovingReader) Read() (*super.Value, error) { + for { + val, err := f.r.Read() + if val != nil && err == nil { + if val.IsQuiet() || f.h.hasFusion(val.Type()) { + continue + } + } + return val, err + } +} + +type hasFusion map[super.Type]bool + +func (h hasFusion) hasFusion(typ super.Type) bool { + if has, ok := h[typ]; ok { + return has + } + var has bool + switch typ := typ.(type) { + case *super.TypeRecord: + has = slices.ContainsFunc(typ.Fields, func(f super.Field) bool { return h.hasFusion(f.Type) }) + case *super.TypeArray: + has = h.hasFusion(typ.Type) + case *super.TypeSet: + has = h.hasFusion(typ.Type) + case *super.TypeMap: + has = h.hasFusion(typ.KeyType) || h.hasFusion(typ.ValType) + case *super.TypeUnion: + has = slices.ContainsFunc(typ.Types, h.hasFusion) + case *super.TypeError: + has = h.hasFusion(typ.Type) + case *super.TypeFusion: + has = true + case *super.TypeNamed: + // Store false to prevent infinite recursion. + h[typ] = false + has = h.hasFusion(typ.Type) + } + h[typ] = has + return has +} + // If there's a problem with panics in the boomerangs, the Skip() can be commented // out and this test run to have each input loaded in a test to see where the problem lies. func TestLoad(t *testing.T) {