Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cmd/zk/deploy.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package zk

import "github.com/spf13/cobra"
import (
"github.com/jam2in/arcusctl/internal/zk"
"github.com/spf13/cobra"
)

var deployCmd = &cobra.Command{
Use: "deploy <version> <topology.yml>",
Short: "Deploy a new ZooKeeper ensemble",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
// TODO: deploy κ΅¬ν˜„
version := args[0]
topologyPath := args[1]

if err := zk.Deploy(version, topologyPath); err != nil {
panic(err)
}
},
}
94 changes: 94 additions & 0 deletions internal/zk/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package zk

import (
"fmt"
"strings"

"github.com/jam2in/arcusctl/internal/topology"
)

const (
defaultTickTime = 2000
defaultInitLimit = 10
defaultSyncLimit = 5
)

func buildZKConfig(server topology.ZKServer, topo *topology.ZKTopology) string {
var sb strings.Builder

cfg := server.Config
dynamicConfigPath := fmt.Sprintf("%s/conf_myid_%d/zoo.cfg.dynamic", topo.Path, server.MyID)

fmt.Fprintf(&sb, "tickTime=%d\n", cfg.TickTime)
fmt.Fprintf(&sb, "initLimit=%d\n", cfg.InitLimit)
fmt.Fprintf(&sb, "syncLimit=%d\n", cfg.SyncLimit)
fmt.Fprintf(&sb, "dataDir=%s/zk%d\n", cfg.DataDir, server.MyID)
fmt.Fprintf(&sb, "dataLogDir=%s/zk%d\n", cfg.DataLogDir, server.MyID)
fmt.Fprintf(&sb, "dynamicConfigFile=%s\n", dynamicConfigPath)

sb.WriteString("standaloneEnabled=false\n")
sb.WriteString("reconfigEnabled=true\n")
sb.WriteString("4lw.commands.whitelist=*\n")

for k, v := range cfg.Properties {
fmt.Fprintf(&sb, "%s=%s\n", k, v)
}

return sb.String()
}

func buildZKDynamicConfig(topo *topology.ZKTopology) string {
var sb strings.Builder

for _, s := range topo.Servers {
host, clientPort, quorumPort, electionPort := s.ParseAddress()
fmt.Fprintf(&sb, "server.%d=%s:%s:%s;%s\n",
s.MyID, host, quorumPort, electionPort, clientPort)
}

return sb.String()
}

func mergeConfig(globalConfig topology.ZKConfig, nodeConfig *topology.ZKConfig) topology.ZKConfig {
Comment thread
namsic marked this conversation as resolved.
merged := globalConfig
if nodeConfig != nil {
if nodeConfig.TickTime > 0 {
merged.TickTime = nodeConfig.TickTime
}
if nodeConfig.InitLimit > 0 {
merged.InitLimit = nodeConfig.InitLimit
}
if nodeConfig.SyncLimit > 0 {
merged.SyncLimit = nodeConfig.SyncLimit
}
if nodeConfig.DataDir != "" {
merged.DataDir = nodeConfig.DataDir
}
if nodeConfig.DataLogDir != "" {
merged.DataLogDir = nodeConfig.DataLogDir
}
if nodeConfig.Properties != nil {
if merged.Properties == nil {
merged.Properties = map[string]string{}
}
for k, v := range nodeConfig.Properties {
merged.Properties[k] = v
}
}
}

if merged.TickTime == 0 {
merged.TickTime = defaultTickTime
}
if merged.InitLimit == 0 {
merged.InitLimit = defaultInitLimit
}
if merged.SyncLimit == 0 {
merged.SyncLimit = defaultSyncLimit
}
if merged.DataLogDir == "" {
merged.DataLogDir = merged.DataDir
}

return merged
}
177 changes: 177 additions & 0 deletions internal/zk/deploy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package zk

import (
"bufio"
"fmt"
"os"
"strings"
"text/tabwriter"
"time"

"github.com/go-zookeeper/zk"
"github.com/jam2in/arcusctl/internal/store"
"github.com/jam2in/arcusctl/internal/topology"
)

func Deploy(version string, topologyPath string) error {
topo, rawData, err := prepare(topologyPath)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ν•¨μˆ˜μ™€ λ°˜ν™˜ νƒ€μž…μ„ 보고 μ–΄λ–€ μž‘μ—…μ„ ν•˜κ²Œ λ˜λŠ”μ§€ μ•ŒκΈ°κ°€ 쑰금 μ–΄λ €μš΄λ°, λ„€μ΄λ°μ΄λ‚˜ λ³€μˆ˜λͺ…을 쒀더 읽기 μ‰½κ²Œ λ§Œλ“€ 수 μžˆμ„κΉŒμš”?

if err != nil {
return err
}

printPlan(topo, version)
if !confirm() {
fmt.Println("Aborted.")
return nil
}
Comment thread
namsic marked this conversation as resolved.

localTarPath, err := ensureDownloaded(version)
if err != nil {
return err
}

deployed, err := installAll(topo, version, localTarPath)
if err != nil {
printRecoveryGuide(deployed, topo)
return err
}

if err := store.SaveZK(topo.Name, version, rawData); err != nil {
printRecoveryGuide(deployed, topo)
return fmt.Errorf("save metadata: %w", err)
}

fmt.Printf("ZooKeeper ensemble %q deployed successfully.\n", topo.Name)
return nil
}

func prepare(topologyPath string) (*topology.ZKTopology, []byte, error) {
topo, rawData, err := topology.LoadZK(topologyPath)
if err != nil {
return nil, nil, err
}

for i := range topo.Servers {
merged := mergeConfig(topo.GlobalConfig, topo.Servers[i].Config)
topo.Servers[i].Config = &merged
}

if err := validate(topo); err != nil {
Comment thread
namsic marked this conversation as resolved.
return nil, nil, err
}

if store.ZKExists(topo.Name) {
return nil, nil, fmt.Errorf("ZooKeeper ensemble %q already exists", topo.Name)
}
Comment on lines +63 to +65
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoadZK 직후에 이뢀뢄을 κ°€μž₯ λ¨Όμ € μˆ˜ν–‰ν•  수 μžˆλ‚˜μš”?


return topo, rawData, nil
}

func installAll(topo *topology.ZKTopology, version string, localTarPath string) ([]topology.ZKServer, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 λ©”μ„œλ“œλŠ” install.go에 두지 μ•ŠλŠ” μ΄μœ κ°€ λ¬΄μ—‡μΈκ°€μš”?

var deployed []topology.ZKServer
for _, server := range topo.Servers {
fmt.Printf("[%d/%d] %s: installing...\n", len(deployed)+1, len(topo.Servers), server.Host())
if err := installNode(server, topo, version, localTarPath); err != nil {
return deployed, fmt.Errorf("deploy failed: %w", err)
}
deployed = append(deployed, server)
}

fmt.Println("Waiting for ZooKeeper cluster to form...")
return deployed, waitForQuorum(topo)
}

func waitForQuorum(topo *topology.ZKTopology) error {
addr := zkConnectString(topo)
const maxWait = 30 * time.Second
const interval = 2 * time.Second

deadline := time.Now().Add(maxWait)
for time.Now().Before(deadline) {
conn, _, err := zk.Connect([]string{addr}, 3*time.Second)
if err == nil {
_, _, err := conn.Get("/zookeeper/config")
conn.Close()
if err == nil {
return nil
}
}
time.Sleep(interval)
}

return fmt.Errorf("ZooKeeper cluster did not form within %s", maxWait)
}

func printPlan(topo *topology.ZKTopology, version string) {
fmt.Printf("ZooKeeper ensemble %q will be deployed (version: %s)\n\n", topo.Name, version)

w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ROLE\tHOST\tPORTS\tDIRECTORIES")
fmt.Fprintln(w, "\t\t\t\t")
for _, s := range topo.Servers {
host, clientPort, quorumPort, electionPort := s.ParseAddress()
ports := fmt.Sprintf("%s/%s/%s", clientPort, quorumPort, electionPort)
fmt.Fprintf(w, "zookeeper\t%s\t%s\t%s\n", host, ports, topo.Path)
}
w.Flush()

fmt.Println()
fmt.Println("Attention:")
fmt.Println(" 1. If the topology is not what you expected, check your yaml file.")
fmt.Println(" 2. Please confirm there is no port/directory conflicts in same host.")
}

func printRecoveryGuide(deployed []topology.ZKServer, topo *topology.ZKTopology) {
if len(deployed) == 0 {
return
}

fmt.Println("\nDeployment failed. Manual recovery required.")
fmt.Println("The following servers have been installed and may still be running:")
for _, s := range deployed {
fmt.Printf(" - %s (myid=%d)\n", s.Host(), s.MyID)
}
fmt.Println("\nTo recover, manually run on each server:")
fmt.Printf(" %s/bin/zkServer.sh stop %s/conf_myid_<myid>/zoo.cfg\n", topo.Path, topo.Path)
fmt.Printf(" rm -rf %s/conf_myid_<myid>\n", topo.Path)
}

func confirm() bool {
// FIXME: internal.ReadStdin()으둜 λ³€κ²½ ν•„μš”
fmt.Print("\nProceed? [y/N]: ")
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
return false
}

input = strings.TrimSpace(strings.ToLower(input))
return input == "y" || input == "yes"
}

func validate(topo *topology.ZKTopology) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 λ©”μ„œλ“œλŠ” topology νŒ¨ν‚€μ§€ 내에 μžˆλŠ”κ²Œ λ‚«μ§€ μ•Šλ‚˜μš”?

if len(topo.Servers) == 0 {
return fmt.Errorf("no servers defined in topology")
}

seenMyID := map[int]bool{}
seenAddress := map[string]bool{}

for _, s := range topo.Servers {
if seenMyID[s.MyID] {
return fmt.Errorf("duplicate myid: %d", s.MyID)
}
seenMyID[s.MyID] = true

if seenAddress[s.Address] {
return fmt.Errorf("duplicate address: %s", s.Address)
}
seenAddress[s.Address] = true

if s.Config.DataDir == "" {
return fmt.Errorf("server myid=%d: data_dir is required", s.MyID)
}
}

return nil
}
45 changes: 45 additions & 0 deletions internal/zk/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package zk

import (
"fmt"
"os"
"os/exec"
"path/filepath"

"github.com/jam2in/arcusctl/internal"
)

const zkDownloadURLTemplate = "https://archive.apache.org/dist/zookeeper/zookeeper-%s/apache-zookeeper-%s-bin.tar.gz"

func ensureDownloaded(version string) (string, error) {
dir := imageDir()

if err := os.MkdirAll(dir, 0755); err != nil {
return "", fmt.Errorf("create image dir: %w", err)
}

filename := fmt.Sprintf("apache-zookeeper-%s-bin.tar.gz", version)
localPath := filepath.Join(dir, filename)

if _, err := os.Stat(localPath); err == nil {
fmt.Printf("Using cached file: %s\n", localPath)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cached -> existing ν‘œν˜„μ΄ λ‚«μ§€ μ•Šμ„κΉŒμš”?

return localPath, nil
}

url := fmt.Sprintf(zkDownloadURLTemplate, version, version)
fmt.Printf("Downloading %s...\n", url)

cmd := exec.Command("wget", "-q", "-O", localPath, url)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
_ = os.Remove(localPath)
return "", fmt.Errorf("download %q failed: %w", version, err)
}

return localPath, nil
}

func imageDir() string {
return filepath.Join(internal.Config.Home, "images", "zookeeper")
}
Loading
Loading