Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,811 changes: 951 additions & 860 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ sqlx = { version = "0.8", features = [
"chrono",
"tls-rustls",
] }
jacquard-common = { version = "0.5", features = ["reqwest-client"] }
jacquard-derive = { version = "0.5" }
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0"
serde_json = "1.0"
Expand All @@ -43,11 +45,11 @@ tokio-tungstenite = { version = "*", default-features = false, features = [
"connect",
"handshake",
] }
atrium-api = "0.25"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
types = { path = "services/types" }
types = { path = "services/types", features = ["app_bsky", "com_atproto", "fm_teal"] }
rocketman = "0.2.3"
thiserror = "1.0"

# CAR and IPLD dependencies
iroh-car = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ And that's it! You should have the full teal.fm stack running locally. Now if yo

### Lexicon Management

We use AT Protocol lexicons with dual TypeScript/Rust codegen (lex-cli + esquema). Use the unified lexicon CLI for managing schema changes:
We use AT Protocol lexicons with dual TypeScript/Rust codegen (lex-cli + jacquard-lexicon). Use the unified lexicon CLI for managing schema changes:

```bash
# Generate all types from lexicons
Expand Down
2 changes: 1 addition & 1 deletion apps/aqua/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ sys-info = "0.9.1"
axum = { workspace = true, features = ["multipart"] }
tower-http = { workspace = true, features = ["cors"] }
time.workspace = true
atrium-api.workspace = true
async-trait.workspace = true
anyhow.workspace = true
serde = { workspace = true, features = ["derive"] }
Expand All @@ -21,6 +20,7 @@ sqlx = { workspace = true, features = ["time"] }
dotenvy.workspace = true
types.workspace = true
chrono.workspace = true
jacquard-common.workspace = true

# CAR import functionality
iroh-car.workspace = true
Expand Down
18 changes: 9 additions & 9 deletions apps/aqua/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,15 +443,15 @@ async fn resolve_did_to_pds(did: &str) -> Result<String> {
// Find the PDS service endpoint
if let Some(services) = doc["service"].as_array() {
for service in services {
if service["id"].as_str() == Some("#atproto_pds") {
if let Some(endpoint) = service["serviceEndpoint"].as_str() {
// Extract hostname from URL
let url = url::Url::parse(endpoint)?;
let host = url.host_str().ok_or_else(|| {
anyhow::anyhow!("Invalid PDS endpoint URL: {}", endpoint)
})?;
return Ok(host.to_string());
}
if service["id"].as_str() == Some("#atproto_pds")
&& let Some(endpoint) = service["serviceEndpoint"].as_str()
{
// Extract hostname from URL
let url = url::Url::parse(endpoint)?;
let host = url
.host_str()
.ok_or_else(|| anyhow::anyhow!("Invalid PDS endpoint URL: {}", endpoint))?;
return Ok(host.to_string());
}
}
}
Expand Down
35 changes: 22 additions & 13 deletions apps/aqua/src/repos/actor_profile.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use std::collections::BTreeMap;

use async_trait::async_trait;
use jacquard_common::from_json_value;
use serde_json::Value;
use types::fm::teal::alpha::actor::defs::ProfileViewData;
use types::{
app_bsky::richtext::facet::Facet,
fm_teal::alpha::actor::{ProfileView, StatusView},
};

use super::{pg::PgDataSource, utc_to_atrium_datetime};

#[async_trait]
pub trait ActorProfileRepo {
async fn get_actor_profile(&self, identity: &str) -> anyhow::Result<Option<ProfileViewData>>;
async fn get_actor_profile(&self, identity: &str) -> anyhow::Result<Option<ProfileView>>;
async fn get_multiple_actor_profiles(
&self,
identities: &[String],
) -> anyhow::Result<Vec<ProfileViewData>>;
) -> anyhow::Result<Vec<ProfileView>>;
}

pub struct PgProfileRepoRows {
Expand All @@ -24,38 +30,41 @@ pub struct PgProfileRepoRows {
pub status: Option<Value>,
}

impl From<PgProfileRepoRows> for ProfileViewData {
impl From<PgProfileRepoRows> for ProfileView<'static> {
fn from(row: PgProfileRepoRows) -> Self {
Self {
avatar: row.avatar,
banner: row.banner,
avatar: row.avatar.map(Into::into),
banner: row.banner.map(Into::into),
// chrono -> atrium time
created_at: row
.created_at
.map(|dt| utc_to_atrium_datetime(crate::repos::time_to_chrono_utc(dt))),
description: row.description,
description: row.description.map(Into::into),
description_facets: row
.description_facets
.and_then(|v| serde_json::from_value(v).ok()),
did: row.did,
display_name: row.display_name,
.and_then(|v| from_json_value::<Vec<Facet<'_>>>(v).ok()),
did: row.did.map(Into::into),
display_name: row.display_name.map(Into::into),
featured_item: None,
status: row.status.and_then(|v| serde_json::from_value(v).ok()),
status: row
.status
.and_then(|v| from_json_value::<StatusView<'_>>(v).ok()),
extra_data: BTreeMap::new(),
}
}
}

#[async_trait]
impl ActorProfileRepo for PgDataSource {
async fn get_actor_profile(&self, identity: &str) -> anyhow::Result<Option<ProfileViewData>> {
async fn get_actor_profile(&self, identity: &str) -> anyhow::Result<Option<ProfileView>> {
self.get_multiple_actor_profiles(&[identity.to_string()])
.await
.map(|p| p.first().cloned())
}
async fn get_multiple_actor_profiles(
&self,
identities: &[String],
) -> anyhow::Result<Vec<ProfileViewData>> {
) -> anyhow::Result<Vec<ProfileView>> {
// split identities into dids (prefixed with "did:") and handles (not prefixed) in one iteration
let mut dids = Vec::new();
let mut handles = Vec::new();
Expand Down
161 changes: 82 additions & 79 deletions apps/aqua/src/repos/feed_play.rs
Original file line number Diff line number Diff line change
@@ -1,126 +1,129 @@
use async_trait::async_trait;
use types::fm::teal::alpha::feed::defs::{Artist, PlayViewData};
use jacquard_common::from_json_value;
use types::fm_teal::alpha::feed::{Artist, PlayView};

use super::{pg::PgDataSource, utc_to_atrium_datetime};

#[async_trait]
pub trait FeedPlayRepo: Send + Sync {
async fn get_feed_play(&self, identity: &str) -> anyhow::Result<Option<PlayViewData>>;
async fn get_feed_play(&self, identity: &str) -> anyhow::Result<Option<PlayView>>;
async fn get_feed_plays_for_profile(
&self,
identities: &[String],
) -> anyhow::Result<Vec<PlayViewData>>;
) -> anyhow::Result<Vec<PlayView>>;
}

#[async_trait]
impl FeedPlayRepo for PgDataSource {
async fn get_feed_play(&self, uri: &str) -> anyhow::Result<Option<PlayViewData>> {
async fn get_feed_play(&self, uri: &str) -> anyhow::Result<Option<PlayView>> {
let row = sqlx::query!(
r#"
SELECT
uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url,
COALESCE(
json_agg(
json_build_object(
'artist_mbid', pta.artist_mbid,
'artist_name', pta.artist_name
)
) FILTER (WHERE pta.artist_name IS NOT NULL),
'[]'
) AS artists
FROM plays
LEFT JOIN play_to_artists as pta ON uri = pta.play_uri
WHERE uri = $1
GROUP BY uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url
ORDER BY processed_time desc
"#,
&uri.to_string()
)
.fetch_one(&self.db)
.await?;
r#"
SELECT
uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url,
COALESCE(
json_agg(
json_build_object(
'artist_mbid', pta.artist_mbid,
'artist_name', pta.artist_name
)
) FILTER (WHERE pta.artist_name IS NOT NULL),
'[]'
) AS artists
FROM plays
LEFT JOIN play_to_artists as pta ON uri = pta.play_uri
WHERE uri = $1
GROUP BY uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url
ORDER BY processed_time desc
"#,
&uri.to_string()
)
.fetch_one(&self.db)
.await?;

let artists: Vec<Artist> = match row.artists {
Some(value) => serde_json::from_value(value).unwrap_or_default(),
Some(value) => from_json_value::<Vec<Artist>>(value).unwrap_or_default(),
None => vec![],
};

Ok(Some(PlayViewData {
track_name: row.track_name.clone(),
track_mb_id: row.recording_mbid.map(|u| u.to_string()),
recording_mb_id: row.recording_mbid.map(|u| u.to_string()),
Ok(Some(PlayView {
track_name: row.track_name.clone().into(),
track_mb_id: row.recording_mbid.map(|u| u.to_string().into()),
recording_mb_id: row.recording_mbid.map(|u| u.to_string().into()),
duration: row.duration.map(|d| d as i64),
artists,
release_name: row.release_name.clone(),
release_mb_id: row.release_mbid.map(|u| u.to_string()),
isrc: row.isrc,
origin_url: row.origin_url,
music_service_base_domain: row.music_service_base_domain,
submission_client_agent: row.submission_client_agent,
release_name: row.release_name.clone().map(|s| s.into()),
release_mb_id: row.release_mbid.map(|u| u.to_string().into()),
isrc: row.isrc.map(|s| s.into()),
origin_url: row.origin_url.map(|s| s.into()),
music_service_base_domain: row.music_service_base_domain.map(|s| s.into()),
submission_client_agent: row.submission_client_agent.map(|s| s.into()),
played_time: row
.played_time
.map(|dt| utc_to_atrium_datetime(crate::repos::time_to_chrono_utc(dt))),
extra_data: Default::default(),
}))
}

async fn get_feed_plays_for_profile(
&self,
identities: &[String],
) -> anyhow::Result<Vec<PlayViewData>> {
) -> anyhow::Result<Vec<PlayView>> {
let rows = sqlx::query!(
r#"
SELECT
uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url,
COALESCE(
json_agg(
json_build_object(
'artist_mbid', pta.artist_mbid,
'artist_name', pta.artist_name
)
) FILTER (WHERE pta.artist_name IS NOT NULL),
'[]'
) AS artists
FROM plays
LEFT JOIN play_to_artists as pta ON uri = pta.play_uri
WHERE did = ANY($1)
GROUP BY uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url
ORDER BY processed_time desc
"#,
identities
)
.fetch_all(&self.db)
.await?;
r#"
SELECT
uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url,
COALESCE(
json_agg(
json_build_object(
'artist_mbid', pta.artist_mbid,
'artist_name', pta.artist_name
)
) FILTER (WHERE pta.artist_name IS NOT NULL),
'[]'
) AS artists
FROM plays
LEFT JOIN play_to_artists as pta ON uri = pta.play_uri
WHERE did = ANY($1)
GROUP BY uri, did, rkey, cid, isrc, duration, track_name, played_time, processed_time,
release_mbid, release_name, recording_mbid, submission_client_agent,
music_service_base_domain, origin_url
ORDER BY processed_time desc
"#,
identities
)
.fetch_all(&self.db)
.await?;

let mut result = Vec::with_capacity(rows.len());
for row in rows {
// Deserialize artists JSON array into Vec<Artist>
let artists: Vec<Artist> = match row.artists {
Some(value) => serde_json::from_value(value).unwrap_or_default(),
Some(value) => from_json_value::<Vec<Artist>>(value).unwrap_or_default(),
None => vec![],
};

result.push(PlayViewData {
track_name: row.track_name.clone(),
track_mb_id: row.recording_mbid.map(|u| u.to_string()),
recording_mb_id: row.recording_mbid.map(|u| u.to_string()),
result.push(PlayView {
track_name: row.track_name.clone().into(),
track_mb_id: row.recording_mbid.map(|u| u.to_string().into()),
recording_mb_id: row.recording_mbid.map(|u| u.to_string().into()),
duration: row.duration.map(|d| d as i64),
artists,
release_name: row.release_name.clone(),
release_mb_id: row.release_mbid.map(|u| u.to_string()),
isrc: row.isrc,
origin_url: row.origin_url,
music_service_base_domain: row.music_service_base_domain,
submission_client_agent: row.submission_client_agent,
release_name: row.release_name.clone().map(|s| s.into()),
release_mb_id: row.release_mbid.map(|u| u.to_string().into()),
isrc: row.isrc.map(|s| s.into()),
origin_url: row.origin_url.map(|s| s.into()),
music_service_base_domain: row.music_service_base_domain.map(|s| s.into()),
submission_client_agent: row.submission_client_agent.map(|s| s.into()),
played_time: row
.played_time
.map(|dt| utc_to_atrium_datetime(crate::repos::time_to_chrono_utc(dt))),
extra_data: Default::default(),
});
}

Expand Down
4 changes: 2 additions & 2 deletions apps/aqua/src/repos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub trait DataSource: ActorProfileRepo + FeedPlayRepo + StatsRepo + Send + Sync

pub fn utc_to_atrium_datetime(
dt: chrono::DateTime<chrono::Utc>,
) -> atrium_api::types::string::Datetime {
atrium_api::types::string::Datetime::new(
) -> jacquard_common::types::string::Datetime {
jacquard_common::types::string::Datetime::new(
dt.with_timezone(&chrono::FixedOffset::west_opt(0).expect("0 is not negative")),
)
}
Expand Down
Loading
Loading