import asyncio import json import re import subprocess from datetime import timedelta from discord import Message from discord.ext.commands import Cog from rocketbot.cogs.basecog import BaseCog from rocketbot.cogsetting import CogSetting from rocketbot.utils import ( blockquote_markdown, format_bytes, suppress_markdown_url_previews, ) def filter_video_format(format: dict) -> bool: if format.get('resolution') == 'audio only': return False if format.get('format_note') == 'DASH audio': return False return True def rank_video_format(format: dict) -> tuple: content = 0 if format.get('resolution') == 'audio only': content = 1 elif format.get('format_note') == 'DASH audio': content = 1 elif format.get('format_note') == 'DASH video': content = 2 else: content = 3 # both I guess! multiplexed formats don't seem clearly labeled res = (format.get('width') or 0) + (format.get('height') or 0) return (content, res) class MessageLink: url: str spoiler: bool = False link_type: str = 'unknown' def __init__(self, url: str, link_type: str, spoiler: bool = False): self.url = url self.link_type = link_type self.spoiler = spoiler class VideoPreviewCog(BaseCog, name='Video Link Previews'): SETTING_ENABLED = CogSetting( 'enabled', bool, default_value=False, brief='Video link previews', description='Whether links to certain social media videos should show previews.', ) SETTING_DELAY = CogSetting( 'delay', timedelta, default_value=3, brief='delay before attempting to fetch a preview', description='How long to wait after a message is posted to see if Discord successfully loads a video preview', ) SETTING_INSTAGRAM = CogSetting( 'instagram', bool, default_value=False, brief='whether to show video previews for Instagram links', description='For both regular posts and reels', ) SETTING_FACEBOOK = CogSetting( 'facebook', bool, default_value=False, brief='whether to show video previews for Facebook links', ) SETTING_TWITTER = CogSetting( 'twitter', bool, default_value=False, brief='whether to show video previews for Twitter links', ) REGEX_INSTAGRAM_POST = r'(https?:\/\/(?:www\.)?)\w*(instagram\.com\/\w+/\w+\/?)' REGEX_FACEBOOK_POST = r'(https?:\/\/(?:www\.)?)\w*(facebook\.com(?:\/\w+)+\/\w+\/?)' REGEX_TWITTER_POST = r'(https?:\/\/(?:www\.)?)\w*((?:twitter|x)\.com\/\w+\/status\/[0-9]+)' REGEX_SPOILERS = '\|\|.+\|\|' # Best video and best audio, mp4 format with m4a audio FORMATS = 'bv*[ext=mp4]+ba[ext=m4a]/' \ 'b[ext=mp4]/' \ 'bv*[ext=mp4]+ba[ext=m4a]/' \ 'b[ext=mp4]/' \ 'bv*+ba/' \ 'b' def __init__(self, bot): super().__init__( bot, config_prefix='linkpreview', short_description='Manages video link preview behavior.', ) Self = VideoPreviewCog self.add_setting(Self.SETTING_ENABLED) self.add_setting(Self.SETTING_DELAY) self.add_setting(Self.SETTING_INSTAGRAM) self.add_setting(Self.SETTING_FACEBOOK) self.add_setting(Self.SETTING_TWITTER) @Cog.listener() async def on_message(self, message: Message): """Event listener""" if message.author is None or \ message.author.bot or \ message.guild is None or \ message.channel is None or \ message.content is None: return if not self.get_guild_setting(message.guild, self.SETTING_ENABLED): return links = self._get_previewable_links(message) if len(links) == 0: return await self._wait_for_preview(message, links) # TODO: Make this just link to the raw video file if possible (yt-dlp --get-url) def _get_previewable_links(self, message: Message) -> list[MessageLink]: Self = VideoPreviewCog links: list[MessageLink] = [] content: str = message.content has_spoilers = re.match(Self.REGEX_SPOILERS, content) is not None if self.get_guild_setting(message.guild, Self.SETTING_INSTAGRAM): for link in re.findall(Self.REGEX_INSTAGRAM_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'instagram', has_spoilers)) if self.get_guild_setting(message.guild, Self.SETTING_FACEBOOK): for link in re.findall(Self.REGEX_FACEBOOK_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'facebook', has_spoilers)) if self.get_guild_setting(message.guild, Self.SETTING_TWITTER): for link in re.findall(Self.REGEX_TWITTER_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'twitter', has_spoilers)) # TODO: Custom patterns return links async def _wait_for_preview(self, message: Message, links: list[MessageLink]): Self = VideoPreviewCog delay: timedelta = self.get_guild_setting(message.guild, Self.SETTING_DELAY) await asyncio.sleep(delay.total_seconds) # Look for embeds already showing the video self.log(message.guild, "Checking message for embeds") for embed in message.embeds: if embed.video.url: # If there's any video, skip downloading any previews self.log(message.guild, "Message already has a video. Skipping this message.") return await self._fetch_previews(message, links) async def _fetch_previews(self, message: Message, links: list[MessageLink]): promises = [] for link in links: promises.append(self._fetch_preview(message, link)) await asyncio.gather(*promises) async def _fetch_preview(self, message: Message, link: MessageLink): result = subprocess.run( [ 'yt-dlp', '--skip-download', '--dump-single-json', link.url, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) if result.returncode != 0: self.log(message.guild, "Fetching link info JSON failed. Skipping preview.") self.log(message.guild, result.stderr) return try: info: dict = json.loads(result.stdout) except Exception as e: self.log(message.guild, f"Error parsing info.json. Skipping preview. {e}") return description = info.get('description') or '' formats: list[dict] = info.get('formats') or [] self.log(message.guild, f"Found {len(formats)} formats") formats = list(filter(filter_video_format, formats)) self.log(message.guild, f"Filtered to {len(formats)} formats") sorted_formats: list[dict] = sorted(formats, key=rank_video_format, reverse=True) if len(sorted_formats) == 0: self.log(message.guild, f"No eligible formats for URL {link.url}") return best_format: dict = sorted_formats[0] self.log(message.guild, f"Best format is id {best_format.get('format_id')}") video_url: str = best_format.get('url') link_description: str = "video" if (best_format.get('width') or 0) > 0 and (best_format.get('height') or 0) > 0: link_description += f", {best_format.get('width')}×{best_format.get('height')}" if (best_format.get('filesize') or 0) > 0: link_description += f", {format_bytes(best_format.get('filesize'))}" elif (best_format.get('filesize_approx') or 0) > 0: link_description += f", {format_bytes(best_format.get('filesize_approx'))}" content = "Hmm, video preview didn't load. Let's try this." if len(description) > 0: content += "\n\n" + blockquote_markdown(suppress_markdown_url_previews(description)) + "\n" if link.spoiler: content += f"\n||[{link_description}]({video_url})||" else: content += f"\n[{link_description}]({video_url})" await message.reply( content, mention_author=False )