Another weapon from the MSL Armory
Plumber gives console apps, Lambdas, queue consumers, and other host-free .NET projects the same middleware-pipeline shape that ASP.NET Core gives web apps. You define a request type, a response type, and a chain of middleware components. Plumber wires up DI, configuration, logging, scoping, timeouts, and cancellation; you focus on the steps in your pipeline.
Upgrading from v2.x? Many APIs changed in v3 — interfaces removed, configuration no longer auto-loaded, builder reshaped. See Migration v2.x → v3.x at the bottom. v3 is also a modernization and bug-fix pass: faster middleware dispatch (expression-tree-compiled), monotonic
Elapsed, distinguishable timeout exceptions, and a host-mode factory for reusing an existing DI container.
- Plumber
- Console apps and CLI tools that need ordered, composable steps with DI and config
- AWS Lambda functions (API Gateway requests, SQS/SNS events, DynamoDB Streams, EventBridge)
- Queue consumers (RabbitMQ, Kafka, Azure Service Bus)
- File and ETL processors
- Any pipeline you'd reach for ASP.NET Core middleware in, but without the web host
If your project already has a host (ASP.NET Core, generic host, etc.), you usually don't need Plumber — but you can still use it inside an existing DI container; see Hosting inside an existing DI container.
The pipeline shape is borrowed from Steve Gordon's walkthrough of the ASP.NET Core middleware pipeline: How is the ASP.NET Core Middleware Pipeline Built. If you're new to middleware, Microsoft also has a primer in their docs.
dotnet add package MSL.Plumber.PipelinePlumber targets .NET 10. Older targets are not supported in v3.
The smallest working pipeline:
using Plumber;
using var handler = RequestHandlerBuilder
.Create<string, string>()
.Build();
handler.Use((context, next) =>
{
context.Response = $"Hello, {context.Request}!";
return next(context);
});
var greeting = await handler.InvokeAsync("World");
Console.WriteLine(greeting); // Hello, World!That's the whole shape: a builder, a built handler, one or more middleware, and an InvokeAsync call. Each invocation gets its own DI scope and cancellation token.
RequestHandler<TRequest, TResponse> is IDisposable — always wrap it in using so the service provider it built gets cleaned up.
Middleware in Plumber forms an onion: code before await next(context) runs in registration order, code after runs in reverse. A request travels inward; the response travels outward.
sequenceDiagram
participant Caller
participant MW1 as Middleware 1
participant MW2 as Middleware 2
participant MW3 as Middleware 3
Caller->>+MW1: request
Note over MW1: pre-processing
MW1->>+MW2: next(context)
Note over MW2: pre-processing
MW2->>+MW3: next(context)
Note over MW3: pre-processing
MW3-->>-MW2: return
Note over MW2: post-processing
MW2-->>-MW1: return
Note over MW1: post-processing
MW1-->>-Caller: response
Three rules:
- Middleware runs in the order you register it.
- Anything before
await next(context)runs going in. Anything after runs coming back. - Don't call
nextand the pipeline short-circuits — useful for validation, caching, and authorization.
A typical Plumber pipeline has two halves:
- Builder configuration — registers configuration sources, services, and logging.
- Pipeline configuration — adds middleware to the built handler.
Splitting these into two methods makes the pipeline trivial to test (see Testing your pipeline).
internal static class Pipeline
{
public static RequestHandlerBuilder<MyRequest, MyResponse> CreateBuilder(string[] args) =>
RequestHandlerBuilder.Create<MyRequest, MyResponse>(args)
.AddJsonFile("appsettings.json", optional: true)
.ConfigureLogging(logging => logging.AddConsole())
.ConfigureServices((services, configuration) =>
{
services.AddSingleton<IMyService, MyService>();
});
public static RequestHandler<MyRequest, MyResponse> Configure(
RequestHandler<MyRequest, MyResponse> handler) =>
handler
.Use<ValidationMiddleware>()
.Use<ProcessingMiddleware>();
public static RequestHandler<MyRequest, MyResponse> Build(string[] args) =>
Configure(CreateBuilder(args).Build());
}In Program.cs:
using var handler = Pipeline.Build(args);
var response = await handler.InvokeAsync(request);This is the convention Sample.Cli uses. You're welcome to inline everything, but you'll regret it the first time you write a test.
v3 configuration is opt-in. Nothing is loaded automatically — except command-line args, which are appended last so they always win. Pick the sources you want:
RequestHandlerBuilder.Create<TReq, TRes>(args)
.AddJsonFile("appsettings.json", optional: true)
.AddJsonFile($"appsettings.{env}.json", optional: true, reloadOnChange: true)
.AddEnvironmentVariables("MYAPP_")
.AddInMemoryCollection([
new("Feature:Enabled", "true"),
]);For ad-hoc cases, the existing extension methods on IConfigurationBuilder are available via a callback:
builder.ConfigureConfiguration((config, args) =>
{
config.AddCustomProvider();
});If you want the conventional set (appsettings.json, appsettings.{env}.json, DOTNET_* env vars, all env vars), call:
builder.AddDefaultConfigurationSources();User secrets are intentionally excluded — call AddUserSecrets<T>() explicitly with a type from your assembly when you want them.
Service registration runs at Build() time and gets the built IConfiguration so you can bind options or pick implementations:
builder.ConfigureServices((services, configuration) =>
{
var options = configuration.GetSection("Tokenizer").Get<TokenizerOptions>()
?? TokenizerOptions.Defaults;
services
.AddSingleton(options)
.AddSingleton<ITokenizer, WhitespaceTokenizer>();
});A TimeProvider is registered automatically (defaulting to TimeProvider.System); register your own if you want to control timer firing in tests — see Custom TimeProvider for tests.
Logging is opt-in. If you don't call ConfigureLogging, no logging infrastructure is registered.
builder.ConfigureLogging(logging =>
{
logging.SetMinimumLevel(LogLevel.Information);
logging.AddSimpleConsole(o => o.SingleLine = true);
});Middleware is a piece of work that runs against a RequestContext<TRequest, TResponse>. It chooses whether to call next(context) (continue) or short-circuit by setting context.Response and returning.
For one-off transformations, register an inline delegate:
handler.Use(async (context, next) =>
{
context.ThrowIfCanceled();
var stopwatch = Stopwatch.StartNew();
await next(context);
stopwatch.Stop();
Console.WriteLine($"{context.Id} took {stopwatch.ElapsedMilliseconds}ms");
});For middleware with dependencies, write a class. Plumber recognizes it by convention: a constructor whose first parameter is RequestMiddleware<TRequest, TResponse> next, and a public Task InvokeAsync method whose first parameter is RequestContext<TRequest, TResponse>.
internal sealed class NormalizeMiddleware(RequestMiddleware<string, TextReport> next)
{
public Task InvokeAsync(RequestContext<string, TextReport> context)
{
context.ThrowIfCanceled();
context.Data["normalized"] = context.Request.ToLowerInvariant();
return next(context);
}
}Register with handler.Use<NormalizeMiddleware>().
The terminal middleware at the end of the pipeline already checks cancellation before invoking, so the explicit ThrowIfCanceled calls above are defense-in-depth — useful in long-running middleware that does work before deferring to next, but not strictly required for short ones. If you'd rather short-circuit without throwing, check context.IsCanceled and set context.Response yourself.
You can declare additional InvokeAsync parameters. Plumber resolves them from the per-request scope on every invocation — this is the safe place for DbContext, HttpClient, and other scoped or transient services.
internal sealed class TokenizeMiddleware(RequestMiddleware<string, TextReport> next)
{
public Task InvokeAsync(
RequestContext<string, TextReport> context, // first param must be the context
ITokenizer tokenizer) // resolved from context.Services on every request
{
context.ThrowIfCanceled();
context.Data["tokens"] = tokenizer.Tokenize(context.Request);
return next(context);
}
}The dispatch is compiled to an expression tree once per registration — there's no per-invocation reflection cost.
Constructor parameters after next are resolved from the root IServiceProvider, not the per-request scope. Plumber constructs the middleware once at registration and reuses that instance for every request — effectively a singleton, regardless of how the dependency is registered.
Don't inject scoped or transient services via the constructor. The captured instance is shared across all requests; you'll get stale data, thread-safety violations, or
ObjectDisposedExceptionfrom disposed dependencies. Use method injection onInvokeAsyncinstead.
Constructor injection is appropriate when the dependency is genuinely a singleton — ILogger<T>, TimeProvider, an options instance bound from configuration:
internal sealed class LoggingMiddleware(
RequestMiddleware<string, TextReport> next,
ILogger<LoggingMiddleware> logger)
{
public async Task InvokeAsync(RequestContext<string, TextReport> context)
{
logger.LogInformation("processing {Id}", context.Id);
await next(context);
logger.LogInformation(
"completed {Id} in {Elapsed}ms",
context.Id,
context.Elapsed.TotalMilliseconds);
}
}You can also pass extra constructor arguments at registration. Declare the constructor with next first, your extra parameters next, then any DI-resolved dependencies. ActivatorUtilities matches the supplied arguments by type before satisfying the rest from the root provider:
handler.Use<RetryMiddleware>(3, TimeSpan.FromMilliseconds(200));The RequestContext.Data dictionary lets middleware pass values down the chain without modifying the request or response:
handler.Use((context, next) =>
{
context.Data["user.id"] = AuthenticateAndExtractUserId(context.Request);
return next(context);
});
handler.Use((context, next) =>
{
if (context.TryGetValue<string>("user.id", out var userId))
{
// ...
}
return next(context);
});TryGetValue<T> returns false for missing keys, null values, and type mismatches — you only get true when there's a non-null T at the key. Note: zero/default values for value types still return true — the check is value is T, not value != default. If you stored 0 for an int key, the call returns true with 0.
The dictionary is allocated lazily on first access, so pipelines that don't share data pay no allocation cost.
Don't call next and the rest of the pipeline doesn't run. This is the canonical pattern for validation, caching, and authorization:
internal sealed class ValidationMiddleware(RequestMiddleware<string, TextReport> next)
{
public Task InvokeAsync(RequestContext<string, TextReport> context)
{
context.ThrowIfCanceled();
if (string.IsNullOrWhiteSpace(context.Request))
{
context.Response = new TextReport(
Original: context.Request ?? string.Empty,
Normalized: string.Empty,
Tokens: [],
WordCount: 0,
Elapsed: TimeSpan.Zero,
ErrorMessage: "input must be non-empty");
return Task.CompletedTask; // short-circuit: no next() call
}
return next(context);
}
}Middleware registered earlier than this still observes the short-circuit on the way out — code after their own await next(context) runs normally with context.Response already populated.
Some pipelines exist purely to do work — event handlers, queue consumers, notifications. Unit is Plumber's name for "no meaningful response":
public readonly record struct Unit;Use it as TResponse:
using var handler = RequestHandlerBuilder
.Create<MessageBatch, Unit>()
.Build()
.Use<ValidateMiddleware>()
.Use<ProcessMiddleware>();
await handler.InvokeAsync(batch);Unit is borrowed from F# (unit) and Haskell (()). It's more expressive than object? and keeps every handler typed as RequestHandler<TRequest, TResponse>, no separate void shape needed.
Two timeout layers: the handler has a built-in timeout configured at Build(), and callers can layer a deadline of their own with a CancellationToken.
Handler-wide:
using var handler = builder.Build(TimeSpan.FromSeconds(30));Caller-supplied:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var response = await handler.InvokeAsync(request, cts.Token);When the handler timeout elapses, InvokeAsync throws TimeoutException. When the caller's token cancels, it throws OperationCanceledException. If both fire, the caller wins. The parameterless InvokeAsync(request) overload skips the caller layer entirely — the handler timeout is the only cancellation signal in flight. The timer is driven by the registered TimeProvider, so FakeTimeProvider works in tests.
Exceptions propagate through the pipeline by default. Wrap a try/catch at the outer edge if you want to convert or log them:
internal sealed class ErrorBoundary<TReq, TRes>(
RequestMiddleware<TReq, TRes> next,
ILogger<ErrorBoundary<TReq, TRes>> logger)
where TReq : notnull
{
public async Task InvokeAsync(RequestContext<TReq, TRes> context)
{
try
{
await next(context);
}
catch (OperationCanceledException)
{
logger.LogWarning("request {Id} was cancelled", context.Id);
throw;
}
catch (TimeoutException)
{
logger.LogWarning("request {Id} timed out", context.Id);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "request {Id} failed", context.Id);
throw;
}
}
}Register the boundary first if you want it to see every exception in the pipeline. The class is generic, so spell out the closed generic when you register it:
handler.Use<ErrorBoundary<MyRequest, MyResponse>>();Preview —
Plumber.Testingis in the source tree but is not yet published to NuGet. Once published, this becomes the recommended way to test pipelines; until then, take a project reference toPlumber.Testingdirectly.
Plumber.Testing ships a PlumberApplicationFactory<TRequest, TResponse> modeled on ASP.NET Core's WebApplicationFactory<TEntryPoint>. It builds your real pipeline once per test, lets you swap services or configuration, and disposes everything when the test ends.
using Plumber.Testing;
public sealed class PipelineTests
{
[Fact]
public async Task ValidInputProducesReportAsync()
{
using var factory = new PlumberApplicationFactory<string, TextReport>(
Pipeline.CreateBuilder,
Pipeline.Configure);
var report = await factory.InvokeAsync("Hello, World!");
Assert.NotNull(report);
Assert.Equal("hello, world!", report.Normalized);
}
[Fact]
public async Task StubTokenizerAsync()
{
using var factory = new PlumberApplicationFactory<string, TextReport>(
Pipeline.CreateBuilder,
Pipeline.Configure)
.WithServices(services =>
services.AddSingleton<ITokenizer>(new StubTokenizer(["a", "b", "c"])));
var report = await factory.InvokeAsync("anything");
Assert.Equal(3, report!.WordCount);
}
}Customization hooks:
| Method | What it does |
|---|---|
WithBuilder(Action<RequestHandlerBuilder<TReq,TRes>>) |
Escape hatch — full access to the builder |
WithServices(Action<IServiceCollection>) |
Swap or add services |
WithServices(Action<IServiceCollection, IConfiguration>) |
Same, with IConfiguration available |
WithLogging(Action<ILoggingBuilder>) |
Adjust logging |
WithConfiguration(Action<IConfigurationBuilder>) |
Add config sources |
WithInMemorySettings(IEnumerable<KeyValuePair<string, string?>>) |
Seed config keys |
CreateHandler() is idempotent — call it as many times as you like; the same handler comes back. Once it's been called, builder hooks are frozen; trying to add more throws.
Sample.Cli is a complete, working version of the same shape. It's a small CLI that reads stdin (or argv), runs it through validation → normalization → tokenization → reporting, and prints the result. The earlier README snippets are simplified for teaching — the sample's middleware add logging and use shared DataKeys constants for the context.Data keys. It demonstrates:
- The
CreateBuilder+Configuresplit - Configuration via
ConfigureConfigurationand bound configuration POCOs - DI-registered services (
ITokenizer) - Method injection on class middleware
- Structured logging via
ConfigureLogging - A timing wrapper that uses
record withto enrich the response
Sample.Cli.Tests shows both direct testing of the built pipeline and the PlumberApplicationFactory pattern.
If your application already has a built IServiceProvider — an ASP.NET Core host, a generic host, or any other container — you can build a Plumber handler that reuses that provider instead of creating its own:
using var handler = RequestHandler
.Create<MyRequest, MyResponse>(serviceProvider)
.Use<MyMiddleware1>()
.Use<MyMiddleware2>();
var response = await handler.InvokeAsync(request);The handler does not take ownership: when it's disposed, your provider is left untouched. The provider must support IServiceScopeFactory (any provider built from ServiceCollection.BuildServiceProvider or a host already does) — Plumber needs it to create the per-request scope.
If a TimeProvider is registered in the provider, it's used for Elapsed and timeouts; otherwise TimeProvider.System is used.
This is the path to take when you want a Plumber pipeline inside an ASP.NET Core minimal API, an existing console app with IHostBuilder, or any other context that already owns a DI root.
A builder is a recipe; each Build() produces an independent handler with its own service provider and configuration root. Use this to spin up a fresh handler per test, or to vary the timeout per build:
var builder = Pipeline.CreateBuilder(args);
using var fast = builder.Build(TimeSpan.FromSeconds(1));
using var slow = builder.Build(TimeSpan.FromSeconds(60));Both handlers share the same recipe but are independent at runtime.
The handler resolves TimeProvider from the service collection. Register your own to control elapsed time and timer firing in tests:
builder.ConfigureServices((services, _) =>
services.AddSingleton<TimeProvider>(new FakeTimeProvider()));FakeTimeProvider lives in Microsoft.Extensions.TimeProvider.Testing.
How does Plumber compare to ASP.NET Core middleware?
Same shape, different host. Plumber's RequestContext<TRequest, TResponse> is the typed analogue of HttpContext; the Use overloads, the onion execution model, and the per-request DI scope all behave the same way.
Can I use Plumber alongside ASP.NET Core? Yes — see Hosting inside an existing DI container. It's useful when you have a non-HTTP pipeline (a background worker, a queue handler) that should share the host's services.
My class middleware doesn't run — what's wrong?
Common causes: an earlier middleware short-circuited (didn't call next), an exception was thrown earlier in the pipeline, or your class signature doesn't match the convention. Plumber expects RequestMiddleware<TReq, TRes> next as the first constructor parameter (it's passed positionally first) and requires RequestContext<TReq, TRes> as the first InvokeAsync parameter; the InvokeAsync method must be public and return a Task.
Why isn't my appsettings.json loaded?
v3 doesn't auto-load configuration. Call AddJsonFile("appsettings.json", optional: true) (or AddDefaultConfigurationSources() for the conventional set) explicitly. See Configuration sources.
Can I add middleware after the pipeline has been invoked?
No. The first call to InvokeAsync builds the pipeline; further Use calls throw InvalidOperationException. Configure all your middleware before your first invocation.
v3 reshapes the public API around concrete types and explicit configuration. The migrations below cover the common cases.
Both IRequestHandlerBuilder<TRequest, TResponse> and IRequestHandler<TRequest, TResponse> are gone. Type your variables and parameters with the concrete classes instead.
// v2
IRequestHandlerBuilder<MyReq, MyRes> builder = RequestHandlerBuilder.Create<MyReq, MyRes>();
IRequestHandler<MyReq, MyRes> handler = builder.Build();// v3
RequestHandlerBuilder<MyReq, MyRes> builder = RequestHandlerBuilder.Create<MyReq, MyRes>();
RequestHandler<MyReq, MyRes> handler = builder.Build();The no-response type was renamed:
// v2
RequestHandlerBuilder.Create<SqsEvent, Void>();// v3
RequestHandlerBuilder.Create<SqsEvent, Unit>();v2 implicitly added appsettings.json, environment variables, and user secrets. v3 doesn't:
// v2 — implicit
var builder = RequestHandlerBuilder.Create<TReq, TRes>(args);// v3 — explicit; either pick sources individually
var builder = RequestHandlerBuilder.Create<TReq, TRes>(args)
.AddJsonFile("appsettings.json", optional: true)
.AddEnvironmentVariables();
// or opt back into the conventional set
var builder = RequestHandlerBuilder.Create<TReq, TRes>(args)
.AddDefaultConfigurationSources();AddDefaultConfigurationSources() does not include user secrets — call AddUserSecrets<T>() explicitly.
The builder no longer exposes mutable Services and Configuration properties. Use the Configure* callbacks; they run at Build() time, with the built IConfiguration available where appropriate.
// v2
var builder = RequestHandlerBuilder.Create<TReq, TRes>();
builder.Services.AddSingleton<IMyService, MyService>();
builder.Configuration.AddInMemoryCollection(...);// v3
var builder = RequestHandlerBuilder.Create<TReq, TRes>()
.AddInMemoryCollection(...)
.ConfigureServices((services, configuration) =>
{
services.AddSingleton<IMyService, MyService>();
});v2 let you inject anything into a middleware constructor. v3 still does, but the constructor parameters are resolved from the root provider, and the middleware itself is constructed once at registration time. Constructor injection of scoped or transient services is now a footgun — use method injection on InvokeAsync instead.
// v2 — works, but the DbContext is captured in the singleton middleware
internal sealed class SaveMiddleware(
RequestMiddleware<TReq, TRes> next,
AppDbContext db)
{
public async Task InvokeAsync(RequestContext<TReq, TRes> context)
{
await db.SaveAsync(context.Request);
await next(context);
}
}// v3 — DbContext is resolved fresh from the per-request scope
internal sealed class SaveMiddleware(RequestMiddleware<TReq, TRes> next)
{
public async Task InvokeAsync(
RequestContext<TReq, TRes> context,
AppDbContext db)
{
await db.SaveAsync(context.Request);
await next(context);
}
}v2 surfaced both handler timeouts and caller cancellation as OperationCanceledException. v3 throws TimeoutException for handler timeouts and OperationCanceledException for caller cancellation. Update any catch clauses that distinguished them by message:
// v2
catch (OperationCanceledException ex)
{
if (ex.Message.Contains("timeout")) { /* ... */ }
}// v3
catch (TimeoutException) { /* handler timeout */ }
catch (OperationCanceledException) { /* caller cancellation */ }Always wrap the handler in using. The handler owns the service provider it built — leaking it leaks the provider, any file watchers the configuration registered (e.g. AddJsonFile(..., reloadOnChange: true)), and any IDisposable services.
// v2
var handler = builder.Build();
var response = await handler.InvokeAsync(request);// v3
using var handler = builder.Build();
var response = await handler.InvokeAsync(request);The exception is host-mode handlers built via RequestHandler.Create(IServiceProvider) — those don't own the provider and don't dispose it; the wrapping using only marks the handler itself disposed.

