From f4aaa5c349073c884dcfc90ee2f98841c55f3440 Mon Sep 17 00:00:00 2001 From: Kate Zaprazna Date: Wed, 13 May 2026 23:07:10 +0200 Subject: [PATCH 1/3] feat: add `patchman.advisory.update` Kafka topic config --- base/utils/config.go | 2 ++ conf/common.env | 1 + deploy/clowdapp.yaml | 1 + dev/kafka/setup.sh | 1 + 4 files changed, 5 insertions(+) diff --git a/base/utils/config.go b/base/utils/config.go index 28ad117f7..447ddcca9 100644 --- a/base/utils/config.go +++ b/base/utils/config.go @@ -63,6 +63,7 @@ type coreConfig struct { EventsTopic string EvalTopic string CreatedSystemsTopic string + AdvisoryUpdateTopic string PayloadTrackerTopic string RemediationUpdateTopic string NotificationsTopic string @@ -171,6 +172,7 @@ func initTopicsFromEnv() { CoreCfg.EventsTopic = Getenv("EVENTS_TOPIC", "") CoreCfg.EvalTopic = Getenv("EVAL_TOPIC", "") CoreCfg.CreatedSystemsTopic = Getenv("CREATED_SYSTEMS_TOPIC", "") + CoreCfg.AdvisoryUpdateTopic = Getenv("ADVISORY_UPDATE_TOPIC", "") CoreCfg.PayloadTrackerTopic = Getenv("PAYLOAD_TRACKER_TOPIC", "") CoreCfg.RemediationUpdateTopic = Getenv("REMEDIATIONS_UPDATE_TOPIC", "") CoreCfg.NotificationsTopic = Getenv("NOTIFICATIONS_TOPIC", "") diff --git a/conf/common.env b/conf/common.env index bb9c8f3ec..9463f8c95 100644 --- a/conf/common.env +++ b/conf/common.env @@ -13,6 +13,7 @@ KAFKA_READY_ADDRESS=http://kafka:9099/ EVAL_TOPIC=patchman.evaluator.upload CREATED_SYSTEMS_TOPIC=patchman.evaluator.user-evaluation +ADVISORY_UPDATE_TOPIC=patchman.advisory.update EVENTS_TOPIC=platform.inventory.events NOTIFICATIONS_TOPIC=platform.notifications.ingress PAYLOAD_TRACKER_TOPIC=platform.payload-status diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 5e202d5b5..ebc2e7a26 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -530,6 +530,7 @@ objects: - {replicas: 3, partitions: 10, topicName: platform.notifications.ingress} - {replicas: 3, partitions: 10, topicName: platform.content-sources.template} - {replicas: 3, partitions: 4, topicName: patchman.evaluator.user-evaluation} + - {replicas: 3, partitions: 10, topicName: patchman.advisory.update} dependencies: - host-inventory diff --git a/dev/kafka/setup.sh b/dev/kafka/setup.sh index ce3fe38b2..622321031 100755 --- a/dev/kafka/setup.sh +++ b/dev/kafka/setup.sh @@ -8,6 +8,7 @@ for topic in \ "patchman.evaluator.recalc" \ "patchman.evaluator.upload" \ "patchman.evaluator.user-evaluation" \ + "patchman.advisory.update" \ "platform.content-sources.template" \ "platform.inventory.events" \ "platform.inventory.host-apps" \ From 781567c450089023d697192b4d03a2719ada46ab Mon Sep 17 00:00:00 2001 From: Kate Zaprazna Date: Thu, 14 May 2026 22:46:11 +0200 Subject: [PATCH 2/3] chore: remove legacy kafkaTopics definition from ClowdApp Kafka topics are provisioned externally by the Platform team, Clowder does not need to create them. Our components reference topics via environment variables in their deployment specs, which are independent of this section. --- deploy/clowdapp.yaml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index ebc2e7a26..fd2f05017 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -520,18 +520,6 @@ objects: name: patchman version: 16 - kafkaTopics: - - {replicas: 3, partitions: 10, topicName: platform.inventory.events} - - {replicas: 3, partitions: 10, topicName: platform.inventory.host-apps} - - {replicas: 3, partitions: 10, topicName: patchman.evaluator.upload} - - {replicas: 3, partitions: 10, topicName: patchman.evaluator.recalc} - - {replicas: 3, partitions: 8, topicName: platform.payload-status} - - {replicas: 3, partitions: 10, topicName: platform.remediation-updates.patch} - - {replicas: 3, partitions: 10, topicName: platform.notifications.ingress} - - {replicas: 3, partitions: 10, topicName: platform.content-sources.template} - - {replicas: 3, partitions: 4, topicName: patchman.evaluator.user-evaluation} - - {replicas: 3, partitions: 10, topicName: patchman.advisory.update} - dependencies: - host-inventory - rbac From c62a8071b7fb7efd2afc14ae26df0bdac7649118 Mon Sep 17 00:00:00 2001 From: Kate Zaprazna Date: Thu, 14 May 2026 23:18:33 +0200 Subject: [PATCH 3/3] feat: add AdvisoryUpdateEvent with tests --- base/mqueue/advisory_update_event.go | 40 +++++++++++ base/mqueue/advisory_update_event_test.go | 84 +++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 base/mqueue/advisory_update_event.go create mode 100644 base/mqueue/advisory_update_event_test.go diff --git a/base/mqueue/advisory_update_event.go b/base/mqueue/advisory_update_event.go new file mode 100644 index 000000000..73dd982a3 --- /dev/null +++ b/base/mqueue/advisory_update_event.go @@ -0,0 +1,40 @@ +package mqueue + +import ( + "app/base/types" + "context" + + "github.com/bytedance/sonic" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type AdvisoryUpdateEvent struct { + RhAccountID int `json:"rh_account_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` + InventoryID uuid.UUID `json:"inventory_id"` + AdvisoryIDs []int64 `json:"advisory_ids"` + ProducedAt types.Rfc3339Timestamp `json:"produced_at"` +} + +type AdvisoryUpdateEvents []AdvisoryUpdateEvent + +func (event *AdvisoryUpdateEvent) createKafkaMessage() (KafkaMessage, error) { + data, err := sonic.Marshal(event) + if err != nil { + return KafkaMessage{}, errors.Wrap(err, "Serializing advisory update event") + } + return KafkaMessage{Value: data}, nil +} + +func (events AdvisoryUpdateEvents) WriteEvents(ctx context.Context, w Writer) error { + msgs := make([]KafkaMessage, 0, len(events)) + for i := range events { + msg, err := events[i].createKafkaMessage() + if err != nil { + return err + } + msgs = append(msgs, msg) + } + return w.WriteMessages(ctx, msgs...) +} diff --git a/base/mqueue/advisory_update_event_test.go b/base/mqueue/advisory_update_event_test.go new file mode 100644 index 000000000..74d76c3ce --- /dev/null +++ b/base/mqueue/advisory_update_event_test.go @@ -0,0 +1,84 @@ +package mqueue + +import ( + "app/base/types" + "context" + "testing" + "time" + + "github.com/bytedance/sonic" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +var ( + testWorkspaceID = uuid.New() + testNow = types.Rfc3339Timestamp(time.Now()) +) + +func TestAdvisoryUpdateEventMarshal(t *testing.T) { + event := AdvisoryUpdateEvent{ + RhAccountID: 1, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{101, 202, 303}, + ProducedAt: testNow, + } + + data, err := sonic.Marshal(&event) + assert.NoError(t, err) + + var parsed AdvisoryUpdateEvent + err = sonic.Unmarshal(data, &parsed) + assert.NoError(t, err) + assert.Equal(t, event.RhAccountID, parsed.RhAccountID) + assert.Equal(t, event.WorkspaceID, parsed.WorkspaceID) + assert.Equal(t, event.InventoryID, parsed.InventoryID) + assert.Equal(t, event.AdvisoryIDs, parsed.AdvisoryIDs) + assert.NotNil(t, parsed.ProducedAt) +} + +func TestAdvisoryUpdateEventsWriteEvents(t *testing.T) { + var writer Writer = &MockKafkaWriter{} + + events := AdvisoryUpdateEvents{ + { + RhAccountID: 1, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{100, 200}, + ProducedAt: testNow, + }, + { + RhAccountID: 2, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{300}, + ProducedAt: testNow, + }, + } + + err := SendMessages(context.Background(), writer, &events) + assert.NoError(t, err) + + mockWriter := writer.(*MockKafkaWriter) + assert.Equal(t, 2, len(mockWriter.Messages)) + + var firstEvent AdvisoryUpdateEvent + err = sonic.Unmarshal(mockWriter.Messages[0].Value, &firstEvent) + assert.NoError(t, err) + assert.Equal(t, events[0].RhAccountID, firstEvent.RhAccountID) + assert.Equal(t, events[0].WorkspaceID, firstEvent.WorkspaceID) + assert.Equal(t, events[0].InventoryID, firstEvent.InventoryID) + assert.Equal(t, events[0].AdvisoryIDs, firstEvent.AdvisoryIDs) + assert.NotNil(t, firstEvent.ProducedAt) + + var secondEvent AdvisoryUpdateEvent + err = sonic.Unmarshal(mockWriter.Messages[1].Value, &secondEvent) + assert.NoError(t, err) + assert.Equal(t, events[1].RhAccountID, secondEvent.RhAccountID) + assert.Equal(t, events[1].WorkspaceID, secondEvent.WorkspaceID) + assert.Equal(t, events[1].InventoryID, secondEvent.InventoryID) + assert.Equal(t, events[1].AdvisoryIDs, secondEvent.AdvisoryIDs) + assert.NotNil(t, secondEvent.ProducedAt) +}