Skip to content

Commit 48832ab

Browse files
TomKclaude
andcommitted
Add PubSub test coverage and fix gRPC error code handling
Add integration tests for GooglePubSubProvider against the PubSub emulator. Fix auto_create not working on gRPC transport - the code compared exception codes against HTTP status codes (404/409) but the Google Cloud PHP library passes gRPC codes (5/6) when using gRPC transport. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f3a712f commit 48832ab

3 files changed

Lines changed: 272 additions & 3 deletions

File tree

.github/workflows/unit-test.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ jobs:
2121
image: rabbitmq:${{matrix.rabbit}}-management
2222
ports:
2323
- 5672:5672
24+
pubsub:
25+
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
26+
ports:
27+
- 8085:8085
28+
options: >-
29+
--entrypoint "gcloud beta emulators pubsub start --host-port=0.0.0.0:8085"
30+
env:
31+
CLOUDSDK_CORE_PROJECT: test-project
2432
steps:
2533
- uses: actions/checkout@v6
2634
- uses: shivammathur/setup-php@v2
@@ -30,4 +38,6 @@ jobs:
3038
tools: composer, phpunit
3139
- run: composer install -n --prefer-dist --no-security-blocking
3240
- run: php vendor/phpunit/phpunit/phpunit -c phpunit.xml --coverage-clover=coverage.xml
41+
env:
42+
PUBSUB_EMULATOR_HOST: localhost:8085
3343
- run: php vendor/bin/coverage-check coverage.xml 10

src/Provider/Google/GooglePubSubProvider.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private function _createTopicAndSub()
157157
}
158158
catch(ConflictException $e)
159159
{
160-
if($e->getCode() != 409)
160+
if($e->getCode() != 409 && $e->getCode() != 6)
161161
{
162162
throw $e;
163163
}
@@ -169,7 +169,7 @@ private function _createTopicAndSub()
169169
}
170170
catch(ConflictException $e)
171171
{
172-
if($e->getCode() != 409)
172+
if($e->getCode() != 409 && $e->getCode() != 6)
173173
{
174174
throw $e;
175175
}
@@ -216,7 +216,7 @@ public function pushBatch(array $batch)
216216
}
217217
catch(NotFoundException $e)
218218
{
219-
if($this->_getAutoCreate() && ($e->getCode() == 404))
219+
if($this->_getAutoCreate() && ($e->getCode() == 404 || $e->getCode() == 5))
220220
{
221221
$this->_createTopicAndSub();
222222
return $topic->publishBatch($messages);
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
<?php
2+
namespace Packaged\Queue\Tests\Provider;
3+
4+
use Packaged\Config\Provider\ConfigSection;
5+
use Packaged\Queue\Provider\Google\GooglePubSubProvider;
6+
use PHPUnit\Framework\TestCase;
7+
8+
class GooglePubSubTest extends TestCase
9+
{
10+
protected function setUp(): void
11+
{
12+
parent::setUp();
13+
// Point the PubSub client at the local emulator
14+
putenv('PUBSUB_EMULATOR_HOST=localhost:8085');
15+
}
16+
17+
protected function _getProvider(string $topic, string $subscription = null): GooglePubSubProvider
18+
{
19+
$q = GooglePubSubProvider::create($topic, $subscription);
20+
$q->configure(new ConfigSection('', ['auto_create' => true]));
21+
return $q;
22+
}
23+
24+
protected function _uniqueName(string $base): string
25+
{
26+
return $base . '_' . uniqid();
27+
}
28+
29+
public function testPushAndConsume()
30+
{
31+
$name = $this->_uniqueName('test_push_consume');
32+
$q = $this->_getProvider($name);
33+
34+
$q->push('hello world');
35+
36+
$result = null;
37+
$q->consume(function ($data) use (&$result) {
38+
$result = $data;
39+
return true; // ack
40+
});
41+
42+
$this->assertEquals('hello world', $result);
43+
}
44+
45+
public function testPushBatch()
46+
{
47+
$name = $this->_uniqueName('test_push_batch');
48+
$q = $this->_getProvider($name);
49+
50+
$result = $q->pushBatch(['msg1', 'msg2', 'msg3']);
51+
$this->assertCount(3, $result['messageIds']);
52+
}
53+
54+
public function testPushBatchEmpty()
55+
{
56+
$name = $this->_uniqueName('test_push_batch_empty');
57+
$q = $this->_getProvider($name);
58+
59+
$result = $q->pushBatch([]);
60+
$this->assertNull($result);
61+
}
62+
63+
public function testConsumeNack()
64+
{
65+
$name = $this->_uniqueName('test_nack');
66+
$q = $this->_getProvider($name);
67+
68+
$q->push('nack me');
69+
70+
// Nack the message (return false)
71+
$q->consume(function ($data) {
72+
return false;
73+
});
74+
75+
// Message should be redelivered after nack
76+
$result = null;
77+
$q->consume(function ($data) use (&$result) {
78+
$result = $data;
79+
return true;
80+
});
81+
82+
$this->assertEquals('nack me', $result);
83+
}
84+
85+
public function testBatchConsume()
86+
{
87+
$name = $this->_uniqueName('test_batch_consume');
88+
$q = $this->_getProvider($name);
89+
90+
$q->pushBatch(['b1', 'b2', 'b3']);
91+
92+
$received = [];
93+
$hadMessages = $q->batchConsume(function ($messages) use ($q, &$received) {
94+
foreach($messages as $ackId => $data)
95+
{
96+
$received[] = $data;
97+
$q->ack($ackId);
98+
}
99+
}, 10);
100+
101+
$this->assertTrue($hadMessages);
102+
$this->assertCount(3, $received);
103+
}
104+
105+
public function testBatchAck()
106+
{
107+
$name = $this->_uniqueName('test_batch_ack');
108+
$q = $this->_getProvider($name);
109+
110+
$q->pushBatch(['a1', 'a2', 'a3']);
111+
112+
$received = [];
113+
$q->batchConsume(function ($messages) use ($q, &$received) {
114+
$results = [];
115+
foreach($messages as $ackId => $data)
116+
{
117+
$received[] = $data;
118+
$results[$ackId] = true;
119+
}
120+
$q->batchAck($results);
121+
}, 10);
122+
123+
$this->assertCount(3, $received);
124+
}
125+
126+
public function testBatchNack()
127+
{
128+
$name = $this->_uniqueName('test_batch_nack');
129+
$q = $this->_getProvider($name);
130+
131+
$q->push('nack_batch');
132+
133+
$q->batchConsume(function ($messages) use ($q) {
134+
$results = [];
135+
foreach($messages as $ackId => $data)
136+
{
137+
$results[$ackId] = false; // nack
138+
}
139+
$q->batchAck($results);
140+
}, 10);
141+
142+
// Nacked message should be redelivered
143+
$redelivered = null;
144+
$q->batchConsume(function ($messages) use ($q, &$redelivered) {
145+
foreach($messages as $ackId => $data)
146+
{
147+
$redelivered = $data;
148+
$q->ack($ackId);
149+
}
150+
}, 10);
151+
152+
$this->assertEquals('nack_batch', $redelivered);
153+
}
154+
155+
public function testSingleAckAndNack()
156+
{
157+
$name = $this->_uniqueName('test_single_ack_nack');
158+
$q = $this->_getProvider($name);
159+
160+
$q->pushBatch(['keep', 'reject']);
161+
162+
$nackId = null;
163+
$q->batchConsume(function ($messages) use ($q, &$nackId) {
164+
foreach($messages as $ackId => $data)
165+
{
166+
if($data === 'keep')
167+
{
168+
$q->ack($ackId);
169+
}
170+
else
171+
{
172+
$q->nack($ackId);
173+
$nackId = $ackId;
174+
}
175+
}
176+
}, 10);
177+
178+
// The nacked message should be redelivered
179+
$redelivered = null;
180+
$q->batchConsume(function ($messages) use ($q, &$redelivered) {
181+
foreach($messages as $ackId => $data)
182+
{
183+
$redelivered = $data;
184+
$q->ack($ackId);
185+
}
186+
}, 10);
187+
188+
$this->assertEquals('reject', $redelivered);
189+
}
190+
191+
public function testSeparateTopicAndSubscription()
192+
{
193+
$topic = $this->_uniqueName('test_topic');
194+
$sub = $this->_uniqueName('test_sub');
195+
$q = $this->_getProvider($topic, $sub);
196+
197+
$q->push('separate names');
198+
199+
$result = null;
200+
$q->consume(function ($data) use (&$result) {
201+
$result = $data;
202+
return true;
203+
});
204+
205+
$this->assertEquals('separate names', $result);
206+
}
207+
208+
public function testCreateDefaultsSubscriptionToTopic()
209+
{
210+
$name = $this->_uniqueName('test_default_sub');
211+
$q = GooglePubSubProvider::create($name);
212+
$q->configure(new ConfigSection('', ['auto_create' => true]));
213+
214+
$q->push('default sub');
215+
216+
$result = null;
217+
$q->consume(function ($data) use (&$result) {
218+
$result = $data;
219+
return true;
220+
});
221+
222+
$this->assertEquals('default sub', $result);
223+
}
224+
225+
public function testAckDeadlineConfig()
226+
{
227+
$name = $this->_uniqueName('test_ack_deadline');
228+
$q = GooglePubSubProvider::create($name);
229+
$q->configure(new ConfigSection('', ['auto_create' => true, 'ack_deadline' => 30]));
230+
231+
$q->push('with deadline');
232+
233+
$result = null;
234+
$q->consume(function ($data) use (&$result) {
235+
$result = $data;
236+
return true;
237+
});
238+
239+
$this->assertEquals('with deadline', $result);
240+
}
241+
242+
public function testMessageEncoding()
243+
{
244+
$name = $this->_uniqueName('test_encoding');
245+
$q = $this->_getProvider($name);
246+
247+
$complex = ['key' => 'value', 'nested' => ['a' => 1]];
248+
$q->push($complex);
249+
250+
$result = null;
251+
$q->consume(function ($data) use (&$result) {
252+
$result = $data;
253+
return true;
254+
});
255+
256+
$this->assertEquals('value', $result->key);
257+
$this->assertEquals(1, $result->nested->a);
258+
}
259+
}

0 commit comments

Comments
 (0)