Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ require (
buf.build/go/protovalidate v1.0.0
connectrpc.com/connect v1.19.1
connectrpc.com/validate v0.6.0
github.com/aws/aws-sdk-go-v2 v1.41.3
github.com/aws/aws-sdk-go-v2/config v1.32.11
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.4
github.com/aws/smithy-go v1.24.2
github.com/caarlos0/env/v11 v11.3.1
github.com/expr-lang/expr v1.17.6
github.com/getsentry/sentry-go v0.36.2
Expand Down Expand Up @@ -34,6 +38,21 @@ require (
cel.dev/expr v0.24.0 // indirect
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.6 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.19 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.16 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.8 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
Expand Down
38 changes: 38 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,44 @@ github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfr
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/aws/aws-sdk-go-v2 v1.41.3 h1:4kQ/fa22KjDt13QCy1+bYADvdgcxpfH18f0zP542kZA=
github.com/aws/aws-sdk-go-v2 v1.41.3/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.6 h1:N4lRUXZpZ1KVEUn6hxtco/1d2lgYhNn1fHkkl8WhlyQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.6/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI=
github.com/aws/aws-sdk-go-v2/config v1.32.11 h1:ftxI5sgz8jZkckuUHXfC/wMUc8u3fG1vQS0plr2F2Zs=
github.com/aws/aws-sdk-go-v2/config v1.32.11/go.mod h1:twF11+6ps9aNRKEDimksp923o44w/Thk9+8YIlzWMmo=
github.com/aws/aws-sdk-go-v2/credentials v1.19.11 h1:NdV8cwCcAXrCWyxArt58BrvZJ9pZ9Fhf9w6Uh5W3Uyc=
github.com/aws/aws-sdk-go-v2/credentials v1.19.11/go.mod h1:30yY2zqkMPdrvxBqzI9xQCM+WrlrZKSOpSJEsylVU+8=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.19 h1:INUvJxmhdEbVulJYHI061k4TVuS3jzzthNvjqvVvTKM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.19/go.mod h1:FpZN2QISLdEBWkayloda+sZjVJL+e9Gl0k1SyTgcswU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19 h1:/sECfyq2JTifMI2JPyZ4bdRN77zJmr6SrS1eL3augIA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19/go.mod h1:dMf8A5oAqr9/oxOfLkC/c2LU/uMcALP0Rgn2BD5LWn0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 h1:AWeJMk33GTBf6J20XJe6qZoRSJo0WfUhsMdUKhoODXE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19/go.mod h1:+GWrYoaAsV7/4pNHpwh1kiNLXkKaSoppxQq9lbH8Ejw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5 h1:clHU5fm//kWS1C2HgtgWxfQbFbx4b6rx+5jzhgX9HrI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.20 h1:qi3e/dmpdONhj1RyIZdi6DKKpDXS5Lb8ftr3p7cyHJc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.20/go.mod h1:V1K+TeJVD5JOk3D9e5tsX2KUdL7BlB+FV6cBhdobN8c=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.6 h1:XAq62tBTJP/85lFD5oqOOe7YYgWxY9LvWq8plyDvDVg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.6/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.11 h1:BYf7XNsJMzl4mObARUBUib+j2tf0U//JAAtTnYqvqCw=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.11/go.mod h1:aEUS4WrNk/+FxkBZZa7tVgp4pGH+kFGW40Y8rCPqt5g=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.19 h1:X1Tow7suZk9UCJHE1Iw9GMZJJl0dAnKXXP1NaSDHwmw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.19/go.mod h1:/rARO8psX+4sfjUQXp5LLifjUt8DuATZ31WptNJTyQA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.19 h1:JnQeStZvPHFHeyky/7LbMlyQjUa+jIBj36OlWm0pzIk=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.19/go.mod h1:HGyasyHvYdFQeJhvDHfH7HXkHh57htcJGKDZ+7z+I24=
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.4 h1:4ExZyubQ6LQQVuF2Qp9OsfEvsTdAWh5Gfwf6PgIdLdk=
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.4/go.mod h1:NF3JcMGOiARAss1ld3WGORCw71+4ExDD2cbbdKS5PpA=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.7 h1:Y2cAXlClHsXkkOvWZFXATr34b0hxxloeQu/pAZz2row=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.7/go.mod h1:idzZ7gmDeqeNrSPkdbtMp9qWMgcBwykA7P7Rzh5DXVU=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.12 h1:iSsvB9EtQ09YrsmIc44Heqlx5ByGErqhPK1ZQLppias=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.12/go.mod h1:fEWYKTRGoZNl8tZ77i61/ccwOMJdGxwOhWCkp6TXAr0=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.16 h1:EnUdUqRP1CNzt2DkV67tJx6XDN4xlfBFm+bzeNOQVb0=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.16/go.mod h1:Jic/xv0Rq/pFNCh3WwpH4BEqdbSAl+IyHro8LbibHD8=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.8 h1:XQTQTF75vnug2TXS8m7CVJfC2nniYPZnO1D4Np761Oo=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.8/go.mod h1:Xgx+PR1NUOjNmQY+tRMnouRp83JRM8pRMw/vCaVhPkI=
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
Expand Down
9 changes: 9 additions & 0 deletions pkg/cardinal/cardinal.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ func NewWorld(opts WorldOptions) (*World, error) {
return nil, eris.Wrap(err, "failed to create jetstream snapshot storage")
}
world.snapshotStorage = snapshotJS
case snapshot.StorageTypeS3:
snapshotS3, err := snapshot.NewS3Storage(snapshot.S3StorageOptions{
Logger: tel.GetLogger("snapshot"),
Address: world.address,
})
if err != nil {
return nil, eris.Wrap(err, "failed to create S3 snapshot storage")
}
world.snapshotStorage = snapshotS3
case snapshot.StorageTypeNop:
world.snapshotStorage = snapshot.NewNopStorage()
case snapshot.StorageTypeUndefined:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cardinal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type worldOptionsEnv struct {
// Unique ID of this world's instance.
ShardID string `env:"CARDINAL_SHARD_ID"`

// Snapshot storage type ("NOP" or "JETSTREAM").
// Snapshot storage type ("NOP", "JETSTREAM", or "S3").
SnapshotStorageTypeStr string `env:"CARDINAL_SNAPSHOT_STORAGE_TYPE" envDefault:"NOP"`

// Number of ticks per snapshot.
Expand Down
8 changes: 7 additions & 1 deletion pkg/cardinal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ const (
StorageTypeUndefined StorageType = iota
StorageTypeNop
StorageTypeJetStream
StorageTypeS3
)

const (
nopStorageString = "NOP"
jetStreamStorageString = "JETSTREAM"
s3StorageString = "S3"
undefinedStorageString = "UNDEFINED"
)

Expand All @@ -57,13 +59,15 @@ func (s StorageType) String() string {
return nopStorageString
case StorageTypeJetStream:
return jetStreamStorageString
case StorageTypeS3:
return s3StorageString
default:
return undefinedStorageString
}
}

func (s StorageType) IsValid() bool {
return s == StorageTypeNop || s == StorageTypeJetStream
return s == StorageTypeNop || s == StorageTypeJetStream || s == StorageTypeS3
}

func ParseStorageType(s string) (StorageType, error) {
Expand All @@ -72,6 +76,8 @@ func ParseStorageType(s string) (StorageType, error) {
return StorageTypeNop, nil
case jetStreamStorageString:
return StorageTypeJetStream, nil
case s3StorageString:
return StorageTypeS3, nil
default:
return StorageTypeUndefined, eris.Errorf("invalid shard mode: %s", s)
}
Expand Down
210 changes: 210 additions & 0 deletions pkg/cardinal/snapshot/storage_s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package snapshot

import (
"bytes"
"context"
"fmt"
"io"

"buf.build/go/protovalidate"
"github.com/argus-labs/world-engine/pkg/micro"
cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/caarlos0/env/v11"
"github.com/rotisserie/eris"
"github.com/rs/zerolog"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

const defaultS3ObjectKey = "snapshot"

// S3Storage implements Storage using AWS S3 (or S3-compatible services like MinIO, R2).
//
// Required environment variables:
//
// CARDINAL_SNAPSHOT_STORAGE_TYPE=S3 # Selects S3 as the snapshot backend
// CARDINAL_S3_BUCKET=<bucket-name> # S3 bucket name (must be pre-provisioned)
// AWS_ACCESS_KEY_ID=<access-key> # AWS access key ID
// AWS_SECRET_ACCESS_KEY=<secret-key> # AWS secret access key
// AWS_REGION=<region> # AWS region (e.g. us-east-1)
//
// Optional environment variables:
//
// CARDINAL_S3_ENDPOINT=<url> # Custom endpoint for S3-compatible services
// AWS_SESSION_TOKEN=<token> # Session token for temporary credentials (STS/IRSA)
//
// Snapshots are stored at the key: {org}/{project}/{serviceId}/snapshot
// A single shared bucket can serve all orgs/projects; key prefixes prevent collisions.
//
// The bucket must already exist. The IAM principal needs s3:PutObject and s3:GetObject permissions.
// Enable S3 versioning on the bucket for automatic backup retention of previous snapshots.
type S3Storage struct {
client *s3.Client
bucket string
key string
logger zerolog.Logger
}

var _ Storage = (*S3Storage)(nil)

// NewS3Storage creates a new S3-based snapshot storage.
// It loads AWS credentials from the default credential chain (env vars, IRSA, instance roles).
func NewS3Storage(opts S3StorageOptions) (*S3Storage, error) {
if err := env.Parse(&opts); err != nil {
return nil, eris.Wrap(err, "failed to parse env")
}

if err := opts.Validate(); err != nil {
return nil, eris.Wrap(err, "invalid options passed")
}

// Build the S3 key. Region scoping is handled at the bucket level (one bucket per region),
// so the key only needs org/project/serviceId to be unique within a region.
objectKey := fmt.Sprintf("%s/%s/%s/%s",
Comment thread
rmrt1n marked this conversation as resolved.
opts.Address.GetOrganization(),
opts.Address.GetProject(),
opts.Address.GetServiceId(),
defaultS3ObjectKey,
)

// Load AWS config from environment (reads AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION).
ctx := context.Background()
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return nil, eris.Wrap(err, "failed to load AWS config")
}

// Build S3 client. When Endpoint is set, use it for S3-compatible services (Garage, R2, etc.).
// When Endpoint is empty, the SDK uses the default AWS S3 endpoint resolved from AWS_REGION
// (e.g., https://s3.us-east-1.amazonaws.com).
var s3Opts []func(*s3.Options)
if opts.Endpoint != "" {
Comment thread
rmrt1n marked this conversation as resolved.
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(opts.Endpoint)
o.UsePathStyle = true // Required for most S3-compatible services
})
}

client := s3.NewFromConfig(cfg, s3Opts...)

return &S3Storage{
client: client,
bucket: opts.Bucket,
key: objectKey,
logger: opts.Logger,
}, nil
}

func (s *S3Storage) Store(ctx context.Context, snapshot *Snapshot) error {
var worldState cardinalv1.WorldState
if err := proto.Unmarshal(snapshot.Data, &worldState); err != nil {
return eris.Wrap(err, "failed to unmarshal world state")
}
snapshotPb := &cardinalv1.Snapshot{
TickHeight: snapshot.TickHeight,
Timestamp: timestamppb.New(snapshot.Timestamp),
WorldState: &worldState,
Version: snapshot.Version,
}
data, err := proto.Marshal(snapshotPb)
if err != nil {
return eris.Wrap(err, "failed to marshal snapshot")
}

// Overwrite the existing snapshot if any.
if _, err = s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
Body: bytes.NewReader(data),
ContentType: aws.String("application/x-protobuf"),
}); err != nil {
return eris.Wrap(err, "failed to store snapshot in S3")
}

return nil
}

func (s *S3Storage) Load(ctx context.Context) (*Snapshot, error) {
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
})
if err != nil {
// Check for "not found" errors (NoSuchKey).
var noSuchKey *types.NoSuchKey
if eris.As(err, &noSuchKey) {
return nil, eris.Wrap(ErrSnapshotNotFound, "no snapshot exists")
}
// Fallback for S3-compatible services that may return a generic smithy error.
var apiErr smithy.APIError
if eris.As(err, &apiErr) && apiErr.ErrorCode() == "NoSuchKey" {
return nil, eris.Wrap(ErrSnapshotNotFound, "no snapshot exists")
}
return nil, eris.Wrap(err, "failed to get snapshot from S3")
}
defer func() {
_ = result.Body.Close()
}()

data, err := io.ReadAll(result.Body)
if err != nil {
return nil, eris.Wrap(err, "failed to read from S3 object")
}

snapshotPb := cardinalv1.Snapshot{}
if err = proto.Unmarshal(data, &snapshotPb); err != nil {
return nil, eris.Wrap(err, "failed to unmarshal snapshot")
}
if err = protovalidate.Validate(&snapshotPb); err != nil {
return nil, eris.Wrap(err, "failed to validate snapshot")
}

worldStateBytes, err := proto.Marshal(snapshotPb.GetWorldState())
if err != nil {
return nil, eris.Wrap(err, "failed to marshal world state")
}

return &Snapshot{
TickHeight: snapshotPb.GetTickHeight(),
Timestamp: snapshotPb.GetTimestamp().AsTime(),
Data: worldStateBytes,
Version: snapshotPb.GetVersion(),
}, nil
}

// -------------------------------------------------------------------------------------------------
// Options
// -------------------------------------------------------------------------------------------------

// S3StorageOptions configures the S3 snapshot storage.
// Bucket and Endpoint are loaded from environment variables via env tags.
// AWS credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION) are loaded
// automatically by the AWS SDK default credential chain.
type S3StorageOptions struct {
Address *micro.ServiceAddress
Logger zerolog.Logger

// S3 bucket name for snapshot storage. Required.
// The bucket must be pre-provisioned; the application does not create it.
Bucket string `env:"CARDINAL_S3_BUCKET"`

// Custom endpoint URL for S3-compatible services. Optional.
// Set this to use MinIO (e.g. "http://localhost:9000"), Cloudflare R2, DigitalOcean Spaces, etc.
// When set, path-style addressing is enabled automatically.
Endpoint string `env:"CARDINAL_S3_ENDPOINT"`
}

func (opt *S3StorageOptions) Validate() error {
if opt.Address == nil {
return eris.New("service address cannot be nil")
}
if opt.Bucket == "" {
return eris.New("CARDINAL_S3_BUCKET environment variable is required")
}
return nil
}
Loading