import asyncio import json import re import subprocess from discord import Message from discord.ext.commands import Cog from rocketbot.cogs.basecog import BaseCog from rocketbot.cogsetting import CogSetting from rocketbot.utils import ( blockquote_markdown, format_bytes, suppress_markdown_url_previews, ) def filter_video_format(format: dict) -> bool: if format.get('resolution') == 'audio only': return False if format.get('format_note') == 'DASH audio': return False return True def rank_video_format(format: dict) -> tuple: content = 0 if format.get('resolution') == 'audio only': content = 1 elif format.get('format_note') == 'DASH audio': content = 1 elif format.get('format_note') == 'DASH video': content = 2 else: content = 3 # both I guess! multiplexed formats don't seem clearly labeled res = (format.get('width') or 0) + (format.get('height') or 0) return (content, res) class MessageLink: url: str spoiler: bool = False link_type: str = 'unknown' def __init__(self, url: str, link_type: str, spoiler: bool = False): self.url = url self.link_type = link_type self.spoiler = spoiler class VideoPreviewCog(BaseCog, name='Video Link Previews'): SETTING_ENABLED = CogSetting( 'enabled', bool, default_value=False, brief='Video link previews', description='Whether links to certain social media videos should show previews.', ) SETTING_INSTAGRAM = CogSetting( 'instagram', bool, default_value=False, brief='whether to show video previews for Instagram links', description='For both regular posts and reels', ) SETTING_FACEBOOK = CogSetting( 'facebook', bool, default_value=False, brief='whether to show video previews for Facebook links', ) SETTING_TWITTER = CogSetting( 'twitter', bool, default_value=False, brief='whether to show video previews for Twitter links', ) REGEX_INSTAGRAM_POST = r'(https?:\/\/(?:www\.)?)\w*(instagram\.com\/\w+/\w+\/?)' REGEX_FACEBOOK_POST = r'(https?:\/\/(?:www\.)?)\w*(facebook\.com(?:\/\w+)+\/\w+\/?)' REGEX_TWITTER_POST = r'(https?:\/\/(?:www\.)?)\w*((?:twitter|x)\.com\/\w+\/status\/[0-9]+)' REGEX_SPOILERS = '\|\|.+\|\|' # Best video and best audio, mp4 format with m4a audio FORMATS = 'bv*[ext=mp4]+ba[ext=m4a]/' \ 'b[ext=mp4]/' \ 'bv*[ext=mp4]+ba[ext=m4a]/' \ 'b[ext=mp4]/' \ 'bv*+ba/' \ 'b' def __init__(self, bot): super().__init__( bot, config_prefix='linkpreview', short_description='Manages video link preview behavior.', ) Self = VideoPreviewCog self.add_setting(Self.SETTING_ENABLED) self.add_setting(Self.SETTING_INSTAGRAM) self.add_setting(Self.SETTING_FACEBOOK) self.add_setting(Self.SETTING_TWITTER) @Cog.listener() async def on_message(self, message: Message): """Event listener""" if message.author is None or \ message.author.bot or \ message.guild is None or \ message.channel is None or \ message.content is None: return if not self.get_guild_setting(message.guild, self.SETTING_ENABLED): return links = self._get_previewable_links(message) if len(links) == 0: return await self._wait_for_preview(message, links) # TODO: Make this just link to the raw video file if possible (yt-dlp --get-url) def _get_previewable_links(self, message: Message) -> list[MessageLink]: Self = VideoPreviewCog links: list[MessageLink] = [] content: str = message.content has_spoilers = re.match(Self.REGEX_SPOILERS, content) is not None if self.get_guild_setting(message.guild, Self.SETTING_INSTAGRAM): for link in re.findall(Self.REGEX_INSTAGRAM_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'instagram', has_spoilers)) if self.get_guild_setting(message.guild, Self.SETTING_FACEBOOK): for link in re.findall(Self.REGEX_FACEBOOK_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'facebook', has_spoilers)) if self.get_guild_setting(message.guild, Self.SETTING_TWITTER): for link in re.findall(Self.REGEX_TWITTER_POST, content): url = link[0] + link[1] links.append(MessageLink(url, 'twitter', has_spoilers)) # TODO: Custom patterns return links async def _wait_for_preview(self, message: Message, links: list[MessageLink]): await asyncio.sleep(3) # Look for embeds already showing the video self.log(message.guild, "Checking message for embeds") for embed in message.embeds: if embed.video.url: # If there's any video, skip downloading any previews self.log(message.guild, "Message already has a video. Skipping this message.") return await self._fetch_previews(message, links) async def _fetch_previews(self, message: Message, links: list[MessageLink]): promises = [] for link in links: promises.append(self._fetch_preview(message, link)) await asyncio.gather(*promises) async def _fetch_preview(self, message: Message, link: MessageLink): result = subprocess.run( [ 'yt-dlp', '--skip-download', '--dump-single-json', link.url, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) if result.returncode != 0: self.log(message.guild, "Fetching link info JSON failed. Skipping preview.") self.log(message.guild, result.stderr) return try: info: dict = json.loads(result.stdout) except Exception as e: self.log(message.guild, f"Error parsing info.json. Skipping preview. {e}") return description = info.get('description') or '' formats: list[dict] = info.get('formats') or [] self.log(message.guild, f"Found {len(formats)} formats") formats = list(filter(filter_video_format, formats)) self.log(message.guild, f"Filtered to {len(formats)} formats") sorted_formats: list[dict] = sorted(formats, key=rank_video_format, reverse=True) if len(sorted_formats) == 0: self.log(message.guild, f"No eligible formats for URL {link.url}") return best_format: dict = sorted_formats[0] self.log(message.guild, f"Best format is id {best_format.get('format_id')}") video_url: str = best_format.get('url') link_description: str = "video" if (best_format.get('width') or 0) > 0 and (best_format.get('height') or 0) > 0: link_description += f", {best_format.get('width')}×{best_format.get('height')}" if (best_format.get('filesize') or 0) > 0: link_description += f", {format_bytes(best_format.get('filesize'))}" elif (best_format.get('filesize_approx') or 0) > 0: link_description += f", {format_bytes(best_format.get('filesize_approx'))}" content = "Hmm, video preview didn't load. Let's try this." if len(description) > 0: content += "\n\n" + blockquote_markdown(suppress_markdown_url_previews(description)) + "\n" if link.spoiler: content += f"\n||[{link_description}]({video_url})||" else: content += f"\n[{link_description}]({video_url})" await message.reply( content, mention_author=False )