Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src-tauri/src/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct CloudState {
pub last_sync: Mutex<Option<i64>>, // unix timestamp ms
pub queue_size: Mutex<u64>, // number of pending items
pub latest_snapshot: Mutex<Option<serde_json::Value>>,
pub latest_miners: Mutex<Option<serde_json::Value>>,
}

impl CloudState {
Expand All @@ -44,6 +45,7 @@ impl CloudState {
last_sync: Mutex::new(None),
queue_size: Mutex::new(0),
latest_snapshot: Mutex::new(None),
latest_miners: Mutex::new(None),
}
}
}
Expand Down
41 changes: 39 additions & 2 deletions src-tauri/src/cloud/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ pub fn open_queue() -> Result<Connection, String> {
conn.execute_batch("PRAGMA journal_mode=WAL;")
.map_err(|e| format!("Failed to set WAL mode: {}", e))?;

// Create table if not exists
// Create table if not exists. The CHECK constraint allows the kinds the
// sync loop currently knows how to push.
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS cloud_sync_queue (
id INTEGER PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert')),
kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert', 'miners')),
payload_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
Expand All @@ -39,6 +40,42 @@ pub fn open_queue() -> Result<Connection, String> {
ON cloud_sync_queue(created_at);"
).map_err(|e| format!("Failed to create queue table: {}", e))?;

// Migration: older installs created the table with a CHECK that allowed
// only ('snapshot', 'alert'). SQLite can't ALTER a CHECK constraint, so
// rebuild the table when the existing definition doesn't include 'miners'.
let existing_sql: String = conn
.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='cloud_sync_queue'",
[],
|row| row.get(0),
)
.unwrap_or_default();
if !existing_sql.is_empty() && !existing_sql.contains("'miners'") {
log::info!("Cloud queue: migrating CHECK constraint to allow 'miners' kind");
conn.execute_batch(
"BEGIN;
CREATE TABLE cloud_sync_queue_new (
id INTEGER PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('snapshot', 'alert', 'miners')),
payload_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT
);
INSERT INTO cloud_sync_queue_new
(id, kind, payload_json, created_at, attempts, last_attempt_at, last_error)
SELECT id, kind, payload_json, created_at, attempts, last_attempt_at, last_error
FROM cloud_sync_queue;
DROP TABLE cloud_sync_queue;
ALTER TABLE cloud_sync_queue_new RENAME TO cloud_sync_queue;
CREATE INDEX IF NOT EXISTS cloud_sync_queue_created_idx
ON cloud_sync_queue(created_at);
COMMIT;",
)
.map_err(|e| format!("Failed to migrate queue table: {}", e))?;
}

Ok(conn)
}

Expand Down
37 changes: 34 additions & 3 deletions src-tauri/src/cloud/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,33 @@ pub async fn start_sync_loop(
}
}

// --- 2. Drain offline queue ---
// --- 2. Push latest miners state if available ---
let miners = {
cloud_state.latest_miners.lock().unwrap().take()
};

if let Some(payload) = miners {
log::info!("Cloud sync: pushing miner state to cloud");
match client::push_miners(&api_key, &payload).await {
Ok(()) => {
let now_ms = chrono::Utc::now().timestamp_millis();
*cloud_state.last_sync.lock().unwrap() = Some(now_ms);
*cloud_state.status.lock().unwrap() = CloudSyncStatus::Connected;
log::info!("Cloud sync: miner state pushed successfully");
}
Err(e) => {
log::warn!("Cloud sync: miner push failed, queueing — {}", e);
if let Err(qe) = queue::enqueue("miners", &payload) {
log::warn!("Cloud sync: failed to enqueue miners: {}", qe);
}
if e.contains("(401)") || e.contains("(403)") {
*cloud_state.status.lock().unwrap() = CloudSyncStatus::AuthRequired;
}
}
}
}

// --- 3. Drain offline queue ---
match queue::peek(10) {
Ok(items) => {
for item in items {
Expand All @@ -73,6 +99,11 @@ pub async fn start_sync_loop(
serde_json::from_str(&item.payload_json).unwrap_or_default();
client::push_alert(&api_key, &payload).await
}
"miners" => {
let payload: serde_json::Value =
serde_json::from_str(&item.payload_json).unwrap_or_default();
client::push_miners(&api_key, &payload).await
}
other => {
log::warn!("Cloud sync: unknown queue item kind '{}'", other);
// Remove unknown items
Expand Down Expand Up @@ -110,12 +141,12 @@ pub async fn start_sync_loop(
}
}

// --- 3. Update queue size ---
// --- 4. Update queue size ---
if let Ok(count) = queue::count() {
*cloud_state.queue_size.lock().unwrap() = count;
}

// --- 4. Periodic prune (every ~100 cycles ≈ 100 minutes) ---
// --- 5. Periodic prune (every ~100 cycles ≈ 100 minutes) ---
if cycle % 100 == 0 {
if let Err(e) = queue::prune() {
log::warn!("Cloud sync: prune failed: {}", e);
Expand Down
130 changes: 127 additions & 3 deletions src-tauri/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ pub fn spawn_all(
let app = app.clone();
let cache = Arc::clone(&cache);
let mobile_state = Arc::clone(&mobile_state);
let _popminer_state = Arc::clone(&popminer_state);
let popminer_state = Arc::clone(&popminer_state);
tauri::async_runtime::spawn(async move {
snapshot_loop(app, cache, mobile_state).await;
snapshot_loop(app, cache, mobile_state, popminer_state).await;
});
}

Expand Down Expand Up @@ -318,6 +318,7 @@ async fn snapshot_loop(
app: AppHandle,
cache: Arc<CachedFarmState>,
mobile_state: Arc<MobileMinersState>,
popminer_state: Arc<PopMinerDevicesState>,
) {
log::info!(
"Background poller: snapshot task starting (initial delay {}s)",
Expand All @@ -329,7 +330,7 @@ async fn snapshot_loop(
// tick resolves immediately, which would cause two snapshots in rapid
// succession on startup before settling into the 60s cadence.
loop {
build_and_persist_snapshot(&app, &cache, &mobile_state).await;
build_and_persist_snapshot(&app, &cache, &mobile_state, &popminer_state).await;
tokio::time::sleep(Duration::from_secs(SNAPSHOT_INTERVAL_SECS)).await;
}
}
Expand All @@ -338,6 +339,7 @@ async fn build_and_persist_snapshot(
app: &AppHandle,
cache: &Arc<CachedFarmState>,
mobile_state: &Arc<MobileMinersState>,
popminer_state: &Arc<PopMinerDevicesState>,
) {
let asic = cache.asic_miners.lock().unwrap().clone();
let saved_miners = crate::commands::storage::get_saved_miners().unwrap_or_default();
Expand All @@ -349,6 +351,11 @@ async fn build_and_persist_snapshot(
map.values().cloned().collect()
};

let popminer_devices: Vec<_> = {
let map = popminer_state.saved.lock().unwrap();
map.values().cloned().collect()
};

// Aggregate by coin: ASIC hashrate in its native unit, mobile in H/s (kept
// separate because the unit differs). The existing schema stores hashrate
// as f64 in the unit reported by ASIC miners (typically GH/s); we keep
Expand Down Expand Up @@ -418,6 +425,12 @@ async fn build_and_persist_snapshot(
log::warn!("Snapshot persist failed: {}", e);
}

// Stage per-miner state for the cloud sync loop. This mirrors the snapshot
// staging in history::add_farm_snapshot, but for the /ingest/miners
// endpoint which populates the cloud `miner_states` table (drives the web
// Dashboard's "Total Miners" tile).
stage_miners_for_cloud(app, &asic, &saved_miners, &mobile_miners, &popminer_devices);

{
let mut slot = cache.farm_snapshot.lock().unwrap();
*slot = Some(snapshot);
Expand All @@ -427,6 +440,117 @@ async fn build_and_persist_snapshot(
let _ = app.emit("farm-state-updated", ());
}

/// Convert local ASIC / mobile / PoPMiner records into the shape the
/// `/api/v1/ingest/miners` endpoint expects, stage it on `CloudState`, and
/// enqueue a fallback copy so the next sync cycle picks it up. Mirrors the
/// snapshot wiring in `commands::history::add_farm_snapshot`.
fn stage_miners_for_cloud(
app: &AppHandle,
asic: &[MinerInfo],
saved_miners: &[SavedMiner],
mobile_miners: &[crate::commands::mobile_miner::MobileMiner],
popminer_devices: &[crate::popminer_device::PopMinerDevice],
) {
use tauri::Manager;

let cloud_state = match app.try_state::<Arc<crate::cloud::CloudState>>() {
Some(s) => s,
None => {
log::debug!("Cloud: miners staging skipped — CloudState not available");
return;
}
};

if cloud_state.api_key.lock().unwrap().is_none() {
log::debug!("Cloud: miners staging skipped — no API key");
return;
}

let mut miners_json: Vec<serde_json::Value> = Vec::new();

// ASIC miners — minerId keyed on IP (stable per device on a LAN). Label
// resolved the same way the Dashboard does it so cloud row labels match.
for m in asic {
let saved = saved_miners.iter().find(|s| s.ip == m.ip);
let label = saved
.map(|s| s.label.clone())
.filter(|l| !l.is_empty())
.or_else(|| (!m.hostname.is_empty()).then(|| m.hostname.clone()));
let coin = saved.map(|s| s.coin_id.clone());

miners_json.push(serde_json::json!({
"minerType": "asic",
"minerId": m.ip,
"label": label,
"coin": coin,
"online": m.online,
"hashrate": m.rt_hashrate,
"state": {
"model": m.model,
"manufacturer": m.manufacturer,
"firmware": m.firmware,
"hashrateUnit": m.hashrate_unit,
"runtimeSecs": m.runtime_secs,
"hwErrors": m.hw_errors,
}
}));
}

// Mobile miners — minerId keyed on the device_id assigned at registration.
for mm in mobile_miners {
miners_json.push(serde_json::json!({
"minerType": "mobile",
"minerId": mm.device_id,
"label": (!mm.name.is_empty()).then(|| mm.name.clone()),
"coin": (!mm.coin.is_empty()).then(|| mm.coin.clone()),
"online": mm.is_online,
"hashrate": mm.hashrate_hs,
"state": {
"model": mm.model,
"manufacturer": mm.manufacturer,
"throttleState": mm.throttle_state,
"cpuTemp": mm.cpu_temp,
"batteryLevel": mm.battery_level,
"batteryCharging": mm.battery_charging,
"runtimeSeconds": mm.runtime_seconds,
}
}));
}

// PoPMiner ESP32 devices — minerId keyed on MAC (stable, what discovery
// already uses as the map key).
for d in popminer_devices {
miners_json.push(serde_json::json!({
"minerType": "popminer",
"minerId": d.mac,
"label": (!d.name.is_empty()).then(|| d.name.clone()),
"coin": serde_json::Value::Null,
"online": d.online,
"hashrate": d.hashrate,
"state": {
"model": d.model,
"firmware": d.fw,
"pool": d.pool,
"uptimeSecs": d.uptime_s,
"accepted": d.accepted,
"rejected": d.rejected,
"mining": d.mining,
}
}));
}

if miners_json.is_empty() {
// Nothing to push this cycle (no miners configured). Don't overwrite
// the staged payload with an empty list — the API would happily store
// count=0, but there's no value in round-tripping that.
return;
}

let payload = serde_json::json!({ "miners": miners_json });
*cloud_state.latest_miners.lock().unwrap() = Some(payload.clone());
log::info!("Cloud: miners staged for sync ({} entries)", miners_json.len());
}

// ─── Helpers shared across tasks ────────────────────────────────────────────

/// Send desktop notifications and emails for any alerts that fired this cycle.
Expand Down
Loading