Experimental Discord bot written in Python
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

videopreviewcog.py 7.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import asyncio
  2. import json
  3. import re
  4. import subprocess
  5. from datetime import timedelta
  6. from discord import Message
  7. from discord.ext.commands import Cog
  8. from rocketbot.cogs.basecog import BaseCog
  9. from rocketbot.cogsetting import CogSetting
  10. from rocketbot.utils import (
  11. blockquote_markdown,
  12. format_bytes,
  13. suppress_markdown_url_previews,
  14. )
  15. def filter_video_format(format: dict) -> bool:
  16. if format.get('resolution') == 'audio only':
  17. return False
  18. if format.get('format_note') == 'DASH audio':
  19. return False
  20. return True
  21. def rank_video_format(format: dict) -> tuple:
  22. content = 0
  23. if format.get('resolution') == 'audio only':
  24. content = 1
  25. elif format.get('format_note') == 'DASH audio':
  26. content = 1
  27. elif format.get('format_note') == 'DASH video':
  28. content = 2
  29. else:
  30. content = 3 # both I guess! multiplexed formats don't seem clearly labeled
  31. res = (format.get('width') or 0) + (format.get('height') or 0)
  32. return (content, res)
  33. class MessageLink:
  34. url: str
  35. spoiler: bool = False
  36. link_type: str = 'unknown'
  37. def __init__(self, url: str, link_type: str, spoiler: bool = False):
  38. self.url = url
  39. self.link_type = link_type
  40. self.spoiler = spoiler
  41. class VideoPreviewCog(BaseCog, name='Video Link Previews'):
  42. SETTING_ENABLED = CogSetting(
  43. 'enabled',
  44. bool,
  45. default_value=False,
  46. brief='Video link previews',
  47. description='Whether links to certain social media videos should show previews.',
  48. )
  49. SETTING_DELAY = CogSetting(
  50. 'delay',
  51. timedelta,
  52. default_value=timedelta(seconds=3),
  53. brief='delay before attempting to fetch a preview',
  54. description='How long to wait after a message is posted to see if Discord successfully loads a video preview',
  55. min_value=timedelta(seconds=0),
  56. max_value=timedelta(seconds=60)
  57. )
  58. SETTING_INSTAGRAM = CogSetting(
  59. 'instagram',
  60. bool,
  61. default_value=False,
  62. brief='whether to show video previews for Instagram links',
  63. description='For both regular posts and reels',
  64. )
  65. SETTING_FACEBOOK = CogSetting(
  66. 'facebook',
  67. bool,
  68. default_value=False,
  69. brief='whether to show video previews for Facebook links',
  70. )
  71. SETTING_TWITTER = CogSetting(
  72. 'twitter',
  73. bool,
  74. default_value=False,
  75. brief='whether to show video previews for Twitter links',
  76. )
  77. REGEX_INSTAGRAM_POST = r'(https?:\/\/(?:www\.)?)\w*(instagram\.com\/\w+/\w+\/?)'
  78. REGEX_FACEBOOK_POST = r'(https?:\/\/(?:www\.)?)\w*(facebook\.com(?:\/\w+)+\/\w+\/?)'
  79. REGEX_TWITTER_POST = r'(https?:\/\/(?:www\.)?)\w*((?:twitter|x)\.com\/\w+\/status\/[0-9]+)'
  80. REGEX_SPOILERS = '\|\|.+\|\|'
  81. # Best video and best audio, mp4 format with m4a audio
  82. FORMATS = 'bv*[ext=mp4]+ba[ext=m4a]/' \
  83. 'b[ext=mp4]/' \
  84. 'bv*[ext=mp4]+ba[ext=m4a]/' \
  85. 'b[ext=mp4]/' \
  86. 'bv*+ba/' \
  87. 'b'
  88. def __init__(self, bot):
  89. super().__init__(
  90. bot,
  91. config_prefix='linkpreview',
  92. short_description='Manages video link preview behavior.',
  93. )
  94. Self = VideoPreviewCog
  95. self.add_setting(Self.SETTING_ENABLED)
  96. self.add_setting(Self.SETTING_DELAY)
  97. self.add_setting(Self.SETTING_INSTAGRAM)
  98. self.add_setting(Self.SETTING_FACEBOOK)
  99. self.add_setting(Self.SETTING_TWITTER)
  100. @Cog.listener()
  101. async def on_message(self, message: Message):
  102. """Event listener"""
  103. if message.author is None or \
  104. message.author.bot or \
  105. message.guild is None or \
  106. message.channel is None or \
  107. message.content is None:
  108. return
  109. if not self.get_guild_setting(message.guild, self.SETTING_ENABLED):
  110. return
  111. links = self._get_previewable_links(message)
  112. if len(links) == 0:
  113. return
  114. await self._wait_for_preview(message, links)
  115. # TODO: Make this just link to the raw video file if possible (yt-dlp --get-url)
  116. def _get_previewable_links(self, message: Message) -> list[MessageLink]:
  117. Self = VideoPreviewCog
  118. links: list[MessageLink] = []
  119. content: str = message.content
  120. has_spoilers = re.match(Self.REGEX_SPOILERS, content) is not None
  121. if self.get_guild_setting(message.guild, Self.SETTING_INSTAGRAM):
  122. for link in re.findall(Self.REGEX_INSTAGRAM_POST, content):
  123. url = link[0] + link[1]
  124. links.append(MessageLink(url, 'instagram', has_spoilers))
  125. if self.get_guild_setting(message.guild, Self.SETTING_FACEBOOK):
  126. for link in re.findall(Self.REGEX_FACEBOOK_POST, content):
  127. url = link[0] + link[1]
  128. links.append(MessageLink(url, 'facebook', has_spoilers))
  129. if self.get_guild_setting(message.guild, Self.SETTING_TWITTER):
  130. for link in re.findall(Self.REGEX_TWITTER_POST, content):
  131. url = link[0] + link[1]
  132. links.append(MessageLink(url, 'twitter', has_spoilers))
  133. # TODO: Custom patterns
  134. return links
  135. async def _wait_for_preview(self, message: Message, links: list[MessageLink]):
  136. Self = VideoPreviewCog
  137. await asyncio.sleep(self.get_guild_setting(message.guild, Self.SETTING_DELAY))
  138. # Look for embeds already showing the video
  139. self.log(message.guild, "Checking message for embeds")
  140. for embed in message.embeds:
  141. if embed.video.url:
  142. # If there's any video, skip downloading any previews
  143. self.log(message.guild, "Message already has a video. Skipping this message.")
  144. return
  145. await self._fetch_previews(message, links)
  146. async def _fetch_previews(self, message: Message, links: list[MessageLink]):
  147. promises = []
  148. for link in links:
  149. promises.append(self._fetch_preview(message, link))
  150. await asyncio.gather(*promises)
  151. async def _fetch_preview(self, message: Message, link: MessageLink):
  152. result = subprocess.run(
  153. [
  154. 'yt-dlp',
  155. '--skip-download',
  156. '--dump-single-json',
  157. link.url,
  158. ],
  159. stdout=subprocess.PIPE,
  160. stderr=subprocess.PIPE,
  161. universal_newlines=True
  162. )
  163. if result.returncode != 0:
  164. self.log(message.guild, "Fetching link info JSON failed. Skipping preview.")
  165. self.log(message.guild, result.stderr)
  166. return
  167. try:
  168. info: dict = json.loads(result.stdout)
  169. except Exception as e:
  170. self.log(message.guild, f"Error parsing info.json. Skipping preview. {e}")
  171. return
  172. description = info.get('description') or ''
  173. formats: list[dict] = info.get('formats') or []
  174. self.log(message.guild, f"Found {len(formats)} formats")
  175. formats = list(filter(filter_video_format, formats))
  176. self.log(message.guild, f"Filtered to {len(formats)} formats")
  177. sorted_formats: list[dict] = sorted(formats, key=rank_video_format, reverse=True)
  178. if len(sorted_formats) == 0:
  179. self.log(message.guild, f"No eligible formats for URL {link.url}")
  180. return
  181. best_format: dict = sorted_formats[0]
  182. self.log(message.guild, f"Best format is id {best_format.get('format_id')}")
  183. video_url: str = best_format.get('url')
  184. link_description: str = "video"
  185. if (best_format.get('width') or 0) > 0 and (best_format.get('height') or 0) > 0:
  186. link_description += f", {best_format.get('width')}×{best_format.get('height')}"
  187. if (best_format.get('filesize') or 0) > 0:
  188. link_description += f", {format_bytes(best_format.get('filesize'))}"
  189. elif (best_format.get('filesize_approx') or 0) > 0:
  190. link_description += f", {format_bytes(best_format.get('filesize_approx'))}"
  191. content = "Hmm, video preview didn't load. Let's try this."
  192. if len(description) > 0:
  193. content += "\n\n" + blockquote_markdown(suppress_markdown_url_previews(description)) + "\n"
  194. if link.spoiler:
  195. content += f"\n||[{link_description}]({video_url})||"
  196. else:
  197. content += f"\n[{link_description}]({video_url})"
  198. await message.reply(
  199. content,
  200. mention_author=False
  201. )