diff --git a/DESIGN.md b/DESIGN.md index 3f4c4a1..599e438 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -1,6 +1,6 @@ This file describes the high-level design of "foiled" by Gumnut. -There are three major layers to the system, plus surface UI code (e.g., the text editor, which may eventually move from this repo). +There are four major layers to the system, plus surface UI code (e.g., the text editor, which may eventually move from this repo). # Layers @@ -21,12 +21,13 @@ However, there are special inode formats that have particular meanings. ### Operations -There are four basic operations: +There are five basic operations: 1. Grow at position, e.g. insert 3 items at offset 2 (would insert after 123 in the example above) 2. Delete under range, e.g. delete between 2-4 (would delete `false, NaN` above) -3. Set at position, e.g. set position 3 to `true` (important: position is _after_ the target, 1-indexed). -4. Mutate at position - this is as set, but modifies data in-place in some way. +3. Flip data around range, e.g., flip data 2-4-8, moving data 2-4 to after the former 4-8. +4. Set at position, e.g. set position 3 to `true` (important: position is _after_ the target, 1-indexed). +5. Mutate at position - this is as set, but modifies data in-place in some way. All of these operations are OT-like - they can transform over each other and are always safe to apply. They may transform to zero - e.g., if a set operation is transformed over a range that deletes its target, the delta will be nil. @@ -45,7 +46,7 @@ If this condition can't be met, then the server rejects the ops. Note however, that all ops are always valid. The client will be called again to retry or reconfigure the transaction. -End-users will never see this, but these transactions might help create dictionaries or other atomic operations that we can't represent. +End-users will never see this, but these transactions might help create dictionaries or other atomic operations that we can't represent with pure OT. ### Client API @@ -91,19 +92,36 @@ The higher layers will provide this. This layer will also support a couple of special low-level types. -1. My Client ID: this is data that will be automatically zero-ed out (possibly with a "zero Client ID" type) when that user disconnects. +1. My Handle ID: this is data that will be automatically zero-ed out (possibly with a "zero Handle ID" type) when that user disconnects. This can be used to claim locks, or key session data. 2. Slice reference: this can reference an offset within another ino. It lets us externally reference _some other_ offset which will transform over time. +3. Other inode reference: this is just a string, but is tagged specially, and may be manipulated on the way in/out of this layer. -It will also support more 'boring' types which do not map to JS: +### Strings -1. Use another `ino` as an array -2. Use a pair of `ino` (maybe implied inode and its +1 peer) as a map +We support "immutable strings" as a type, as well as arrays of numbers. +Foiled does not distinguish between numbers and characters/runes; the user of an inode decides how to interpret it (i.e., is this a jumble of numbers, or is this actually a string). -### Strings +This works because JS numbers are just runs of `uint16`, and JSON can represent every possible JS string value in its UTF-8 encoding. + +## 2. Special Inodes + +The first section describes basic operations on inodes, the data within etc. + +However, the server will provide a few ways for clients to orchestrate inodes. +We have a few broad goals here: + +1. Allow the server to allocate unique inodes for us - if the client uses an inode of a special form during its session, the server will allocate a totally unique (and uncreatable by others) inode for it +2. Allow us to set the "other inode reference" to a special new inode _only once_ + - This is basically for creating at-most-once string data; strings in JS are not objects, so should not be attached more than once +3. Allow a client to create data unique to them for a session (e.g. via a known prefix that is deleted when gone). + +In general, users should not be creating arbitrary inodes and this may be disallowed- they are typically only creating data via the above approaches. + +### Implementation -We support "immutable strings" as a type, as well as _string data_ (i.e., of `uint16`) within the array-like. +-- TODO -- ### Special Inode Formats @@ -120,7 +138,7 @@ This will be mapped for _other_ clients to an ID like ":blah". Any ID prefixed with a ":" has this 1:1 mapping to the other inode. The important part here is that clients _cannot_ set ":blah" directly - they must always create it with "-:". -## 2. Object Model +## 3. Object Model This will use the above layer to provide a high-level object model _which is not JS or JSON_. @@ -143,7 +161,7 @@ This will surface in JS as something like `GumnutArray` or `GumnutMap`. We can relatively trivially map these to JS objects. For our first Gemini demo, we will do something minimal here. -## 3. Gumnut Product Layer +## 4. Gumnut Product Layer Finally, we use the above layer to create a layer which allows for importing and mirroring data from an end-user's database (the goal of Gumnut as a product). diff --git a/server/pkg/model/doc/doc.go b/server/pkg/model/doc/doc.go index 27b4029..8087f40 100644 --- a/server/pkg/model/doc/doc.go +++ b/server/pkg/model/doc/doc.go @@ -38,6 +38,12 @@ func (d *docImpl) RevVersion() (newVersion int) { return d.version } +func (d *docImpl) ServerApply(update map[string]node.Patch) (out *Work, err error) { + out = &Work{TargetVersion: d.version, Update: update} + err = d.Apply(out) + return +} + func (d *docImpl) Apply(work *Work) (err error) { // ensure not too far ahead if work.TargetVersion > d.version { diff --git a/server/pkg/model/doc/types.go b/server/pkg/model/doc/types.go index 4aad668..8a85a41 100644 --- a/server/pkg/model/doc/types.go +++ b/server/pkg/model/doc/types.go @@ -15,6 +15,9 @@ type Doc interface { // This does not care about barrier ops, the caller must fail before this based on its own version checks. Apply(work *Work) (err error) + // ServerApply is as Apply, but is server-initiated. + ServerApply(update map[string]node.Patch) (out *Work, err error) + // RevVersion bumps the version without doing any work. // This is unlike Apply, which will not rev the version if nothing occurs. RevVersion() (newVersion int) @@ -23,6 +26,8 @@ type Doc interface { Version() (version int) // Read reads the current status of this Doc. + // This also includes the current Version. + // // Internally, this may apply ops forward: we don't always keep a "live" copy around. Read() (dw *Work) diff --git a/server/pkg/model/raw/const.go b/server/pkg/model/raw/const.go index 0cb76f5..1a60d05 100644 --- a/server/pkg/model/raw/const.go +++ b/server/pkg/model/raw/const.go @@ -10,3 +10,9 @@ var ( ErrForwardSet = errors.New("got -ve set in normal forward") ErrNormalizeSet = errors.New("couldn't normalize user set") ) + +var ( + GumnutDataTrue = GumnutData{Type: DataTypeKnown, Data: 1, String: "true"} + GumnutDataFalse = GumnutData{Type: DataTypeKnown, Data: 1, String: "false"} + GumnutDataNull = GumnutData{Type: DataTypeKnown, Data: 1, String: "null"} +) diff --git a/server/pkg/model/raw/set.go b/server/pkg/model/raw/set.go index caf0be1..93a78ae 100644 --- a/server/pkg/model/raw/set.go +++ b/server/pkg/model/raw/set.go @@ -2,6 +2,7 @@ package raw import ( "encoding/json" + "iter" "math" "slices" @@ -16,6 +17,28 @@ type SetPart struct { // SetOp is a sequence of set operations which will be applied one after another. type SetOp []SetPart +func (s SetOp) IterData() (i iter.Seq[*GumnutData]) { + return func(yield func(*GumnutData) (more bool)) { + for i := range s { + for j := range s[i].Data { + if !yield(&s[i].Data[j]) { + return + } + } + } + } +} + +// Length returns the +ve length here. +// If this targets -ve ops, these are ignored. +func (arr SetOp) Length() (length int) { + for _, each := range arr { + length += each.Skip + length += len(each.Data) + } + return +} + func (arr SetOp) MarshalJSONArray() (out []json.RawMessage, err error) { for i, each := range arr { if len(each.Data) == 0 { diff --git a/server/pkg/server/const.go b/server/pkg/safedoc/const.go similarity index 51% rename from server/pkg/server/const.go rename to server/pkg/safedoc/const.go index c0a867b..5f45d6b 100644 --- a/server/pkg/server/const.go +++ b/server/pkg/safedoc/const.go @@ -1,4 +1,4 @@ -package server +package safedoc import ( "errors" @@ -11,5 +11,7 @@ const ( ) var ( - ErrSeq = errors.New("invalid sequence number") + ErrSeq = errors.New("invalid sequence number") + ErrPerform = errors.New("bad inode target") + ErrBarrier = errors.New("barrier failed") ) diff --git a/server/pkg/safedoc/inode.go b/server/pkg/safedoc/inode.go new file mode 100644 index 0000000..7b829ce --- /dev/null +++ b/server/pkg/safedoc/inode.go @@ -0,0 +1,65 @@ +package safedoc + +import ( + "fmt" + "maps" + "slices" + "strings" + + "github.com/gumnutdev/foiled/server/pkg/model/node" + "github.com/gumnutdev/foiled/server/pkg/model/raw" +) + +func inodeHandleActive(handleID int) (s string) { + return fmt.Sprintf("handle-active/%x:m", handleID) +} + +// mapInodeIn updates inodes requested to be changed by a session. +// It returns false if there are invalid inode targets here. +func (ds *docSession) mapInodeIn(update map[string]node.Patch) (ok bool) { + handleActive := inodeHandleActive(ds.handleID) + + for id, patch := range update { + if id == "" { + return false + } else if strings.HasPrefix(id, "handle-active/") { + return false + } + + // rewrite "self" target to be the actual ID + for gd := range patch.Set.IterData() { + if gd.Type == raw.DataTypeInode && gd.String == "handle-active/self:m" { + gd.String = handleActive + } + } + } + + return true +} + +// mapInodeOut modifies what gets broadcast to this session. +// The passed map is already local and can be modified inline. +func (ds *docSession) mapInodeOut(update map[string]node.Patch) { + handleActive := inodeHandleActive(ds.handleID) + + for _, patch := range update { + // rewrite "self" target to be the actual ID + for gd := range patch.Set.IterData() { + // FIXME: we need a better way to say - we're modifying this, copy all the way down :\ + if gd.Type == raw.DataTypeInode && gd.String == handleActive { + gd.String = "handle-active/self:m" + } + } + } + + keys := slices.Collect(maps.Keys(update)) + + for _, id := range keys { + if id == handleActive { + // broadcast self ownership under unique ID + update["handle-active/self:m"] = update[id] + delete(update, id) + continue + } + } +} diff --git a/server/pkg/safedoc/safedoc.go b/server/pkg/safedoc/safedoc.go new file mode 100644 index 0000000..98fd37d --- /dev/null +++ b/server/pkg/safedoc/safedoc.go @@ -0,0 +1,228 @@ +package safedoc + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/gumnutdev/foiled/server/pkg/alloc" + "github.com/gumnutdev/foiled/server/pkg/model/doc" + "github.com/gumnutdev/foiled/server/pkg/model/node" + "github.com/gumnutdev/foiled/server/pkg/model/raw" + "github.com/samthor/thorgo/queue" +) + +type opResult struct { + clientID int // clientID (from higher level) + handleID int // sourcer of ops + seq int // sequence number + dw doc.Work // work done here +} + +type safeDoc struct { + lock sync.Mutex + doc doc.Doc + q queue.Queue[opResult] +} + +// serverApply must be called under lock. +func (sd *safeDoc) serverApply(update map[string]node.Patch) { + work, err := sd.doc.ServerApply(update) + if err != nil { + panic(err) // TODO: should/can we crash safeDoc? + } + + if len(work.Update) != 0 { + sd.q.Push(opResult{dw: *work}) + } +} + +// New creates a new Doc. +func New() (d Doc) { + return &safeDoc{ + doc: doc.New(), + q: queue.New[opResult](), + } +} + +func (sd *safeDoc) Join(ctx context.Context) (ds Session) { + handleID := alloc.JSInt(ctx) + + sd.lock.Lock() + defer sd.lock.Unlock() + + // setup first packet / join under lock + data := sd.doc.Read() + impl := &docSession{ + sd: sd, + handleID: handleID, + first: &OutPacket{Version: data.TargetVersion, Update: data.Update}, + l: sd.q.Join(ctx), + } + impl.join() + + context.AfterFunc(ctx, func() { + sd.lock.Lock() + defer sd.lock.Unlock() + impl.part() + }) + + return impl +} + +type docSession struct { + sd *safeDoc + + handleID int // unique generated here + first *OutPacket + l queue.Listener[opResult] + + lastSeq int +} + +// join is called when the context has joined this valid doc. +func (ds *docSession) join() { + handleActive := inodeHandleActive(ds.handleID) + + ds.sd.serverApply(map[string]node.Patch{ + handleActive: { + Set: raw.SetOp{ + {Data: []raw.GumnutData{ + {Type: raw.DataTypeString, String: "active"}, + raw.GumnutDataTrue, + }}, + }, + }, + }) +} + +// part is called when the context passed in Join is done. +func (ds *docSession) part() { + ds.sd.doc.ClearHandle(ds.handleID) + + handleActive := inodeHandleActive(ds.handleID) + curr := ds.sd.doc.Read() + length := curr.Update[handleActive].Set.Length() + + if length == 0 { + return + } + + ds.sd.serverApply(map[string]node.Patch{ + handleActive: { + Run: raw.OpRun{ + {Range: []int{0, length}}, + }, + }, + }) +} + +func (ds *docSession) Handle() (handle int) { + return ds.handleID +} + +func (ds *docSession) Perform(in InPacket, clientID int) (err error) { + if in.Seq != ds.lastSeq+1 { + return ErrSeq + } + + if !ds.mapInodeIn(in.Update) { + return ErrPerform + } + + ds.sd.lock.Lock() + defer ds.sd.lock.Unlock() + + if !ds.sd.doc.Barrier(ds.handleID, in.TargetVersion, in.Barrier) { + // not an error - failed to pass barrier + return ErrBarrier + } + + // do work normally + work := doc.Work{ + Handle: ds.handleID, + TargetVersion: in.TargetVersion, + Update: in.Update, + } + err = ds.sd.doc.Apply(&work) + if err != nil { + return + } + + // even with err, there might be valid ops - broadcast out + ds.sd.q.Push(opResult{ + handleID: ds.handleID, + clientID: clientID, + dw: work, + seq: in.Seq, + }) + + ds.lastSeq = in.Seq + return nil +} + +func (ds *docSession) Next() (clientID int, p OutPacket, ok bool) { + if ds.first != nil { + p = *ds.first + ds.first = nil + return 0, p, true + } + +retry: + next, ok := ds.l.Next() + if !ok { + return // ctx shutdown + } + + time.Sleep(DelayUpdate) + + switch { + // ack + case next.handleID == ds.handleID: + cand := next + for { + // consume more for conn if we can + peek, _ := ds.l.Peek() + if peek.handleID != ds.handleID { + p.Seq = cand.seq + p.Version = cand.dw.TargetVersion + break + } + ds.l.Next() // consume peek + cand = peek + } + return 0, p, true + + case len(next.dw.Update) == 0: + goto retry // zero ack for someone else + + default: + // this is actions performed by someone else + update := copyUpdate(next.dw.Update) + ds.mapInodeOut(update) + if len(update) == 0 { + goto retry // we got filtered out + } + + p.Version = next.dw.TargetVersion + p.Update = update + clientID = next.clientID + + // TODO: we could merge Patch that are not acks here (merge => append) + return clientID, p, true + } +} + +func copyUpdate(update map[string]node.Patch) (out map[string]node.Patch) { + // FIXME: gross, but since we might modify this inline, we need to copy the whole thing :| + b, err := json.Marshal(update) + if err != nil { + panic(err) + } + err = json.Unmarshal(b, &out) + if err != nil { + panic(err) + } + return out +} diff --git a/server/pkg/safedoc/types.go b/server/pkg/safedoc/types.go new file mode 100644 index 0000000..0cb9fc6 --- /dev/null +++ b/server/pkg/safedoc/types.go @@ -0,0 +1,15 @@ +package safedoc + +import ( + "context" +) + +type Doc interface { + Join(ctx context.Context) (ds Session) +} + +type Session interface { + Next() (clientID int, p OutPacket, ok bool) + Perform(in InPacket, clientID int) (err error) + Handle() (handle int) +} diff --git a/server/pkg/server/wire.go b/server/pkg/safedoc/wire.go similarity index 54% rename from server/pkg/server/wire.go rename to server/pkg/safedoc/wire.go index 13f50b6..2b2eaab 100644 --- a/server/pkg/server/wire.go +++ b/server/pkg/safedoc/wire.go @@ -1,4 +1,4 @@ -package server +package safedoc import ( "github.com/gumnutdev/foiled/server/pkg/model/node" @@ -11,12 +11,8 @@ type InPacket struct { Seq int `json:"seq"` // incrementing seq no# } -type AckPacket struct { - Version int `json:"v"` // what version this brings us to - Seq int `json:"seq"` // sequence no# we are acking -} - -type AnnouncePacket struct { - Version int `json:"v"` // what version this brings us to - Update map[string]node.Patch `json:"p"` // ops for client to apply +type OutPacket struct { + Version int `json:"v"` // what version this brings us to + Update map[string]node.Patch `json:"p,omitzero"` // ops for client to apply + Seq int `json:"seq,omitzero"` // sequence no# we are acking } diff --git a/server/pkg/server/server.go b/server/pkg/server/server.go index 1f07c97..e8f31e4 100644 --- a/server/pkg/server/server.go +++ b/server/pkg/server/server.go @@ -5,85 +5,34 @@ import ( "encoding/json" "log" "sync" - "time" - "github.com/gumnutdev/foiled/server/pkg/alloc" "github.com/gumnutdev/foiled/server/pkg/host" - "github.com/gumnutdev/foiled/server/pkg/model/doc" - "github.com/samthor/thorgo/queue" + "github.com/gumnutdev/foiled/server/pkg/safedoc" "github.com/samthor/thorgo/transport" "golang.org/x/sync/errgroup" ) -type opResult struct { - clientID int // clientID (from higher level) - handleID int // sourcer of ops - seq int // sequence number - dw doc.Work // work done here -} - -type safeDoc struct { - lock sync.Mutex - doc doc.Doc - q queue.Queue[opResult] -} - -func (sd *safeDoc) Perform(in InPacket, handleID, clientID int) (ok bool, err error) { - sd.lock.Lock() - defer sd.lock.Unlock() - - if !sd.doc.Barrier(handleID, in.TargetVersion, in.Barrier) { - // not an error - failed to pass barrier - // TODO: we don't explicitly announce this - expect client to know - return - } - - // do work normally - work := doc.Work{ - Handle: handleID, - TargetVersion: in.TargetVersion, - Update: in.Update, - } - err = sd.doc.Apply(&work) - if err != nil { - return - } - - // even with err, there might be valid ops - broadcast out - sd.q.Push(opResult{ - handleID: handleID, - clientID: clientID, - dw: work, - seq: in.Seq, - }) - - return true, nil -} - // Server manages collaboration sessions and documents. type Server struct { docsLock sync.Mutex - docs map[string]*safeDoc + docs map[string]safedoc.Doc } // New creates a new Server instance. func New() (s *Server) { return &Server{ - docs: make(map[string]*safeDoc), + docs: make(map[string]safedoc.Doc), } } -func (s *Server) getDoc(id string) (sd *safeDoc) { +func (s *Server) getDoc(id string) (sd safedoc.Doc) { s.docsLock.Lock() defer s.docsLock.Unlock() if d, ok := s.docs[id]; ok { return d } - d := &safeDoc{ - q: queue.New[opResult](), - doc: doc.New(), - } + d := safedoc.New() s.docs[id] = d return d } @@ -94,104 +43,46 @@ func (s *Server) Handle(arg host.DocArg, tr transport.TypeTransport[host.Session // TODO: track minimum known version of ALL clients and purge old ops before that version doc := s.getDoc(arg.Doc) - handleID := alloc.JSInt(tr.Context()) - defer func() { - doc.lock.Lock() - defer doc.lock.Unlock() - doc.doc.ClearHandle(handleID) - }() - log.Printf("new handle=%d for doc=%+v", handleID, arg) + ds := doc.Join(tr.Context()) + handle := ds.Handle() + + log.Printf("new handle=%d for doc=%+v", handle, arg) defer func() { - log.Printf("closing handle=%d for doc=%+v err=%v", handleID, arg, err) + log.Printf("closing handle=%d for doc=%+v err=%v", handle, arg, err) }() eg, groupCtx := errgroup.WithContext(tr.Context()) - // send initial packet and join queue under lock - doc.lock.Lock() - - data := doc.doc.Read() - first := AnnouncePacket{Version: data.TargetVersion, Update: data.Update} - l := doc.q.Join(tr.Context()) - - doc.lock.Unlock() - - err = writeTo(tr, first, 0) - if err != nil { - return err - } - // inbound handler eg.Go(func() (err error) { - var lastSeq int - for { sp, err := tr.Read() if err != nil { return err } - in, err := host.As[InPacket](sp) + in, err := host.As[safedoc.InPacket](sp) if err != nil { return err } - if in.Seq != lastSeq+1 { - return ErrSeq - } - - ok, err := doc.Perform(in, handleID, sp.Client) - if err != nil { + err = ds.Perform(in, sp.Client) + if err == safedoc.ErrBarrier { + // do nothing + } else if err != nil { return err } - if ok { - lastSeq = in.Seq - } } }) // outbound handler eg.Go(func() (err error) { - // TODO: this method is a bit gross, we intersperse / merge matching data types - better abstraction? "Consume" helper? - for { - next, ok := l.Next() + clientID, p, ok := ds.Next() if !ok { return nil // ctx shutdown } - time.Sleep(DelayUpdate) - - var clientID int - var resp any - if next.handleID == handleID { - p := AckPacket{} - resp = &p - - cand := next - for { - // consume more for conn if we can - peek, _ := l.Peek() - if peek.handleID != handleID { - p.Seq = cand.seq - p.Version = cand.dw.TargetVersion - break - } - l.Next() - cand = peek - } - - } else if len(next.dw.Update) == 0 { - continue // zero ack for someone else - - } else { - p := AnnouncePacket{Version: next.dw.TargetVersion, Update: next.dw.Update} - resp = &p - clientID = next.clientID - - // TODO: we could merge Patch that are not acks here (merge => append) - } - - err = writeTo(tr, resp, clientID) + err = writeTo(tr, p, clientID) if err != nil { return err }