Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ examples/databricks/serverless dev \
-e DATABRICKS_ACCOUNT_ID=${DATABRICKS_ACCOUNT_ID} \
-e DATABRICKS_AWS_ACCOUNT_ID=${DATABRICKS_AWS_ACCOUNT_ID} \
--dry-run

pgrep -f "stackql srv"
kill $(pgrep -f "stackql srv")
11 changes: 6 additions & 5 deletions examples/databricks/serverless/resources/aws/s3/buckets.iql
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ FROM awscc.s3.buckets
WHERE
region = 'us-east-1' AND
Identifier = '{{ bucket_name }}' AND
BucketEncryption = '{{ bucket_encryption }}' AND
OwnershipControls = '{{ ownership_controls }}' AND
PublicAccessBlockConfiguration = '{{ public_access_block_configuration }}' AND
Tags = '{{ tags }}' AND
VersioningConfiguration = '{{ versioning_configuration }}'
JSON_EQUAL(bucket_encryption, '{{ bucket_encryption }}') AND
JSON_EQUAL(ownership_controls, '{{ ownership_controls }}') AND
JSON_EQUAL(public_access_block_configuration, '{{ public_access_block_configuration }}') AND
JSON_EQUAL(tags, '{{ tags }}') AND
JSON_EQUAL(versioning_configuration, '{{ versioning_configuration }}')
;


/*+ exports */
SELECT
bucket_name,
Expand Down
42 changes: 36 additions & 6 deletions src/utils/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::process::{Command as ProcessCommand, Stdio};
use std::thread;
use std::time::Duration;

use log::{error, info, warn};
use log::{debug, error, info, warn};

use crate::app::{DEFAULT_LOG_FILE, LOCAL_SERVER_ADDRESSES};
use crate::globals::{server_host, server_port};
Expand Down Expand Up @@ -73,9 +73,11 @@ pub struct RunningServer {

/// Check if the stackql server is running on a specific port
pub fn is_server_running(port: u16) -> bool {
find_all_running_servers()
.iter()
.any(|server| server.port == port)
let servers = find_all_running_servers();
debug!("is_server_running({}): found {} candidate server(s): {:?}", port, servers.len(), servers.iter().map(|s| format!("pid={} port={}", s.pid, s.port)).collect::<Vec<_>>());
let result = servers.iter().any(|server| server.port == port);
debug!("is_server_running({}) -> {}", port, result);
result
}

/// Find all stackql servers that are running and their ports
Expand All @@ -101,21 +103,32 @@ pub fn find_all_running_servers() -> Vec<RunningServer> {
} else {
let output = ProcessCommand::new("pgrep")
.arg("-f")
.arg("stackql")
.arg("stackql srv")
.output()
.unwrap_or_else(|_| panic!("Failed to execute pgrep"));

if !output.stdout.is_empty() {
let pids_str = String::from_utf8_lossy(&output.stdout).to_string();
let pids = pids_str.trim().split('\n').collect::<Vec<&str>>();
debug!("find_all_running_servers: pgrep found {} PID(s): {:?}", pids.len(), pids);

for pid_str in pids {
if let Ok(pid) = pid_str.trim().parse::<u32>() {
// Log the full command line of this PID before attempting port extraction
if let Ok(ps_out) = ProcessCommand::new("ps").arg("-p").arg(pid.to_string()).arg("-o").arg("args").output() {
let cmdline = String::from_utf8_lossy(&ps_out.stdout);
debug!("find_all_running_servers: PID {} cmdline: {}", pid, cmdline.trim());
}
if let Some(port) = extract_port_from_ps(pid_str) {
debug!("find_all_running_servers: PID {} -> port {}", pid, port);
running_servers.push(RunningServer { pid, port });
} else {
debug!("find_all_running_servers: PID {} -> no --pgsrv.port found, skipping", pid);
}
}
}
} else {
debug!("find_all_running_servers: pgrep returned no matching PIDs");
}
}

Expand Down Expand Up @@ -204,15 +217,22 @@ pub fn get_server_pid(port: u16) -> Option<u32> {

/// Start the stackql server with the given options
pub fn start_server(options: &StartServerOptions) -> Result<u32, String> {
debug!("start_server called: host={}, port={}", options.host, options.port);

let binary_path = match get_binary_path() {
Some(path) => path,
Some(path) => {
debug!("Using stackql binary at: {:?}", path);
path
}
_none => return Err("stackql binary not found".to_string()),
};

debug!("Checking if server is already running on port {}...", options.port);
if is_server_running(options.port) {
info!("Server is already running on port {}", options.port);
return Ok(get_server_pid(options.port).unwrap_or(0));
}
debug!("Server not running on port {}; proceeding to start.", options.port);

let mut cmd = ProcessCommand::new(&binary_path);
cmd.arg("srv");
Expand Down Expand Up @@ -247,6 +267,7 @@ pub fn start_server(options: &StartServerOptions) -> Result<u32, String> {
.open(log_path)
.map_err(|e| format!("Failed to open log file: {}", e))?;

debug!("Spawning stackql server process (log -> {:?})...", log_path);
let child = cmd
.stdout(Stdio::from(log_file.try_clone().unwrap()))
.stderr(Stdio::from(log_file))
Expand All @@ -255,8 +276,10 @@ pub fn start_server(options: &StartServerOptions) -> Result<u32, String> {

let pid = child.id();
info!("Starting stackql server with PID: {}", pid);
debug!("Waiting 5 seconds for server on port {} to become ready...", options.port);
thread::sleep(Duration::from_secs(5));

debug!("Re-checking if server is running on port {}...", options.port);
if is_server_running(options.port) {
info!("Server started successfully on port {}", options.port);
Ok(pid)
Expand Down Expand Up @@ -315,10 +338,14 @@ pub fn check_and_start_server() {
let host = server_host();
let port = server_port();

debug!("check_and_start_server: host={}, port={}", host, port);

if LOCAL_SERVER_ADDRESSES.contains(&host) {
debug!("Host '{}' is local; checking if server is running on port {}...", host, port);
if is_server_running(port) {
info!("Local server is already running on port {}.", port);
} else {
debug!("Server not detected on port {}; will attempt to start it.", port);
info!("Server not running. Starting server...");

let options = StartServerOptions {
Expand All @@ -327,12 +354,15 @@ pub fn check_and_start_server() {
..Default::default()
};

debug!("StartServerOptions: host={}, port={}", options.host, options.port);

if let Err(e) = start_server(&options) {
error!("Failed to start server: {}", e);
process::exit(1);
}
}
} else {
debug!("Host '{}' is remote; skipping local server start.", host);
info!("Using remote server {}:{}", host, port);
}
}
Loading