-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbotctl
More file actions
executable file
·102 lines (80 loc) · 2.87 KB
/
botctl
File metadata and controls
executable file
·102 lines (80 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python3
import argparse
import os
import socket
import sys
BOTCTL_VERSION = "1.0.0"
# ------------------------------------------------------------
# DEFAULT SOCKET PATH (env override → default)
# ------------------------------------------------------------
DEFAULT_SOCKET = os.path.expanduser(
os.environ.get(
"BOT_CONTROL_SOCKET", "~/runbot/plugins/LocalControl/.localcontrol.sock"
)
)
# ------------------------------------------------------------
# Error class
# ------------------------------------------------------------
class BotCtlError(Exception):
pass
# ------------------------------------------------------------
# Core send/receive function
# ------------------------------------------------------------
def send_command(socket_path: str, line: str) -> str:
if not os.path.exists(socket_path):
raise BotCtlError(f"Socket not found: {socket_path}")
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(socket_path)
s.sendall((line + "\n").encode("utf-8"))
data = s.recv(65536)
except ConnectionRefusedError:
raise BotCtlError("Connection refused. Is the bot running?")
except Exception as e:
raise BotCtlError(f"Socket error: {e}")
return data.decode("utf-8").strip()
# ------------------------------------------------------------
# Command handlers
# ------------------------------------------------------------
def handle_exec(args):
return send_command(args.socket, args.command)
def handle_bot(args):
return send_command(args.socket, " ".join(args.subcommand))
# ------------------------------------------------------------
# Main CLI
# ------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="LocalControl CLI for Limnoria bots")
# Global socket override
parser.add_argument(
"--socket",
default=DEFAULT_SOCKET,
help=f"Path to LocalControl socket (default: {DEFAULT_SOCKET})",
)
parser.add_argument(
"--version",
action="version",
version=f"botctl {BOTCTL_VERSION}",
)
subparsers = parser.add_subparsers(dest="action", required=True)
# exec
p_exec = subparsers.add_parser("exec", help="Send raw command")
p_exec.add_argument("command", help="Command to send")
p_exec.set_defaults(func=handle_exec)
# bot
p_bot = subparsers.add_parser("bot", help="Bot-level commands")
p_bot.add_argument(
"subcommand",
nargs="+",
help="Bot command tokens (e.g., sysinfo or say #channel hello)",
)
p_bot.set_defaults(func=handle_bot)
args = parser.parse_args()
try:
output = args.func(args)
print(output)
except BotCtlError as e:
print(f"botctl: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()