Skip to content
Open

2.1 #44

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Thunder/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from Thunder.utils.commands import set_commands
from Thunder.utils.database import db
from Thunder.utils.keepalive import ping_server
from Thunder.utils.canonical_files import drain_background_touch_tasks
from Thunder.utils.logger import logger
from Thunder.utils.messages import MSG_ADMIN_RESTART_DONE
from Thunder.utils.rate_limiter import rate_limiter, request_executor
Expand Down Expand Up @@ -49,6 +50,25 @@ def print_banner():
print(banner)


def schedule_index_ensure() -> None:
task = asyncio.create_task(
db.ensure_indexes(raise_on_error=False),
name="ensure_database_indexes"
)

def _log_index_failure(done_task: asyncio.Task) -> None:
try:
created_indexes = done_task.result()
if created_indexes:
print(" ✓ Database indexes ensured.")
else:
print(" ▶ Database indexes could not be ensured during startup.")
except Exception as e:
logger.error(f"Background database index ensure failed: {e}", exc_info=True)

task.add_done_callback(_log_index_failure)


async def import_plugins():
print("╠════════════════════ IMPORTING PLUGINS ════════════════════╣")
plugins = glob.glob(PLUGIN_PATH)
Expand Down Expand Up @@ -119,6 +139,7 @@ async def start_services():

await set_commands()
print(" ✓ Bot commands set successfully.")
schedule_index_ensure()

restart_message_data = await db.get_restart_message()
if restart_message_data:
Expand Down Expand Up @@ -212,6 +233,10 @@ async def start_services():
await rate_limiter.shutdown()
except Exception:
pass
try:
await drain_background_touch_tasks()
except Exception as e:
logger.error(f"Error during canonical touch task cleanup: {e}", exc_info=True)
return

elapsed_time = (datetime.now() - start_time).total_seconds()
Expand Down Expand Up @@ -252,6 +277,11 @@ async def start_services():
except Exception as e:
logger.error(f"Error during client cleanup: {e}")

try:
await drain_background_touch_tasks()
except Exception as e:
logger.error(f"Error during canonical touch task cleanup: {e}", exc_info=True)

if 'app_runner' in locals() and app_runner is not None:
try:
await app_runner.cleanup()
Expand Down
245 changes: 155 additions & 90 deletions Thunder/bot/plugins/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

from pyrogram import Client, enums, filters
from pyrogram.errors import FloodWait, MessageNotModified, MessageDeleteForbidden, MessageIdInvalid
from pyrogram.types import (InlineKeyboardButton, InlineKeyboardMarkup,
Message)

from Thunder.bot import StreamBot
from Thunder.utils.bot_utils import (gen_links, is_admin, log_newusr, notify_own,
reply_user_err)
from Thunder.utils.database import db
from Thunder.utils.decorators import (check_banned, get_shortener_status,
require_token)
from pyrogram.types import (InlineKeyboardButton, InlineKeyboardMarkup,
Message)

from Thunder.bot import StreamBot
from Thunder.utils.bot_utils import (gen_canonical_links, gen_links, is_admin,
log_newusr, notify_own, reply_user_err)
from Thunder.utils.canonical_files import get_or_create_canonical_file
from Thunder.utils.database import db
from Thunder.utils.decorators import (check_banned, get_shortener_status,
require_token)
from Thunder.utils.force_channel import force_channel_check
from Thunder.utils.logger import logger
from Thunder.utils.messages import (
Expand Down Expand Up @@ -71,30 +72,55 @@ async def validate_request_common(client: Client, message: Message) -> Optional[
return await get_shortener_status(client, message)


async def send_channel_links(target_msg: Message, links: Dict[str, Any], source_info: str, source_id: int):
try:
await target_msg.reply_text(
MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
),
disable_web_page_preview=True,
quote=True
)
except FloodWait as e:
await asyncio.sleep(e.value)
await target_msg.reply_text(
MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
),
disable_web_page_preview=True,
quote=True
)
async def send_channel_links(
links: Dict[str, Any],
source_info: str,
source_id: int,
*,
target_msg: Optional[Message] = None,
reply_to_message_id: Optional[int] = None
):
try:
text = MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
)
if target_msg:
await target_msg.reply_text(
text,
disable_web_page_preview=True,
quote=True
)
else:
await StreamBot.send_message(
chat_id=Var.BIN_CHANNEL,
text=text,
disable_web_page_preview=True,
reply_to_message_id=reply_to_message_id
)
except FloodWait as e:
await asyncio.sleep(e.value)
text = MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
)
if target_msg:
await target_msg.reply_text(
text,
disable_web_page_preview=True,
quote=True
)
else:
await StreamBot.send_message(
chat_id=Var.BIN_CHANNEL,
text=text,
disable_web_page_preview=True,
reply_to_message_id=reply_to_message_id
)


async def safe_edit_message(message: Message, text: str, **kwargs):
Expand Down Expand Up @@ -313,18 +339,36 @@ async def _actual_channel_receive_handler(client: Client, message: Message, **ha
f"({message.chat.title or 'Unknown'}). Ignoring message.")
return

try:
stored_msg = await fwd_media(message)
if not stored_msg:
logger.error(
f"Failed to forward media from channel {message.chat.id}. Ignoring.")
return
shortener_val = await get_shortener_status(client, message)
links = await gen_links(stored_msg, shortener=shortener_val)
source_info = message.chat.title or "Unknown Channel"

if notification_msg:
try:
try:
shortener_val = await get_shortener_status(client, message)
canonical_record, stored_msg, reused_existing = await get_or_create_canonical_file(message, fwd_media)
if reused_existing and stored_msg:
await safe_delete_message(stored_msg)
stored_msg = None
if canonical_record:
links = await gen_canonical_links(
file_name=canonical_record["file_name"],
file_size=int(canonical_record.get("file_size", 0) or 0),
public_hash=canonical_record["public_hash"],
shortener=shortener_val
)
reply_to_message_id = int(canonical_record["canonical_message_id"])
else:
if not stored_msg:
stored_msg = await fwd_media(message)
if not stored_msg:
logger.error(
f"Failed to forward media from channel {message.chat.id}. Ignoring.")
return
links = await gen_links(stored_msg, shortener=shortener_val)
reply_to_message_id = stored_msg.id
source_info = message.chat.title or "Unknown Channel"
# When we reused an existing canonical BIN copy, stored_msg is intentionally
# None so send_channel_links falls back to StreamBot.send_message(...,
# reply_to_message_id=...) and keeps the log threaded to the canonical message.

if notification_msg:
try:
try:
await notification_msg.edit_text(
MSG_NEW_FILE_REQUEST.format(
Expand All @@ -346,11 +390,23 @@ async def _actual_channel_receive_handler(client: Client, message: Message, **ha
),
disable_web_page_preview=True
)
except Exception as e:
logger.error(f"Error editing notification message with links: {e}", exc_info=True)
await send_channel_links(stored_msg, links, source_info, message.chat.id)
else:
await send_channel_links(stored_msg, links, source_info, message.chat.id)
except Exception as e:
logger.error(f"Error editing notification message with links: {e}", exc_info=True)
await send_channel_links(
links,
source_info,
message.chat.id,
target_msg=stored_msg,
reply_to_message_id=reply_to_message_id
)
else:
await send_channel_links(
links,
source_info,
message.chat.id,
target_msg=stored_msg,
reply_to_message_id=reply_to_message_id
)

try:
try:
Expand Down Expand Up @@ -388,17 +444,32 @@ async def process_single(
status_msg: Message,
shortener_val: bool,
original_request_msg: Optional[Message] = None,
notification_msg: Optional[Message] = None
):
try:
stored_msg = await fwd_media(file_msg)
if not stored_msg:
logger.error(f"Failed to forward media for message {file_msg.id}. Skipping.")
return None
links = await gen_links(stored_msg, shortener=shortener_val)
if notification_msg:
await safe_edit_message(
notification_msg,
notification_msg: Optional[Message] = None
):
try:
canonical_record, stored_msg, reused_existing = await get_or_create_canonical_file(file_msg, fwd_media)
if reused_existing and stored_msg:
await safe_delete_message(stored_msg)
stored_msg = None
if canonical_record:
links = await gen_canonical_links(
file_name=canonical_record["file_name"],
file_size=int(canonical_record.get("file_size", 0) or 0),
public_hash=canonical_record["public_hash"],
shortener=shortener_val
)
canonical_reply_id = int(canonical_record["canonical_message_id"])
else:
if not stored_msg:
stored_msg = await fwd_media(file_msg)
if not stored_msg:
logger.error(f"Failed to forward media for message {file_msg.id}. Skipping.")
return None
links = await gen_links(stored_msg, shortener=shortener_val)
canonical_reply_id = stored_msg.id
if notification_msg:
await safe_edit_message(
notification_msg,
MSG_LINKS.format(
file_name=links['media_name'],
file_size=links['media_size'],
Expand All @@ -421,35 +492,29 @@ async def process_single(
if not source_info:
source_info = f"@{source_msg.from_user.username}" if source_msg.from_user.username else "Unknown User"
source_id = source_msg.from_user.id
elif source_msg.chat.type == enums.ChatType.CHANNEL:
source_info = source_msg.chat.title or "Unknown Channel"
source_id = source_msg.chat.id
if source_info and source_id:
try:
await stored_msg.reply_text(
MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
),
disable_web_page_preview=True,
quote=True
)
except FloodWait as e:
await asyncio.sleep(e.value)
await stored_msg.reply_text(
MSG_NEW_FILE_REQUEST.format(
source_info=source_info,
id_=source_id,
online_link=links['online_link'],
stream_link=links['stream_link']
),
disable_web_page_preview=True,
quote=True
)
if status_msg:
await safe_delete_message(status_msg)
elif source_msg.chat.type == enums.ChatType.CHANNEL:
source_info = source_msg.chat.title or "Unknown Channel"
source_id = source_msg.chat.id
if source_info and source_id:
try:
await send_channel_links(
links,
source_info,
source_id,
target_msg=stored_msg,
reply_to_message_id=canonical_reply_id
)
except FloodWait as e:
await asyncio.sleep(e.value)
await send_channel_links(
links,
source_info,
source_id,
target_msg=stored_msg,
reply_to_message_id=canonical_reply_id
)
if status_msg:
await safe_delete_message(status_msg)
return links
except Exception as e:
logger.error(f"Error processing single file for message {file_msg.id}: {e}", exc_info=True)
Expand Down
Loading
Loading