-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_client.py
More file actions
148 lines (125 loc) · 5.18 KB
/
Copy pathtest_client.py
File metadata and controls
148 lines (125 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
"""
OpenWinBot integration test client.
What it does:
1. Subscribes to win-observer (tcp://localhost:5555) and waits for one frame.
2. Prints every element found in the target window.
3. Searches for a button whose name matches a CLI-supplied string (default "9").
4. Sends a click command to win-actuator (tcp://localhost:5556) using
the centre coordinates of that button.
5. Prints the actuator's reply.
Usage:
pip install pyzmq
python test_client.py # clicks the "9" button
python test_client.py --click "Equals" # clicks the "=" button
python test_client.py --type "42" # types "42" into the window
python test_client.py --dump # just print the element tree
"""
import argparse
import json
import sys
import time
import zmq
OBS_ADDR = "tcp://localhost:5555"
ACT_ADDR = "tcp://localhost:5556"
TIMEOUT_MS = 5000 # how long to wait for a state frame
def recv_state(timeout_ms: int = TIMEOUT_MS) -> dict:
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.RCVTIMEO, timeout_ms)
sub.setsockopt_string(zmq.SUBSCRIBE, "") # subscribe to all topics
sub.connect(OBS_ADDR)
try:
raw = sub.recv()
return json.loads(raw)
except zmq.Again:
print(f"[test] Timed out waiting for a frame from {OBS_ADDR}", file=sys.stderr)
print("[test] Is win-observer running?", file=sys.stderr)
sys.exit(1)
finally:
sub.close()
ctx.term()
def send_action(action: dict) -> dict:
ctx = zmq.Context()
req = ctx.socket(zmq.REQ)
req.setsockopt(zmq.RCVTIMEO, TIMEOUT_MS)
req.connect(ACT_ADDR)
try:
req.send_string(json.dumps(action))
raw = req.recv()
return json.loads(raw)
except zmq.Again:
print(f"[test] Timed out waiting for reply from {ACT_ADDR}", file=sys.stderr)
print("[test] Is win-actuator running?", file=sys.stderr)
sys.exit(1)
finally:
req.close()
ctx.term()
def main():
parser = argparse.ArgumentParser(description="OpenWinBot test client")
parser.add_argument("--obs", default=OBS_ADDR, help="Observer PUB address")
parser.add_argument("--act", default=ACT_ADDR, help="Actuator REP address")
parser.add_argument("--click", metavar="NAME", help="Click button by name")
parser.add_argument("--type", metavar="TEXT", help="Type text into window")
parser.add_argument("--dump", action="store_true", help="Dump element tree and exit")
args = parser.parse_args()
# ── Step 1: receive one state frame ─────────────────────────────────────
print(f"[test] Connecting to observer at {args.obs} …")
state = recv_state()
window = state.get("window", "?")
elements = state.get("elements", [])
ts = state.get("timestamp", 0)
print(f"[test] Frame received window={window!r} elements={len(elements)}"
f" ts={ts}")
print()
# ── Step 2: print element tree ───────────────────────────────────────────
col_id = 18
col_type = 14
col_name = 30
header = f"{'ID':<{col_id}} {'TYPE':<{col_type}} {'NAME':<{col_name}} RECT"
print(header)
print("-" * len(header))
for el in elements:
r = el.get("rect", {})
rect_str = f"({r.get('x')}, {r.get('y')}, {r.get('w')}×{r.get('h')})"
print(f"{el['id']:<{col_id}} {el['type']:<{col_type}} "
f"{el.get('name', ''):<{col_name}} {rect_str}")
print()
if args.dump:
return
# ── Step 3: build action ─────────────────────────────────────────────────
if args.type:
action = {"action": "type", "text": args.type}
else:
target_name = args.click if args.click else "9"
match = next(
(e for e in elements
if e.get("type") == "Button"
and e.get("name", "").strip() == target_name),
None
)
if not match:
# Fallback: case-insensitive substring match
match = next(
(e for e in elements
if target_name.lower() in e.get("name", "").lower()),
None
)
if not match:
print(f"[test] Button {target_name!r} not found in element tree.",
file=sys.stderr)
sys.exit(1)
r = match["rect"]
cx = r["x"] + r["w"] // 2
cy = r["y"] + r["h"] // 2
action = {"action": "click", "x": cx, "y": cy}
print(f"[test] Found {match['name']!r} at id={match['id']} "
f"→ clicking centre ({cx}, {cy})")
# ── Step 4: send action and print reply ──────────────────────────────────
print(f"[test] Sending: {json.dumps(action)}")
reply = send_action(action)
print(f"[test] Reply : {json.dumps(reply)}")
if reply.get("status") != "ok":
sys.exit(1)
if __name__ == "__main__":
main()