Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
831 changes: 831 additions & 0 deletions pkg/cardinal/bench_internal_test.go

Large diffs are not rendered by default.

18 changes: 4 additions & 14 deletions pkg/cardinal/cardinal.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ func (w *World) run(ctx context.Context) error {
case <-w.debug.resumeChan():
w.debug.setPaused(false)
case replyCh := <-w.debug.stepChan():
if err := w.Tick(ctx, time.Now()); err != nil {
replyCh <- 0
return eris.Wrap(err, "failed to run tick during step")
}
w.Tick(ctx, time.Now())
replyCh <- w.currentTick.height
case replyCh := <-w.debug.resetChan():
w.reset()
Expand All @@ -187,9 +184,7 @@ func (w *World) run(ctx context.Context) error {

select {
case <-ticker.C:
if err := w.Tick(ctx, time.Now()); err != nil {
return eris.Wrap(err, "failed to run tick")
}
w.Tick(ctx, time.Now())
case replyCh := <-w.debug.pauseChan():
w.debug.setPaused(true)
replyCh <- w.currentTick.height
Expand All @@ -199,18 +194,15 @@ func (w *World) run(ctx context.Context) error {
}
}

func (w *World) Tick(ctx context.Context, timestamp time.Time) error {
func (w *World) Tick(ctx context.Context, timestamp time.Time) {
// TODO: commands returned to be used for debug epoch log.
_ = w.commands.Drain()

w.currentTick.timestamp = timestamp
w.debug.startPerfTick()

// Tick ECS world.
err := w.world.Tick()
if err != nil {
return eris.Wrap(err, "one or more systems failed")
}
w.world.Tick()

w.debug.recordTick(w.currentTick.height, timestamp)

Expand All @@ -228,8 +220,6 @@ func (w *World) Tick(ctx context.Context, timestamp time.Time) error {

// Increment tick height.
w.currentTick.height++

return nil
}

// snapshot persists the world state as a best-effort operation. Snapshots are best effort only, and
Expand Down
2 changes: 1 addition & 1 deletion pkg/cardinal/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RunDST(t *testing.T, setup DSTSetupFunc) {
switch {
case op == opTick:
timestamp := time.Unix(int64(tick), 0)
require.NoError(t, fix.world.Tick(context.Background(), timestamp))
fix.world.Tick(context.Background(), timestamp)

// Assert structural ECS invariants after every tick.
ecs.CheckWorld(t, fix.world.world)
Expand Down
129 changes: 64 additions & 65 deletions pkg/cardinal/internal/command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"math/rand/v2"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/argus-labs/world-engine/pkg/cardinal/internal/command"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/schema"
Expand Down Expand Up @@ -279,9 +277,8 @@ func TestCommand_RegisterModelFuzz(t *testing.T) {
// Concurrent enqueue test
// -------------------------------------------------------------------------------------------------
// This test verifies that concurrent enqueues are thread-safe and all commands are properly stored.
// Run with -race to detect data races (go test -race). We use synctest.Test with time.Sleep to
// force goroutine interleaving. Without it, WaitGroup goroutines tend to run sequentially, which
// defeats the purpose of a concurrency test.
// Run with -race to detect data races (go test -race). We coordinate goroutine start with an
// explicit barrier to maximize contention in the Enqueue path.
// -------------------------------------------------------------------------------------------------

func TestCommand_ConcurrentEnqueue(t *testing.T) {
Expand All @@ -292,69 +289,71 @@ func TestCommand_ConcurrentEnqueue(t *testing.T) {
commandsPerRoutine = 1000
)

synctest.Test(t, func(t *testing.T) {
impl := command.NewManager()
impl := command.NewManager()

_, err := impl.Register(testutils.CommandA{}.Name(), command.NewQueue[testutils.CommandA]())
require.NoError(t, err)
_, err = impl.Register(testutils.CommandB{}.Name(), command.NewQueue[testutils.CommandB]())
require.NoError(t, err)
_, err := impl.Register(testutils.CommandA{}.Name(), command.NewQueue[testutils.CommandA]())
require.NoError(t, err)
_, err = impl.Register(testutils.CommandB{}.Name(), command.NewQueue[testutils.CommandB]())
require.NoError(t, err)

var wg sync.WaitGroup
var mu sync.Mutex
expected := make([]command.Command, 0, numGoroutines*commandsPerRoutine)

for range numGoroutines {
wg.Go(func() {
// Initialize prng in each goroutine separately because rand/v2.Rand isn't concurrent-safe.
prng := testutils.NewRand(t)

for i := range commandsPerRoutine {
var payload command.Payload
if testutils.RandBool(prng) {
payload = testutils.CommandA{X: float64(i), Y: prng.Float64(), Z: 0}
} else {
payload = testutils.CommandB{ID: uint64(i), Label: "test", Enabled: true}
}

pbPayload, err := schema.Serialize(payload)
if err != nil {
t.Errorf("Serialize failed: %v", err)
return
}

cmdpb := &iscv1.Command{
Name: payload.Name(),
Address: &microv1.ServiceAddress{},
Persona: &iscv1.Persona{Id: "test-persona"},
Payload: pbPayload,
}

if err := impl.Enqueue(cmdpb); err != nil {
t.Errorf("Enqueue failed: %v", err)
return
}

mu.Lock()
expected = append(expected, command.Command{
Name: payload.Name(),
Address: &microv1.ServiceAddress{},
Persona: "test-persona",
Payload: payload,
})
mu.Unlock()
time.Sleep(2 * time.Millisecond)
var wg sync.WaitGroup
var mu sync.Mutex
start := make(chan struct{})
expected := make([]command.Command, 0, numGoroutines*commandsPerRoutine)

for range numGoroutines {
wg.Go(func() {
// Initialize prng in each goroutine separately because rand/v2.Rand isn't concurrent-safe.
prng := testutils.NewRand(t)
<-start

for i := range commandsPerRoutine {
var payload command.Payload
if testutils.RandBool(prng) {
payload = testutils.CommandA{X: float64(i), Y: prng.Float64(), Z: 0}
} else {
payload = testutils.CommandB{ID: uint64(i), Label: "test", Enabled: true}
}
})
}

// Wait for all goroutines to complete their work.
wg.Wait()
pbPayload, err := schema.Serialize(payload)
if err != nil {
t.Errorf("Serialize failed: %v", err)
return
}

cmdpb := &iscv1.Command{
Name: payload.Name(),
Address: &microv1.ServiceAddress{},
Persona: &iscv1.Persona{Id: "test-persona"},
Payload: pbPayload,
}

if err := impl.Enqueue(cmdpb); err != nil {
t.Errorf("Enqueue failed: %v", err)
return
}

mu.Lock()
expected = append(expected, command.Command{
Name: payload.Name(),
Address: &microv1.ServiceAddress{},
Persona: "test-persona",
Payload: payload,
})
mu.Unlock()
}
})
}

// Release all workers simultaneously to maximize overlapping Enqueue calls.
close(start)

// Wait for all goroutines to complete their work.
wg.Wait()

// Drain all commands and verify count and content.
all := impl.Drain()
expectedTotal := numGoroutines * commandsPerRoutine
assert.Len(t, all, expectedTotal, "total command count mismatch")
assert.ElementsMatch(t, expected, all, "command content mismatch")
})
// Drain all commands and verify count and content.
all := impl.Drain()
expectedTotal := numGoroutines * commandsPerRoutine
assert.Len(t, all, expectedTotal, "total command count mismatch")
assert.ElementsMatch(t, expected, all, "command content mismatch")
}
10 changes: 10 additions & 0 deletions pkg/cardinal/internal/ecs/archetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func (a *archetype) contains(components bitmap.Bitmap) bool {
return intersect.Count() == components.Count()
}

func (a *archetype) reset() {
a.rows.clear()
a.entities = a.entities[:0]
for _, column := range a.columns {
for column.len() > 0 {
column.remove(column.len() - 1)
}
}
}

// -------------------------------------------------------------------------------------------------
// Entity operations
// -------------------------------------------------------------------------------------------------
Expand Down
Loading
Loading