Skip to content

Commit 0f4fc8f

Browse files
committed
feature(systemd): added the ability to start a service with the predefined config
1 parent 62790ee commit 0f4fc8f

13 files changed

Lines changed: 613 additions & 29 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# Added by goreleaser init:
22
dist/
3+
cmd/ad-runtime-utils/ad-runtime-utils

README.MD

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,15 @@ Default runtimes:
9191
Service <service-name>:
9292
<runtime>: <path or "error: <message>">
9393
94-
```
94+
```
95+
96+
### 4. Starting a Service
97+
98+
When --start and/or --supervise flags are provided --service is a required argument.
99+
100+
- It starts the executable `services.<service-name>.executable` with the specified arguments from `services.<service-name>.executable_args`.
101+
102+
- If the `--supervise` flag is provided, it will run the health checks, defined in `services.<service-name>.health_checks` on service start to make sure the service is operational. If any of the checks fail the service will be stopped. `Type=notify` should be used in the systemd unit(see example in `examples/systemd` directory).
103+
104+
- While using `--supervise` and health checks, make sure that systemd service has enough `TimeoutStartSec`. ideally should be a combined timeout of all health checks.
105+

cmd/ad-runtime-utils/app.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
"flag"
55
"fmt"
66
"io"
7+
"os"
78
"strings"
89

910
"github.com/arenadata/ad-runtime-utils/internal/config"
1011
"github.com/arenadata/ad-runtime-utils/internal/detect"
12+
"github.com/arenadata/ad-runtime-utils/internal/exec"
13+
"github.com/coreos/go-systemd/v22/daemon"
1114
)
1215

1316
// exit codes.
@@ -27,6 +30,8 @@ func Run(args []string, stdout, stderr io.Writer) int {
2730
listAll := fs.Bool("list", false, "List all detected runtimes (default + services)")
2831
fs.BoolVar(listAll, "l", false, "shorthand for --list")
2932
printCACerts := fs.Bool("print-cacerts", false, "When used with --runtime=java, prints the cacerts path and exits")
33+
start := fs.Bool("start", false, "Start the service. Use with simple/exec services")
34+
supervise := fs.Bool("supervise", false, "Supervise the service. Use with notify systemd services")
3035

3136
if err := fs.Parse(args); err != nil {
3237
return exitParseError
@@ -76,6 +81,15 @@ func Run(args []string, stdout, stderr io.Writer) int {
7681
}
7782

7883
envName := detectEnvName(cfg, *service, *runtime)
84+
85+
if *start {
86+
if err = startService(*service, envName, path, *cfg, *supervise); err != nil {
87+
fmt.Fprintf(stderr, "start service failed: %v\n", err)
88+
return exitUserError
89+
}
90+
return exitOK
91+
}
92+
7993
fmt.Fprintf(stdout, "export %s=%s\n", envName, path)
8094
return exitOK
8195
}
@@ -124,3 +138,52 @@ func runList(cfg *config.Config, stdout, stderr io.Writer) int {
124138
}
125139
return exitOK
126140
}
141+
142+
func startService(service string, envName string, envPath string, cfg config.Config, supervise bool) error {
143+
srvConfig, ok := cfg.Services[service]
144+
if !ok {
145+
return fmt.Errorf("service %s not found in config", service)
146+
}
147+
// Append the env for the runtime (eg. JAVA_HOME)
148+
if srvConfig.EnvVars == nil {
149+
srvConfig.EnvVars = make(map[string]string)
150+
}
151+
srvConfig.EnvVars[envName] = envPath
152+
if !supervise {
153+
return exec.RunExecutable(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
154+
}
155+
process, err := exec.RunExecutableAsync(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
156+
if err != nil {
157+
return err
158+
}
159+
// Run the health checks
160+
for _, checkCfg := range srvConfig.HealthChecks {
161+
switch checkCfg.Type {
162+
case exec.PortHealthCheckType:
163+
portheck := exec.PortHealthCheck{
164+
PID: process.Process.Pid,
165+
Config: checkCfg,
166+
}
167+
if err = portheck.Check(); err != nil {
168+
if err = process.Process.Signal(os.Interrupt); err != nil {
169+
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
170+
}
171+
return fmt.Errorf("health check failed: %w", err)
172+
}
173+
default:
174+
if err = process.Process.Signal(os.Interrupt); err != nil {
175+
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
176+
}
177+
return fmt.Errorf("unknown health check type: %s", checkCfg.Type)
178+
}
179+
}
180+
// Notify systemd daemon that service has started
181+
if _, err = daemon.SdNotify(false, daemon.SdNotifyReady); err != nil {
182+
fmt.Fprintf(os.Stderr, "systemd notification failed: %v\n", err)
183+
}
184+
// TODO: Replace this with an actual supervisor loop
185+
if err = process.Wait(); err != nil {
186+
return err
187+
}
188+
return nil
189+
}

configs/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,4 @@ services:
9191
FLINK2:
9292
path: /etc/flink2/conf/flink2-java.yaml
9393
OZONE:
94-
path: /etc/ozone/conf/ozone-java.yaml
94+
path: /etc/ozone/conf/ozone-java.yaml

examples/config/kafka.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
executable: bin/kafka-server-start.sh
2+
executable_args:
3+
- config/server.properties
4+
runtimes:
5+
java:
6+
version: "21"
7+
health_checks:
8+
- type: port
9+
params:
10+
port: 9092
11+
timeout: 20
12+
protocol: tcp
13+
- type: port
14+
params:
15+
port: 9093
16+
timeout: 20
17+
protocol: tcp

examples/systemd/kafka.service

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[Unit]
2+
Description=Test Sleep
3+
4+
[Service]
5+
Type=notify
6+
ExecStart=bin/ad-runtime-utils --config configs/config.yaml --service kafka --runtime java --start --supervise
7+
TimeoutStartSec=120
8+
9+
[Install]
10+
WantedBy=multi-user.target

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,7 @@ module github.com/arenadata/ad-runtime-utils
22

33
go 1.24.4
44

5-
require github.com/goccy/go-yaml v1.18.0
5+
require (
6+
github.com/coreos/go-systemd/v22 v22.6.0
7+
github.com/goccy/go-yaml v1.18.0
8+
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
github.com/coreos/go-systemd/v22 v22.6.0 h1:aGVa/v8B7hpb0TKl0MWoAavPDmHvobFe5R5zn0bCJWo=
2+
github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X7Lua8rrTWzYgWU=
13
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
24
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=

internal/config/config.go

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"fmt"
55
"os"
6+
"strings"
67

78
"github.com/goccy/go-yaml"
89
)
@@ -14,9 +15,19 @@ type RuntimeSetting struct {
1415
Paths []string `yaml:"paths,omitempty"`
1516
}
1617

18+
type HealthCheckConfig struct {
19+
Type string `yaml:"type"`
20+
Params map[string]any `yaml:"params,omitempty"`
21+
}
22+
1723
type ServiceConfig struct {
18-
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
19-
Path string `yaml:"path,omitempty"`
24+
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
25+
Path string `yaml:"path,omitempty"`
26+
Executable string `yaml:"executable,omitempty"`
27+
ExecutableArgs []string `yaml:"executable_args,omitempty"`
28+
EnvVars map[string]string `yaml:"env_vars,omitempty"`
29+
EnvVarsFile string `yaml:"env_vars_file,omitempty"`
30+
HealthChecks []HealthCheckConfig `yaml:"health_checks,omitempty"`
2031
}
2132

2233
type Config struct {
@@ -31,6 +42,30 @@ type Config struct {
3142
Services map[string]ServiceConfig `yaml:"services"`
3243
}
3344

45+
func (h *HealthCheckConfig) ParamToInt(name string) (int, error) {
46+
val, found := h.Params[name]
47+
if !found {
48+
return 0, fmt.Errorf("health check param %q not found", name)
49+
}
50+
intVal, ok := val.(int)
51+
if !ok {
52+
return 0, fmt.Errorf("health check param %q is not an integer", name)
53+
}
54+
return intVal, nil
55+
}
56+
57+
func (h *HealthCheckConfig) ParamToString(name string) (string, error) {
58+
val, found := h.Params[name]
59+
if !found {
60+
return "", fmt.Errorf("health check param %q not found", name)
61+
}
62+
strVal, ok := val.(string)
63+
if !ok {
64+
return "", fmt.Errorf("health check param %q is not a string", name)
65+
}
66+
return strVal, nil
67+
}
68+
3469
func Load(path string) (*Config, error) {
3570
data, readErr := os.ReadFile(path)
3671
if readErr != nil {
@@ -44,37 +79,70 @@ func Load(path string) (*Config, error) {
4479
}
4580

4681
for name, svc := range cfg.Services {
47-
if svc.Path == "" {
48-
continue
49-
}
50-
51-
extData, readExtErr := os.ReadFile(svc.Path)
52-
if readExtErr != nil {
53-
if os.IsNotExist(readExtErr) {
54-
continue
82+
// Load Service config from file if specified
83+
if svc.Path != "" {
84+
if extCfg, fullCfgErr := parseExternalConfig(svc.Path); fullCfgErr == nil {
85+
if extSvc, found := extCfg.Services[name]; found {
86+
cfg.Services[name] = extSvc
87+
continue
88+
}
5589
}
56-
return nil, fmt.Errorf("read service config %q: %w", svc.Path, readExtErr)
57-
}
5890

59-
var extCfg Config
60-
fullCfgErr := yaml.UnmarshalWithOptions(extData, &extCfg, yaml.Strict())
61-
if fullCfgErr == nil {
62-
if extSvc, found := extCfg.Services[name]; found && len(extSvc.Runtimes) > 0 {
63-
svc.Runtimes = extSvc.Runtimes
64-
cfg.Services[name] = svc
65-
continue
91+
// Load Service from external file if specified
92+
ext, err := parseExternalServiceConfig(svc.Path)
93+
if err != nil {
94+
return nil, fmt.Errorf("parse external service config %q: %w", svc.Path, err)
6695
}
96+
// Replace ServiceConfig with the loaded one
97+
svc = ext
98+
// Keep the path to the Service config file for future references
99+
svc.Path = path
67100
}
68101

69-
var ext ServiceConfig
70-
fallbackErr := yaml.UnmarshalWithOptions(extData, &ext, yaml.Strict())
71-
if fallbackErr != nil {
72-
return nil, fmt.Errorf("parse service config %q: %w", svc.Path, fallbackErr)
73-
}
74-
75-
svc.Runtimes = ext.Runtimes
76102
cfg.Services[name] = svc
77103
}
78104

79105
return &cfg, nil
80106
}
107+
108+
func parseExternalConfig(path string) (Config, error) {
109+
var cfg Config
110+
111+
extData, err := os.ReadFile(path)
112+
if err != nil {
113+
if os.IsNotExist(err) {
114+
return cfg, nil
115+
}
116+
return cfg, fmt.Errorf("read service config %q: %w", path, err)
117+
}
118+
119+
if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err == nil {
120+
return cfg, nil
121+
}
122+
return cfg, nil
123+
}
124+
125+
func parseExternalServiceConfig(path string) (ServiceConfig, error) {
126+
var cfg ServiceConfig
127+
extData, err := os.ReadFile(path)
128+
if err != nil {
129+
if os.IsNotExist(err) {
130+
return cfg, nil
131+
}
132+
return cfg, fmt.Errorf("read service config %q: %w", path, err)
133+
}
134+
if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err != nil {
135+
return cfg, fmt.Errorf("parse service config %q: %w", path, err)
136+
}
137+
// Abbomination to support env files sourcing
138+
// It changes exec to bash and adds source command and the original executable as the args to bash
139+
if cfg.EnvVarsFile != "" {
140+
argsString := strings.Join(cfg.ExecutableArgs, " ")
141+
cfg.ExecutableArgs = []string{
142+
"-c",
143+
fmt.Sprintf("source %s; %s %s", cfg.EnvVarsFile, cfg.Executable, argsString),
144+
}
145+
cfg.Executable = "bash"
146+
}
147+
return cfg, nil
148+
}

0 commit comments

Comments
 (0)