Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions book/src/super-sql/operators/fuse.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Because all values of the input must be read to compute the fused type,
---

_Fuse two records_
```mdtest-spq
```mdtest-spq fusion
# spq
fuse
# input
Expand All @@ -43,7 +43,7 @@ fusion({a?:_::int64,b?:2},<{b:int64}>)
---

_Fuse records with type variation_
```mdtest-spq
```mdtest-spq fusion
# spq
fuse
# input
Expand All @@ -57,7 +57,7 @@ fuse
---

_Fuse records with complex type variation_
```mdtest-spq {data-layout="stacked"}
```mdtest-spq fusion {data-layout="stacked"}
# spq
fuse
# input
Expand Down
1 change: 1 addition & 0 deletions cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (f *Flags) SetFormatFlags(fs *flag.FlagSet) {
fs.BoolVar(&f.forceBinary, "B", false, "allow Super Binary to be sent to a terminal output")
fs.BoolVar(&f.jsonPretty, "J", false, "use formatted JSON output independent of -f option")
fs.BoolVar(&f.jsonShortcut, "j", false, "use line-oriented JSON output independent of -f option")
fs.BoolVar(&f.SUP.Fusion, "fusion", false, "display fusion values (fusion values are otherwise auto-defused)")
fs.BoolVar(&f.supPretty, "S", false, "use formatted Super JSON output independent of -f option")
fs.BoolVar(&f.supShortcut, "s", false, "use line-oriented Super JSON output independent of -f option")
}
Expand Down
2 changes: 1 addition & 1 deletion db/ztests/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ script: |
super db load -q -use poolB b.sup
super db -S -c 'from :pools | drop id | sort name | drop ts'
echo ===
super db -S -c 'from poolA@main:objects | {nameof:nameof(this),...this} | drop id'
super db -fusion -S -c 'from poolA@main:objects | {nameof:nameof(this),...this} | drop id'
super db -S -c 'from poolA:log | cut nameof(this)'
inputs:
Expand Down
4 changes: 4 additions & 0 deletions mdtest/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,14 @@ func parseMarkdown(source []byte) (map[string]string, []*Test, error) {
})
case "mdtest-spq":
var fails bool
var fusion bool
var runtime string
for _, word := range fcbInfoWords(fcb, source)[1:] {
switch {
case word == "fails":
fails = true
case word == "fusion":
fusion = true
case strings.HasPrefix(word, "runtime="):
runtime = strings.TrimPrefix(word, "runtime=")
if runtime != "vam" && runtime != "sam" {
Expand All @@ -263,6 +266,7 @@ func parseMarkdown(source []byte) (map[string]string, []*Test, error) {
Input: sections[2],
SPQ: sections[1],
Runtime: runtime,
Fusion: fusion,
})
}
return ast.WalkContinue, nil
Expand Down
8 changes: 6 additions & 2 deletions mdtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type Test struct {
Runtime string // "sam", "vam", or "" for both

// For SPQ tests
Input string
SPQ string
Input string
SPQ string
Fusion bool // If true do not auto-defuse output
}

// Run runs the test, returning nil on success.
Expand Down Expand Up @@ -52,6 +53,9 @@ func (t *Test) run(runtime string) error {
var c *exec.Cmd
if t.SPQ != "" {
c = exec.Command("super", "-s", "-c", t.SPQ)
if t.Fusion {
c.Args = append(c.Args, "-fusion")
}
if s := t.Input; strings.TrimSpace(s) != "" {
c.Args = append(c.Args, "-")
c.Stdin = strings.NewReader(s)
Expand Down
16 changes: 4 additions & 12 deletions runtime/sam/expr/function/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"fmt"
"net/url"
"strconv"
"strings"

"github.com/brimdata/super"
"github.com/brimdata/super/sio/supio"
"github.com/brimdata/super/pkg/byteconv"
"github.com/brimdata/super/sup"
)

Expand Down Expand Up @@ -86,13 +85,10 @@ func (p *ParseURI) Call(args []super.Value) super.Value {

type ParseSUP struct {
sctx *super.Context
sr *strings.Reader
zr *supio.Reader
}

func newParseSUP(sctx *super.Context) *ParseSUP {
var sr strings.Reader
return &ParseSUP{sctx, &sr, supio.NewReader(sctx, &sr)}
return &ParseSUP{sctx}
}

func (p *ParseSUP) Call(args []super.Value) super.Value {
Expand All @@ -103,13 +99,9 @@ func (p *ParseSUP) Call(args []super.Value) super.Value {
if !in.IsString() {
return p.sctx.WrapError("parse_sup: string arg required", args[0])
}
p.sr.Reset(super.DecodeString(in.Bytes()))
val, err := p.zr.Read()
val, err := sup.ParseValue(p.sctx, byteconv.UnsafeString(in.Bytes()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change creates a lot of extra work for the garbage collector on each parse_sup() call. I wouldn't cry about that if there were no other way to avoid the import cycle, but how about doing it in anyio instead of supio?

--- a/sio/anyio/writer.go
+++ b/sio/anyio/writer.go
@@ -4,7 +4,11 @@ import (
        "fmt"
        "io"

+       "github.com/brimdata/super"
        "github.com/brimdata/super/csup"
+       "github.com/brimdata/super/runtime/sam/expr"
+       "github.com/brimdata/super/runtime/sam/expr/function"
+       "github.com/brimdata/super/sbuf"
        "github.com/brimdata/super/sio/arrowio"
        "github.com/brimdata/super/sio/bsupio"
        "github.com/brimdata/super/sio/csvio"
@@ -20,12 +24,13 @@ import (
 )

 type WriterOpts struct {
-       Format string
-       BSUP   *bsupio.WriterOpts // Nil means use defaults via bsupio.NewWriter.
-       CSV    csvio.WriterOpts
-       DB     dbio.WriterOpts
-       JSON   jsonio.WriterOpts
-       SUP    supio.WriterOpts
+       Format    string
+       SUPFusion bool
+       BSUP      *bsupio.WriterOpts // Nil means use defaults via bsupio.NewWriter.
+       CSV       csvio.WriterOpts
+       DB        dbio.WriterOpts
+       JSON      jsonio.WriterOpts
+       SUP       supio.WriterOpts
 }

 func NewWriter(w io.WriteCloser, opts WriterOpts) (vio.PushCloser, error) {
@@ -52,7 +57,11 @@ func NewWriter(w io.WriteCloser, opts WriterOpts) (vio.PushCloser, error) {
        case "parquet":
                return parquetio.NewWriter(w), nil
        case "sup", "":
-               return supio.NewWriter(w, opts.SUP), nil
+               w := supio.NewWriter(w, opts.SUP)
+               if !opts.SUPFusion {
+                       return &supDefuser{w, function.NewDefuse(super.NewContext())}, nil
+               }
+               return w, nil
        case "table":
                return tableio.NewWriter(w), nil
        case "tsv":
@@ -65,6 +74,21 @@ func NewWriter(w io.WriteCloser, opts WriterOpts) (vio.PushCloser, error) {
        }
 }

+type supDefuser struct {
+       *supio.Writer
+       defuse expr.Function
+}
+
+func (d *supDefuser) Push(vec vector.Any) error {
+       for _, val := range sbuf.Materialize(vec).Values() {
+               val = d.defuse.Call([]super.Value{val})
+               if err := d.Writer.Write(val); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh I really don't have like having surprising behavior like this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd honestly rather just duplicate the code in supio.Reader in sam/expr/function.ParseSUP.

if err != nil {
return p.sctx.WrapError("parse_sup: "+err.Error(), args[0])
}
if val == nil {
return super.Null
}
return *val
return val
}
2 changes: 2 additions & 0 deletions runtime/ztests/expr/function/upcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ input: |
type n101=int64
[1::n101,<fusion(n101|string)>]

output-flags: -fusion

output: |
error({message:"upcast: value not a subtype of [int8|string]",on:[1,"a"]})
[1::int8,"a"]
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/expr/fusion-all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ input: |
{x:1}
<string>
output-flags: -fusion

output: |
fusion(0x02::all,<int64>)
fusion(0x666f6f::all,<string>)
Expand Down
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-glob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ output: |
{a:"foox",b:"there"}
{a:"hello",b:"foox"}
{a:"",b:"foo"}
fusion({a?:_::string,b?:"fool"},<{b:string}>)
{b:"fool"}
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-nested-field-regexp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ input: |
output: |
{a:[{bar:"foo"}]}
{a:[{car:"foo"}]}
fusion({car:"foo"}::(null|{car:string}),<{car:string}>)
{car:"foo"}
{a:[]::[{bar:null}]}
4 changes: 2 additions & 2 deletions runtime/ztests/expr/search-nested-field.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ input: |
output: |
{a:[{b:"foo"}]}
{a:[{c:"foo"},{b:1}]}
fusion({a:1,b?:_::string},<{a:int64}>)
fusion({a:2,b?:"foo"},<{a:int64,b:string}>)
{a:1}
{a:2,b:"foo"}
{a:[]::[{b:null}]}
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-primitives.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ input: |
output: |
"foo"
"foo"
fusion("foo"::(int64|string),<string>)
"foo"
16 changes: 16 additions & 0 deletions runtime/ztests/op/fuse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ input: |
{a:"goodnight",b:123::int32}
{a:null,b:null,c:null}

output-flags: -fusion

output: |
fusion({a:fusion("hello"::(string|null|none),<string>),b:fusion("world"::(int32|string|null),<string>),c:fusion(_::(string|null|none),<none>)},<{a:string,b:string}>)
fusion({a:fusion(_::(string|null|none),<none>),b:fusion("goodnight"::(int32|string|null),<string>),c:fusion("gracie"::(string|null|none),<string>)},<{b:string,c:string}>)
Expand All @@ -24,6 +26,8 @@ input: |
[{a:3,b:3}]
[{a:null,b:null}]

output-flags: -fusion

output: |
fusion([fusion({a:fusion(1::(int64|null|none),<int64>),b:fusion(_::(int64|null|none),<none>)},<{a:int64}>)],<[{a:int64}]>)
fusion([fusion({a:fusion(_::(int64|null|none),<none>),b:fusion(2::(int64|null|none),<int64>)},<{b:int64}>)],<[{b:int64}]>)
Expand All @@ -44,6 +48,8 @@ input: |
[1]
["s"]

output-flags: -fusion

output: |
fusion({a:fusion(1::(int64|string),<int64>)}::(int64|string|{a:fusion(int64|string)}|[fusion(int64|string)]),<{a:int64}>)
fusion({a:fusion("s"::(int64|string),<string>)}::(int64|string|{a:fusion(int64|string)}|[fusion(int64|string)]),<{a:string}>)
Expand All @@ -62,6 +68,8 @@ input: |
{r:{y:4::int32,z:5::int32},s:"world",r2:{x:6::int32}}
{a:null,r:{x:null,y:null,z:null},s:null,r2:null}

output-flags: -fusion

output: |
fusion({a:fusion("hello"::(string|null|none),<string>),r:fusion({x:fusion(1::int32::(int32|null|none),<int32>),y:fusion(2::int32::(int32|null),<int32>),z:fusion(_::(int32|null|none),<none>)},<{x:int32,y:int32}>),s:fusion(_::(string|null|none),<none>),r2:fusion(_::(null|none|{x:int32}),<none>)},<{a:string,r:{x:int32,y:int32}}>)
fusion({a:fusion(_::(string|null|none),<none>),r:fusion({x:fusion(_::(int32|null|none),<none>),y:fusion(4::int32::(int32|null),<int32>),z:fusion(5::int32::(int32|null|none),<int32>)},<{y:int32,z:int32}>),s:fusion("world"::(string|null|none),<string>),r2:fusion({x:6::int32}::(null|none|{x:int32}),<{x:int32}>)},<{r:{y:int32,z:int32},s:string,r2:{x:int32}}>)
Expand All @@ -75,6 +83,8 @@ input: |
error(1)
error("s")

output-flags: -fusion

output: |
fusion(error(fusion(1::(int64|string),<int64>)),<error(int64)>)
fusion(error(fusion("s"::(int64|string),<string>)),<error(string)>)
Expand All @@ -88,6 +98,8 @@ input: |
["foo"]
[null]

output-flags: -fusion

output: |
fusion([fusion(1::(int64|string|null),<int64>),fusion(2::(int64|string|null),<int64>)],<[int64]>)
fusion([fusion("foo"::(int64|string|null),<string>)],<[string]>)
Expand All @@ -102,6 +114,8 @@ input: |
{a:["foo"]}
{a:[null]}

output-flags: -fusion

output: |
{a:fusion([fusion(1::(int64|string|null),<int64>),fusion(2::(int64|string|null),<int64>)],<[int64]>)}
{a:fusion([fusion("foo"::(int64|string|null),<string>)],<[string]>)}
Expand Down Expand Up @@ -137,6 +151,8 @@ input: |
type er2=error(er1)
error(1)::er2

output-flags: -fusion

output: |
type a1=int64
type a2=[a1]
Expand Down
12 changes: 12 additions & 0 deletions sio/supio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"io"

"github.com/brimdata/super"
"github.com/brimdata/super/runtime/sam/expr"
"github.com/brimdata/super/runtime/sam/expr/function"
"github.com/brimdata/super/sbuf"
"github.com/brimdata/super/sup"
"github.com/brimdata/super/vector"
Expand All @@ -12,17 +14,24 @@ import (
type Writer struct {
writer io.WriteCloser
formatter *sup.StreamFormatter
defuse expr.Function
}

type WriterOpts struct {
ColorDisabled bool
Fusion bool
Pretty int
}

func NewWriter(w io.WriteCloser, opts WriterOpts) *Writer {
var defuse expr.Function
if !opts.Fusion {
defuse = function.NewDefuse(super.NewContext())
}
return &Writer{
formatter: sup.NewStreamFormatter(opts.Pretty, opts.ColorDisabled),
writer: w,
defuse: defuse,
}
}

Expand All @@ -35,6 +44,9 @@ func (w *Writer) Close() error {
}

func (w *Writer) Write(val super.Value) error {
if w.defuse != nil {
val = w.defuse.Call([]super.Value{val})
}
if _, err := io.WriteString(w.writer, w.formatter.FormatValue(val)); err != nil {
return err
}
Expand Down
Loading