From cf78acc6b3a6274dc9b86b6b0e01511477fea828 Mon Sep 17 00:00:00 2001 From: Damien Krotkine Date: Tue, 12 Dec 2017 12:22:44 +0100 Subject: [PATCH 1/2] add idea dir to gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index a2318a8..980ecbd 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ erl_crash.dump /priv .tool-versions + +.idea/ + From ca5d856e3b1ead2952258cb0cf3c03661c84d966 Mon Sep 17 00:00:00 2001 From: Damien Krotkine Date: Tue, 12 Dec 2017 17:19:55 +0100 Subject: [PATCH 2/2] implement initializing a state and passing it around to group consumer --- README.md | 15 ++++++++++----- lib/kaffe/group_member/worker/worker.ex | 14 ++++++++------ test/kaffe/group_member/worker/worker_test.exs | 5 +++-- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index bb6f408..c966fb9 100644 --- a/README.md +++ b/README.md @@ -120,20 +120,25 @@ Batch message consumers receive a list of messages and work as part of the `:bro ### Kaffe GroupMember - Batch Message Consumer -1. Define a `handle_messages/1` function in the provided module. +1. Define a `init_handler/0` and a `handle_messages/2` function in the provided module. - `handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata. + `init_handler/0` This function will be called upon consumer initialization and should return `{:ok, state}`. This state will be passed to the `handle_messages/2`callback - The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`. + `handle_messages/2` This function (note the pluralization) will be called with a *list of messages* and the state, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata. + + The module's `handle_messages/2` function _must_ return `{:ok, state}` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/2` function returns `{:ok, state}`. ```elixir defmodule MessageProcessor - def handle_messages(messages) do + def init_handler() do + {:ok, 0} # initial state, number of messages received + end + def handle_messages(messages, state) do for %{key: key, value: value} = message <- messages do IO.inspect message IO.puts "#{key}: #{value}" end - :ok # Important! + {:ok, state + Enum.count(messages)} # Important! end end ``` diff --git a/lib/kaffe/group_member/worker/worker.ex b/lib/kaffe/group_member/worker/worker.ex index 5f030ba..b3b410d 100644 --- a/lib/kaffe/group_member/worker/worker.ex +++ b/lib/kaffe/group_member/worker/worker.ex @@ -5,8 +5,9 @@ defmodule Kaffe.Worker do Processing the message set is delegated to the configured message handler. It is responsible for any error handling. The message handler - must define a `handle_messages` function (*note* the pluralization!) - to accept a list of messages. + must define a `init_handler/0` function that should return `{:ok, state}`, and + a `handle_messages` function (*note* the pluralization!) + to accept a list of messages and a state, and returns `{:ok, state}`. The result of `handle_messages` is sent back to the subscriber. """ @@ -20,7 +21,8 @@ defmodule Kaffe.Worker do def init([message_handler, worker_name]) do Logger.info "event#starting=#{__MODULE__} name=#{worker_name}" - {:ok, %{message_handler: message_handler, worker_name: worker_name}} + {:ok, handler_state } = apply(message_handler, :init_handler, []) + {:ok, %{message_handler: message_handler, worker_name: worker_name, handler_state: handler_state}} end def process_messages(pid, subscriber_pid, topic, partition, generation_id, messages) do @@ -28,13 +30,13 @@ defmodule Kaffe.Worker do end def handle_cast({:process_messages, subscriber_pid, topic, partition, generation_id, messages}, - %{message_handler: message_handler} = state) do + %{message_handler: message_handler, handler_state: handler_state} = state) do - :ok = apply(message_handler, :handle_messages, [messages]) + {:ok, new_handler_state} = apply(message_handler, :handle_messages, [messages, handler_state]) offset = Enum.reduce(messages, 0, &max(&1.offset, &2)) subscriber().ack_messages(subscriber_pid, topic, partition, generation_id, offset) - {:noreply, state} + {:noreply, %{state| handler_state: new_handler_state}} end def terminate(reason, _state) do diff --git a/test/kaffe/group_member/worker/worker_test.exs b/test/kaffe/group_member/worker/worker_test.exs index bea32af..78f1498 100644 --- a/test/kaffe/group_member/worker/worker_test.exs +++ b/test/kaffe/group_member/worker/worker_test.exs @@ -11,9 +11,10 @@ defmodule Kaffe.WorkerTest do end defmodule TestHandler do - def handle_messages(messages) do + def init_handler(), do: {:ok, :some_state} + def handle_messages(messages, state) do send :test_case, {:handle_messages, messages} - :ok + {:ok, state} end end