Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ As of June 2017 we have a first working version and we are trying to get the con
Petze is configured through a set of yaml configuration files. The path to folder containing the configuration is passed to petze as the first argument on the commandline.

```bash
$ petze path/to/petzconf
$ petze path/to/confdir
```

Take a look at a simple example config: https://github.com/foomo/petze/tree/master/exampleConfig
Expand All @@ -35,6 +35,18 @@ Take a look at a simple example config: https://github.com/foomo/petze/tree/mast
The configuration file for petze is called: **petze.yml**.
It provides information for the petze service, as well the configuration for your notifications.

The hierarchy of the config folder should look as follows:
```
exampleConfigs/
petze.yml
services/
spiegel.yml
google.yml
bestbytes.yml
hosts/
example.yml
```

## Main config file petze.yml

```yaml
Expand Down Expand Up @@ -76,7 +88,7 @@ sms:

## Service configuration files

Any other files with a .yml suffix will be treated as service configurations.
Files in the services subdirectory with a .yml suffix will be treated as service configuration files.
It is strongly encouraged to organize them in folder structures.
These will be reflected in the service ids.

Expand Down Expand Up @@ -134,6 +146,27 @@ session:
- matchReply: "asdf"
```


## Host Configuration files

Files in the hosts subdirectory with a .yml suffix will be treated as host configuration files.

```yaml
# host name to check
hostname: example.hostname.com

# interval of the ICMP echo request
interval: 10s

# ICMP echo request timeout
timeout: 5s

# services that are running on this host
services:
- spiegel
- google
```

## SMTP Integration

You can now get notifications by Mail, all you need to provide is an SMTP server!
Expand Down
237 changes: 183 additions & 54 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,40 @@ import (
log "github.com/sirupsen/logrus"
)

type ResultListener func(watch.Result)
type ServiceResultListener func(watch.ServiceResult)
type HostResultListener func(watch.HostResult)

// Collector collects stats on services
// Collector collects stats on hosts & services
type Collector struct {
servicesConfigDir string
chanServices chan map[string]*config.Service
chanGetResults chan map[string][]watch.Result
watchers map[string]*watch.Watcher
resultListeners []ResultListener
services map[string]*config.Service
servicesConfigDir string
hostsConfigDir string
chanServices chan map[string]*config.Service
chanHosts chan map[string]*config.Host
chanGetServiceResults chan map[string][]watch.ServiceResult
chanGetHostResults chan map[string][]watch.HostResult
serviceWatchers map[string]*watch.ServiceWatcher
hostWatchers map[string]*watch.HostWatcher
serviceResultListeners []ServiceResultListener
hostResultListeners []HostResultListener
services map[string]*config.Service
hosts map[string]*config.Host
}

// NewCollector construct a collector - it will watch its config files for changes
func NewCollector(servicesConfigDir string) (c *Collector, err error) {
func NewCollector(servicesConfigDir string, hostsConfigDir string) (c *Collector, err error) {
c = &Collector{
servicesConfigDir: servicesConfigDir,
services: make(map[string]*config.Service),
chanServices: make(chan map[string]*config.Service),
chanGetResults: make(chan map[string][]watch.Result),
watchers: make(map[string]*watch.Watcher),
resultListeners: make([]ResultListener, 0),
servicesConfigDir: servicesConfigDir,
hostsConfigDir: hostsConfigDir,
services: make(map[string]*config.Service),
hosts: make(map[string]*config.Host),
chanServices: make(chan map[string]*config.Service),
chanHosts: make(chan map[string]*config.Host),
chanGetServiceResults: make(chan map[string][]watch.ServiceResult),
chanGetHostResults: make(chan map[string][]watch.HostResult),
serviceWatchers: make(map[string]*watch.ServiceWatcher),
hostWatchers: make(map[string]*watch.HostWatcher),
serviceResultListeners: make([]ServiceResultListener, 0),
hostResultListeners: make([]HostResultListener, 0),
}

return c, nil
Expand All @@ -42,95 +55,168 @@ func (c *Collector) Start() {
go c.configWatch()
}

const maxResults = 1000
const maxServiceResults = 1000
const maxHostResults = 1000

func (c *Collector) RegisterListener(listener ResultListener) {
c.resultListeners = append(c.resultListeners, listener)
func (c *Collector) RegisterServiceListener(listener ServiceResultListener) {
c.serviceResultListeners = append(c.serviceResultListeners, listener)
}

func (c *Collector) NotifyListeners(result watch.Result) {
for _, listener := range c.resultListeners {
func (c *Collector) RegisterHostListener(listener HostResultListener) {
c.hostResultListeners = append(c.hostResultListeners, listener)
}

func (c *Collector) NotifyServiceListeners(result watch.ServiceResult) {
for _, listener := range c.serviceResultListeners {
listener(result)
}
}

func (c *Collector) NotifyHostListeners(result watch.HostResult) {
for _, listener := range c.hostResultListeners {
listener(result)
}
}

func (c *Collector) collect() {

chanResult := make(chan watch.Result)
results := map[string][]watch.Result{}
chanServiceResult := make(chan watch.ServiceResult)
chanHostResult := make(chan watch.HostResult)
serviceResults := map[string][]watch.ServiceResult{}
hostResults := map[string][]watch.HostResult{}

for {
select {
case <-c.chanGetResults:
resultsCopy := map[string][]watch.Result{}
for name, results := range results {
resultsCopy[name] = results
case <-c.chanGetServiceResults:
serviceResultsCopy := map[string][]watch.ServiceResult{}
for name, serviceResults := range serviceResults {
serviceResultsCopy[name] = serviceResults
}
c.chanGetServiceResults <- serviceResultsCopy

case <-c.chanGetHostResults:
hostResultsCopy := map[string][]watch.HostResult{}
for name, hostResults := range hostResults {
hostResultsCopy[name] = hostResults
}
c.chanGetResults <- resultsCopy
c.chanGetHostResults <- hostResultsCopy
case newServices := <-c.chanServices:
c.services = newServices

var lastErrors = make(map[string][]watch.Error)

// stop old watchers
for oldWatcherID, oldWatcher := range c.watchers {
oldWatcher.Stop()
for oldWatcherID, oldWatcher := range c.serviceWatchers {
oldWatcher.Watcher.Stop()

// if the service had errors before updating the config
// store them in a map so we can transfer them to the updated watchers
if len(oldWatcher.LastErrors()) > 0 {
lastErrors[oldWatcherID] = oldWatcher.LastErrors()
if len(oldWatcher.Watcher.LastErrors()) > 0 {
lastErrors[oldWatcherID] = oldWatcher.Watcher.LastErrors()
}

delete(c.watchers, oldWatcherID)
delete(c.serviceWatchers, oldWatcherID)
}

// setup new watchers
for serviceID, service := range c.services {
// check if the service had errors before being updated
lastErrs, ok := lastErrors[serviceID]
if ok {
// transfer errors to new watcher
newWatcher := watch.Watch(service, chanResult)
newWatcher := watch.WatchService(service, chanServiceResult, chanHostResult, c.hosts)
newWatcher.SetLastErrors(lastErrs)
c.serviceWatchers[serviceID] = newWatcher
} else {
// no errors - init a new watcher
// TODO: when starting up c.hosts is empty...
c.serviceWatchers[serviceID] = watch.WatchService(service, chanServiceResult, chanHostResult, c.hosts)
}
// reset stored results
_, ok = serviceResults[serviceID]
if !ok {
serviceResults[serviceID] = []watch.ServiceResult{}
}
}

case newHosts := <-c.chanHosts:
c.hosts = newHosts

var lastErrors = make(map[string][]watch.Error)

// stop old watchers
for oldWatcherID, oldWatcher := range c.hostWatchers {
oldWatcher.Watcher.Stop()

// if the host had errors before updating the config
// store them in a map so we can transfer them to the updated watchers
if len(oldWatcher.Watcher.LastErrors()) > 0 {
lastErrors[oldWatcherID] = oldWatcher.Watcher.LastErrors()
}

delete(c.hostWatchers, oldWatcherID)
}

// setup new watchers
for hostID, host := range c.hosts {
// check if the host had errors before being updated
lastErrs, ok := lastErrors[hostID]
if ok {
// transfer errors to new watcher
newWatcher := watch.WatchHost(host, chanHostResult)
newWatcher.SetLastErrors(lastErrs)
c.watchers[serviceID] = newWatcher
c.hostWatchers[hostID] = newWatcher
} else {
// no errors - init a new watcher
c.watchers[serviceID] = watch.Watch(service, chanResult)
c.hostWatchers[hostID] = watch.WatchHost(host, chanHostResult)
}
// reset stored results
_, ok = results[serviceID]
_, ok = hostResults[hostID]
if !ok {
results[serviceID] = []watch.Result{}
hostResults[hostID] = []watch.HostResult{}
}
}
// clean up results
for possiblyUnknownServiceID := range results {
_, ok := c.watchers[possiblyUnknownServiceID]
for possiblyUnknownHostID := range hostResults {
_, ok := c.hostWatchers[possiblyUnknownHostID]
if !ok {
// clean up results
delete(results, possiblyUnknownServiceID)
delete(hostResults, possiblyUnknownHostID)
}
}
case result := <-chanResult:
serviceResults, ok := results[result.ID]
case serviceResult := <-chanServiceResult:
serviceResultsFromChan, ok := serviceResults[serviceResult.ID]
if ok {
serviceResults = append(serviceResults, result)
if len(serviceResults) > maxResults {
serviceResults = serviceResults[len(serviceResults)-maxResults:]
serviceResultsFromChan = append(serviceResultsFromChan, serviceResult)
if len(serviceResultsFromChan) > maxServiceResults {
serviceResultsFromChan = serviceResultsFromChan[len(serviceResultsFromChan)-maxServiceResults:]
}
results[result.ID] = serviceResults
serviceResults[serviceResult.ID] = serviceResultsFromChan

c.NotifyListeners(result)
c.NotifyServiceListeners(serviceResult)
}
case hostResult := <-chanHostResult:
hostResultsFromChan, ok := hostResults[hostResult.ID]
if ok {
hostResultsFromChan = append(hostResultsFromChan, hostResult)
if len(hostResultsFromChan) > maxHostResults {
hostResultsFromChan = hostResultsFromChan[len(hostResultsFromChan)-maxHostResults:]
}
hostResults[hostResult.ID] = hostResultsFromChan

c.NotifyHostListeners(hostResult)
}
}
}
}

// GetResults get current results
func (c *Collector) GetResults() map[string][]watch.Result {
c.chanGetResults <- nil
return <-c.chanGetResults
func (c *Collector) GetServiceResults() map[string][]watch.ServiceResult {
c.chanGetServiceResults <- nil
return <-c.chanGetServiceResults
}

func (c *Collector) GetHostResults() map[string][]watch.HostResult {
c.chanGetHostResults <- nil
return <-c.chanGetHostResults
}

func hashServiceConfig(config map[string]*config.Service) (hash string) {
Expand All @@ -142,20 +228,53 @@ func hashServiceConfig(config map[string]*config.Service) (hash string) {
return hash
}

func hashHostConfig(config map[string]*config.Host) (hash string) {
hash = "invalid config"
jsonBytes, errJSON := json.Marshal(config)
if errJSON == nil {
hash = string(jsonBytes)
}
return hash
}

func (c *Collector) configWatch() {
for {
services, errServices := config.LoadServices(c.servicesConfigDir)
if errServices != nil {
log.Error("could not read configuration:", errServices)
log.Error("could not read configuration: ", errServices)
}
if errServices == nil {
newHash := hashServiceConfig(services)
oldHash := hashServiceConfig(c.services)
if newHash != oldHash {
log.Info("configuration update successful")
log.Info("service configuration update successful")
c.updateServices()
}
}

hosts, errHosts := config.LoadHosts(c.hostsConfigDir)

if errHosts != nil {
log.Error("could not read configuration: ", errHosts)
}
if errHosts == nil {
newHash := hashHostConfig(hosts)
oldHash := hashHostConfig(c.hosts)
if newHash != oldHash {
log.Info("host configuration update successful")
c.updateHosts()
}
}

// check if service defined in host config exists
for _, hostConfig := range hosts {
// validate services
for _, service := range hostConfig.Services {
if _, ok := services[service]; !ok {
log.Fatal("host " + hostConfig.Hostname + " has service " + service + " in its list of services but the service doesn't exist in the service config dir")
}
}
}
time.Sleep(10 * time.Second)
}
}
Expand All @@ -169,3 +288,13 @@ func (c *Collector) updateServices() error {
}
return err
}

func (c *Collector) updateHosts() error {
hosts, err := config.LoadHosts(c.hostsConfigDir)
if err == nil {
c.chanHosts <- hosts
} else {
log.Warn("could not update hosts:", err)
}
return err
}
Loading