Skip to content

h-dav/outboxutil

Repository files navigation

outboxutil

Package outboxutil is a lightweight utility library for the Transactional Outbox pattern in Go.

Go Report Card

Installation

go get github.com/h-dav/outboxutil

Quick Start

package main

import (
	"context"
	"log"
	"os/signal"
	"syscall"
	"time"

	"github.com/h-dav/outboxutil"
)

type Outbox struct{}

func (o *Outbox) Next(ctx context.Context) (*outboxutil.Message, error) {
	// Your implementation...
	return &outboxutil.Message{}, nil
}

func (o *Outbox) MarkProcessed(ctx context.Context, msg *outboxutil.Message) error {
	// Your implementation...
	return nil
}

func (o *Outbox) MarkFailed(ctx context.Context, msg *outboxutil.Message, err error) error {
	// Your implementation...
	return nil
}

func main() {
	outbox := Outbox{}

	handler := func(ctx context.Context, msg *outboxutil.Message) error {
		// Publish an event... or do something else...
		return nil
	}

	relay := outboxutil.New(&outbox, handler,
		outboxutil.WithWorkers(5),
		outboxutil.WithPollInterval(500*time.Millisecond),
		outboxutil.WithBackoff(1*time.Second, 30*time.Second, 2.0),
	)

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	if err := relay.Run(ctx); err != nil {
		log.Fatal(err)
	}
}

Run blocks until the context is cancelled, then waits for all workers to exit cleanly.

Features

outboxutil provides a Relay that drives the acquire -> handle -> mark life cycle for messages stored in a user-owned persistence layer. The library orchestrates; you own the schema, storage backend, and locking strategy.

Design principles:

  • Toolkit, not framework — you own your deployment structure and architectural decisions
  • Storage agnostic — implement the Outbox interface against any backend
  • Kubernetes-ready — stateless workers, no leader election required

Options

Option Default Description
WithWorkers(n int) 1 Number of concurrent worker goroutines
WithPollInterval(d time.Duration) 1s Sleep between polls when no backoff is configured
WithBackoff(initial, max time.Duration, multiplier float64) disabled Exponential backoff on idle and Next errors
WithLogger(l *slog.Logger) none Structured logger for internal errors
WithHooks(h Hooks) none Observability hooks

Handler

type Handler func(ctx context.Context, msg *Message) error

Return nil to mark a message processed; return an error to mark it failed.

The same Relay type serves both inbox and outbox patterns — the handler determines the behaviour.

Implementing the Outbox Interface

You implement Outbox against your own database or queue. The library calls these three methods:

type Outbox interface {
    // Next atomically claims the next available message.
    // Returns nil, nil when no message is available.
    Next(ctx context.Context) (*Message, error)

    // MarkProcessed is called after the handler returns nil.
    MarkProcessed(ctx context.Context, msg *Message) error

    // MarkFailed is called after the handler returns an error.
    MarkFailed(ctx context.Context, msg *Message, err error) error
}

Locking is your responsibility inside Next. A typical PostgreSQL implementation uses SELECT FOR UPDATE SKIP LOCKED:

func (r *PostgresOutbox) Next(ctx context.Context) (*outboxutil.Message, error) {
    row := r.db.QueryRowContext(ctx, `
        UPDATE inbox_messages
        SET status = 'processing', acquired_at = NOW()
        WHERE id = (
            SELECT id FROM inbox_messages
            WHERE status = 'pending'
            ORDER BY created_at ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        )
        RETURNING id, payload, created_at
    `)

    var msg outboxutil.Message
    if err := row.Scan(&msg.ID, &msg.Payload, &msg.CreatedAt); err != nil {
        if errors.Is(err, sql.ErrNoRows) {
            return nil, nil
        }
        return nil, err
    }
    return &msg, nil
}

Backoff Behaviour

  • Not configured: fixed poll at PollInterval. An Next error stops the worker and is returned from Run.
  • Configured: exponential backoff applies on idle and on Next errors, capped at max. Resets when a message is successfully acquired.

Worker Loop

Each goroutine runs independently:

  1. Call Next
  2. If no message -> fire OnIdle, wait (backoff or poll interval), repeat
  3. If error -> log, apply backoff or return error, repeat
  4. If message -> fire OnAcquired, call handler
  5. Handler returns nil -> MarkProcessed, fire OnProcessed
  6. Handler returns error -> MarkFailed, fire OnFailed

Shutdown

Cancel the context. All workers stop at their next poll point. Run returns once all workers have exited.

Observability Hooks

All hook fields are optional. Use them to plug in Prometheus metrics, structured logs, or traces:

outboxutil.WithHooks(outboxutil.Hooks{
    OnAcquired: func(msg *outboxutil.Message) {
        // message claimed from the repository
    },
    OnProcessed: func(msg *outboxutil.Message, elapsed time.Duration) {
        // handler returned nil; message marked processed
    },
    OnFailed: func(msg *outboxutil.Message, err error) {
        // handler returned an error; message marked failed
    },
    OnIdle: func() {
        // no messages available; worker is about to wait
    },
})

Scope

This library handles message orchestration only. It does not cover:

  • Message serialization/deserialization
  • Schema requirements
  • Retry of failed messages (that is a handler concern)
  • Infrastructure or deployment

About

Transactional Outbox pattern kept simple.

Topics

Resources

Stars

Watchers

Forks

Contributors

Languages