Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions base/mqueue/advisory_update_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mqueue

import (
"app/base/types"
"context"

"github.com/bytedance/sonic"
"github.com/google/uuid"
"github.com/pkg/errors"
)

type AdvisoryUpdateEvent struct {
RhAccountID int `json:"rh_account_id"`
WorkspaceID uuid.UUID `json:"workspace_id"`
InventoryID uuid.UUID `json:"inventory_id"`
AdvisoryIDs []int64 `json:"advisory_ids"`
ProducedAt types.Rfc3339Timestamp `json:"produced_at"`
}

type AdvisoryUpdateEvents []AdvisoryUpdateEvent

func (event *AdvisoryUpdateEvent) createKafkaMessage() (KafkaMessage, error) {
data, err := sonic.Marshal(event)
if err != nil {
return KafkaMessage{}, errors.Wrap(err, "Serializing advisory update event")
}
return KafkaMessage{Value: data}, nil
}

func (events AdvisoryUpdateEvents) WriteEvents(ctx context.Context, w Writer) error {
msgs := make([]KafkaMessage, 0, len(events))
for i := range events {
msg, err := events[i].createKafkaMessage()
if err != nil {
return err
}
msgs = append(msgs, msg)
}
return w.WriteMessages(ctx, msgs...)
}
84 changes: 84 additions & 0 deletions base/mqueue/advisory_update_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package mqueue

import (
"app/base/types"
"context"
"testing"
"time"

"github.com/bytedance/sonic"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)

var (
testWorkspaceID = uuid.New()
testNow = types.Rfc3339Timestamp(time.Now())
)

func TestAdvisoryUpdateEventMarshal(t *testing.T) {
event := AdvisoryUpdateEvent{
RhAccountID: 1,
WorkspaceID: testWorkspaceID,
InventoryID: uuid.New(),
AdvisoryIDs: []int64{101, 202, 303},
ProducedAt: testNow,
}

data, err := sonic.Marshal(&event)
assert.NoError(t, err)

var parsed AdvisoryUpdateEvent
err = sonic.Unmarshal(data, &parsed)
assert.NoError(t, err)
assert.Equal(t, event.RhAccountID, parsed.RhAccountID)
assert.Equal(t, event.WorkspaceID, parsed.WorkspaceID)
assert.Equal(t, event.InventoryID, parsed.InventoryID)
assert.Equal(t, event.AdvisoryIDs, parsed.AdvisoryIDs)
assert.NotNil(t, parsed.ProducedAt)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
}

func TestAdvisoryUpdateEventsWriteEvents(t *testing.T) {
var writer Writer = &MockKafkaWriter{}

events := AdvisoryUpdateEvents{
{
RhAccountID: 1,
WorkspaceID: testWorkspaceID,
InventoryID: uuid.New(),
AdvisoryIDs: []int64{100, 200},
ProducedAt: testNow,
},
{
RhAccountID: 2,
WorkspaceID: testWorkspaceID,
InventoryID: uuid.New(),
AdvisoryIDs: []int64{300},
ProducedAt: testNow,
},
}

err := SendMessages(context.Background(), writer, &events)
assert.NoError(t, err)

mockWriter := writer.(*MockKafkaWriter)
assert.Equal(t, 2, len(mockWriter.Messages))

var firstEvent AdvisoryUpdateEvent
err = sonic.Unmarshal(mockWriter.Messages[0].Value, &firstEvent)
assert.NoError(t, err)
assert.Equal(t, events[0].RhAccountID, firstEvent.RhAccountID)
assert.Equal(t, events[0].WorkspaceID, firstEvent.WorkspaceID)
assert.Equal(t, events[0].InventoryID, firstEvent.InventoryID)
assert.Equal(t, events[0].AdvisoryIDs, firstEvent.AdvisoryIDs)
assert.NotNil(t, firstEvent.ProducedAt)

var secondEvent AdvisoryUpdateEvent
err = sonic.Unmarshal(mockWriter.Messages[1].Value, &secondEvent)
assert.NoError(t, err)
assert.Equal(t, events[1].RhAccountID, secondEvent.RhAccountID)
assert.Equal(t, events[1].WorkspaceID, secondEvent.WorkspaceID)
assert.Equal(t, events[1].InventoryID, secondEvent.InventoryID)
assert.Equal(t, events[1].AdvisoryIDs, secondEvent.AdvisoryIDs)
assert.NotNil(t, secondEvent.ProducedAt)
}
2 changes: 2 additions & 0 deletions base/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type coreConfig struct {
EventsTopic string
EvalTopic string
CreatedSystemsTopic string
AdvisoryUpdateTopic string
PayloadTrackerTopic string
RemediationUpdateTopic string
NotificationsTopic string
Expand Down Expand Up @@ -171,6 +172,7 @@ func initTopicsFromEnv() {
CoreCfg.EventsTopic = Getenv("EVENTS_TOPIC", "")
CoreCfg.EvalTopic = Getenv("EVAL_TOPIC", "")
CoreCfg.CreatedSystemsTopic = Getenv("CREATED_SYSTEMS_TOPIC", "")
CoreCfg.AdvisoryUpdateTopic = Getenv("ADVISORY_UPDATE_TOPIC", "")
CoreCfg.PayloadTrackerTopic = Getenv("PAYLOAD_TRACKER_TOPIC", "")
CoreCfg.RemediationUpdateTopic = Getenv("REMEDIATIONS_UPDATE_TOPIC", "")
CoreCfg.NotificationsTopic = Getenv("NOTIFICATIONS_TOPIC", "")
Expand Down
1 change: 1 addition & 0 deletions conf/common.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ KAFKA_READY_ADDRESS=http://kafka:9099/

EVAL_TOPIC=patchman.evaluator.upload
CREATED_SYSTEMS_TOPIC=patchman.evaluator.user-evaluation
ADVISORY_UPDATE_TOPIC=patchman.advisory.update
EVENTS_TOPIC=platform.inventory.events
NOTIFICATIONS_TOPIC=platform.notifications.ingress
PAYLOAD_TRACKER_TOPIC=platform.payload-status
Expand Down
11 changes: 0 additions & 11 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -520,17 +520,6 @@ objects:
name: patchman
version: 16

kafkaTopics:
- {replicas: 3, partitions: 10, topicName: platform.inventory.events}
- {replicas: 3, partitions: 10, topicName: platform.inventory.host-apps}
- {replicas: 3, partitions: 10, topicName: patchman.evaluator.upload}
- {replicas: 3, partitions: 10, topicName: patchman.evaluator.recalc}
- {replicas: 3, partitions: 8, topicName: platform.payload-status}
- {replicas: 3, partitions: 10, topicName: platform.remediation-updates.patch}
- {replicas: 3, partitions: 10, topicName: platform.notifications.ingress}
- {replicas: 3, partitions: 10, topicName: platform.content-sources.template}
- {replicas: 3, partitions: 4, topicName: patchman.evaluator.user-evaluation}

dependencies:
- host-inventory
- rbac
Expand Down
1 change: 1 addition & 0 deletions dev/kafka/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ for topic in \
"patchman.evaluator.recalc" \
"patchman.evaluator.upload" \
"patchman.evaluator.user-evaluation" \
"patchman.advisory.update" \
"platform.content-sources.template" \
"platform.inventory.events" \
"platform.inventory.host-apps" \
Expand Down
Loading