diff --git a/src-tauri/src/cloud/mod.rs b/src-tauri/src/cloud/mod.rs index 11d51ca..50efdac 100644 --- a/src-tauri/src/cloud/mod.rs +++ b/src-tauri/src/cloud/mod.rs @@ -31,6 +31,7 @@ pub struct CloudState { pub last_sync: Mutex>, // unix timestamp ms pub queue_size: Mutex, // number of pending items pub latest_snapshot: Mutex>, + pub latest_miners: Mutex>, } impl CloudState { @@ -44,6 +45,7 @@ impl CloudState { last_sync: Mutex::new(None), queue_size: Mutex::new(0), latest_snapshot: Mutex::new(None), + latest_miners: Mutex::new(None), } } } diff --git a/src-tauri/src/cloud/queue.rs b/src-tauri/src/cloud/queue.rs index 6310863..b5d64a8 100644 --- a/src-tauri/src/cloud/queue.rs +++ b/src-tauri/src/cloud/queue.rs @@ -24,11 +24,12 @@ pub fn open_queue() -> Result { conn.execute_batch("PRAGMA journal_mode=WAL;") .map_err(|e| format!("Failed to set WAL mode: {}", e))?; - // Create table if not exists + // Create table if not exists. The CHECK constraint allows the kinds the + // sync loop currently knows how to push. conn.execute_batch( "CREATE TABLE IF NOT EXISTS cloud_sync_queue ( id INTEGER PRIMARY KEY, - kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert')), + kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert', 'miners')), payload_json TEXT NOT NULL, created_at INTEGER NOT NULL, attempts INTEGER NOT NULL DEFAULT 0, @@ -39,6 +40,42 @@ pub fn open_queue() -> Result { ON cloud_sync_queue(created_at);" ).map_err(|e| format!("Failed to create queue table: {}", e))?; + // Migration: older installs created the table with a CHECK that allowed + // only ('snapshot', 'alert'). SQLite can't ALTER a CHECK constraint, so + // rebuild the table when the existing definition doesn't include 'miners'. + let existing_sql: String = conn + .query_row( + "SELECT sql FROM sqlite_master WHERE type='table' AND name='cloud_sync_queue'", + [], + |row| row.get(0), + ) + .unwrap_or_default(); + if !existing_sql.is_empty() && !existing_sql.contains("'miners'") { + log::info!("Cloud queue: migrating CHECK constraint to allow 'miners' kind"); + conn.execute_batch( + "BEGIN; + CREATE TABLE cloud_sync_queue_new ( + id INTEGER PRIMARY KEY, + kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert', 'miners')), + payload_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + last_attempt_at INTEGER, + last_error TEXT + ); + INSERT INTO cloud_sync_queue_new + (id, kind, payload_json, created_at, attempts, last_attempt_at, last_error) + SELECT id, kind, payload_json, created_at, attempts, last_attempt_at, last_error + FROM cloud_sync_queue; + DROP TABLE cloud_sync_queue; + ALTER TABLE cloud_sync_queue_new RENAME TO cloud_sync_queue; + CREATE INDEX IF NOT EXISTS cloud_sync_queue_created_idx + ON cloud_sync_queue(created_at); + COMMIT;", + ) + .map_err(|e| format!("Failed to migrate queue table: {}", e))?; + } + Ok(conn) } diff --git a/src-tauri/src/cloud/sync.rs b/src-tauri/src/cloud/sync.rs index c8d4935..fb53fc4 100644 --- a/src-tauri/src/cloud/sync.rs +++ b/src-tauri/src/cloud/sync.rs @@ -58,7 +58,33 @@ pub async fn start_sync_loop( } } - // --- 2. Drain offline queue --- + // --- 2. Push latest miners state if available --- + let miners = { + cloud_state.latest_miners.lock().unwrap().take() + }; + + if let Some(payload) = miners { + log::info!("Cloud sync: pushing miner state to cloud"); + match client::push_miners(&api_key, &payload).await { + Ok(()) => { + let now_ms = chrono::Utc::now().timestamp_millis(); + *cloud_state.last_sync.lock().unwrap() = Some(now_ms); + *cloud_state.status.lock().unwrap() = CloudSyncStatus::Connected; + log::info!("Cloud sync: miner state pushed successfully"); + } + Err(e) => { + log::warn!("Cloud sync: miner push failed, queueing — {}", e); + if let Err(qe) = queue::enqueue("miners", &payload) { + log::warn!("Cloud sync: failed to enqueue miners: {}", qe); + } + if e.contains("(401)") || e.contains("(403)") { + *cloud_state.status.lock().unwrap() = CloudSyncStatus::AuthRequired; + } + } + } + } + + // --- 3. Drain offline queue --- match queue::peek(10) { Ok(items) => { for item in items { @@ -73,6 +99,11 @@ pub async fn start_sync_loop( serde_json::from_str(&item.payload_json).unwrap_or_default(); client::push_alert(&api_key, &payload).await } + "miners" => { + let payload: serde_json::Value = + serde_json::from_str(&item.payload_json).unwrap_or_default(); + client::push_miners(&api_key, &payload).await + } other => { log::warn!("Cloud sync: unknown queue item kind '{}'", other); // Remove unknown items @@ -110,12 +141,12 @@ pub async fn start_sync_loop( } } - // --- 3. Update queue size --- + // --- 4. Update queue size --- if let Ok(count) = queue::count() { *cloud_state.queue_size.lock().unwrap() = count; } - // --- 4. Periodic prune (every ~100 cycles ≈ 100 minutes) --- + // --- 5. Periodic prune (every ~100 cycles ≈ 100 minutes) --- if cycle % 100 == 0 { if let Err(e) = queue::prune() { log::warn!("Cloud sync: prune failed: {}", e); diff --git a/src-tauri/src/poller.rs b/src-tauri/src/poller.rs index 840e5df..45a1fa5 100644 --- a/src-tauri/src/poller.rs +++ b/src-tauri/src/poller.rs @@ -52,9 +52,9 @@ pub fn spawn_all( let app = app.clone(); let cache = Arc::clone(&cache); let mobile_state = Arc::clone(&mobile_state); - let _popminer_state = Arc::clone(&popminer_state); + let popminer_state = Arc::clone(&popminer_state); tauri::async_runtime::spawn(async move { - snapshot_loop(app, cache, mobile_state).await; + snapshot_loop(app, cache, mobile_state, popminer_state).await; }); } @@ -318,6 +318,7 @@ async fn snapshot_loop( app: AppHandle, cache: Arc, mobile_state: Arc, + popminer_state: Arc, ) { log::info!( "Background poller: snapshot task starting (initial delay {}s)", @@ -329,7 +330,7 @@ async fn snapshot_loop( // tick resolves immediately, which would cause two snapshots in rapid // succession on startup before settling into the 60s cadence. loop { - build_and_persist_snapshot(&app, &cache, &mobile_state).await; + build_and_persist_snapshot(&app, &cache, &mobile_state, &popminer_state).await; tokio::time::sleep(Duration::from_secs(SNAPSHOT_INTERVAL_SECS)).await; } } @@ -338,6 +339,7 @@ async fn build_and_persist_snapshot( app: &AppHandle, cache: &Arc, mobile_state: &Arc, + popminer_state: &Arc, ) { let asic = cache.asic_miners.lock().unwrap().clone(); let saved_miners = crate::commands::storage::get_saved_miners().unwrap_or_default(); @@ -349,6 +351,11 @@ async fn build_and_persist_snapshot( map.values().cloned().collect() }; + let popminer_devices: Vec<_> = { + let map = popminer_state.saved.lock().unwrap(); + map.values().cloned().collect() + }; + // Aggregate by coin: ASIC hashrate in its native unit, mobile in H/s (kept // separate because the unit differs). The existing schema stores hashrate // as f64 in the unit reported by ASIC miners (typically GH/s); we keep @@ -418,6 +425,12 @@ async fn build_and_persist_snapshot( log::warn!("Snapshot persist failed: {}", e); } + // Stage per-miner state for the cloud sync loop. This mirrors the snapshot + // staging in history::add_farm_snapshot, but for the /ingest/miners + // endpoint which populates the cloud `miner_states` table (drives the web + // Dashboard's "Total Miners" tile). + stage_miners_for_cloud(app, &asic, &saved_miners, &mobile_miners, &popminer_devices); + { let mut slot = cache.farm_snapshot.lock().unwrap(); *slot = Some(snapshot); @@ -427,6 +440,117 @@ async fn build_and_persist_snapshot( let _ = app.emit("farm-state-updated", ()); } +/// Convert local ASIC / mobile / PoPMiner records into the shape the +/// `/api/v1/ingest/miners` endpoint expects, stage it on `CloudState`, and +/// enqueue a fallback copy so the next sync cycle picks it up. Mirrors the +/// snapshot wiring in `commands::history::add_farm_snapshot`. +fn stage_miners_for_cloud( + app: &AppHandle, + asic: &[MinerInfo], + saved_miners: &[SavedMiner], + mobile_miners: &[crate::commands::mobile_miner::MobileMiner], + popminer_devices: &[crate::popminer_device::PopMinerDevice], +) { + use tauri::Manager; + + let cloud_state = match app.try_state::>() { + Some(s) => s, + None => { + log::debug!("Cloud: miners staging skipped — CloudState not available"); + return; + } + }; + + if cloud_state.api_key.lock().unwrap().is_none() { + log::debug!("Cloud: miners staging skipped — no API key"); + return; + } + + let mut miners_json: Vec = Vec::new(); + + // ASIC miners — minerId keyed on IP (stable per device on a LAN). Label + // resolved the same way the Dashboard does it so cloud row labels match. + for m in asic { + let saved = saved_miners.iter().find(|s| s.ip == m.ip); + let label = saved + .map(|s| s.label.clone()) + .filter(|l| !l.is_empty()) + .or_else(|| (!m.hostname.is_empty()).then(|| m.hostname.clone())); + let coin = saved.map(|s| s.coin_id.clone()); + + miners_json.push(serde_json::json!({ + "minerType": "asic", + "minerId": m.ip, + "label": label, + "coin": coin, + "online": m.online, + "hashrate": m.rt_hashrate, + "state": { + "model": m.model, + "manufacturer": m.manufacturer, + "firmware": m.firmware, + "hashrateUnit": m.hashrate_unit, + "runtimeSecs": m.runtime_secs, + "hwErrors": m.hw_errors, + } + })); + } + + // Mobile miners — minerId keyed on the device_id assigned at registration. + for mm in mobile_miners { + miners_json.push(serde_json::json!({ + "minerType": "mobile", + "minerId": mm.device_id, + "label": (!mm.name.is_empty()).then(|| mm.name.clone()), + "coin": (!mm.coin.is_empty()).then(|| mm.coin.clone()), + "online": mm.is_online, + "hashrate": mm.hashrate_hs, + "state": { + "model": mm.model, + "manufacturer": mm.manufacturer, + "throttleState": mm.throttle_state, + "cpuTemp": mm.cpu_temp, + "batteryLevel": mm.battery_level, + "batteryCharging": mm.battery_charging, + "runtimeSeconds": mm.runtime_seconds, + } + })); + } + + // PoPMiner ESP32 devices — minerId keyed on MAC (stable, what discovery + // already uses as the map key). + for d in popminer_devices { + miners_json.push(serde_json::json!({ + "minerType": "popminer", + "minerId": d.mac, + "label": (!d.name.is_empty()).then(|| d.name.clone()), + "coin": serde_json::Value::Null, + "online": d.online, + "hashrate": d.hashrate, + "state": { + "model": d.model, + "firmware": d.fw, + "pool": d.pool, + "uptimeSecs": d.uptime_s, + "accepted": d.accepted, + "rejected": d.rejected, + "mining": d.mining, + } + })); + } + + if miners_json.is_empty() { + // Nothing to push this cycle (no miners configured). Don't overwrite + // the staged payload with an empty list — the API would happily store + // count=0, but there's no value in round-tripping that. + return; + } + + let payload = serde_json::json!({ "miners": miners_json }); + *cloud_state.latest_miners.lock().unwrap() = Some(payload.clone()); + log::info!("Cloud: miners staged for sync ({} entries)", miners_json.len()); +} + // ─── Helpers shared across tasks ──────────────────────────────────────────── /// Send desktop notifications and emails for any alerts that fired this cycle.