From bb6f53f6f38a2c9b5eae164920ff31d99133f46d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 20 Apr 2026 12:58:55 +0000 Subject: [PATCH 1/4] Initial plan From 84670719c8de266e26ba549df2be47a99dcba242 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 20 Apr 2026 13:01:42 +0000 Subject: [PATCH 2/4] fix: add producer.flush() to kafka tests to prevent race conditions The producer.send() method in kafka-python is asynchronous - it queues messages to be sent by a background I/O thread. Without calling producer.flush(), messages may not be delivered to the Kafka broker before the consumer's consumer_timeout_ms=500 expires. This creates a race condition where the consumer loop exits with no items consumed, resulting in no spans being created. Added producer.flush() after producer.send() calls in all delayed_send functions to ensure messages are actually delivered to Kafka. Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/e5ad59a6-a44a-4dd2-978e-fe4844b5c198 Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> --- tests/instrumentation/kafka_tests.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 54be2ee8e..6bcd50593 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -114,6 +114,7 @@ def delayed_send(): producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") elasticapm_client.end_transaction("foo") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -140,6 +141,7 @@ def delayed_send(): producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") elasticapm_client.end_transaction("foo") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -166,6 +168,7 @@ def delayed_send(): producer.send(topic="foo", key=b"foo", value=b"bar") producer.send("bar", key=b"foo", value=b"bar") producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -186,6 +189,7 @@ def delayed_send(): producer.send(topic="foo", key=b"foo", value=b"bar") producer.send("bar", key=b"foo", value=b"bar") producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -205,6 +209,7 @@ def delayed_send(): time.sleep(0.2) producer.send("test", key=b"foo", value=b"bar") producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -249,6 +254,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( def delayed_send(): time.sleep(0.2) producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() From 059906fce22fddb3c331f47e3dfd1086447db510 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Apr 2026 16:00:19 +0000 Subject: [PATCH 3/4] fix: wait for kafka send futures in flaky consumer tests Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/c8da0228-9a66-4391-a204-fe0b99b1b85b Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> --- tests/instrumentation/kafka_tests.py | 32 +++++++++++----------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 6bcd50593..f5d81eefe 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -111,10 +111,9 @@ def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) elasticapm_client.end_transaction("foo") - producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -138,10 +137,9 @@ def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, produc def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) elasticapm_client.end_transaction("foo") - producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -165,10 +163,9 @@ def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, co def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") - producer.flush() + producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5) + producer.send("bar", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) thread = threading.Thread(target=delayed_send) thread.start() @@ -186,10 +183,9 @@ def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_ def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") - producer.flush() + producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5) + producer.send("bar", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) thread = threading.Thread(target=delayed_send) thread.start() @@ -207,9 +203,8 @@ def delayed_send(): def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") - producer.flush() + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) thread = threading.Thread(target=delayed_send) thread.start() @@ -253,8 +248,7 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( ): def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") - producer.flush() + producer.send("test", key=b"foo", value=b"bar").get(timeout=5) thread = threading.Thread(target=delayed_send) thread.start() From d2bad117366e161f500742548a6fcd5abbc664c8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 23 Apr 2026 10:24:25 +0000 Subject: [PATCH 4/4] fix: flush kafka producer before ending sender transaction Agent-Logs-Url: https://github.com/elastic/apm-agent-python/sessions/610642ed-3a8d-4371-bccb-485eaee1d69d Co-authored-by: xrmx <12932+xrmx@users.noreply.github.com> --- tests/instrumentation/kafka_tests.py | 32 +++++++++++++++++----------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index f5d81eefe..8339ceefb 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -111,8 +111,9 @@ def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) - producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar") + producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() elasticapm_client.end_transaction("foo") thread = threading.Thread(target=delayed_send) @@ -137,8 +138,9 @@ def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, produc def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) - producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar") + producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() elasticapm_client.end_transaction("foo") thread = threading.Thread(target=delayed_send) @@ -163,9 +165,10 @@ def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, co def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5) - producer.send("bar", key=b"foo", value=b"bar").get(timeout=5) - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send(topic="foo", key=b"foo", value=b"bar") + producer.send("bar", key=b"foo", value=b"bar") + producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -183,9 +186,10 @@ def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_ def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar").get(timeout=5) - producer.send("bar", key=b"foo", value=b"bar").get(timeout=5) - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send(topic="foo", key=b"foo", value=b"bar") + producer.send("bar", key=b"foo", value=b"bar") + producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -203,8 +207,9 @@ def delayed_send(): def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) - producer.send("test", key=b"baz", value=b"bazzinga").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar") + producer.send("test", key=b"baz", value=b"bazzinga") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start() @@ -248,7 +253,8 @@ def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( ): def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar").get(timeout=5) + producer.send("test", key=b"foo", value=b"bar") + producer.flush() thread = threading.Thread(target=delayed_send) thread.start()