Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 115 additions & 23 deletions client_lib/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ use reqwest::blocking::{Body, Client, RequestBuilder};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fs::File;
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::io::{self, BufRead, BufReader, BufWriter, Write, Read};
use std::path::Path;
use std::time::Duration;

// Some of these constants are based on the ones in server/main.rs.
const MAX_MOTION_FILE_SIZE: u64 = 50 * 1024 * 1024; // 50 mebibytes
const MAX_LIVESTREAM_FILE_SIZE: u64 = 20 * 1024 * 1024; // 20 mebibytes
const MAX_COMMAND_FILE_SIZE: u64 = 100 * 1024; // 100 kibibytes
const MAX_CHECK_RESP_SIZE: u64 = 20 * 1024; // 20 kibibytes
const MAX_NOTIFICATION_TARGET_SIZE: u64 = 10 * 1024; // 10 kibibytes
const IOS_NOTIFICATION_RESP_MAX_SIZE: u64 = 10 * 1024; // 10 kibibytes

#[derive(Clone)]
pub struct HttpClient {
server_addr: String,
Expand Down Expand Up @@ -112,6 +120,8 @@ impl HttpClient {
}

pub fn fetch_notification_target(&self) -> io::Result<Option<NotificationTarget>> {
let max_size = MAX_NOTIFICATION_TARGET_SIZE;

let url = format!("{}/notification_target", self.server_addr);

let client = Client::builder()
Expand All @@ -135,10 +145,18 @@ impl HttpClient {
));
}

let text = response
.text()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let target = serde_json::from_str::<NotificationTarget>(&text)
let mut buf = Vec::new();
let mut limited = response.take(max_size);
limited.read_to_end(&mut buf)?;

if buf.len() >= max_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Notification target response exceeded maximum allowed size",
));
}

let target = serde_json::from_slice::<NotificationTarget>(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
Ok(Some(target))
}
Expand Down Expand Up @@ -214,7 +232,25 @@ impl HttpClient {
.and_then(|value| value.to_str().ok())
.unwrap_or("<missing>")
.to_string();
let body = response.text().unwrap_or_default();

let max_size = IOS_NOTIFICATION_RESP_MAX_SIZE;

let mut buf = Vec::new();
let mut limited = response.take(max_size);
limited.read_to_end(&mut buf)?;

let body = if buf.len() >= max_size.try_into().unwrap() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"ios notification response exceeded maximum allowed size"
),
));
} else {
String::from_utf8_lossy(&buf).to_string()
};


return Err(io::Error::new(
io::ErrorKind::Other,
format!(
Expand Down Expand Up @@ -262,14 +298,19 @@ impl HttpClient {
}

/// Fetches an (encrypted) video file or thumbnail, persists it, and then deletes it from the server.
pub fn fetch_enc_file(&self, group_name: &str, enc_file_path: &Path) -> io::Result<()> {
pub fn fetch_enc_file(
&self, group_name: &str,
enc_file_path: &Path,
) -> io::Result<()> {
let max_size = MAX_MOTION_FILE_SIZE;

let enc_file_name = enc_file_path
.file_name()
.and_then(|name| name.to_str())
.unwrap()
.to_string();

self.fetch_enc_file_named(group_name, &enc_file_name, enc_file_path)
self.fetch_enc_file_named(group_name, &enc_file_name, enc_file_path, max_size)
}

/// Fetches an encrypted file whose server-side name and local temp filename differ.
Expand All @@ -278,6 +319,7 @@ impl HttpClient {
group_name: &str,
server_file_name: &str,
local_file_path: &Path,
max_size: u64,
) -> io::Result<()> {
let server_url = format!("{}/{}/{}", self.server_addr, group_name, server_file_name);

Expand All @@ -286,7 +328,7 @@ impl HttpClient {
.build()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

let mut response = self.authorized_headers(client
let response = self.authorized_headers(client
.get(&server_url))
.send()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
Expand All @@ -302,10 +344,19 @@ impl HttpClient {

let mut file = BufWriter::new(File::create(local_file_path)?);

io::copy(&mut response, &mut file)?;
let mut limited = response.take(max_size);

let bytes_copied = io::copy(&mut limited, &mut file)?;
file.flush().unwrap();
file.into_inner()?.sync_all()?;

if bytes_copied >= max_size {
return Err(io::Error::new(
io::ErrorKind::Other,
"File download exceeded maximum allowed size",
));
}

let del_response = self.authorized_headers(client
.delete(&server_url))
.send()
Expand Down Expand Up @@ -388,6 +439,8 @@ impl HttpClient {

/// Checks to see if there's a livestream request.
pub fn livestream_check(&self, group_name: &str) -> io::Result<()> {
let max_size = MAX_CHECK_RESP_SIZE;

let server_url = format!("{}/livestream/{}", self.server_addr, group_name);

let client = Client::builder()
Expand All @@ -402,12 +455,21 @@ impl HttpClient {
.error_for_status()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

let reader = BufReader::new(response);
let mut buf = Vec::new();
let mut limited = response.take(max_size);
limited.read_to_end(&mut buf)?;

if buf.len() >= max_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Livestream check response exceeded maximum allowed size",
));
}
let reader = BufReader::new(&buf[..]);

for line in reader.lines() {
let line = line?;
if line.starts_with("data:") {
//println!("Received event data: {}", &line[5..]);
return Ok(());
}
}
Expand Down Expand Up @@ -462,7 +524,12 @@ impl HttpClient {
}

/// Retrieves and returns (encrypted) livestream data.
pub fn livestream_retrieve(&self, group_name: &str, chunk_number: u64) -> io::Result<Vec<u8>> {
pub fn livestream_retrieve(
&self, group_name: &str,
chunk_number: u64,
) -> io::Result<Vec<u8>> {
let max_size = MAX_LIVESTREAM_FILE_SIZE;

let server_url = format!(
"{}/livestream/{}/{}",
self.server_addr, group_name, chunk_number
Expand All @@ -488,10 +555,14 @@ impl HttpClient {
));
}

let response_vec = response
.bytes()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?
.to_vec();
let mut response_vec = Vec::new();
let mut limited = response.take(max_size);

limited.read_to_end(&mut response_vec)?;

if response_vec.len() >= max_size.try_into().unwrap() {
return Err(io::Error::new(io::ErrorKind::Other, "Livestream chunk download exceeded maximum allowed size"));
}

let del_response = self.authorized_headers(client
.delete(&server_del_url))
Expand Down Expand Up @@ -558,6 +629,8 @@ impl HttpClient {
/// The server sends the command encoded in Base64.
/// This function converts the command to Vec<u8> to returns it.
pub fn config_check(&self, group_name: &str) -> io::Result<Vec<u8>> {
let max_size = MAX_CHECK_RESP_SIZE;

let server_url = format!("{}/config/{}", self.server_addr, group_name);

let client = Client::builder()
Expand All @@ -572,7 +645,17 @@ impl HttpClient {
.error_for_status()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

let reader = BufReader::new(response);
let mut buf = Vec::new();
let mut limited = response.take(max_size);
Comment thread
arrdalan marked this conversation as resolved.
limited.read_to_end(&mut buf)?;

if buf.len() >= max_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Livestream check response exceeded maximum allowed size",
));
}
let reader = BufReader::new(&buf[..]);

for line in reader.lines() {
let line = line?;
Expand Down Expand Up @@ -614,7 +697,12 @@ impl HttpClient {
}

/// Checks and retrieve a config command response.
pub fn fetch_config_response(&self, group_name: &str) -> io::Result<Vec<u8>> {
pub fn fetch_config_response(
&self,
group_name: &str,
) -> io::Result<Vec<u8>> {
let max_size = MAX_COMMAND_FILE_SIZE;

let server_url = format!("{}/config_response/{}", self.server_addr, group_name);

let client = Client::builder()
Expand All @@ -636,10 +724,14 @@ impl HttpClient {
));
}

let response_vec = response
.bytes()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?
.to_vec();
let mut response_vec = Vec::new();
let mut limited = response.take(max_size);

limited.read_to_end(&mut response_vec)?;

if response_vec.len() >= max_size.try_into().unwrap() {
return Err(io::Error::new(io::ErrorKind::Other, "Config response download exceeded maximum allowed size"));
}

Ok(response_vec)
}
Expand Down
5 changes: 4 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use base64::engine::general_purpose::STANDARD as base64_engine;
use base64::Engine;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use rocket::data::{Data, ToByteUnit};
use rocket::data::{Data, ToByteUnit, Limits};
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::response::content::RawText;
Expand Down Expand Up @@ -99,6 +99,7 @@ const MAX_NUM_PENDING_MOTION_FILES: usize = 100;
const MAX_LIVESTREAM_FILE_SIZE: usize = 20; // in mebibytes
const MAX_NUM_PENDING_LIVESTREAM_FILES: usize = 50;
const MAX_COMMAND_FILE_SIZE: usize = 100; // in kibibytes
const MAX_JSON_SIZE: usize = 10; // in kibibytes

async fn get_num_files(path: &Path) -> io::Result<usize> {
let mut entries = fs::read_dir(path).await?;
Expand Down Expand Up @@ -1126,6 +1127,8 @@ pub fn build_rocket() -> rocket::Rocket<rocket::Build> {
let config = rocket::Config {
port: listen_port.unwrap_or(8000),
address: address.parse().unwrap(),
limits: Limits::default()
.limit("json", MAX_JSON_SIZE.kibibytes()),
..rocket::Config::default()
};

Expand Down
Loading