Skip to content

Commit 23e8b9d

Browse files
committed
refactor(plugins): replace config polling with channel-based configuration updates across cloud integration plugins (AWS, Azure, GCP, O365, Sophos)
1 parent dffce38 commit 23e8b9d

File tree

10 files changed

+402
-90
lines changed

10 files changed

+402
-90
lines changed

plugins/aws/config/config.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@ const (
2323
)
2424

2525
var (
26-
cnf *ConfigurationSection
27-
mu sync.Mutex
26+
cnf *ConfigurationSection
27+
mu sync.Mutex
28+
configUpdateChan chan *ConfigurationSection
2829

2930
internalKey string
3031
modulesConfigHost string
3132
)
3233

34+
func init() {
35+
configUpdateChan = make(chan *ConfigurationSection, 1)
36+
}
37+
3338
func GetConfig() *ConfigurationSection {
3439
mu.Lock()
3540
defer mu.Unlock()
@@ -39,6 +44,10 @@ func GetConfig() *ConfigurationSection {
3944
return cnf
4045
}
4146

47+
func GetConfigUpdateChannel() <-chan *ConfigurationSection {
48+
return configUpdateChan
49+
}
50+
4251
func StartConfigurationSystem() {
4352
for {
4453
pluginConfig := plugins.PluginCfg("com.utmstack")
@@ -135,7 +144,18 @@ func StartConfigurationSystem() {
135144
catcher.Info("Received configuration update", map[string]any{
136145
"process": "plugin_com.utmstack.aws",
137146
})
147+
148+
mu.Lock()
138149
cnf = message.Config
150+
mu.Unlock()
151+
152+
select {
153+
case configUpdateChan <- message.Config:
154+
default:
155+
catcher.Info("Configuration update channel full, skipping notification", map[string]any{
156+
"process": "plugin_com.utmstack.aws",
157+
})
158+
}
139159
}
140160
}
141161
}

plugins/aws/main.go

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ import (
2222
)
2323

2424
const (
25-
defaultTenant = "ce66672c-e36d-4761-a8c8-90058fee1a24"
26-
urlCheckConnection = "https://sts.amazonaws.com"
27-
wait = 1 * time.Second
28-
configCheckInterval = 10 * time.Second
25+
defaultTenant = "ce66672c-e36d-4761-a8c8-90058fee1a24"
26+
urlCheckConnection = "https://sts.amazonaws.com"
27+
wait = 1 * time.Second
2928
)
3029

3130
type activeGroupStream struct {
@@ -63,47 +62,53 @@ func main() {
6362
}
6463

6564
func watchConfigChanges() {
66-
ticker := time.NewTicker(configCheckInterval)
67-
defer ticker.Stop()
65+
time.Sleep(3 * time.Second)
6866

69-
for range ticker.C {
70-
moduleConfig := config.GetConfig()
67+
initialConfig := config.GetConfig()
68+
if initialConfig != nil && initialConfig.ModuleActive {
69+
syncStreams(initialConfig)
70+
}
7171

72-
if moduleConfig == nil || !moduleConfig.ModuleActive {
72+
for newConfig := range config.GetConfigUpdateChannel() {
73+
if newConfig == nil || !newConfig.ModuleActive {
7374
stopAllStreams()
7475
continue
7576
}
7677

77-
currentGroupIDs := make(map[int32]bool)
78-
for _, group := range moduleConfig.ModuleGroups {
79-
currentConfig := getAWSProcessor(group)
80-
groupID := group.Id
81-
currentGroupIDs[groupID] = true
78+
syncStreams(newConfig)
79+
}
80+
}
8281

83-
existing := activeStreams[groupID]
82+
func syncStreams(moduleConfig *config.ConfigurationSection) {
83+
currentGroupIDs := make(map[int32]bool)
84+
for _, group := range moduleConfig.ModuleGroups {
85+
currentConfig := getAWSProcessor(group)
86+
groupID := group.Id
87+
currentGroupIDs[groupID] = true
8488

85-
if existing == nil {
86-
startGroupStream(groupID, group)
87-
} else if existing.config != currentConfig {
88-
catcher.Info("Configuration changed for group, restarting", map[string]any{
89-
"group": group.GroupName,
90-
"process": "plugin_com.utmstack.aws",
91-
})
92-
existing.cancel()
93-
delete(activeStreams, groupID)
94-
startGroupStream(groupID, group)
95-
}
89+
existing := activeStreams[groupID]
90+
91+
if existing == nil {
92+
startGroupStream(groupID, group)
93+
} else if existing.config != currentConfig {
94+
catcher.Info("Configuration changed for group, restarting", map[string]any{
95+
"group": group.GroupName,
96+
"process": "plugin_com.utmstack.aws",
97+
})
98+
existing.cancel()
99+
delete(activeStreams, groupID)
100+
startGroupStream(groupID, group)
96101
}
102+
}
97103

98-
for groupID, stream := range activeStreams {
99-
if !currentGroupIDs[groupID] {
100-
catcher.Info("Group removed, stopping stream", map[string]any{
101-
"groupId": groupID,
102-
"process": "plugin_com.utmstack.aws",
103-
})
104-
stream.cancel()
105-
delete(activeStreams, groupID)
106-
}
104+
for groupID, stream := range activeStreams {
105+
if !currentGroupIDs[groupID] {
106+
catcher.Info("Group removed, stopping stream", map[string]any{
107+
"groupId": groupID,
108+
"process": "plugin_com.utmstack.aws",
109+
})
110+
stream.cancel()
111+
delete(activeStreams, groupID)
107112
}
108113
}
109114
}

plugins/azure/config/config.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@ const (
2323
)
2424

2525
var (
26-
cnf *ConfigurationSection
27-
mu sync.Mutex
26+
cnf *ConfigurationSection
27+
mu sync.Mutex
28+
configUpdateChan chan *ConfigurationSection
2829

2930
internalKey string
3031
modulesConfigHost string
3132
)
3233

34+
func init() {
35+
configUpdateChan = make(chan *ConfigurationSection, 1)
36+
}
37+
3338
func GetConfig() *ConfigurationSection {
3439
mu.Lock()
3540
defer mu.Unlock()
@@ -39,6 +44,10 @@ func GetConfig() *ConfigurationSection {
3944
return cnf
4045
}
4146

47+
func GetConfigUpdateChannel() <-chan *ConfigurationSection {
48+
return configUpdateChan
49+
}
50+
4251
func StartConfigurationSystem() {
4352
for {
4453
pluginConfig := plugins.PluginCfg("com.utmstack")
@@ -133,7 +142,18 @@ func StartConfigurationSystem() {
133142
switch message := in.Payload.(type) {
134143
case *BiDirectionalMessage_Config:
135144
catcher.Info("Received configuration update", map[string]any{"process": "plugin_com.utmstack.azure"})
145+
146+
mu.Lock()
136147
cnf = message.Config
148+
mu.Unlock()
149+
150+
select {
151+
case configUpdateChan <- message.Config:
152+
default:
153+
catcher.Info("Configuration update channel full, skipping notification", map[string]any{
154+
"process": "plugin_com.utmstack.azure",
155+
})
156+
}
137157
}
138158
}
139159
}

plugins/azure/main.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,20 @@ func main() {
9898
"process": "plugin_com.utmstack.azure",
9999
})
100100

101-
processorManager.syncProcessors()
101+
processorManager.watchConfigAndSync()
102+
}
103+
104+
func (pm *ProcessorManager) watchConfigAndSync() {
105+
time.Sleep(3 * time.Second)
102106

103-
delay := 5 * time.Minute
104-
ticker := time.NewTicker(delay)
105-
defer ticker.Stop()
107+
pm.syncProcessors()
106108

107-
for range ticker.C {
108-
processorManager.syncProcessors()
109+
for newConfig := range config.GetConfigUpdateChannel() {
110+
catcher.Info("Received config update, syncing processors", map[string]any{
111+
"moduleActive": newConfig != nil && newConfig.ModuleActive,
112+
"process": "plugin_com.utmstack.azure",
113+
})
114+
pm.syncProcessors()
109115
}
110116
}
111117

@@ -120,9 +126,8 @@ func (pm *ProcessorManager) syncProcessors() {
120126
for cloudName, loginAuthority := range cloudsInUse {
121127
if err := connectionChecker(loginAuthority); err != nil {
122128
catcher.Info("airgap or limited connectivity detected", map[string]any{
123-
"cloud": cloudName,
124-
"loginAuthority": loginAuthority,
125-
"process": "plugin_com.utmstack.azure",
129+
"cloud": cloudName,
130+
"process": "plugin_com.utmstack.azure",
126131
})
127132
}
128133
}

plugins/gcp/config/config.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@ const (
2323
)
2424

2525
var (
26-
cnf *ConfigurationSection
27-
mu sync.Mutex
26+
cnf *ConfigurationSection
27+
mu sync.Mutex
28+
configUpdateChan chan *ConfigurationSection
2829

2930
internalKey string
3031
modulesConfigHost string
3132
)
3233

34+
func init() {
35+
configUpdateChan = make(chan *ConfigurationSection, 1)
36+
}
37+
3338
func GetConfig() *ConfigurationSection {
3439
mu.Lock()
3540
defer mu.Unlock()
@@ -39,6 +44,10 @@ func GetConfig() *ConfigurationSection {
3944
return cnf
4045
}
4146

47+
func GetConfigUpdateChannel() <-chan *ConfigurationSection {
48+
return configUpdateChan
49+
}
50+
4251
func StartConfigurationSystem() {
4352
for {
4453
pluginConfig := plugins.PluginCfg("com.utmstack")
@@ -133,7 +142,18 @@ func StartConfigurationSystem() {
133142
switch message := in.Payload.(type) {
134143
case *BiDirectionalMessage_Config:
135144
catcher.Info("Received configuration update", map[string]any{"process": "plugin_com.utmstack.gcp"})
145+
146+
mu.Lock()
136147
cnf = message.Config
148+
mu.Unlock()
149+
150+
select {
151+
case configUpdateChan <- message.Config:
152+
default:
153+
catcher.Info("Configuration update channel full, skipping notification", map[string]any{
154+
"process": "plugin_com.utmstack.gcp",
155+
})
156+
}
137157
}
138158
}
139159
}

plugins/gcp/main.go

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
)
1818

1919
const defaultTenant string = "ce66672c-e36d-4761-a8c8-90058fee1a24"
20-
const delayCheckConfig = 30 * time.Second
2120

2221
type GroupModule struct {
2322
GroupName string
@@ -140,25 +139,73 @@ func startGroupModuleManager() {
140139
}
141140

142141
func (m *GroupModuleManager) SyncConfigs() {
143-
ticker := time.NewTicker(delayCheckConfig)
144-
defer ticker.Stop()
142+
time.Sleep(3 * time.Second)
145143

146-
for range ticker.C {
147-
if err := ConnectionChecker(CHECKCON); err != nil {
148-
_ = catcher.Error("External connection failure detected", err, map[string]any{"process": "plugin_com.utmstack.gcp"})
144+
m.handleConfigUpdate(config.GetConfig())
145+
146+
for newConfig := range config.GetConfigUpdateChannel() {
147+
catcher.Info("Received config update", map[string]any{
148+
"moduleActive": newConfig != nil && newConfig.ModuleActive,
149+
"process": "plugin_com.utmstack.gcp",
150+
})
151+
m.handleConfigUpdate(newConfig)
152+
}
153+
}
154+
155+
func (m *GroupModuleManager) handleConfigUpdate(moduleConfig *config.ConfigurationSection) {
156+
if err := ConnectionChecker(CHECKCON); err != nil {
157+
_ = catcher.Error("External connection failure detected", err, map[string]any{"process": "plugin_com.utmstack.gcp"})
158+
}
159+
160+
if moduleConfig == nil || !moduleConfig.ModuleActive {
161+
for groupID, group := range m.Groups {
162+
catcher.Info("Cancelling group", map[string]any{
163+
"process": "plugin_com.utmstack.gcp",
164+
})
165+
group.Cancel()
166+
delete(m.Groups, groupID)
149167
}
168+
return
169+
}
150170

151-
moduleConfig := config.GetConfig()
152-
if moduleConfig != nil && moduleConfig.ModuleActive {
153-
for _, conf := range moduleConfig.ModuleGroups {
154-
m.Groups[conf.Id] = getModuleConfig(conf)
155-
group := m.Groups[conf.Id]
156-
go group.PullLogs()
171+
currentGroupIDs := make(map[int32]bool)
172+
for _, conf := range moduleConfig.ModuleGroups {
173+
currentGroupIDs[conf.Id] = true
174+
175+
if existing, ok := m.Groups[conf.Id]; ok {
176+
newModule := getModuleConfig(conf)
177+
if configChanged(existing, newModule) {
178+
catcher.Info("Configuration changed for group, restarting", map[string]any{
179+
"process": "plugin_com.utmstack.gcp",
180+
})
181+
existing.Cancel()
182+
delete(m.Groups, conf.Id)
183+
m.Groups[conf.Id] = newModule
184+
go newModule.PullLogs()
157185
}
158186
} else {
159-
for _, cnf := range m.Groups {
160-
cnf.Cancel()
161-
}
187+
catcher.Info("Starting new group", map[string]any{
188+
"process": "plugin_com.utmstack.gcp",
189+
})
190+
m.Groups[conf.Id] = getModuleConfig(conf)
191+
group := m.Groups[conf.Id]
192+
go group.PullLogs()
162193
}
163194
}
195+
196+
for groupID, group := range m.Groups {
197+
if !currentGroupIDs[groupID] {
198+
catcher.Info("Group removed, stopping", map[string]any{
199+
"process": "plugin_com.utmstack.gcp",
200+
})
201+
group.Cancel()
202+
delete(m.Groups, groupID)
203+
}
204+
}
205+
}
206+
207+
func configChanged(old, new GroupModule) bool {
208+
return old.JsonKey != new.JsonKey ||
209+
old.ProjectID != new.ProjectID ||
210+
old.SubscriptionID != new.SubscriptionID
164211
}

0 commit comments

Comments
 (0)