Skip to content

Add abstractions for tiered storage of stream data#196

Open
the-mikedavis wants to merge 6 commits into
rabbitmq:mainfrom
amazon-mq:tiered-storage-abstractions
Open

Add abstractions for tiered storage of stream data#196
the-mikedavis wants to merge 6 commits into
rabbitmq:mainfrom
amazon-mq:tiered-storage-abstractions

Conversation

@the-mikedavis
Copy link
Copy Markdown
Collaborator

This is an evolution to #194 with a different approach to adding the necessary hooks for #184. It's quite different so I wanted to make a new PR to preserve the old history.

The main difference is that the "log reader" part is at a higher level. Instead of file-like functions with callbacks like open/1, pread/3 and close/1 we have callbacks more like what's already in osiris_log: init_offset_reader/2, send_file/3, chunk_iterator/3 and iterator_next/1. @kjnilsson suggested something higher level when we last discussed the reader. It's much more natural than wrapping file operations after rebasing on #192. Implementors have more responsibilities, but it makes for much less invasive changes into osiris_log and it doesn't need many changes for callers: just search-and-replace osiris_log with osiris_log_reader (see here). It also makes the manifest behaviour less complex and invasive. The manifest mainly provides hooks into events like whenever a chunk is written or a segment file rolled over. It also has hooks into writer and acceptor start-up and closing.

Use of these abstractions for S3 tiered storage can be found in amazon-mq/rabbitmq-stream-s3#2 - not everything works well yet but it shows a good sketch of using the reader and manifest behaviors.

Comment thread src/osiris.erl Outdated
Comment on lines +69 to +73
-type retention_fun() :: fun((IdxFile :: file:filename_all()) -> boolean()).
-type retention_spec() ::
{max_bytes, non_neg_integer()} | {max_age, milliseconds()}.
{max_bytes, non_neg_integer()} |
{max_age, milliseconds()} |
{'fun', retention_fun()}.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retention spec part of this PR could probably be ported out on its own. It could also be used to provide a feature unrelated to tiered storage like rabbitmq/rabbitmq-server#14413 - retention up to an offset

Comment thread src/osiris_log.erl
@the-mikedavis the-mikedavis force-pushed the tiered-storage-abstractions branch from 5f5ed04 to 6ce7e82 Compare November 18, 2025 17:16
@the-mikedavis the-mikedavis force-pushed the tiered-storage-abstractions branch from 6ce7e82 to d579d06 Compare December 1, 2025 20:33
@the-mikedavis the-mikedavis force-pushed the tiered-storage-abstractions branch 2 times, most recently from 02ca62b to 09e7e09 Compare January 12, 2026 18:47
Comment thread src/osiris.erl
Copy link
Copy Markdown
Collaborator

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed PR description and the cleanup since #194. A few observations:

overview/1 callback is never dispatched

osiris_log_manifest declares an overview/1 callback, and rabbitmq_stream_s3_log_manifest implements it. However osiris_writer:overview/1 calls osiris_log:overview(Dir) directly rather than dispatching through (manifest_module()):overview(Dir). So the callback is currently dead code. Either the dispatch is missing from osiris_writer, or the callback should be removed from the behaviour.

send_file_callback() not exported

send_file_callback() is used in the -callback spec for send_file/3 but is not included in -export_type. Implementors who want to reference it in their own -spec declarations won't be able to, and Dialyzer will complain.

ct:pal left in evaluate_retention_fun test

Small nit — there's a ct:pal debug print left in the test body that should be removed before merge.

@the-mikedavis the-mikedavis force-pushed the tiered-storage-abstractions branch 2 times, most recently from 6ac7054 to 6092efe Compare April 10, 2026 14:46
@the-mikedavis
Copy link
Copy Markdown
Collaborator Author

the-mikedavis commented Apr 10, 2026

I have rebased and resolved those: 1/ delete overview/1, not needed anymore, 2/ export the callback, 3/ remove the ct:pal. I also squashed in the merge with your change.

@the-mikedavis
Copy link
Copy Markdown
Collaborator Author

the-mikedavis commented Apr 10, 2026

I have added a change to the osiris_log_reader behavior and changed some of the reading functions of osiris_log in the latest commit. When retention overtakes a reader, the reader skips ahead to the first available chunk ID. We need a way to 'hook in' to that so that we convert the lagging reader to a remote reader (if the data is present in the remote tier) instead of letting it skip over data.

This is the change in the S3 plugin which uses it: amazon-mq/rabbitmq-stream-s3#114

@the-mikedavis the-mikedavis force-pushed the tiered-storage-abstractions branch from d8dd337 to 05f280f Compare May 19, 2026 14:18
@the-mikedavis
Copy link
Copy Markdown
Collaborator Author

I've been working on a redesign for the upload path part of these abstractions (amazon-mq/rabbitmq-stream-s3#148), replacing the osiris_log_manifest behaviour (e9f030d) with a thin set of hooks for on-init and on-retention-change (05f280f). osiris_log_manifest seemed pretty hacky to me, I think this is much cleaner. The process in charge of copying data to the remote tier now mirrors osiris_replica_reader. This way it doesn't interfere with the writer at all. I think it's a smoother design and it shrinks the diff here nicely as well.

the-mikedavis and others added 4 commits May 29, 2026 22:12
This change introduces a behaviour `osiris_log_reader` which can be
implemented externally to read from a stream at a given offset spec.
This closes over the high-level reading operations `send_file/3` and
`chunk_iterator/3`. `osiris:init_reader/4` selects the reader module
based on application env, and then callers use `osiris_log_reader` to
interact with the reader.

By default all of these functions delegate to `osiris_log`. `osiris_log`
doesn't need any meaningful changes this way. The only change is to
expose the `header_map()` type.
This can be used flexibly to evaluate retention depending on the name or
contents of index files. You pass in a function which returns a tuple
with the index files split into two lists: to delete and to keep.
This could be used as a way to truncate everything up to an offset or to
guarantee that an offset (for example an uncommitted one) won't be
truncated. Since these files are sorted, some retention functions could
operate just on the names (deriving the offset of the segment with
`erlang:binary_to_integer/1`).
This change refactors `parse_header/2` to take the chunk header binary
and the position at which it was read and return a `header_map()`. This
is useful for other readers - so that they do not need to duplicate the
binary match code and `next_position` calculation.
When local retention on the writer node deletes a segment while the
`osiris_replica_reader` is reading it, `read_header_with_ra` skips
ahead to the next available segment using `first_chunk_id`. If the
replica has not yet received data up to that offset, `accept_chunk`
exits with `{accept_chunk_out_of_order, Received, Expected}`.

The replica recovers correctly either way — the stream coordinator
restarts it and `init_acceptor` resyncs to the writer's current
position. However, the unhandled exit floods the log with thousands
of `[error]` lines per event.

Catch the exit in `handle_incoming_data`, log a single `[warning]`,
and stop with `normal`. Recovery behavior is unchanged.
This adds hooks in the log which are executed for writers and acceptors
that can be used in plugins. The plugin intercepts the config map in
init and can modify values. Same for retention: a plugin can modify the
retention specs when it is updated.
@lukebakken lukebakken force-pushed the tiered-storage-abstractions branch from 05f280f to 8c7ea35 Compare May 29, 2026 22:12
After `trigger_retention_eval` sets counters from local segment state,
call the hook to let plugins override values. The tiered storage plugin
uses this to correct `?C_FIRST_OFFSET` when the remote tier holds older
data than the local tier.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tiered Storage Support for RabbitMQ Streams

2 participants