diff --git a/base/mqueue/advisory_update_event.go b/base/mqueue/advisory_update_event.go new file mode 100644 index 000000000..73dd982a3 --- /dev/null +++ b/base/mqueue/advisory_update_event.go @@ -0,0 +1,40 @@ +package mqueue + +import ( + "app/base/types" + "context" + + "github.com/bytedance/sonic" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type AdvisoryUpdateEvent struct { + RhAccountID int `json:"rh_account_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` + InventoryID uuid.UUID `json:"inventory_id"` + AdvisoryIDs []int64 `json:"advisory_ids"` + ProducedAt types.Rfc3339Timestamp `json:"produced_at"` +} + +type AdvisoryUpdateEvents []AdvisoryUpdateEvent + +func (event *AdvisoryUpdateEvent) createKafkaMessage() (KafkaMessage, error) { + data, err := sonic.Marshal(event) + if err != nil { + return KafkaMessage{}, errors.Wrap(err, "Serializing advisory update event") + } + return KafkaMessage{Value: data}, nil +} + +func (events AdvisoryUpdateEvents) WriteEvents(ctx context.Context, w Writer) error { + msgs := make([]KafkaMessage, 0, len(events)) + for i := range events { + msg, err := events[i].createKafkaMessage() + if err != nil { + return err + } + msgs = append(msgs, msg) + } + return w.WriteMessages(ctx, msgs...) +} diff --git a/base/mqueue/advisory_update_event_test.go b/base/mqueue/advisory_update_event_test.go new file mode 100644 index 000000000..74d76c3ce --- /dev/null +++ b/base/mqueue/advisory_update_event_test.go @@ -0,0 +1,84 @@ +package mqueue + +import ( + "app/base/types" + "context" + "testing" + "time" + + "github.com/bytedance/sonic" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +var ( + testWorkspaceID = uuid.New() + testNow = types.Rfc3339Timestamp(time.Now()) +) + +func TestAdvisoryUpdateEventMarshal(t *testing.T) { + event := AdvisoryUpdateEvent{ + RhAccountID: 1, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{101, 202, 303}, + ProducedAt: testNow, + } + + data, err := sonic.Marshal(&event) + assert.NoError(t, err) + + var parsed AdvisoryUpdateEvent + err = sonic.Unmarshal(data, &parsed) + assert.NoError(t, err) + assert.Equal(t, event.RhAccountID, parsed.RhAccountID) + assert.Equal(t, event.WorkspaceID, parsed.WorkspaceID) + assert.Equal(t, event.InventoryID, parsed.InventoryID) + assert.Equal(t, event.AdvisoryIDs, parsed.AdvisoryIDs) + assert.NotNil(t, parsed.ProducedAt) +} + +func TestAdvisoryUpdateEventsWriteEvents(t *testing.T) { + var writer Writer = &MockKafkaWriter{} + + events := AdvisoryUpdateEvents{ + { + RhAccountID: 1, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{100, 200}, + ProducedAt: testNow, + }, + { + RhAccountID: 2, + WorkspaceID: testWorkspaceID, + InventoryID: uuid.New(), + AdvisoryIDs: []int64{300}, + ProducedAt: testNow, + }, + } + + err := SendMessages(context.Background(), writer, &events) + assert.NoError(t, err) + + mockWriter := writer.(*MockKafkaWriter) + assert.Equal(t, 2, len(mockWriter.Messages)) + + var firstEvent AdvisoryUpdateEvent + err = sonic.Unmarshal(mockWriter.Messages[0].Value, &firstEvent) + assert.NoError(t, err) + assert.Equal(t, events[0].RhAccountID, firstEvent.RhAccountID) + assert.Equal(t, events[0].WorkspaceID, firstEvent.WorkspaceID) + assert.Equal(t, events[0].InventoryID, firstEvent.InventoryID) + assert.Equal(t, events[0].AdvisoryIDs, firstEvent.AdvisoryIDs) + assert.NotNil(t, firstEvent.ProducedAt) + + var secondEvent AdvisoryUpdateEvent + err = sonic.Unmarshal(mockWriter.Messages[1].Value, &secondEvent) + assert.NoError(t, err) + assert.Equal(t, events[1].RhAccountID, secondEvent.RhAccountID) + assert.Equal(t, events[1].WorkspaceID, secondEvent.WorkspaceID) + assert.Equal(t, events[1].InventoryID, secondEvent.InventoryID) + assert.Equal(t, events[1].AdvisoryIDs, secondEvent.AdvisoryIDs) + assert.NotNil(t, secondEvent.ProducedAt) +} diff --git a/base/utils/config.go b/base/utils/config.go index 28ad117f7..447ddcca9 100644 --- a/base/utils/config.go +++ b/base/utils/config.go @@ -63,6 +63,7 @@ type coreConfig struct { EventsTopic string EvalTopic string CreatedSystemsTopic string + AdvisoryUpdateTopic string PayloadTrackerTopic string RemediationUpdateTopic string NotificationsTopic string @@ -171,6 +172,7 @@ func initTopicsFromEnv() { CoreCfg.EventsTopic = Getenv("EVENTS_TOPIC", "") CoreCfg.EvalTopic = Getenv("EVAL_TOPIC", "") CoreCfg.CreatedSystemsTopic = Getenv("CREATED_SYSTEMS_TOPIC", "") + CoreCfg.AdvisoryUpdateTopic = Getenv("ADVISORY_UPDATE_TOPIC", "") CoreCfg.PayloadTrackerTopic = Getenv("PAYLOAD_TRACKER_TOPIC", "") CoreCfg.RemediationUpdateTopic = Getenv("REMEDIATIONS_UPDATE_TOPIC", "") CoreCfg.NotificationsTopic = Getenv("NOTIFICATIONS_TOPIC", "") diff --git a/conf/common.env b/conf/common.env index bb9c8f3ec..9463f8c95 100644 --- a/conf/common.env +++ b/conf/common.env @@ -13,6 +13,7 @@ KAFKA_READY_ADDRESS=http://kafka:9099/ EVAL_TOPIC=patchman.evaluator.upload CREATED_SYSTEMS_TOPIC=patchman.evaluator.user-evaluation +ADVISORY_UPDATE_TOPIC=patchman.advisory.update EVENTS_TOPIC=platform.inventory.events NOTIFICATIONS_TOPIC=platform.notifications.ingress PAYLOAD_TRACKER_TOPIC=platform.payload-status diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 5e202d5b5..fd2f05017 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -520,17 +520,6 @@ objects: name: patchman version: 16 - kafkaTopics: - - {replicas: 3, partitions: 10, topicName: platform.inventory.events} - - {replicas: 3, partitions: 10, topicName: platform.inventory.host-apps} - - {replicas: 3, partitions: 10, topicName: patchman.evaluator.upload} - - {replicas: 3, partitions: 10, topicName: patchman.evaluator.recalc} - - {replicas: 3, partitions: 8, topicName: platform.payload-status} - - {replicas: 3, partitions: 10, topicName: platform.remediation-updates.patch} - - {replicas: 3, partitions: 10, topicName: platform.notifications.ingress} - - {replicas: 3, partitions: 10, topicName: platform.content-sources.template} - - {replicas: 3, partitions: 4, topicName: patchman.evaluator.user-evaluation} - dependencies: - host-inventory - rbac diff --git a/dev/kafka/setup.sh b/dev/kafka/setup.sh index ce3fe38b2..622321031 100755 --- a/dev/kafka/setup.sh +++ b/dev/kafka/setup.sh @@ -8,6 +8,7 @@ for topic in \ "patchman.evaluator.recalc" \ "patchman.evaluator.upload" \ "patchman.evaluator.user-evaluation" \ + "patchman.advisory.update" \ "platform.content-sources.template" \ "platform.inventory.events" \ "platform.inventory.host-apps" \