diff --git a/CHANGELOG.md b/CHANGELOG.md index f34b2541..2a9669af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far has minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870). - CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903). +- Added `riverlog.NewMiddlewareCustomContext` that makes the use of `riverlog` job-persisted logging possible with non-slog loggers. [PR #919](https://github.com/riverqueue/river/pull/919). ### Changed - Optimized the job completer's query `JobSetStateIfRunningMany`, resulting in an approximately 15% reduction in its duration when completing 2000 jobs, and around a 15-20% increase in `riverbench` throughput. [PR #904](https://github.com/riverqueue/river/pull/904). - `TimeStub` has been removed from the `rivertest` package. Its original inclusion was entirely accidentally and it should be considered entirely an internal API. [PR #912](https://github.com/riverqueue/river/pull/912). +- When storing job-persisted logging with `riverlog`, if a work run's logging was completely empty no metadata value is stored at all (previously, an empty value was stored). [PR #919](https://github.com/riverqueue/river/pull/919). ### Fixed diff --git a/riverlog/example_new_middleware_custom_context_test.go b/riverlog/example_new_middleware_custom_context_test.go new file mode 100644 index 00000000..eaacabd3 --- /dev/null +++ b/riverlog/example_new_middleware_custom_context_test.go @@ -0,0 +1,108 @@ +package riverlog_test + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/riverlog" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +// Callers should define their own context key to extract their a logger back +// out of work context. +type customContextKey struct{} + +type CustomContextLoggingArgs struct{} + +func (CustomContextLoggingArgs) Kind() string { return "logging" } + +type CustomContextLoggingWorker struct { + river.WorkerDefaults[CustomContextLoggingArgs] +} + +func (w *CustomContextLoggingWorker) Work(ctx context.Context, job *river.Job[CustomContextLoggingArgs]) error { + logger := ctx.Value(customContextKey{}).(*log.Logger) //nolint:forcetypeassert + logger.Printf("Raw log from worker") + return nil +} + +// ExampleNewMiddlewareCustomContext demonstrates the use of riverlog middleware +// with an arbitrary new context function that can be used to inject any sort of +// logger into context. +func ExampleNewMiddlewareCustomContext() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + workers := river.NewWorkers() + river.AddWorker(workers, &CustomContextLoggingWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Middleware: []rivertype.Middleware{ + riverlog.NewMiddlewareCustomContext(func(ctx context.Context, w io.Writer) context.Context { + // For demonstration purposes we show the use of a built-in + // non-slog logger, but this could be anything like Logrus or + // Zap. Even the raw writer could be stored if so desired. + logger := log.New(w, "", 0) + return context.WithValue(ctx, customContextKey{}, logger) + }, nil), + }, + Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Out of example scope, but used to wait until a job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + _, err = riverClient.Insert(ctx, CustomContextLoggingArgs{}, nil) + if err != nil { + panic(err) + } + + // Wait for job to complete, extract log data out of metadata, and print it. + for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) { + var metadataWithLog metadataWithLog + if err := json.Unmarshal(event.Job.Metadata, &metadataWithLog); err != nil { + panic(err) + } + for _, logAttempt := range metadataWithLog.RiverLog { + fmt.Print(logAttempt.Log) + } + } + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // Raw log from worker +} diff --git a/riverlog/example_middleware_test.go b/riverlog/example_new_middleware_test.go similarity index 93% rename from riverlog/example_middleware_test.go rename to riverlog/example_new_middleware_test.go index da76f6b4..45ce5fa3 100644 --- a/riverlog/example_middleware_test.go +++ b/riverlog/example_new_middleware_test.go @@ -33,9 +33,9 @@ func (w *LoggingWorker) Work(ctx context.Context, job *river.Job[LoggingArgs]) e return nil } -// Example_middleware demonstrates the use of riverlog middleware to inject a +// ExampleNewMiddleware demonstrates the use of riverlog middleware to inject a // logger into context that'll persist its output onto the job record. -func Example_middleware() { +func ExampleNewMiddleware() { ctx := context.Background() dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) @@ -83,12 +83,6 @@ func Example_middleware() { panic(err) } - type metadataWithLog struct { - RiverLog []struct { - Log string `json:"log"` - } `json:"river:log"` - } - // Wait for job to complete, extract log data out of metadata, and print it. for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) { var metadataWithLog metadataWithLog @@ -108,3 +102,9 @@ func Example_middleware() { // Logged from worker // Another line logged from worker } + +type metadataWithLog struct { + RiverLog []struct { + Log string `json:"log"` + } `json:"river:log"` +} diff --git a/riverlog/river_log.go b/riverlog/river_log.go index 505740a6..c8b9a892 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -42,8 +42,9 @@ func Logger(ctx context.Context) *slog.Logger { type Middleware struct { baseservice.BaseService rivertype.Middleware - config *MiddlewareConfig - newHandler func(w io.Writer) slog.Handler + config *MiddlewareConfig + newCustomContext func(ctx context.Context, w io.Writer) context.Context + newSlogHandler func(w io.Writer) slog.Handler } // MiddlewareConfig is configuration for Middleware. @@ -64,8 +65,8 @@ type MiddlewareConfig struct { MaxSizeBytes int } -// NewMiddleware initializes a new Middleware with the given handler function -// and configuration. +// NewMiddleware initializes a new Middleware with the given slog handler +// initialization function and configuration. // // newHandler is a function which is invoked on every Work execution to generate // a new slog.Handler for a work-specific slog.Logger. It should take an @@ -77,20 +78,46 @@ type MiddlewareConfig struct { // riverlog.NewMiddleware(func(w io.Writer) slog.Handler { // return slog.NewJSONHandler(w, nil) // }, nil) -func NewMiddleware(newHandler func(w io.Writer) slog.Handler, config *MiddlewareConfig) *Middleware { - if config == nil { - config = &MiddlewareConfig{} +func NewMiddleware(newSlogHandler func(w io.Writer) slog.Handler, config *MiddlewareConfig) *Middleware { + return &Middleware{ + config: defaultConfig(config), + newSlogHandler: newSlogHandler, } +} - // Assign defaults. - config = &MiddlewareConfig{ - MaxSizeBytes: cmp.Or(config.MaxSizeBytes, maxSizeBytes), +// NewMiddlewareCustomContext initializes a new Middleware with the given arbitrary +// context initialization function and configuration. +// +// newContext is a function which is invoked on every Work execution to generate +// a new context for the worker. It's generally used to initialize a logger with +// the given writer and put it in context under a user-defined context key for +// later use. +// +// This variant is meant to provide callers with a version of the middleware +// that's not tied to slog. A non-slog standard library logger, Logrus, or Zap +// logger could all be placed in context according to preferred convention. +// +// For example: +// +// riverlog.NewMiddlewareCustomContext(func(ctx context.Context, w io.Writer) context.Context { +// logger := log.New(w, "", 0) +// return context.WithValue(ctx, ArbitraryContextKey{}, logger) +// }, nil), +func NewMiddlewareCustomContext(newCustomContext func(ctx context.Context, w io.Writer) context.Context, config *MiddlewareConfig) *Middleware { + return &Middleware{ + config: defaultConfig(config), + newCustomContext: newCustomContext, } +} - return &Middleware{ - config: config, - newHandler: newHandler, +func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig { + if config == nil { + config = &MiddlewareConfig{} } + + config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes) + + return config } type logAttempt struct { @@ -106,9 +133,18 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu var ( existingLogData metadataWithLog logBuf bytes.Buffer - logger = slog.New(m.newHandler(&logBuf)) ) + switch { + case m.newCustomContext != nil: + ctx = m.newCustomContext(ctx, &logBuf) + case m.newSlogHandler != nil: + logger := slog.New(m.newSlogHandler(&logBuf)) + ctx = context.WithValue(ctx, contextKey{}, logger) + default: + return errors.New("expected either newContextLogger or newSlogHandler to be set") + } + if err := json.Unmarshal(job.Metadata, &existingLogData); err != nil { return err } @@ -122,6 +158,11 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu defer func() { logData := logBuf.String() + // Return early if nothing ended up getting logged. + if len(logData) < 1 { + return + } + // Postgres JSONB is limited to 255MB, but it would be a bad idea to get // anywhere close to that limit here. if len(logData) > m.config.MaxSizeBytes { @@ -145,5 +186,5 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu metadataUpdates[metadataKey] = json.RawMessage(allLogDataBytes) }() - return doInner(context.WithValue(ctx, contextKey{}, logger)) + return doInner(ctx) } diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index 6dc9e6b6..465e2ce0 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "log" "log/slog" "testing" @@ -35,7 +36,9 @@ type loggingWorker struct { } func (w *loggingWorker) Work(ctx context.Context, job *river.Job[loggingArgs]) error { - Logger(ctx).InfoContext(ctx, job.Args.Message) + if len(job.Args.Message) > 0 { + Logger(ctx).InfoContext(ctx, job.Args.Message) + } if job.Args.DoError { return errors.New("error from worker") @@ -54,8 +57,10 @@ func TestMiddleware(t *testing.T) { ctx := context.Background() type testBundle struct { - driver *riverpgxv5.Driver - tx pgx.Tx + clientConfig *river.Config + driver *riverpgxv5.Driver + middleware *Middleware + tx pgx.Tx } setup := func(t *testing.T, config *MiddlewareConfig) (*rivertest.Worker[loggingArgs, pgx.Tx], *testBundle) { @@ -74,8 +79,10 @@ func TestMiddleware(t *testing.T) { ) return rivertest.NewWorker(t, driver, clientConfig, worker), &testBundle{ - driver: driver, - tx: tx, + clientConfig: clientConfig, + driver: driver, + middleware: middleware, + tx: tx, } } @@ -183,6 +190,17 @@ func TestMiddleware(t *testing.T) { ) }) + t.Run("EmptyLogsStoreNoMetadata", func(t *testing.T) { + t.Parallel() + + testWorker, bundle := setup(t, nil) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: ""}, nil) + require.NoError(t, err) + + require.JSONEq(t, "{}", string(workRes.Job.Metadata)) + }) + t.Run("TruncatedAtMaxSizeBytes", func(t *testing.T) { t.Parallel() @@ -205,4 +223,39 @@ func TestMiddleware(t *testing.T) { metadataWithLog.RiverLog, ) }) + + t.Run("RawMiddleware", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t, nil) + + type writerContextKey struct{} + + bundle.middleware.newCustomContext = func(ctx context.Context, w io.Writer) context.Context { + logger := log.New(w, "", 0) + return context.WithValue(ctx, writerContextKey{}, logger) + } + bundle.middleware.newSlogHandler = nil + + testWorker := rivertest.NewWorker(t, bundle.driver, bundle.clientConfig, river.WorkFunc(func(ctx context.Context, job *river.Job[loggingArgs]) error { + logger := ctx.Value(writerContextKey{}).(*log.Logger) //nolint:forcetypeassert + logger.Printf(job.Args.Message) + return nil + })) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Raw log from worker"}, nil) + require.NoError(t, err) + + var metadataWithLog metadataWithLog + require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog)) + + require.Equal(t, []logAttempt{ + { + Attempt: 1, + Log: "Raw log from worker\n", + }, + }, + metadataWithLog.RiverLog, + ) + }) }