From 663729e5d4666f0857f5316a313f94e52834cd65 Mon Sep 17 00:00:00 2001 From: Andrew France Date: Mon, 15 Mar 2021 15:55:02 +0000 Subject: [PATCH 1/2] Test Producer with sync_start option --- test/producer_test.exs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/test/producer_test.exs b/test/producer_test.exs index aebbf2b..ac97f02 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -21,6 +21,26 @@ defmodule Rabbit.ProducerTest do end end + defmodule TroublesomeTestProducer do + use Rabbit.Producer + + @impl Rabbit.Producer + def init(_type, opts) do + {:ok, opts} + end + + @impl Rabbit.Producer + def handle_setup(state) do + attempt = Agent.get_and_update(state.setup_opts[:counter], fn n -> {n, n + 1} end) + + if attempt == 0 do + {:error, :something_went_wrong} + else + :ok + end + end + end + describe "start_link/3" do test "starts producer" do assert {:ok, connection} = Connection.start_link(TestConnection) @@ -50,6 +70,22 @@ defmodule Rabbit.ProducerTest do end end + describe "start_link/3 with :sync_start" do + test "starts producer with multiple attempts" do + {:ok, counter} = Agent.start(fn -> 0 end) + assert {:ok, connection} = Connection.start_link(TestConnection) + + assert {:ok, _producer} = + Producer.start_link(TroublesomeTestProducer, + connection: connection, + sync_start: true, + setup_opts: [counter: counter] + ) + + assert Agent.get(counter, & &1) == 2 + end + end + describe "stop/1" do test "stops producer" do assert {:ok, connection} = Connection.start_link(TestConnection) From 008b1da5df8477885315d6496dd83daecf19801d Mon Sep 17 00:00:00 2001 From: Andrew France Date: Mon, 15 Mar 2021 15:55:38 +0000 Subject: [PATCH 2/2] Add sync_start option to Consumer Allows calling processes to wait until the Consumer is ready to consume. --- lib/rabbit/consumer.ex | 9 +++++ lib/rabbit/consumer/server.ex | 35 ++++++++++++++++- test/consumer_test.exs | 72 ++++++++++++++++++++++++++++++++--- 3 files changed, 109 insertions(+), 7 deletions(-) diff --git a/lib/rabbit/consumer.ex b/lib/rabbit/consumer.ex index 7130904..b0f02ac 100644 --- a/lib/rabbit/consumer.ex +++ b/lib/rabbit/consumer.ex @@ -100,6 +100,9 @@ defmodule Rabbit.Consumer do | {:arguments, Keyword.t()} | {:custom_meta, map()} | {:setup_opts, setup_options()} + | {:sync_start, boolean()} + | {:sync_start_delay, non_neg_integer()} + | {:sync_start_max, non_neg_integer()} @type options :: [option()] @type delivery_tag :: non_neg_integer() @type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}] @@ -217,6 +220,12 @@ defmodule Rabbit.Consumer do * `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message` handled by the consumer. * `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`. + * `:sync_start` - Boolean representing whether to establish the connection, + channel, and setup synchronously - defaults to `false`. + * `:sync_start_delay` - The amount of time in milliseconds to sleep between + sync start attempts - defaults to `50`. + * `:sync_start_max` - The max amount of sync start attempts that will occur + before proceeding with async start - defaults to `100`. ## Server Options diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index 4ce405e..94055b0 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -20,7 +20,10 @@ defmodule Rabbit.Consumer.Server do arguments: [type: :list, default: []], timeout: [type: [:integer, :atom], required: false], custom_meta: [type: :map, default: %{}], - setup_opts: [type: :list, default: [], required: false] + setup_opts: [type: :list, default: [], required: false], + sync_start: [type: :boolean, required: true, default: false], + sync_start_delay: [type: :integer, required: true, default: 50], + sync_start_max: [type: :integer, required: true, default: 100] } @qos_opts [ @@ -61,6 +64,7 @@ defmodule Rabbit.Consumer.Server do with {:ok, opts} <- module.init(:consumer, opts), {:ok, opts} <- validate_opts(opts, @opts_schema) do state = init_state(module, opts) + state = sync_start(state) {:ok, state, {:continue, :connection}} end end @@ -171,10 +175,37 @@ defmodule Rabbit.Consumer.Server do consume_opts: Keyword.take(opts, @consume_opts), worker_opts: Keyword.take(opts, @worker_opts), custom_meta: Keyword.get(opts, :custom_meta), - setup_opts: Keyword.get(opts, :setup_opts) + setup_opts: Keyword.get(opts, :setup_opts), + sync_start: Keyword.get(opts, :sync_start), + sync_start_delay: Keyword.get(opts, :sync_start_delay), + sync_start_max: Keyword.get(opts, :sync_start_max), + started_mode: :async } end + defp sync_start(state, attempt \\ 1) + + defp sync_start(%{sync_start: false} = state, _attempt), do: state + + defp sync_start(%{sync_start_max: max} = state, attempt) when attempt >= max do + log_error(state, {:error, :sync_start_failed}) + state + end + + defp sync_start(state, attempt) do + with {:ok, state} <- connection(state), + {:ok, connection} <- Rabbit.Connection.fetch(state.connection), + {:ok, state} <- channel(state, connection), + {:ok, state} <- handle_setup(state), + {:ok, state} <- consume(state) do + %{state | started_mode: :sync} + else + _ -> + :timer.sleep(state.sync_start_delay) + sync_start(state, attempt + 1) + end + end + defp connection(%{connection_subscribed: true} = state), do: {:ok, state} defp connection(state) do diff --git a/test/consumer_test.exs b/test/consumer_test.exs index 234a0fa..68c6270 100644 --- a/test/consumer_test.exs +++ b/test/consumer_test.exs @@ -101,6 +101,36 @@ defmodule Rabbit.ConsumerTest do end end + defmodule TroublesomeTestConsumer do + use Rabbit.Consumer + + @impl Rabbit.Consumer + def init(:consumer, opts) do + {:ok, opts} + end + + @impl Rabbit.Consumer + def handle_setup(state) do + attempt = Agent.get_and_update(state.setup_opts[:counter], fn n -> {n, n + 1} end) + + if attempt == 0 do + {:error, :something_went_wrong} + else + AMQP.Queue.declare(state.channel, state.queue, auto_delete: true) + :ok + end + end + + @impl Rabbit.Consumer + def handle_message(_msg) do + end + + @impl Rabbit.Consumer + def handle_error(_) do + :ok + end + end + setup do {:ok, connection} = Connection.start_link(TestConnection) {:ok, producer} = Producer.start_link(TestProducer, connection: connection) @@ -118,6 +148,34 @@ defmodule Rabbit.ConsumerTest do end end + describe "start_link/3 with :sync_start" do + test "starts consumer", meta do + assert {:ok, consumer} = + Consumer.start_link(TestConsumer, + connection: meta.connection, + queue: "consumer", + sync_start: true + ) + + assert %{started_mode: :sync, consuming: true} = get_state(consumer) + end + + test "starts consumer with multiple attempts", meta do + {:ok, counter} = Agent.start(fn -> 0 end) + + assert {:ok, consumer} = + Consumer.start_link(TroublesomeTestConsumer, + connection: meta.connection, + queue: "consumer", + sync_start: true, + setup_opts: [counter: counter] + ) + + assert Agent.get(counter, & &1) == 2 + assert %{started_mode: :sync, consuming: true} = get_state(consumer) + end + end + describe "stop/1" do test "stops consumer", meta do assert {:ok, consumer, _queue} = start_consumer(meta) @@ -128,7 +186,7 @@ defmodule Rabbit.ConsumerTest do test "disconnects the amqp channel", meta do assert {:ok, consumer, _queue} = start_consumer(meta) - state = GenServer.call(consumer, :state) + state = get_state(consumer) assert Process.alive?(state.channel.pid) assert :ok = Consumer.stop(consumer) @@ -143,10 +201,10 @@ defmodule Rabbit.ConsumerTest do assert {:ok, consumer, _queue} = start_consumer(meta) connection_state = connection_state(meta.connection) - consumer_state1 = GenServer.call(consumer, :state) + consumer_state1 = get_state(consumer) AMQP.Connection.close(connection_state.connection) await_consuming(consumer) - consumer_state2 = GenServer.call(consumer, :state) + consumer_state2 = get_state(consumer) assert consumer_state1.channel.pid != consumer_state2.channel.pid end @@ -256,7 +314,7 @@ defmodule Rabbit.ConsumerTest do end defp await_consuming(consumer) do - state = GenServer.call(consumer, :state) + state = get_state(consumer) if state.consuming do :ok @@ -270,7 +328,11 @@ defmodule Rabbit.ConsumerTest do :crypto.strong_rand_bytes(8) |> Base.encode64() end + defp get_state(consumer) do + GenServer.call(consumer, :state) + end + defp connection_state(connection) do - Connection.transaction(connection, &GenServer.call(&1, :state)) + Connection.transaction(connection, &get_state/1) end end