diff --git a/cmd/zk/deploy.go b/cmd/zk/deploy.go index 9d9b520..30dba27 100644 --- a/cmd/zk/deploy.go +++ b/cmd/zk/deploy.go @@ -1,12 +1,20 @@ package zk -import "github.com/spf13/cobra" +import ( + "github.com/jam2in/arcusctl/internal/zk" + "github.com/spf13/cobra" +) var deployCmd = &cobra.Command{ Use: "deploy ", Short: "Deploy a new ZooKeeper ensemble", Args: cobra.ExactArgs(2), Run: func(cmd *cobra.Command, args []string) { - // TODO: deploy 구현 + version := args[0] + topologyPath := args[1] + + if err := zk.Deploy(version, topologyPath); err != nil { + panic(err) + } }, } diff --git a/internal/zk/config.go b/internal/zk/config.go new file mode 100644 index 0000000..d535a7e --- /dev/null +++ b/internal/zk/config.go @@ -0,0 +1,94 @@ +package zk + +import ( + "fmt" + "strings" + + "github.com/jam2in/arcusctl/internal/topology" +) + +const ( + defaultTickTime = 2000 + defaultInitLimit = 10 + defaultSyncLimit = 5 +) + +func buildZKConfig(server topology.ZKServer, topo *topology.ZKTopology) string { + var sb strings.Builder + + cfg := server.Config + dynamicConfigPath := fmt.Sprintf("%s/conf_myid_%d/zoo.cfg.dynamic", topo.Path, server.MyID) + + fmt.Fprintf(&sb, "tickTime=%d\n", cfg.TickTime) + fmt.Fprintf(&sb, "initLimit=%d\n", cfg.InitLimit) + fmt.Fprintf(&sb, "syncLimit=%d\n", cfg.SyncLimit) + fmt.Fprintf(&sb, "dataDir=%s/zk%d\n", cfg.DataDir, server.MyID) + fmt.Fprintf(&sb, "dataLogDir=%s/zk%d\n", cfg.DataLogDir, server.MyID) + fmt.Fprintf(&sb, "dynamicConfigFile=%s\n", dynamicConfigPath) + + sb.WriteString("standaloneEnabled=false\n") + sb.WriteString("reconfigEnabled=true\n") + sb.WriteString("4lw.commands.whitelist=*\n") + + for k, v := range cfg.Properties { + fmt.Fprintf(&sb, "%s=%s\n", k, v) + } + + return sb.String() +} + +func buildZKDynamicConfig(topo *topology.ZKTopology) string { + var sb strings.Builder + + for _, s := range topo.Servers { + host, clientPort, quorumPort, electionPort := s.ParseAddress() + fmt.Fprintf(&sb, "server.%d=%s:%s:%s;%s\n", + s.MyID, host, quorumPort, electionPort, clientPort) + } + + return sb.String() +} + +func mergeConfig(globalConfig topology.ZKConfig, nodeConfig *topology.ZKConfig) topology.ZKConfig { + merged := globalConfig + if nodeConfig != nil { + if nodeConfig.TickTime > 0 { + merged.TickTime = nodeConfig.TickTime + } + if nodeConfig.InitLimit > 0 { + merged.InitLimit = nodeConfig.InitLimit + } + if nodeConfig.SyncLimit > 0 { + merged.SyncLimit = nodeConfig.SyncLimit + } + if nodeConfig.DataDir != "" { + merged.DataDir = nodeConfig.DataDir + } + if nodeConfig.DataLogDir != "" { + merged.DataLogDir = nodeConfig.DataLogDir + } + if nodeConfig.Properties != nil { + if merged.Properties == nil { + merged.Properties = map[string]string{} + } + for k, v := range nodeConfig.Properties { + merged.Properties[k] = v + } + } + } + + if merged.TickTime == 0 { + merged.TickTime = defaultTickTime + } + if merged.InitLimit == 0 { + merged.InitLimit = defaultInitLimit + } + if merged.SyncLimit == 0 { + merged.SyncLimit = defaultSyncLimit + } + if merged.DataLogDir == "" { + merged.DataLogDir = merged.DataDir + } + + return merged +} diff --git a/internal/zk/deploy.go b/internal/zk/deploy.go new file mode 100644 index 0000000..46f2949 --- /dev/null +++ b/internal/zk/deploy.go @@ -0,0 +1,177 @@ +package zk + +import ( + "bufio" + "fmt" + "os" + "strings" + "text/tabwriter" + "time" + + "github.com/go-zookeeper/zk" + "github.com/jam2in/arcusctl/internal/store" + "github.com/jam2in/arcusctl/internal/topology" +) + +func Deploy(version string, topologyPath string) error { + topo, rawData, err := prepare(topologyPath) + if err != nil { + return err + } + + printPlan(topo, version) + if !confirm() { + fmt.Println("Aborted.") + return nil + } + + localTarPath, err := ensureDownloaded(version) + if err != nil { + return err + } + + deployed, err := installAll(topo, version, localTarPath) + if err != nil { + printRecoveryGuide(deployed, topo) + return err + } + + if err := store.SaveZK(topo.Name, version, rawData); err != nil { + printRecoveryGuide(deployed, topo) + return fmt.Errorf("save metadata: %w", err) + } + + fmt.Printf("ZooKeeper ensemble %q deployed successfully.\n", topo.Name) + return nil +} + +func prepare(topologyPath string) (*topology.ZKTopology, []byte, error) { + topo, rawData, err := topology.LoadZK(topologyPath) + if err != nil { + return nil, nil, err + } + + for i := range topo.Servers { + merged := mergeConfig(topo.GlobalConfig, topo.Servers[i].Config) + topo.Servers[i].Config = &merged + } + + if err := validate(topo); err != nil { + return nil, nil, err + } + + if store.ZKExists(topo.Name) { + return nil, nil, fmt.Errorf("ZooKeeper ensemble %q already exists", topo.Name) + } + + return topo, rawData, nil +} + +func installAll(topo *topology.ZKTopology, version string, localTarPath string) ([]topology.ZKServer, error) { + var deployed []topology.ZKServer + for _, server := range topo.Servers { + fmt.Printf("[%d/%d] %s: installing...\n", len(deployed)+1, len(topo.Servers), server.Host()) + if err := installNode(server, topo, version, localTarPath); err != nil { + return deployed, fmt.Errorf("deploy failed: %w", err) + } + deployed = append(deployed, server) + } + + fmt.Println("Waiting for ZooKeeper cluster to form...") + return deployed, waitForQuorum(topo) +} + +func waitForQuorum(topo *topology.ZKTopology) error { + addr := zkConnectString(topo) + const maxWait = 30 * time.Second + const interval = 2 * time.Second + + deadline := time.Now().Add(maxWait) + for time.Now().Before(deadline) { + conn, _, err := zk.Connect([]string{addr}, 3*time.Second) + if err == nil { + _, _, err := conn.Get("/zookeeper/config") + conn.Close() + if err == nil { + return nil + } + } + time.Sleep(interval) + } + + return fmt.Errorf("ZooKeeper cluster did not form within %s", maxWait) +} + +func printPlan(topo *topology.ZKTopology, version string) { + fmt.Printf("ZooKeeper ensemble %q will be deployed (version: %s)\n\n", topo.Name, version) + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "ROLE\tHOST\tPORTS\tDIRECTORIES") + fmt.Fprintln(w, "\t\t\t\t") + for _, s := range topo.Servers { + host, clientPort, quorumPort, electionPort := s.ParseAddress() + ports := fmt.Sprintf("%s/%s/%s", clientPort, quorumPort, electionPort) + fmt.Fprintf(w, "zookeeper\t%s\t%s\t%s\n", host, ports, topo.Path) + } + w.Flush() + + fmt.Println() + fmt.Println("Attention:") + fmt.Println(" 1. If the topology is not what you expected, check your yaml file.") + fmt.Println(" 2. Please confirm there is no port/directory conflicts in same host.") +} + +func printRecoveryGuide(deployed []topology.ZKServer, topo *topology.ZKTopology) { + if len(deployed) == 0 { + return + } + + fmt.Println("\nDeployment failed. Manual recovery required.") + fmt.Println("The following servers have been installed and may still be running:") + for _, s := range deployed { + fmt.Printf(" - %s (myid=%d)\n", s.Host(), s.MyID) + } + fmt.Println("\nTo recover, manually run on each server:") + fmt.Printf(" %s/bin/zkServer.sh stop %s/conf_myid_/zoo.cfg\n", topo.Path, topo.Path) + fmt.Printf(" rm -rf %s/conf_myid_\n", topo.Path) +} + +func confirm() bool { + // FIXME: internal.ReadStdin()으로 변경 필요 + fmt.Print("\nProceed? [y/N]: ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + return false + } + + input = strings.TrimSpace(strings.ToLower(input)) + return input == "y" || input == "yes" +} + +func validate(topo *topology.ZKTopology) error { + if len(topo.Servers) == 0 { + return fmt.Errorf("no servers defined in topology") + } + + seenMyID := map[int]bool{} + seenAddress := map[string]bool{} + + for _, s := range topo.Servers { + if seenMyID[s.MyID] { + return fmt.Errorf("duplicate myid: %d", s.MyID) + } + seenMyID[s.MyID] = true + + if seenAddress[s.Address] { + return fmt.Errorf("duplicate address: %s", s.Address) + } + seenAddress[s.Address] = true + + if s.Config.DataDir == "" { + return fmt.Errorf("server myid=%d: data_dir is required", s.MyID) + } + } + + return nil +} diff --git a/internal/zk/download.go b/internal/zk/download.go new file mode 100644 index 0000000..b940b2c --- /dev/null +++ b/internal/zk/download.go @@ -0,0 +1,45 @@ +package zk + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/jam2in/arcusctl/internal" +) + +const zkDownloadURLTemplate = "https://archive.apache.org/dist/zookeeper/zookeeper-%s/apache-zookeeper-%s-bin.tar.gz" + +func ensureDownloaded(version string) (string, error) { + dir := imageDir() + + if err := os.MkdirAll(dir, 0755); err != nil { + return "", fmt.Errorf("create image dir: %w", err) + } + + filename := fmt.Sprintf("apache-zookeeper-%s-bin.tar.gz", version) + localPath := filepath.Join(dir, filename) + + if _, err := os.Stat(localPath); err == nil { + fmt.Printf("Using cached file: %s\n", localPath) + return localPath, nil + } + + url := fmt.Sprintf(zkDownloadURLTemplate, version, version) + fmt.Printf("Downloading %s...\n", url) + + cmd := exec.Command("wget", "-q", "-O", localPath, url) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + _ = os.Remove(localPath) + return "", fmt.Errorf("download %q failed: %w", version, err) + } + + return localPath, nil +} + +func imageDir() string { + return filepath.Join(internal.Config.Home, "images", "zookeeper") +} diff --git a/internal/zk/install.go b/internal/zk/install.go new file mode 100644 index 0000000..8b73ab3 --- /dev/null +++ b/internal/zk/install.go @@ -0,0 +1,76 @@ +package zk + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/jam2in/arcusctl/internal/ssh" + "github.com/jam2in/arcusctl/internal/topology" +) + +func installNode(server topology.ZKServer, topo *topology.ZKTopology, version string, localTarPath string) error { + host := server.Host() + confDir := fmt.Sprintf("%s/conf_myid_%d", topo.Path, server.MyID) + dataDir := server.Config.DataDir + dataDirPath := fmt.Sprintf("%s/zk%d", dataDir, server.MyID) + + if err := ssh.Run(host, fmt.Sprintf("mkdir -p %s", topo.Path)); err != nil { + return fmt.Errorf("mkdir base path on %s: %w", host, err) + } + + remoteTarPath := fmt.Sprintf("%s/zookeeper-%s.tar.gz", topo.Path, version) + if err := ssh.Copy(localTarPath, host, remoteTarPath); err != nil { + return fmt.Errorf("copy file to %s: %w", host, err) + } + + extractCmd := fmt.Sprintf("tar -xzf %s -C %s --strip-components=1", remoteTarPath, topo.Path) + if err := ssh.Run(host, extractCmd); err != nil { + return fmt.Errorf("extract file on %s: %w", host, err) + } + + mkdirCmd := fmt.Sprintf("mkdir -p %s %s", confDir, dataDirPath) + if err := ssh.Run(host, mkdirCmd); err != nil { + return fmt.Errorf("mkdir on %s: %w", host, err) + } + + myidCmd := fmt.Sprintf("echo %d > %s/myid", server.MyID, dataDirPath) + if err := ssh.Run(host, myidCmd); err != nil { + return fmt.Errorf("write myid on %s: %w", host, err) + } + + cfgContent := buildZKConfig(server, topo) + if err := uploadFile(host, cfgContent, filepath.Join(confDir, "zoo.cfg")); err != nil { + return fmt.Errorf("upload zoo.cfg to %s: %w", host, err) + } + + dynContent := buildZKDynamicConfig(topo) + if err := uploadFile(host, dynContent, filepath.Join(confDir, "zoo.cfg.dynamic")); err != nil { + return fmt.Errorf("upload zoo.cfg.dynamic to %s: %w", host, err) + } + + startCmd := fmt.Sprintf("%s/bin/zkServer.sh start %s/zoo.cfg", topo.Path, confDir) + if err := ssh.Run(host, startCmd); err != nil { + return fmt.Errorf("start ZK on %s: %w", host, err) + } + + return nil +} + +func uploadFile(host string, content string, remotePath string) error { + tmp, err := os.CreateTemp("", "arcusctl-*") + if err != nil { + return err + } + defer os.Remove(tmp.Name()) + + if _, err := tmp.WriteString(content); err != nil { + return err + } + + if err := tmp.Close(); err != nil { + return err + } + + return ssh.Copy(tmp.Name(), host, remotePath) +} diff --git a/internal/zk/znode.go b/internal/zk/znode.go new file mode 100644 index 0000000..c56eff0 --- /dev/null +++ b/internal/zk/znode.go @@ -0,0 +1,17 @@ +package zk + +import ( + "fmt" + "strings" + + "github.com/jam2in/arcusctl/internal/topology" +) + +func zkConnectString(topo *topology.ZKTopology) string { + addrs := make([]string, len(topo.Servers)) + for i, s := range topo.Servers { + host, clientPort, _, _ := s.ParseAddress() + addrs[i] = fmt.Sprintf("%s:%s", host, clientPort) + } + return strings.Join(addrs, ",") +} diff --git a/zk-sample-topology.yml b/zk-sample-topology.yml new file mode 100644 index 0000000..49c220a --- /dev/null +++ b/zk-sample-topology.yml @@ -0,0 +1,39 @@ +# For more configuration options, see: +# https://zookeeper.apache.org/doc/r3.5.9/zookeeperAdmin.html#sc_configuration + +name: my-ensemble # required +path: /home/arcus/zookeeper # required + +servers: + - myid: 1 # required + address: zk1:2181:2888:3888 # required (host:clientPort:quorumPort:electionPort) + config: # optional - per-node override + data_log_dir: /data/zk1-txlog + + - myid: 2 + address: zk2:2181:2888:3888 + + - myid: 3 + address: zk3:2181:2888:3888 + +global_config: + # optional (default: 2000) + tick_time: 2000 + + # optional (default: 10) - ticks for follower to sync to leader + init_limit: 10 + + # optional (default: 5) - ticks between request and ack + sync_limit: 5 + + # required + data_dir: /var/lib/zk/data + + # optional (default: data_dir) - dedicated disk recommended + data_log_dir: /var/lib/zk/datalog + + # optional - other ZK options (string values only) + properties: + maxClientCnxns: "60" + autopurge.snapRetainCount: "10" + autopurge.purgeInterval: "24" \ No newline at end of file