Skip to content

Commit 44b0243

Browse files
committed
feature(systemd): added the ability to start a service with the predefined config
1 parent 62790ee commit 44b0243

14 files changed

Lines changed: 683 additions & 29 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# Added by goreleaser init:
22
dist/
3+
cmd/ad-runtime-utils/ad-runtime-utils

README.MD

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,15 @@ Default runtimes:
9191
Service <service-name>:
9292
<runtime>: <path or "error: <message>">
9393
94-
```
94+
```
95+
96+
### 4. Starting a Service
97+
98+
When --start and/or --supervise flags are provided --service is a required argument.
99+
100+
- It starts the executable `services.<service-name>.executable` with the specified arguments from `services.<service-name>.executable_args`.
101+
102+
- If the `--supervise` flag is provided, it will run the health checks, defined in `services.<service-name>.health_checks` on service start to make sure the service is operational. If any of the checks fail the service will be stopped. `Type=notify` should be used in the systemd unit(see example in `examples/systemd` directory).
103+
104+
- While using `--supervise` and health checks, make sure that systemd service has enough `TimeoutStartSec`. ideally should be a combined timeout of all health checks.
105+

cmd/ad-runtime-utils/app.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
"flag"
55
"fmt"
66
"io"
7+
"os"
78
"strings"
89

910
"github.com/arenadata/ad-runtime-utils/internal/config"
1011
"github.com/arenadata/ad-runtime-utils/internal/detect"
12+
"github.com/arenadata/ad-runtime-utils/internal/exec"
13+
"github.com/coreos/go-systemd/v22/daemon"
1114
)
1215

1316
// exit codes.
@@ -27,6 +30,8 @@ func Run(args []string, stdout, stderr io.Writer) int {
2730
listAll := fs.Bool("list", false, "List all detected runtimes (default + services)")
2831
fs.BoolVar(listAll, "l", false, "shorthand for --list")
2932
printCACerts := fs.Bool("print-cacerts", false, "When used with --runtime=java, prints the cacerts path and exits")
33+
start := fs.Bool("start", false, "Start the service. Use with simple/exec services")
34+
supervise := fs.Bool("supervise", false, "Supervise the service. Use with norify services")
3035

3136
if err := fs.Parse(args); err != nil {
3237
return exitParseError
@@ -76,6 +81,15 @@ func Run(args []string, stdout, stderr io.Writer) int {
7681
}
7782

7883
envName := detectEnvName(cfg, *service, *runtime)
84+
85+
if *start {
86+
if err = startService(*service, envName, path, *cfg, *supervise); err != nil {
87+
fmt.Fprintf(stderr, "start service failed: %v\n", err)
88+
return exitUserError
89+
}
90+
return exitOK
91+
}
92+
7993
fmt.Fprintf(stdout, "export %s=%s\n", envName, path)
8094
return exitOK
8195
}
@@ -124,3 +138,52 @@ func runList(cfg *config.Config, stdout, stderr io.Writer) int {
124138
}
125139
return exitOK
126140
}
141+
142+
func startService(service string, envName string, envPath string, cfg config.Config, supervise bool) error {
143+
srvConfig, ok := cfg.Services[service]
144+
if !ok {
145+
return fmt.Errorf("service %s not found in config", service)
146+
}
147+
// Append the env for the runtime (eg. JAVA_HOME)
148+
if srvConfig.EnvVars == nil {
149+
srvConfig.EnvVars = make(map[string]string)
150+
}
151+
srvConfig.EnvVars[envName] = envPath
152+
if !supervise {
153+
return exec.RunExecutable(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
154+
}
155+
process, err := exec.RunExecutableAsync(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
156+
if err != nil {
157+
return err
158+
}
159+
// Run the health checks
160+
for _, checkCfg := range srvConfig.HealthChecks {
161+
switch checkCfg.Type {
162+
case exec.PortHealthCheckType:
163+
portheck := exec.PortHealthCheck{
164+
PID: process.Process.Pid,
165+
Config: checkCfg,
166+
}
167+
if err = portheck.Check(); err != nil {
168+
if err = process.Process.Signal(os.Interrupt); err != nil {
169+
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
170+
}
171+
return fmt.Errorf("health check failed: %w", err)
172+
}
173+
default:
174+
if err = process.Process.Signal(os.Interrupt); err != nil {
175+
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
176+
}
177+
return fmt.Errorf("unknown health check type: %s", checkCfg.Type)
178+
}
179+
}
180+
// Notify systemd daemon that service has started
181+
if _, err = daemon.SdNotify(false, daemon.SdNotifyReady); err != nil {
182+
fmt.Fprintf(os.Stderr, "systemd notification failed: %v\n", err)
183+
}
184+
// TODO: Replace this with an actual supervisor loop
185+
if err = process.Wait(); err != nil {
186+
return err
187+
}
188+
return nil
189+
}

configs/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,4 @@ services:
9191
FLINK2:
9292
path: /etc/flink2/conf/flink2-java.yaml
9393
OZONE:
94-
path: /etc/ozone/conf/ozone-java.yaml
94+
path: /etc/ozone/conf/ozone-java.yaml

examples/config/kafka.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
executable: bin/kafka-server-start.sh
2+
executable_args:
3+
- config/server.properties
4+
runtimes:
5+
java:
6+
version: "21"
7+
health_checks:
8+
- type: port
9+
params:
10+
port: 9092
11+
timeout: 20
12+
protocol: tcp
13+
- type: port
14+
params:
15+
port: 9093
16+
timeout: 20
17+
protocol: tcp

examples/systemd/kafka.service

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[Unit]
2+
Description=Test Sleep
3+
4+
[Service]
5+
Type=notify
6+
ExecStart=bin/ad-runtime-utils --config configs/config.yaml --service kafka --runtime java --start --supervise
7+
TimeoutStartSec=120
8+
9+
[Install]
10+
WantedBy=multi-user.target

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,7 @@ module github.com/arenadata/ad-runtime-utils
22

33
go 1.24.4
44

5-
require github.com/goccy/go-yaml v1.18.0
5+
require (
6+
github.com/coreos/go-systemd/v22 v22.6.0
7+
github.com/goccy/go-yaml v1.18.0
8+
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
github.com/coreos/go-systemd/v22 v22.6.0 h1:aGVa/v8B7hpb0TKl0MWoAavPDmHvobFe5R5zn0bCJWo=
2+
github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X7Lua8rrTWzYgWU=
13
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
24
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=

internal/config/config.go

Lines changed: 122 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package config
22

33
import (
4+
"bufio"
45
"fmt"
56
"os"
7+
"strings"
68

79
"github.com/goccy/go-yaml"
810
)
@@ -14,9 +16,19 @@ type RuntimeSetting struct {
1416
Paths []string `yaml:"paths,omitempty"`
1517
}
1618

19+
type HealthCheckConfig struct {
20+
Type string `yaml:"type"`
21+
Params map[string]any `yaml:"params,omitempty"`
22+
}
23+
1724
type ServiceConfig struct {
18-
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
19-
Path string `yaml:"path,omitempty"`
25+
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
26+
Path string `yaml:"path,omitempty"`
27+
Executable string `yaml:"executable,omitempty"`
28+
ExecutableArgs []string `yaml:"executable_args,omitempty"`
29+
EnvVars map[string]string `yaml:"env_vars,omitempty"`
30+
EnvVarsFile string `yaml:"env_vars_file,omitempty"`
31+
HealthChecks []HealthCheckConfig `yaml:"health_checks,omitempty"`
2032
}
2133

2234
type Config struct {
@@ -31,6 +43,30 @@ type Config struct {
3143
Services map[string]ServiceConfig `yaml:"services"`
3244
}
3345

46+
func (h *HealthCheckConfig) ParamToInt(name string) (int, error) {
47+
val, found := h.Params[name]
48+
if !found {
49+
return 0, fmt.Errorf("health check param %q not found", name)
50+
}
51+
intVal, ok := val.(int)
52+
if !ok {
53+
return 0, fmt.Errorf("health check param %q is not an integer", name)
54+
}
55+
return intVal, nil
56+
}
57+
58+
func (h *HealthCheckConfig) ParamToString(name string) (string, error) {
59+
val, found := h.Params[name]
60+
if !found {
61+
return "", fmt.Errorf("health check param %q not found", name)
62+
}
63+
strVal, ok := val.(string)
64+
if !ok {
65+
return "", fmt.Errorf("health check param %q is not a string", name)
66+
}
67+
return strVal, nil
68+
}
69+
3470
func Load(path string) (*Config, error) {
3571
data, readErr := os.ReadFile(path)
3672
if readErr != nil {
@@ -44,37 +80,97 @@ func Load(path string) (*Config, error) {
4480
}
4581

4682
for name, svc := range cfg.Services {
47-
if svc.Path == "" {
48-
continue
49-
}
50-
51-
extData, readExtErr := os.ReadFile(svc.Path)
52-
if readExtErr != nil {
53-
if os.IsNotExist(readExtErr) {
54-
continue
83+
// Load Service config from file if specified
84+
if svc.Path != "" {
85+
if extCfg, fullCfgErr := parseExternalConfig(svc.Path); fullCfgErr == nil {
86+
if extSvc, found := extCfg.Services[name]; found {
87+
cfg.Services[name] = extSvc
88+
continue
89+
}
5590
}
56-
return nil, fmt.Errorf("read service config %q: %w", svc.Path, readExtErr)
57-
}
5891

59-
var extCfg Config
60-
fullCfgErr := yaml.UnmarshalWithOptions(extData, &extCfg, yaml.Strict())
61-
if fullCfgErr == nil {
62-
if extSvc, found := extCfg.Services[name]; found && len(extSvc.Runtimes) > 0 {
63-
svc.Runtimes = extSvc.Runtimes
64-
cfg.Services[name] = svc
65-
continue
92+
// Load Service from external file if specified
93+
ext, err := parseExternalServiceConfig(svc.Path)
94+
if err != nil {
95+
return nil, fmt.Errorf("parse external service config %q: %w", svc.Path, err)
6696
}
97+
// Replace ServiceConfig with the loaded one
98+
svc = ext
99+
// Keep the path to the Service config file for future references
100+
svc.Path = path
67101
}
68102

69-
var ext ServiceConfig
70-
fallbackErr := yaml.UnmarshalWithOptions(extData, &ext, yaml.Strict())
71-
if fallbackErr != nil {
72-
return nil, fmt.Errorf("parse service config %q: %w", svc.Path, fallbackErr)
73-
}
74-
75-
svc.Runtimes = ext.Runtimes
76103
cfg.Services[name] = svc
77104
}
78105

79106
return &cfg, nil
80107
}
108+
109+
func parseExternalConfig(path string) (Config, error) {
110+
var cfg Config
111+
112+
extData, err := os.ReadFile(path)
113+
if err != nil {
114+
if os.IsNotExist(err) {
115+
return cfg, nil
116+
}
117+
return cfg, fmt.Errorf("read service config %q: %w", path, err)
118+
}
119+
120+
if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err == nil {
121+
return cfg, nil
122+
}
123+
return cfg, nil
124+
}
125+
126+
func parseExternalServiceConfig(path string) (ServiceConfig, error) {
127+
var cfg ServiceConfig
128+
extData, err := os.ReadFile(path)
129+
if err != nil {
130+
if os.IsNotExist(err) {
131+
return cfg, nil
132+
}
133+
return cfg, fmt.Errorf("read service config %q: %w", path, err)
134+
}
135+
if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err != nil {
136+
return cfg, fmt.Errorf("parse service config %q: %w", path, err)
137+
}
138+
// Load EnvVars from file if specified in the external config
139+
if cfg.EnvVarsFile != "" {
140+
err = loadEnvVarsFile(&cfg)
141+
if err != nil {
142+
return cfg, fmt.Errorf("load env vars from file %q: %w", cfg.EnvVarsFile, err)
143+
}
144+
}
145+
return cfg, nil
146+
}
147+
148+
func loadEnvVarsFile(svc *ServiceConfig) error {
149+
envVarsFile, err := os.Open(svc.EnvVarsFile)
150+
if err != nil {
151+
return err
152+
}
153+
defer envVarsFile.Close()
154+
if svc.EnvVars == nil {
155+
svc.EnvVars = make(map[string]string)
156+
}
157+
scanner := bufio.NewScanner(envVarsFile)
158+
for scanner.Scan() {
159+
line := scanner.Text()
160+
// Skip comments and empty lines
161+
if strings.HasPrefix(line, "#") || line == "" {
162+
continue
163+
}
164+
// Split on the first '=' to get the key-value pair
165+
splitSubstrings := 2
166+
parts := strings.SplitN(line, "=", splitSubstrings)
167+
if len(parts) != splitSubstrings {
168+
continue
169+
}
170+
// Add new env var only if it doesn't already exist
171+
if _, found := svc.EnvVars[parts[0]]; !found {
172+
svc.EnvVars[parts[0]] = strings.Trim(parts[1], "\"")
173+
}
174+
}
175+
return nil
176+
}

internal/config/config_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,42 @@ services:
158158
t.Fatal("expected error parsing external service config")
159159
}
160160
}
161+
func TestLoad_ServiceEnvVarsFile(t *testing.T) {
162+
tmpDir := t.TempDir()
163+
envFilePath := filepath.Join(tmpDir, "env.sh")
164+
svcContent := []byte(`
165+
env_vars_file: "` + envFilePath + `"
166+
`)
167+
svcCfgFile := filepath.Join(tmpDir, "trino.yaml")
168+
if err := os.WriteFile(svcCfgFile, svcContent, 0o644); err != nil {
169+
t.Fatalf("write external service config: %v", err)
170+
}
171+
envContent := []byte(`
172+
# some comment
173+
ENV_TEST="test_value"
174+
175+
`)
176+
if err := os.WriteFile(envFilePath, envContent, 0o644); err != nil {
177+
t.Fatalf("write environment variables file: %v", err)
178+
}
179+
mainContent := []byte(`
180+
services:
181+
svc1:
182+
path: "` + svcCfgFile + `"
183+
`)
184+
mainFilePath := filepath.Join(tmpDir, "config.yaml")
185+
if err := os.WriteFile(mainFilePath, mainContent, 0o644); err != nil {
186+
t.Fatalf("write main config: %v", err)
187+
}
188+
cfg, err := Load(mainFilePath)
189+
if err != nil {
190+
t.Fatalf("Load() failed: %v", err)
191+
}
192+
envValue, found := cfg.Services["svc1"].EnvVars["ENV_TEST"]
193+
if !found {
194+
t.Fatalf("env_vars_file not loaded correctly. config: %q", cfg)
195+
}
196+
if envValue != "test_value" {
197+
t.Fatalf("env_vars_file loaded incorrectly, got '%s', expected 'test_value'", envValue)
198+
}
199+
}

0 commit comments

Comments
 (0)