Skip to content

Commit cb31472

Browse files
feat: robust qstat <jobID> filtering and process tree cleanup (v0.9.2)
1 parent 11f9e36 commit cb31472

9 files changed

Lines changed: 304 additions & 42 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
[package]
22
name = "fbqueue"
3-
version = "0.9.1"
3+
version = "0.9.2"
44
edition = "2021"
55

6-
[dependencies]
6+
[dependencies]
7+
8+
[target.'cfg(unix)'.dependencies]
9+
libc = "0.2"

MANUAL.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ Jobs are executed via their respective shells without modifying the original scr
144144

145145
| Command | Option | Description |
146146
| :--- | :--- | :--- |
147+
| `stat` | `[jobID]` | Filter by a specific job ID (supports `.master` suffix) |
147148
| `stat` | `--style pbs` | Use PBS-compatible tabular output |
149+
| `stat` | `-u <user>` | Filter jobs by specific username |
148150
| `stat` | `-H`, `--history [N]` | Show recent job history (last N jobs) |
149151
| `del` | `<job_id>` | Cancel a pending or running job |
150152

PBS_COMPATIBILITY.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ ln -s fbqueue qdel
2121
```
2222

2323
Once linked, FBQueue automatically detects how it was invoked and adjusts its behavior:
24-
- `qsub` behaves like `fbqueue sub`
25-
- `qstat` behaves like `fbqueue stat --style pbs`
24+
- `qsub` behaves like `fbqueue sub` (Outputs only `<id>.master` upon success)
25+
- `qstat` behaves like `fbqueue stat --style pbs` (Supports `qstat <jobID>`, `-u <user>`, and `-H`)
2626
- `qdel` behaves like `fbqueue del`
2727

2828
---

README.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ FBQueue addresses the gap between manual script execution and heavy enterprise s
3030
Download the pre-built static binary and place it in your `$PATH`:
3131

3232
```bash
33-
# 1. Download and extract
34-
wget https://github.com/ForblazeProject/fbqueue/releases/download/v0.9.1/fbqueue-linux-x64.tar.gz
35-
tar -xzvf fbqueue-linux-x64.tar.gz
33+
# 1. Download
34+
wget https://github.com/ForblazeProject/fbqueue/releases/download/v0.9.2/fbqueue-linux-x64.tar.gz
3635

37-
# 2. Move to your bin directory
36+
# 2. Extract and move
37+
tar -xzvf fbqueue-linux-x64.tar.gz
3838
mkdir -p ~/bin
39-
mv fbqueue ~/bin/
39+
mv fbqueue/fbqueue ~/bin/
4040

4141
# 3. (Optional) Create PBS-style symbolic links
4242
ln -s ~/bin/fbqueue ~/bin/qsub
@@ -81,6 +81,11 @@ fbqueue sub ./calc.sh
8181

8282
## 📜 Change Log
8383

84+
### v0.9.2
85+
- **Fixed Process Leak**: Improved job cancellation logic using process groups (`setsid`) on Unix and tree termination (`/T`) on Windows. Subprocesses spawned by job scripts are now correctly terminated.
86+
- **Robust Job Filtering**: `qstat <jobID>` (and `stat <jobID>`) now works reliably even if the job has already finished and moved to history. It also correctly handles the `.master` suffix.
87+
- **Dependency Update**: Added `libc` dependency for Unix systems to support advanced process management.
88+
8489
### v0.9.1
8590
- **Enhanced PBS Compatibility**: Added support for `qstat -u <user>` filtering.
8691
- **Improved History Display**: `qstat -H` now correctly shows job history in PBS-style format.
@@ -98,4 +103,4 @@ fbqueue sub ./calc.sh
98103
Website: [https://forblaze-works.com/en/](https://forblaze-works.com/en/)
99104

100105
### License
101-
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
106+
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

src/daemon.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ pub fn run_daemon() {
7070
let id = entry.file_name().to_str().unwrap().trim_end_matches(".job").to_string();
7171
if let Some(pos) = running_jobs.iter().position(|(rid, _, _, _, _, _)| rid == &id) {
7272
let (_, mut child, _, _, _, _) = running_jobs.remove(pos);
73-
let _ = child.kill();
74-
let _ = child.wait();
73+
kill_process_tree(&mut child);
7574
if let Ok(mut f) = fs::OpenOptions::new().append(true).open(fbq_dir.join("queue/running").join(entry.file_name())) {
7675
let _ = writeln!(f, "status: CANCELLED");
7776
}
@@ -86,8 +85,7 @@ pub fn run_daemon() {
8685
let (_id, ref mut child, _, _, start_at, walltime) = &mut running_jobs[i];
8786
if let Some(wt) = *walltime {
8887
if now - *start_at > wt {
89-
let _ = child.kill();
90-
let _ = child.wait();
88+
kill_process_tree(child);
9189
let (job_id, _, _, _, _, _) = running_jobs.remove(i);
9290
let fname = format!("{}.job", job_id);
9391
let job_path = fbq_dir.join("queue/running").join(&fname);
@@ -182,6 +180,12 @@ pub fn run_daemon() {
182180
process::Command::new(&j.cmd)
183181
};
184182

183+
#[cfg(unix)]
184+
{
185+
use std::os::unix::process::CommandExt;
186+
unsafe { child_cmd.pre_exec(|| { setsid(); Ok(()) }); }
187+
}
188+
185189
#[cfg(windows)]
186190
let mut child_cmd = if is_file {
187191
let cmd_lower = j.cmd.to_lowercase();
@@ -227,6 +231,30 @@ pub fn run_daemon() {
227231
}
228232
}
229233

234+
fn kill_process_tree(child: &mut process::Child) {
235+
let pid = child.id();
236+
#[cfg(unix)]
237+
{
238+
// Kill the entire process group (PGID is same as PID because of setsid)
239+
let pgid = pid as i32;
240+
unsafe {
241+
libc::kill(-pgid, libc::SIGKILL);
242+
}
243+
}
244+
#[cfg(windows)]
245+
{
246+
// /T kills child processes as well
247+
let _ = process::Command::new("taskkill")
248+
.arg("/F")
249+
.arg("/T")
250+
.arg("/PID")
251+
.arg(pid.to_string())
252+
.status();
253+
}
254+
let _ = child.kill();
255+
let _ = child.wait();
256+
}
257+
230258
fn prune_history(fbq_dir: &Path, limit: usize) {
231259
let mut entries = Vec::new();
232260
for dir in &["queue/done", "queue/failed"] {

src/job.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ pub fn submit_job(cmd_tmpl: &str, args_tmpl: &[String], cwd: &Path, val: Option<
174174
content.push_str(&format!("cwd: {}\n", cwd.display()));
175175
for (key, val) in env::vars() { content.push_str(&format!("env: {}={}\n", key, val)); }
176176

177-
fs::write(&job_file_path, content).expect("Failed to write job file");
178-
println!("Job submitted: {} ({}) [queue: {}, cost: {}]", job_id, final_name, final_queue, final_cost);
179-
}
177+
fs::write(&job_file_path, content).expect("Failed to write job file");
178+
179+
println!("{}.master", job_id);
180+
181+
}
182+
183+

src/stat.rs

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,42 @@ pub fn handle_stat(args: &[String], default_style: &str) {
1212

1313
let mut style = default_style;
1414
let mut show_history = None;
15+
let mut filter_user = None;
16+
let mut filter_job_id = None;
17+
1518
let mut i = 0;
1619
while i < args.len() {
17-
if args[i] == "--style" && i + 1 < args.len() {
20+
let arg = &args[i];
21+
if arg == "stat" || arg.ends_with("fbqueue") || arg.ends_with("qstat") {
22+
i += 1;
23+
continue;
24+
}
25+
if arg == "--style" && i + 1 < args.len() {
1826
style = &args[i+1];
1927
i += 2;
20-
} else if args[i] == "-H" || args[i] == "--history" {
28+
} else if arg == "-H" || arg == "--history" {
2129
if i + 1 < args.len() && args[i+1].parse::<usize>().is_ok() {
2230
show_history = Some(args[i+1].parse::<usize>().unwrap_or(conf.history_limit));
2331
i += 2;
2432
} else {
2533
show_history = Some(conf.history_limit);
2634
i += 1;
2735
}
36+
} else if arg == "-u" && i + 1 < args.len() {
37+
filter_user = Some(args[i+1].clone());
38+
i += 2;
39+
} else if !arg.starts_with('-') {
40+
// Positional argument: Job ID (handle both "123" and "123.master")
41+
let id_part = arg.split('.').next().unwrap_or(arg);
42+
if id_part.chars().next().map_or(false, |c| c.is_ascii_digit()) {
43+
filter_job_id = Some(id_part.to_string());
44+
}
45+
i += 1;
2846
} else {
2947
i += 1;
3048
}
3149
}
50+
// eprintln!("DEBUG: filter_job_id={:?}, filter_user={:?}, style={}", filter_job_id, filter_user, style);
3251

3352
let new_entries: Vec<_> = fs::read_dir(fbq_dir.join("queue/new")).map(|d| d.filter_map(|e| e.ok()).collect()).unwrap_or_default();
3453
let running_entries: Vec<_> = fs::read_dir(fbq_dir.join("queue/running")).map(|d| d.filter_map(|e| e.ok()).collect()).unwrap_or_default();
@@ -40,35 +59,62 @@ pub fn handle_stat(args: &[String], default_style: &str) {
4059

4160
for entry in &running_entries {
4261
if let Ok(j) = job::parse_job_file(&entry.path()) {
62+
if let Some(ref fid) = filter_job_id {
63+
let jid_norm = j.id.split('.').next().unwrap_or(&j.id);
64+
if jid_norm != fid { continue; }
65+
}
66+
if let Some(ref u) = filter_user { if &j.user != u { continue; } }
4367
*used_caps.entry(j.queue.clone()).or_insert(0) += j.cost;
4468
total_used += j.cost;
4569
running_jobs.push(j);
4670
}
4771
}
72+
4873
let mut sorted_new = new_entries;
4974
sorted_new.sort_by_key(|e| e.file_name().to_str().unwrap_or("0").trim_end_matches(".job").parse::<usize>().unwrap_or(0));
5075
for entry in sorted_new {
51-
if let Ok(j) = job::parse_job_file(&entry.path()) { pending_jobs.push(j); }
76+
if let Ok(j) = job::parse_job_file(&entry.path()) {
77+
if let Some(ref fid) = filter_job_id {
78+
let jid_norm = j.id.split('.').next().unwrap_or(&j.id);
79+
if jid_norm != fid { continue; }
80+
}
81+
if let Some(ref u) = filter_user { if &j.user != u { continue; } }
82+
pending_jobs.push(j);
83+
}
84+
}
85+
86+
let mut history_jobs = Vec::new();
87+
if show_history.is_some() || filter_job_id.is_some() {
88+
let limit = show_history.unwrap_or(conf.history_limit);
89+
for dir in &["queue/done", "queue/failed"] {
90+
if let Ok(entries) = fs::read_dir(fbq_dir.join(dir)) {
91+
for entry in entries.filter_map(|e| e.ok()) {
92+
if let Ok(j) = job::parse_job_file(&entry.path()) {
93+
if let Some(ref fid) = filter_job_id {
94+
let jid_norm = j.id.split('.').next().unwrap_or(&j.id);
95+
if jid_norm != fid { continue; }
96+
}
97+
if let Some(ref u) = filter_user { if &j.user != u { continue; } }
98+
history_jobs.push(j);
99+
}
100+
}
101+
}
102+
}
103+
history_jobs.sort_by(|a, b| b.id.parse::<usize>().unwrap_or(0).cmp(&a.id.parse::<usize>().unwrap_or(0)));
104+
if show_history.is_some() {
105+
history_jobs.truncate(limit);
106+
}
52107
}
53108

54109
let has_pending = !pending_jobs.is_empty();
55110
let has_running = !running_entries.is_empty();
56111

57112
if style == "pbs" {
58-
print_pbs_style(running_jobs, pending_jobs);
113+
print_pbs_style(running_jobs, pending_jobs, history_jobs, show_history.is_some() || filter_job_id.is_some());
59114
} else {
60115
if let Some(limit) = show_history {
61116
println!("Recent Job History (Last {}):", limit);
62-
let mut history = Vec::new();
63-
for dir in &["queue/done", "queue/failed"] {
64-
if let Ok(entries) = fs::read_dir(fbq_dir.join(dir)) {
65-
for entry in entries.filter_map(|e| e.ok()) {
66-
if let Ok(j) = job::parse_job_file(&entry.path()) { history.push(j); }
67-
}
68-
}
69-
}
70-
history.sort_by(|a, b| b.id.parse::<usize>().unwrap_or(0).cmp(&a.id.parse::<usize>().unwrap_or(0)));
71-
for j in history.iter().take(limit) {
117+
for j in history_jobs {
72118
let status = j.status.as_deref().unwrap_or("DONE");
73119
println!(" ID: {:>4} | NAME: {:<15} | USER: {:<10} | QUEUE: {:<10} | STATUS: {}", j.id, j.name, j.user, j.queue, status);
74120
}
@@ -108,21 +154,32 @@ pub fn handle_stat(args: &[String], default_style: &str) {
108154
}
109155
}
110156

111-
fn print_pbs_style(mut running: Vec<job::Job>, mut pending: Vec<job::Job>) {
157+
fn print_pbs_style(mut running: Vec<job::Job>, mut pending: Vec<job::Job>, history: Vec<job::Job>, is_history_mode: bool) {
112158
println!("{:<16} {:<16} {:<16} {:<8} S {:<5}", "Job id", "Name", "User", "Time Use", "Queue");
113159
println!("{:-<16} {:-<16} {:-<16} {:-<8} - {:-<5}", "", "", "", "", "");
114160

115161
let now = utils::get_now();
116-
running.sort_by_key(|j| j.id.parse::<usize>().unwrap_or(0));
117-
pending.sort_by_key(|j| j.id.parse::<usize>().unwrap_or(0));
118-
119-
for j in running {
120-
let elapsed = if let Some(start) = j.start_at { now - start } else { 0 };
121-
let time_use = format!("{:02}:{:02}:{:02}", elapsed / 3600, (elapsed % 3600) / 60, elapsed % 60);
122-
println!("{:<16} {:<16} {:<16} {:<8} R {:<5}", format!("{}.master", j.id), truncate(&j.name, 16), truncate(&j.user, 16), time_use, j.queue);
123-
}
124-
for j in pending {
125-
println!("{:<16} {:<16} {:<16} {:<8} Q {:<5}", format!("{}.master", j.id), truncate(&j.name, 16), truncate(&j.user, 16), "0", j.queue);
162+
163+
if !is_history_mode {
164+
running.sort_by_key(|j| j.id.parse::<usize>().unwrap_or(0));
165+
for j in running {
166+
let elapsed = if let Some(start) = j.start_at { now - start } else { 0 };
167+
let time_use = format!("{:02}:{:02}:{:02}", elapsed / 3600, (elapsed % 3600) / 60, elapsed % 60);
168+
println!("{:<16} {:<16} {:<16} {:<8} R {:<5}", format!("{}.master", j.id), truncate(&j.name, 16), truncate(&j.user, 16), time_use, j.queue);
169+
}
170+
pending.sort_by_key(|j| j.id.parse::<usize>().unwrap_or(0));
171+
for j in pending {
172+
println!("{:<16} {:<16} {:<16} {:<8} Q {:<5}", format!("{}.master", j.id), truncate(&j.name, 16), truncate(&j.user, 16), "0", j.queue);
173+
}
174+
} else {
175+
for j in history {
176+
let status_char = match j.status.as_deref() {
177+
Some("DONE") => "F",
178+
Some("FAILED") | Some("CANCELLED") | Some("TIMEOUT") => "E",
179+
_ => "F",
180+
};
181+
println!("{:<16} {:<16} {:<16} {:<8} {} {:<5}", format!("{}.master", j.id), truncate(&j.name, 16), truncate(&j.user, 16), "0", status_char, j.queue);
182+
}
126183
}
127184
}
128185

0 commit comments

Comments
 (0)