Package outboxutil is a lightweight utility library for the Transactional Outbox pattern in Go.
go get github.com/h-dav/outboxutilpackage main
import (
"context"
"log"
"os/signal"
"syscall"
"time"
"github.com/h-dav/outboxutil"
)
type Outbox struct{}
func (o *Outbox) Next(ctx context.Context) (*outboxutil.Message, error) {
// Your implementation...
return &outboxutil.Message{}, nil
}
func (o *Outbox) MarkProcessed(ctx context.Context, msg *outboxutil.Message) error {
// Your implementation...
return nil
}
func (o *Outbox) MarkFailed(ctx context.Context, msg *outboxutil.Message, err error) error {
// Your implementation...
return nil
}
func main() {
outbox := Outbox{}
handler := func(ctx context.Context, msg *outboxutil.Message) error {
// Publish an event... or do something else...
return nil
}
relay := outboxutil.New(&outbox, handler,
outboxutil.WithWorkers(5),
outboxutil.WithPollInterval(500*time.Millisecond),
outboxutil.WithBackoff(1*time.Second, 30*time.Second, 2.0),
)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err := relay.Run(ctx); err != nil {
log.Fatal(err)
}
}Run blocks until the context is cancelled, then waits for all workers to exit cleanly.
outboxutil provides a Relay that drives the acquire -> handle -> mark life cycle for messages stored in a user-owned persistence layer. The library orchestrates; you own the schema, storage backend, and locking strategy.
Design principles:
- Toolkit, not framework — you own your deployment structure and architectural decisions
- Storage agnostic — implement the
Outboxinterface against any backend - Kubernetes-ready — stateless workers, no leader election required
| Option | Default | Description |
|---|---|---|
WithWorkers(n int) |
1 |
Number of concurrent worker goroutines |
WithPollInterval(d time.Duration) |
1s |
Sleep between polls when no backoff is configured |
WithBackoff(initial, max time.Duration, multiplier float64) |
disabled | Exponential backoff on idle and Next errors |
WithLogger(l *slog.Logger) |
none | Structured logger for internal errors |
WithHooks(h Hooks) |
none | Observability hooks |
type Handler func(ctx context.Context, msg *Message) errorReturn nil to mark a message processed; return an error to mark it failed.
The same Relay type serves both inbox and outbox patterns — the handler determines the behaviour.
You implement Outbox against your own database or queue. The library calls these three methods:
type Outbox interface {
// Next atomically claims the next available message.
// Returns nil, nil when no message is available.
Next(ctx context.Context) (*Message, error)
// MarkProcessed is called after the handler returns nil.
MarkProcessed(ctx context.Context, msg *Message) error
// MarkFailed is called after the handler returns an error.
MarkFailed(ctx context.Context, msg *Message, err error) error
}Locking is your responsibility inside Next. A typical PostgreSQL implementation uses SELECT FOR UPDATE SKIP LOCKED:
func (r *PostgresOutbox) Next(ctx context.Context) (*outboxutil.Message, error) {
row := r.db.QueryRowContext(ctx, `
UPDATE inbox_messages
SET status = 'processing', acquired_at = NOW()
WHERE id = (
SELECT id FROM inbox_messages
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, payload, created_at
`)
var msg outboxutil.Message
if err := row.Scan(&msg.ID, &msg.Payload, &msg.CreatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return &msg, nil
}- Not configured: fixed poll at
PollInterval. AnNexterror stops the worker and is returned fromRun. - Configured: exponential backoff applies on idle and on
Nexterrors, capped atmax. Resets when a message is successfully acquired.
Each goroutine runs independently:
- Call
Next - If no message -> fire
OnIdle, wait (backoff or poll interval), repeat - If error -> log, apply backoff or return error, repeat
- If message -> fire
OnAcquired, call handler - Handler returns nil ->
MarkProcessed, fireOnProcessed - Handler returns error ->
MarkFailed, fireOnFailed
Cancel the context. All workers stop at their next poll point. Run returns once all workers have exited.
All hook fields are optional. Use them to plug in Prometheus metrics, structured logs, or traces:
outboxutil.WithHooks(outboxutil.Hooks{
OnAcquired: func(msg *outboxutil.Message) {
// message claimed from the repository
},
OnProcessed: func(msg *outboxutil.Message, elapsed time.Duration) {
// handler returned nil; message marked processed
},
OnFailed: func(msg *outboxutil.Message, err error) {
// handler returned an error; message marked failed
},
OnIdle: func() {
// no messages available; worker is about to wait
},
})This library handles message orchestration only. It does not cover:
- Message serialization/deserialization
- Schema requirements
- Retry of failed messages (that is a handler concern)
- Infrastructure or deployment