| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import asyncio
- import json
- import re
- import subprocess
-
- from discord import Message
- from discord.ext.commands import Cog
-
- from rocketbot.cogs.basecog import BaseCog
- from rocketbot.cogsetting import CogSetting
- from rocketbot.utils import (
- blockquote_markdown,
- format_bytes,
- suppress_markdown_url_previews,
- )
-
-
- def filter_video_format(format: dict) -> bool:
- if format.get('resolution') == 'audio only':
- return False
- if format.get('format_note') == 'DASH audio':
- return False
- return True
-
- def rank_video_format(format: dict) -> tuple:
- content = 0
- if format.get('resolution') == 'audio only':
- content = 1
- elif format.get('format_note') == 'DASH audio':
- content = 1
- elif format.get('format_note') == 'DASH video':
- content = 2
- else:
- content = 3 # both I guess! multiplexed formats don't seem clearly labeled
- res = (format.get('width') or 0) + (format.get('height') or 0)
- return (content, res)
-
- class MessageLink:
- url: str
- spoiler: bool = False
- link_type: str = 'unknown'
-
- def __init__(self, url: str, link_type: str, spoiler: bool = False):
- self.url = url
- self.link_type = link_type
- self.spoiler = spoiler
-
- class VideoPreviewCog(BaseCog, name='Video Link Previews'):
- SETTING_ENABLED = CogSetting(
- 'enabled',
- bool,
- default_value=False,
- brief='Video link previews',
- description='Whether links to certain social media videos should show previews.',
- )
- SETTING_INSTAGRAM = CogSetting(
- 'instagram',
- bool,
- default_value=False,
- brief='whether to show video previews for Instagram links',
- description='For both regular posts and reels',
- )
- SETTING_FACEBOOK = CogSetting(
- 'facebook',
- bool,
- default_value=False,
- brief='whether to show video previews for Facebook links',
- )
- SETTING_TWITTER = CogSetting(
- 'twitter',
- bool,
- default_value=False,
- brief='whether to show video previews for Twitter links',
- )
-
- REGEX_INSTAGRAM_POST = r'https?:\/\/(?:www\.)?instagram\.com\/(?:p|reel)\/[a-zA-Z0-9_-]+\/?'
- REGEX_FACEBOOK_POST = r'https?:\/\/(?:www\.)?facebook\.com\/share\/[rv]\/[a-zA-Z0-9_-]+\/?'
- REGEX_TWITTER_POST = r'https?:\/\/(?:twitter|x)\.com\/[a-zA-Z0-9_-]+/status/[0-9]+'
-
- REGEX_SPOILERS = '\|\|.+\|\|'
-
- # Best video and best audio, mp4 format with m4a audio
- FORMATS = 'bv*[ext=mp4]+ba[ext=m4a]/' \
- 'b[ext=mp4]/' \
- 'bv*[ext=mp4]+ba[ext=m4a]/' \
- 'b[ext=mp4]/' \
- 'bv*+ba/' \
- 'b'
-
- def __init__(self, bot):
- super().__init__(
- bot,
- config_prefix='linkpreview',
- short_description='Manages video link preview behavior.',
- )
- Self = VideoPreviewCog
- self.add_setting(Self.SETTING_ENABLED)
- self.add_setting(Self.SETTING_INSTAGRAM)
- self.add_setting(Self.SETTING_FACEBOOK)
- self.add_setting(Self.SETTING_TWITTER)
-
- @Cog.listener()
- async def on_message(self, message: Message):
- """Event listener"""
- if message.author is None or \
- message.author.bot or \
- message.guild is None or \
- message.channel is None or \
- message.content is None:
- return
- if not self.get_guild_setting(message.guild, self.SETTING_ENABLED):
- return
- links = self._get_previewable_links(message)
- if len(links) == 0:
- return
- await self._wait_for_preview(message, links)
-
-
- # TODO: Make this just link to the raw video file if possible (yt-dlp --get-url)
-
- def _get_previewable_links(self, message: Message) -> list[MessageLink]:
- Self = VideoPreviewCog
- links: list[MessageLink] = []
- content: str = message.content
- has_spoilers = re.match(Self.REGEX_SPOILERS, content) is not None
- if self.get_guild_setting(message.guild, Self.SETTING_INSTAGRAM):
- for link in re.findall(Self.REGEX_INSTAGRAM_POST, content):
- links.append(MessageLink(link, 'instagram', has_spoilers))
- if self.get_guild_setting(message.guild, Self.SETTING_FACEBOOK):
- for link in re.findall(Self.REGEX_FACEBOOK_POST, content):
- links.append(MessageLink(link, 'facebook', has_spoilers))
- if self.get_guild_setting(message.guild, Self.SETTING_TWITTER):
- for link in re.findall(Self.REGEX_TWITTER_POST, content):
- links.append(MessageLink(link, 'twitter', has_spoilers))
- # TODO: Custom patterns
- return links
-
- async def _wait_for_preview(self, message: Message, links: list[MessageLink]):
- await asyncio.sleep(3)
- # Look for embeds already showing the video
- self.log(message.guild, "Checking message for embeds")
- for embed in message.embeds:
- if embed.video.url:
- # If there's any video, skip downloading any previews
- self.log(message.guild, "Message already has a video. Skipping this message.")
- return
- await self._fetch_previews(message, links)
-
- async def _fetch_previews(self, message: Message, links: list[MessageLink]):
- promises = []
- for link in links:
- promises.append(self._fetch_preview(message, link))
- await asyncio.gather(*promises)
-
- async def _fetch_preview(self, message: Message, link: MessageLink):
- result = subprocess.run(
- [
- 'yt-dlp',
- '--skip-download',
- '--dump-single-json',
- link.url,
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True
- )
- if result.returncode != 0:
- self.log(message.guild, "Fetching link info JSON failed. Skipping preview.")
- self.log(message.guild, result.stderr)
- return
- try:
- info: dict = json.loads(result.stdout)
- except Exception as e:
- self.log(message.guild, f"Error parsing info.json. Skipping preview. {e}")
- return
- description = info.get('description') or ''
- formats: list[dict] = info.get('formats') or []
- self.log(message.guild, f"Found {len(formats)} formats")
- formats = list(filter(filter_video_format, formats))
- self.log(message.guild, f"Filtered to {len(formats)} formats")
- sorted_formats: list[dict] = sorted(formats, key=rank_video_format, reverse=True)
- if len(sorted_formats) == 0:
- self.log(message.guild, f"No eligible formats for URL {link.url}")
- return
- best_format: dict = sorted_formats[0]
- self.log(message.guild, f"Best format is id {best_format.get('format_id')}")
- video_url: str = best_format.get('url')
- link_description: str = "video"
- if (best_format.get('width') or 0) > 0 and (best_format.get('height') or 0) > 0:
- link_description += f", {best_format.get('width')}×{best_format.get('height')}"
- if (best_format.get('filesize') or 0) > 0:
- link_description += f", {format_bytes(best_format.get('filesize'))}"
- elif (best_format.get('filesize_approx') or 0) > 0:
- link_description += f", {format_bytes(best_format.get('filesize_approx'))}"
- content = "Hmm, video preview didn't load. Let's try this."
- if len(description) > 0:
- content += "\n\n" + blockquote_markdown(suppress_markdown_url_previews(description)) + "\n"
- if link.spoiler:
- content += f"\n||[{link_description}]({video_url})||"
- else:
- content += f"\n[{link_description}]({video_url})"
- await message.reply(
- content,
- mention_author=False
- )
|