Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function execute(VoiceStateUpdate $state, ?VoiceStateUpdate $oldState): v
'external_identity_id' => $identity->id,
'channel_name' => $event['channel_id'],
'state' => $event['state'],
'occurred_at' => now(),
'occurred_at' => now()->utc(),
'obtained_experience' => 0,
]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
<?php

declare(strict_types=1);

namespace He4rt\IntegrationDiscord\ETL\Console;

use Carbon\CarbonImmutable;
use He4rt\Activity\Voice\Models\Voice;
use He4rt\Identity\Tenant\Models\Tenant;
use He4rt\IntegrationDiscord\ETL\Actions\ImportDiscordVoiceLogAction;
use He4rt\IntegrationDiscord\ETL\DTOs\DiscordVoiceLogDTO;
use He4rt\IntegrationDiscord\Transport\DiscordConnector;
use He4rt\IntegrationDiscord\Transport\Requests\Channels\ListGuildChannels;
use He4rt\IntegrationDiscord\Transport\Requests\Messages\ListChannelMessages;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Sleep;

use function Laravel\Prompts\error;
use function Laravel\Prompts\intro;
use function Laravel\Prompts\note;
use function Laravel\Prompts\outro;
use function Laravel\Prompts\table;
use function Laravel\Prompts\task;
use function Laravel\Prompts\warning;

final class BackfillVoiceLogsCommand extends Command
{
protected $signature = 'discord:backfill-voice
{channel_id : Discord channel ID where the bot posts voice join/left logs}
{--since= : Start date (Y-m-d). Defaults to 2026-03-01}
{--until= : End date (Y-m-d). Defaults to now}
{--bot-id=621538099545112596 : Discord bot user ID that posts voice logs (Dyno)}
{--tenant=he4rt : Tenant slug}
{--dry-run : Preview without saving}';

protected $description = 'Backfill voice logs by paginating a Discord channel where a bot logs voice join/left events';

private int $voiceCount = 0;

private int $joinCount = 0;

private int $leftCount = 0;

private int $skippedCount = 0;

private int $rateLimitHits = 0;

private int $alreadyExistsCount = 0;

public function handle(
DiscordConnector $connector,
ImportDiscordVoiceLogAction $voiceAction,
): int {
DB::disableQueryLog();

$tenantSlug = (string) $this->option('tenant');
$tenant = Tenant::query()->where('slug', $tenantSlug)->first();

if ($tenant === null) {
error(sprintf('Tenant "%s" not found.', $tenantSlug));

return self::FAILURE;
}

$channelId = (string) $this->argument('channel_id');
$botId = (string) $this->option('bot-id');
$isDryRun = (bool) $this->option('dry-run');
$since = CarbonImmutable::parse($this->option('since') ?? '2026-03-01');
$until = CarbonImmutable::parse($this->option('until') ?? 'now');
$tenantId = $tenant->getKey();

intro(sprintf('Discord Voice Backfill%s', $isDryRun ? ' [DRY RUN]' : ''));

$existingCount = Voice::query()
->where('tenant_id', $tenantId)
->whereBetween('occurred_at', [$since, $until])
->count();

table(
headers: ['Setting', 'Value'],
rows: [
['Channel', $channelId],
['Bot filter', $botId],
['Period', sprintf('%s → %s', $since->toDateString(), $until->toDateString())],
['Tenant', sprintf('%s (ID: %d)', $tenant->name, $tenantId)],
['Existing voice events in period', number_format($existingCount)],
],
);

$channelMap = $this->buildChannelMap($connector);
note(sprintf('Channel map loaded: %d channels resolved', count($channelMap)));

$before = null;
$pages = 0;
$fetched = 0;

task(
label: 'Fetching voice logs [starting...]',
callback: function ($logger) use (
$connector, $voiceAction, $channelId, $botId,
$isDryRun, $since, $until, $tenantId, $channelMap,
&$before, &$pages, &$fetched,
): void {
$reachedSince = false;

while (!$reachedSince) {
$response = $connector->send(new ListChannelMessages(
channelId: $channelId,
limit: 100,
before: $before,
));

if ($response->status() === 429) {
$retryAfter = (int) ($response->json('retry_after') ?? 5);
$this->rateLimitHits++;
$logger->warning(sprintf(
'Rate limited — pausing %ds (hit #%d)',
$retryAfter,
$this->rateLimitHits,
));
Sleep::sleep($retryAfter + 1);

continue;
}

if ($response->failed()) {
$logger->warning(sprintf('HTTP %d — aborting', $response->status()));

break;
}

/** @var list<array<string, mixed>> $messages */
$messages = $response->json();

if ($messages === []) {
break;
}

$pages++;
$fetched += count($messages);
$oldestTimestamp = '';

foreach ($messages as $message) {
$timestamp = CarbonImmutable::parse($message['timestamp']);
$oldestTimestamp = $timestamp->format('Y-m-d H:i');

if ($timestamp->isBefore($since)) {
$reachedSince = true;

break;
}

if ($timestamp->isAfter($until)) {
continue;
}

$authorId = $message['author']['id'] ?? null;

if ($authorId !== $botId) {
$this->skippedCount++;

continue;
}

$voiceDto = DiscordVoiceLogDTO::fromDump($message);

if (!$voiceDto instanceof DiscordVoiceLogDTO) {
$this->skippedCount++;

continue;
}

$channelName = $channelMap[$voiceDto->voiceChannelId] ?? $voiceDto->voiceChannelId;
$exists = Voice::query()
->where('tenant_id', $tenantId)
->where('provider_message_id', (string) $message['id'])
->exists();

if ($exists) {
$this->alreadyExistsCount++;
$logger->line(sprintf(
'%s <@%s> %s #%s [EXISTS]',
$timestamp->format('m/d H:i'),
$voiceDto->userDiscordId,
$voiceDto->action,
$channelName,
));
} else {
if (!$isDryRun) {
$voiceAction->handle($voiceDto, $tenantId, $channelMap);
}

$this->voiceCount++;

if ($voiceDto->action === 'joined') {
$this->joinCount++;
} else {
$this->leftCount++;
}

$logger->line(sprintf(
'%s <@%s> %s #%s [NEW]',
$timestamp->format('m/d H:i'),
$voiceDto->userDiscordId,
$voiceDto->action,
$channelName,
));
}
}

$before = end($messages)['id'] ?? null;

if ($before === null) {
break;
}

$logger->label(sprintf(
'Page %d | oldest: %s | New: %d (↗%d ↘%d) | Exists: %d | Skip: %d | 429s: %d',
$pages,
$oldestTimestamp,
$this->voiceCount,
$this->joinCount,
$this->leftCount,
$this->alreadyExistsCount,
$this->skippedCount,
$this->rateLimitHits,
));

Sleep::usleep(500_000);
}
},
limit: 15,
);

$this->newLine();

if ($this->rateLimitHits > 0) {
warning(sprintf('Hit rate limit %d time(s) during backfill.', $this->rateLimitHits));
}

table(
headers: ['Metric', 'Value'],
rows: [
['Period', sprintf('%s → %s', $since->toDateString(), $until->toDateString())],
['Pages fetched', number_format($pages)],
['Messages scanned', number_format($fetched)],
['New voice events', number_format($this->voiceCount)],
[' ↗ Joins', number_format($this->joinCount)],
[' ↘ Leaves', number_format($this->leftCount)],
['Already in DB', number_format($this->alreadyExistsCount)],
['Messages skipped', number_format($this->skippedCount)],
['Rate limit hits', (string) $this->rateLimitHits],
['Mode', $isDryRun ? 'DRY RUN' : 'LIVE'],
],
);

outro(sprintf(
'Backfill %s — %s voice events %s',
$isDryRun ? 'preview done' : 'complete',
number_format($this->voiceCount),
$isDryRun ? 'would be imported' : 'imported',
));

return self::SUCCESS;
}

/**
* @return array<string, string>
*/
private function buildChannelMap(DiscordConnector $connector): array
{
$guildId = config('he4rt.discord.guild_id');

if ($guildId === null) {
return [];
}

$response = $connector->send(new ListGuildChannels((string) $guildId));

/** @var list<array<string, mixed>> $channels */
$channels = $response->json();

$map = [];

foreach ($channels as $channel) {
if (isset($channel['id'], $channel['name'])) {
$map[(string) $channel['id']] = $channel['name'];
}
}

return $map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace He4rt\IntegrationDiscord;

use He4rt\IntegrationDiscord\ETL\Console\BackfillVoiceLogsCommand;
use He4rt\IntegrationDiscord\ETL\Console\ImportDiscordMessagesCommand;
use He4rt\IntegrationDiscord\ETL\Console\ImportDiscordProfilesCommand;
use He4rt\IntegrationDiscord\ETL\Console\MergeDuplicateDiscordProfilesCommand;
Expand Down Expand Up @@ -39,6 +40,7 @@ public function boot(): void
ImportDiscordMessagesCommand::class,
MergeDuplicateDiscordProfilesCommand::class,
SyncDiscordGuildCommand::class,
BackfillVoiceLogsCommand::class,
]);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

namespace He4rt\IntegrationDiscord\Transport\Requests\Messages;

use Saloon\Enums\Method;
use Saloon\Http\Request;

final class ListChannelMessages extends Request
{
protected Method $method = Method::GET;

public function __construct(
private readonly string $channelId,
private readonly int $limit = 100,
private readonly ?string $before = null,
private readonly ?string $after = null,
) {}

public function resolveEndpoint(): string
{
return sprintf('/channels/%s/messages', $this->channelId);
}

/**
* @return array<string, string|int>
*/
protected function defaultQuery(): array
{
$query = ['limit' => $this->limit];

if ($this->before !== null) {
$query['before'] = $this->before;
}

if ($this->after !== null) {
$query['after'] = $this->after;
}

return $query;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public function get(): array
$start = Date::now('America/Sao_Paulo')->subDays($this->rangeDays)->startOfDay()->utc();

return DB::table('voice_messages')
->selectRaw("EXTRACT(DOW FROM occurred_at AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo')::int AS dow")
->selectRaw("EXTRACT(HOUR FROM occurred_at AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo')::int AS hour")
->selectRaw("EXTRACT(DOW FROM occurred_at AT TIME ZONE 'America/Sao_Paulo')::int AS dow")
->selectRaw("EXTRACT(HOUR FROM occurred_at AT TIME ZONE 'America/Sao_Paulo')::int AS hour")
->selectRaw('COUNT(*) AS total')
->where('occurred_at', '>=', $start)
->whereNotNull('occurred_at')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function get(): Collection
$start = Date::now('America/Sao_Paulo')->subDays($this->rangeDays)->startOfDay()->utc();

return DB::table('voice_messages')
->selectRaw("(occurred_at AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo')::date AS day")
->selectRaw("(occurred_at AT TIME ZONE 'America/Sao_Paulo')::date AS day")
->selectRaw('COUNT(*) AS total_joins')
->where('occurred_at', '>=', $start)
->whereNotNull('occurred_at')
Expand Down
Loading