From dc22fbf1d4251cc72ca7ba9ef60f52db725bfdc1 Mon Sep 17 00:00:00 2001 From: Saurabh16-s Date: Mon, 4 May 2026 23:15:06 +0530 Subject: [PATCH 1/3] Fix outdated code references in message passing documentation --- docs/Message-passing-implementation.md | 40 ++++++++++++++------------ 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/docs/Message-passing-implementation.md b/docs/Message-passing-implementation.md index fd4cf2cbeea..fdc9d69a900 100644 --- a/docs/Message-passing-implementation.md +++ b/docs/Message-passing-implementation.md @@ -3,28 +3,30 @@ title: Message Passing Implementation layout: documentation documentation: true --- -(Note: this walkthrough is out of date as of 0.8.0. 0.8.0 revamped the message passing infrastructure to be based on the Disruptor) + +(Note: this walkthrough has been updated for v2.6.2. As of 0.8.0, the message passing infrastructure has been based on the Disruptor) This page walks through how emitting and transferring tuples works in Storm. - Worker is responsible for message transfer - - `refresh-connections` is called every "task.refresh.poll.secs" or whenever assignment in ZK changes. It manages connections to other workers and maintains a mapping from task -> worker [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L123) - - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L56) - - The serializer is thread-safe [code](https://github.com/apache/storm/blob/0.7.1/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java#L26) - - The worker has a single thread which drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L185) - - Message sending happens through this protocol: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/messaging/protocol.clj) - - The implementation for distributed mode uses ZeroMQ [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/messaging/zmq.clj) - - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing to get ZeroMQ installed) [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/messaging/local.clj) + - Connection management is handled by the `WorkerState` class which manages connections to other workers and maintains a mapping from task -> worker. Connection refresh is triggered every "task.refresh.poll.secs" or whenever assignment in ZK changes. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java) + - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java) + - The serializer is thread-safe [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java) + - The worker drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java) + - Message sending happens through this interface: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java) + - The implementation for distributed mode uses Netty [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/) + - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing external messaging dependencies) [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/local/) - Receiving messages in tasks works differently in local mode and distributed mode - - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/messaging/local.clj#L21) - - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port is called a "virtual port", because it receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L204) - - The virtual port implementation is here: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/zilch/virtual_port.clj) - - Tasks listen on an in-memory ZeroMQ port for messages from the virtual port [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L201) - - Bolts listen here: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L489) - - Spouts listen here: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L382) + - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/local/) + - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java) + - The message routing implementation is here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/) + - Executors listen on an in-memory connection for messages [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/Executor.java) + - Bolts listen here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) + - Spouts listen here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) - Tasks are responsible for message routing. A tuple is emitted either to a direct stream (where the task id is specified) or a regular stream. In direct streams, the message is only sent if that bolt subscribes to that direct stream. In regular streams, the stream grouping functions are used to determine the task ids to send the tuple to. - - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L198) - - The "tasks-fn" returns the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L207) - - After getting the output task ids, bolts and spouts use the transfer-fn provided by the worker to actually transfer the tuples - - Bolt transfer code here: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L429) - - Spout transfer code here: [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L329) + - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm//org/apache/storm/daemon/Task.java) + - Stream grouping functions determine the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/Task.java) + - After getting the output task ids, bolts and spouts use the transfer function provided by the worker to actually transfer the tuples + - Bolt transfer code here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) + - Spout transfer code here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) + From d32cc34aad6d9282d78c9698d6fc2a62cf0ea173 Mon Sep 17 00:00:00 2001 From: Saurabh16-s Date: Tue, 5 May 2026 00:29:17 +0530 Subject: [PATCH 2/3] Update message passing documentation to reflect Storm v2.8.7 --- docs/Message-passing-implementation.md | 36 +++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/Message-passing-implementation.md b/docs/Message-passing-implementation.md index fdc9d69a900..8f410816760 100644 --- a/docs/Message-passing-implementation.md +++ b/docs/Message-passing-implementation.md @@ -4,29 +4,29 @@ layout: documentation documentation: true --- -(Note: this walkthrough has been updated for v2.6.2. As of 0.8.0, the message passing infrastructure has been based on the Disruptor) +(Note: this walkthrough has been updated for v2.8.7. As of 0.8.0, the message passing infrastructure has been based on the Disruptor) This page walks through how emitting and transferring tuples works in Storm. - Worker is responsible for message transfer - - Connection management is handled by the `WorkerState` class which manages connections to other workers and maintains a mapping from task -> worker. Connection refresh is triggered every "task.refresh.poll.secs" or whenever assignment in ZK changes. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java) - - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java) - - The serializer is thread-safe [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java) - - The worker drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java) - - Message sending happens through this interface: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java) - - The implementation for distributed mode uses Netty [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/) - - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing external messaging dependencies) [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/local/) + - Connection management is handled by the `WorkerState` class which manages connections to other workers and maintains a mapping from task -> worker. Connection refresh is triggered every "task.refresh.poll.secs" or whenever assignment in ZK changes. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java) + - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java) + - The serializer is thread-safe [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java) + - The worker drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java) + - Message sending happens through this interface: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java) + - The implementation for distributed mode uses Netty [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/) + - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing external messaging dependencies) [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/) - Receiving messages in tasks works differently in local mode and distributed mode - - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/local/) - - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java) - - The message routing implementation is here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/messaging/netty/) - - Executors listen on an in-memory connection for messages [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/Executor.java) - - Bolts listen here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) - - Spouts listen here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) + - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/) + - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java) + - The message routing implementation is here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/) + - Executors listen on an in-memory connection for messages [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/Executor.java) + - Bolts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) + - Spouts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) - Tasks are responsible for message routing. A tuple is emitted either to a direct stream (where the task id is specified) or a regular stream. In direct streams, the message is only sent if that bolt subscribes to that direct stream. In regular streams, the stream grouping functions are used to determine the task ids to send the tuple to. - - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm//org/apache/storm/daemon/Task.java) - - Stream grouping functions determine the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/daemon/Task.java) + - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/Task.java) + - Stream grouping functions determine the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/Task.java) - After getting the output task ids, bolts and spouts use the transfer function provided by the worker to actually transfer the tuples - - Bolt transfer code here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) - - Spout transfer code here: [code](https://github.com/apache/storm/blob/v2.6.2/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) + - Bolt transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) + - Spout transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) From 1ea39bd16ece509a3747ad9106874a20dad8385b Mon Sep 17 00:00:00 2001 From: Saurabh16-s Date: Tue, 5 May 2026 22:11:01 +0530 Subject: [PATCH 3/3] Update Message Passing Implementation doc to v2.8.7 with accurate links --- docs/Message-passing-implementation.md | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/Message-passing-implementation.md b/docs/Message-passing-implementation.md index 8f410816760..0275925e119 100644 --- a/docs/Message-passing-implementation.md +++ b/docs/Message-passing-implementation.md @@ -4,29 +4,28 @@ layout: documentation documentation: true --- -(Note: this walkthrough has been updated for v2.8.7. As of 0.8.0, the message passing infrastructure has been based on the Disruptor) +(Note: this walkthrough has been updated for v2.8.7. The message passing infrastructure was rewritten in 0.8.0 (originally Disruptor-based) and later moved to JCQueue for better performance.) This page walks through how emitting and transferring tuples works in Storm. - Worker is responsible for message transfer - Connection management is handled by the `WorkerState` class which manages connections to other workers and maintains a mapping from task -> worker. Connection refresh is triggered every "task.refresh.poll.secs" or whenever assignment in ZK changes. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java) - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java) - - The serializer is thread-safe [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java) + - Tuple serialization uses `KryoTupleSerializer`. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java) - The worker drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java) - Message sending happens through this interface: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java) - - The implementation for distributed mode uses Netty [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/) - - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing external messaging dependencies) [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/) + - The implementation for distributed mode uses Netty [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java) + - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing external messaging dependencies) [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java#L38) - Receiving messages in tasks works differently in local mode and distributed mode - - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/) + - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java#L127) - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java) - - The message routing implementation is here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/) - - Executors listen on an in-memory connection for messages [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/Executor.java) - - Bolts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) - - Spouts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) + - The message routing implementation is here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java) + - Executors consume from their receive queue (a `JCQueue`) [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/Executor.java) + - Bolts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java#L154) + - Spouts listen here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L165) - Tasks are responsible for message routing. A tuple is emitted either to a direct stream (where the task id is specified) or a regular stream. In direct streams, the message is only sent if that bolt subscribes to that direct stream. In regular streams, the stream grouping functions are used to determine the task ids to send the tuple to. - - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/Task.java) - - Stream grouping functions determine the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/Task.java) + - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function}; the grouping functions determine the task ids to send the tuples to for either regular stream emit or direct stream emit. [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/daemon/Task.java) - After getting the output task ids, bolts and spouts use the transfer function provided by the worker to actually transfer the tuples - - Bolt transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java) - - Spout transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java) + - Bolt transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java#L127) + - Spout transfer code here: [code](https://github.com/apache/storm/blob/v2.8.7/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L133)