-
Notifications
You must be signed in to change notification settings - Fork 0
FEATURE: Add zk deploy command #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,20 @@ | ||
| package zk | ||
|
|
||
| import "github.com/spf13/cobra" | ||
| import ( | ||
| "github.com/jam2in/arcusctl/internal/zk" | ||
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
||
| var deployCmd = &cobra.Command{ | ||
| Use: "deploy <version> <topology.yml>", | ||
| Short: "Deploy a new ZooKeeper ensemble", | ||
| Args: cobra.ExactArgs(2), | ||
| Run: func(cmd *cobra.Command, args []string) { | ||
| // TODO: deploy ꡬν | ||
| version := args[0] | ||
| topologyPath := args[1] | ||
|
|
||
| if err := zk.Deploy(version, topologyPath); err != nil { | ||
| panic(err) | ||
| } | ||
| }, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| package zk | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "github.com/jam2in/arcusctl/internal/topology" | ||
| ) | ||
|
|
||
| const ( | ||
| defaultTickTime = 2000 | ||
| defaultInitLimit = 10 | ||
| defaultSyncLimit = 5 | ||
| ) | ||
|
|
||
| func buildZKConfig(server topology.ZKServer, topo *topology.ZKTopology) string { | ||
| var sb strings.Builder | ||
|
|
||
| cfg := server.Config | ||
| dynamicConfigPath := fmt.Sprintf("%s/conf_myid_%d/zoo.cfg.dynamic", topo.Path, server.MyID) | ||
|
|
||
| fmt.Fprintf(&sb, "tickTime=%d\n", cfg.TickTime) | ||
| fmt.Fprintf(&sb, "initLimit=%d\n", cfg.InitLimit) | ||
| fmt.Fprintf(&sb, "syncLimit=%d\n", cfg.SyncLimit) | ||
| fmt.Fprintf(&sb, "dataDir=%s/zk%d\n", cfg.DataDir, server.MyID) | ||
| fmt.Fprintf(&sb, "dataLogDir=%s/zk%d\n", cfg.DataLogDir, server.MyID) | ||
| fmt.Fprintf(&sb, "dynamicConfigFile=%s\n", dynamicConfigPath) | ||
|
|
||
| sb.WriteString("standaloneEnabled=false\n") | ||
| sb.WriteString("reconfigEnabled=true\n") | ||
| sb.WriteString("4lw.commands.whitelist=*\n") | ||
|
|
||
| for k, v := range cfg.Properties { | ||
| fmt.Fprintf(&sb, "%s=%s\n", k, v) | ||
| } | ||
|
|
||
| return sb.String() | ||
| } | ||
|
|
||
| func buildZKDynamicConfig(topo *topology.ZKTopology) string { | ||
| var sb strings.Builder | ||
|
|
||
| for _, s := range topo.Servers { | ||
| host, clientPort, quorumPort, electionPort := s.ParseAddress() | ||
| fmt.Fprintf(&sb, "server.%d=%s:%s:%s;%s\n", | ||
| s.MyID, host, quorumPort, electionPort, clientPort) | ||
| } | ||
|
|
||
| return sb.String() | ||
| } | ||
|
|
||
| func mergeConfig(globalConfig topology.ZKConfig, nodeConfig *topology.ZKConfig) topology.ZKConfig { | ||
| merged := globalConfig | ||
| if nodeConfig != nil { | ||
| if nodeConfig.TickTime > 0 { | ||
| merged.TickTime = nodeConfig.TickTime | ||
| } | ||
| if nodeConfig.InitLimit > 0 { | ||
| merged.InitLimit = nodeConfig.InitLimit | ||
| } | ||
| if nodeConfig.SyncLimit > 0 { | ||
| merged.SyncLimit = nodeConfig.SyncLimit | ||
| } | ||
| if nodeConfig.DataDir != "" { | ||
| merged.DataDir = nodeConfig.DataDir | ||
| } | ||
| if nodeConfig.DataLogDir != "" { | ||
| merged.DataLogDir = nodeConfig.DataLogDir | ||
| } | ||
| if nodeConfig.Properties != nil { | ||
| if merged.Properties == nil { | ||
| merged.Properties = map[string]string{} | ||
| } | ||
| for k, v := range nodeConfig.Properties { | ||
| merged.Properties[k] = v | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if merged.TickTime == 0 { | ||
| merged.TickTime = defaultTickTime | ||
| } | ||
| if merged.InitLimit == 0 { | ||
| merged.InitLimit = defaultInitLimit | ||
| } | ||
| if merged.SyncLimit == 0 { | ||
| merged.SyncLimit = defaultSyncLimit | ||
| } | ||
| if merged.DataLogDir == "" { | ||
| merged.DataLogDir = merged.DataDir | ||
| } | ||
|
|
||
| return merged | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| package zk | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "fmt" | ||
| "os" | ||
| "strings" | ||
| "text/tabwriter" | ||
| "time" | ||
|
|
||
| "github.com/go-zookeeper/zk" | ||
| "github.com/jam2in/arcusctl/internal/store" | ||
| "github.com/jam2in/arcusctl/internal/topology" | ||
| ) | ||
|
|
||
| func Deploy(version string, topologyPath string) error { | ||
| topo, rawData, err := prepare(topologyPath) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ν¨μμ λ°ν νμ μ λ³΄κ³ μ΄λ€ μμ μ νκ² λλμ§ μκΈ°κ° μ‘°κΈ μ΄λ €μ΄λ°, λ€μ΄λ°μ΄λ λ³μλͺ μ μ’λ μ½κΈ° μ½κ² λ§λ€ μ μμκΉμ? |
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| printPlan(topo, version) | ||
| if !confirm() { | ||
| fmt.Println("Aborted.") | ||
| return nil | ||
| } | ||
|
namsic marked this conversation as resolved.
|
||
|
|
||
| localTarPath, err := ensureDownloaded(version) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| deployed, err := installAll(topo, version, localTarPath) | ||
| if err != nil { | ||
| printRecoveryGuide(deployed, topo) | ||
| return err | ||
| } | ||
|
|
||
| if err := store.SaveZK(topo.Name, version, rawData); err != nil { | ||
| printRecoveryGuide(deployed, topo) | ||
| return fmt.Errorf("save metadata: %w", err) | ||
| } | ||
|
|
||
| fmt.Printf("ZooKeeper ensemble %q deployed successfully.\n", topo.Name) | ||
| return nil | ||
| } | ||
|
|
||
| func prepare(topologyPath string) (*topology.ZKTopology, []byte, error) { | ||
| topo, rawData, err := topology.LoadZK(topologyPath) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
|
|
||
| for i := range topo.Servers { | ||
| merged := mergeConfig(topo.GlobalConfig, topo.Servers[i].Config) | ||
| topo.Servers[i].Config = &merged | ||
| } | ||
|
|
||
| if err := validate(topo); err != nil { | ||
|
namsic marked this conversation as resolved.
|
||
| return nil, nil, err | ||
| } | ||
|
|
||
| if store.ZKExists(topo.Name) { | ||
| return nil, nil, fmt.Errorf("ZooKeeper ensemble %q already exists", topo.Name) | ||
| } | ||
|
Comment on lines
+63
to
+65
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LoadZK μ§νμ μ΄λΆλΆμ κ°μ₯ λ¨Όμ μνν μ μλμ? |
||
|
|
||
| return topo, rawData, nil | ||
| } | ||
|
|
||
| func installAll(topo *topology.ZKTopology, version string, localTarPath string) ([]topology.ZKServer, error) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. μ΄ λ©μλλ install.goμ λμ§ μλ μ΄μ κ° λ¬΄μμΈκ°μ? |
||
| var deployed []topology.ZKServer | ||
| for _, server := range topo.Servers { | ||
| fmt.Printf("[%d/%d] %s: installing...\n", len(deployed)+1, len(topo.Servers), server.Host()) | ||
| if err := installNode(server, topo, version, localTarPath); err != nil { | ||
| return deployed, fmt.Errorf("deploy failed: %w", err) | ||
| } | ||
| deployed = append(deployed, server) | ||
| } | ||
|
|
||
| fmt.Println("Waiting for ZooKeeper cluster to form...") | ||
| return deployed, waitForQuorum(topo) | ||
| } | ||
|
|
||
| func waitForQuorum(topo *topology.ZKTopology) error { | ||
| addr := zkConnectString(topo) | ||
| const maxWait = 30 * time.Second | ||
| const interval = 2 * time.Second | ||
|
|
||
| deadline := time.Now().Add(maxWait) | ||
| for time.Now().Before(deadline) { | ||
| conn, _, err := zk.Connect([]string{addr}, 3*time.Second) | ||
| if err == nil { | ||
| _, _, err := conn.Get("/zookeeper/config") | ||
| conn.Close() | ||
| if err == nil { | ||
| return nil | ||
| } | ||
| } | ||
| time.Sleep(interval) | ||
| } | ||
|
|
||
| return fmt.Errorf("ZooKeeper cluster did not form within %s", maxWait) | ||
| } | ||
|
|
||
| func printPlan(topo *topology.ZKTopology, version string) { | ||
| fmt.Printf("ZooKeeper ensemble %q will be deployed (version: %s)\n\n", topo.Name, version) | ||
|
|
||
| w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) | ||
| fmt.Fprintln(w, "ROLE\tHOST\tPORTS\tDIRECTORIES") | ||
| fmt.Fprintln(w, "\t\t\t\t") | ||
| for _, s := range topo.Servers { | ||
| host, clientPort, quorumPort, electionPort := s.ParseAddress() | ||
| ports := fmt.Sprintf("%s/%s/%s", clientPort, quorumPort, electionPort) | ||
| fmt.Fprintf(w, "zookeeper\t%s\t%s\t%s\n", host, ports, topo.Path) | ||
| } | ||
| w.Flush() | ||
|
|
||
| fmt.Println() | ||
| fmt.Println("Attention:") | ||
| fmt.Println(" 1. If the topology is not what you expected, check your yaml file.") | ||
| fmt.Println(" 2. Please confirm there is no port/directory conflicts in same host.") | ||
| } | ||
|
|
||
| func printRecoveryGuide(deployed []topology.ZKServer, topo *topology.ZKTopology) { | ||
| if len(deployed) == 0 { | ||
| return | ||
| } | ||
|
|
||
| fmt.Println("\nDeployment failed. Manual recovery required.") | ||
| fmt.Println("The following servers have been installed and may still be running:") | ||
| for _, s := range deployed { | ||
| fmt.Printf(" - %s (myid=%d)\n", s.Host(), s.MyID) | ||
| } | ||
| fmt.Println("\nTo recover, manually run on each server:") | ||
| fmt.Printf(" %s/bin/zkServer.sh stop %s/conf_myid_<myid>/zoo.cfg\n", topo.Path, topo.Path) | ||
| fmt.Printf(" rm -rf %s/conf_myid_<myid>\n", topo.Path) | ||
| } | ||
|
|
||
| func confirm() bool { | ||
| // FIXME: internal.ReadStdin()μΌλ‘ λ³κ²½ νμ | ||
| fmt.Print("\nProceed? [y/N]: ") | ||
| reader := bufio.NewReader(os.Stdin) | ||
| input, err := reader.ReadString('\n') | ||
| if err != nil { | ||
| return false | ||
| } | ||
|
|
||
| input = strings.TrimSpace(strings.ToLower(input)) | ||
| return input == "y" || input == "yes" | ||
| } | ||
|
|
||
| func validate(topo *topology.ZKTopology) error { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. μ΄ λ©μλλ topology ν¨ν€μ§ λ΄μ μλκ² λ«μ§ μλμ? |
||
| if len(topo.Servers) == 0 { | ||
| return fmt.Errorf("no servers defined in topology") | ||
| } | ||
|
|
||
| seenMyID := map[int]bool{} | ||
| seenAddress := map[string]bool{} | ||
|
|
||
| for _, s := range topo.Servers { | ||
| if seenMyID[s.MyID] { | ||
| return fmt.Errorf("duplicate myid: %d", s.MyID) | ||
| } | ||
| seenMyID[s.MyID] = true | ||
|
|
||
| if seenAddress[s.Address] { | ||
| return fmt.Errorf("duplicate address: %s", s.Address) | ||
| } | ||
| seenAddress[s.Address] = true | ||
|
|
||
| if s.Config.DataDir == "" { | ||
| return fmt.Errorf("server myid=%d: data_dir is required", s.MyID) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package zk | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "os" | ||
| "os/exec" | ||
| "path/filepath" | ||
|
|
||
| "github.com/jam2in/arcusctl/internal" | ||
| ) | ||
|
|
||
| const zkDownloadURLTemplate = "https://archive.apache.org/dist/zookeeper/zookeeper-%s/apache-zookeeper-%s-bin.tar.gz" | ||
|
|
||
| func ensureDownloaded(version string) (string, error) { | ||
| dir := imageDir() | ||
|
|
||
| if err := os.MkdirAll(dir, 0755); err != nil { | ||
| return "", fmt.Errorf("create image dir: %w", err) | ||
| } | ||
|
|
||
| filename := fmt.Sprintf("apache-zookeeper-%s-bin.tar.gz", version) | ||
| localPath := filepath.Join(dir, filename) | ||
|
|
||
| if _, err := os.Stat(localPath); err == nil { | ||
| fmt.Printf("Using cached file: %s\n", localPath) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cached -> existing ννμ΄ λ«μ§ μμκΉμ? |
||
| return localPath, nil | ||
| } | ||
|
|
||
| url := fmt.Sprintf(zkDownloadURLTemplate, version, version) | ||
| fmt.Printf("Downloading %s...\n", url) | ||
|
|
||
| cmd := exec.Command("wget", "-q", "-O", localPath, url) | ||
| cmd.Stdout = os.Stdout | ||
| cmd.Stderr = os.Stderr | ||
| if err := cmd.Run(); err != nil { | ||
| _ = os.Remove(localPath) | ||
| return "", fmt.Errorf("download %q failed: %w", version, err) | ||
| } | ||
|
|
||
| return localPath, nil | ||
| } | ||
|
|
||
| func imageDir() string { | ||
| return filepath.Join(internal.Config.Home, "images", "zookeeper") | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.