Experimental Discord bot written in Python
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

storage.py 6.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """
  2. Handles storage of persisted and non-persisted data for the bot.
  3. """
  4. import json
  5. from datetime import datetime, timezone, timedelta
  6. from os.path import exists
  7. from typing import Any, Optional
  8. from discord import Guild
  9. from config import CONFIG
  10. from rocketbot.collections import AgeBoundDict
  11. class ConfigKey:
  12. """
  13. Common keys in persisted guild storage.
  14. """
  15. WARNING_CHANNEL_ID = 'warning_channel_id'
  16. WARNING_MENTION = 'warning_mention'
  17. class Storage:
  18. """
  19. Static class for managing persisted bot configuration and transient state
  20. on a per-guild basis.
  21. """
  22. # -- Transient state management -----------------------------------------
  23. __guild_id_to_state: dict[int, dict[str, Any]] = {}
  24. @classmethod
  25. def get_state(cls, guild: Guild) -> dict[str, Any]:
  26. """
  27. Returns transient state for the given guild. This state is not preserved
  28. if the bot is restarted.
  29. """
  30. state: dict[str, Any] = cls.__guild_id_to_state.get(guild.id)
  31. if state is None:
  32. state = {}
  33. cls.__guild_id_to_state[guild.id] = state
  34. return state
  35. @classmethod
  36. def get_state_value(cls, guild: Guild, key: str) -> Optional[Any]:
  37. """
  38. Returns a state value for the given guild and key, or `None` if not set.
  39. """
  40. return cls.get_state(guild).get(key)
  41. @classmethod
  42. def set_state_value(cls, guild: Guild, key: str, value: Optional[Any]) -> None:
  43. """
  44. Updates a transient value associated with the given guild and key name.
  45. A value of `None` removes any previous value for that key.
  46. """
  47. cls.set_state_values(guild, { key: value })
  48. @classmethod
  49. def set_state_values(cls, guild: Guild, values: Optional[dict[str, Optional[Any]]]) -> None:
  50. """
  51. Merges in a set of key-value pairs into the transient state for the
  52. given guild. Any pairs with a value of `None` will be removed from the
  53. transient state.
  54. """
  55. if values is None or len(values) == 0:
  56. return
  57. state: dict[str, Any] = cls.get_state(guild)
  58. for key, value in values.items():
  59. if value is None:
  60. del state[key]
  61. else:
  62. state[key] = value
  63. # XXX: Superstitious. Should update by ref already but saw weirdness once.
  64. cls.__guild_id_to_state[guild.id] = state
  65. # -- Persisted configuration management ---------------------------------
  66. # discord.Guild.id -> dict
  67. __guild_id_to_config: dict[int, dict[str, Any]] = {}
  68. @classmethod
  69. def get_config(cls, guild: Guild) -> dict[str, Any]:
  70. """
  71. Returns all persisted configuration for the given guild.
  72. """
  73. config: dict[str, Any] = cls.__guild_id_to_config.get(guild.id)
  74. if config is not None:
  75. # Already in memory
  76. return config
  77. # Load from disk if possible
  78. cls.__trace(f'No loaded config for guild {guild.id}. Attempting to ' +
  79. 'load from disk.')
  80. config = cls.__read_guild_config(guild)
  81. if config is None:
  82. config = {}
  83. cls.__guild_id_to_config[guild.id] = config
  84. return config
  85. @classmethod
  86. def get_config_value(cls, guild: Guild, key: str) -> Optional[Any]:
  87. """
  88. Returns a persisted guild config value stored under the given key.
  89. Returns `None` if not present.
  90. """
  91. return cls.get_config(guild).get(key)
  92. @classmethod
  93. def set_config_value(cls, guild: Guild, key: str, value: Optional[Any]) -> None:
  94. """
  95. Adds/updates the given key-value pair to the persisted config for the
  96. given Guild. If `value` is `None` the key will be removed from persisted
  97. config.
  98. """
  99. cls.set_config_values(guild, { key: value })
  100. @classmethod
  101. def set_config_values(cls, guild: Guild, values: Optional[dict[str, Optional[Any]]]) -> None:
  102. """
  103. Merges the given `values` dict with the saved config for the given guild
  104. and writes it to disk. `values` must be JSON-encodable or a `ValueError`
  105. will be raised. Keys with associated values of `None` will be removed
  106. from the persisted config.
  107. """
  108. if values is None or len(values) == 0:
  109. return
  110. config: dict[str, Any] = cls.get_config(guild)
  111. try:
  112. json.dumps(values)
  113. except Exception as e:
  114. raise ValueError(f'values not JSON encodable - {values}') from e
  115. for key, value in values.items():
  116. if value is None:
  117. del config[key]
  118. else:
  119. config[key] = value
  120. cls.__write_guild_config(guild, config)
  121. @classmethod
  122. def get_bot_messages(cls, guild: Guild) -> AgeBoundDict[int, Any, datetime, timedelta]:
  123. """Returns all the bot messages for a guild."""
  124. bm = cls.get_state_value(guild, 'bot_messages')
  125. if bm is None:
  126. far_future = datetime.now(timezone.utc) + timedelta(days=1000)
  127. bm = AgeBoundDict(timedelta(seconds=600),
  128. lambda k, v : v.message_sent_at() or far_future)
  129. Storage.set_state_value(guild, 'bot_messages', bm)
  130. return bm
  131. @classmethod
  132. def __write_guild_config(cls, guild: Guild, config: dict[str, Any]) -> None:
  133. """
  134. Saves config for a guild to a JSON file on disk.
  135. """
  136. path: str = cls.__guild_config_path(guild)
  137. cls.__trace(f'Saving config for guild {guild.id} to {path}')
  138. cls.__trace(f'config = {config}')
  139. config['_guild_name'] = guild.name # Just for making JSON files easier to identify
  140. with open(path, 'w', encoding='utf8') as file:
  141. # Pretty printing to make more legible for debugging
  142. # Sorting keys to help with diffs
  143. json.dump(config, file, indent='\t', sort_keys=True)
  144. cls.__trace('State saved')
  145. @classmethod
  146. def __read_guild_config(cls, guild: Guild) -> Optional[dict[str, Any]]:
  147. """
  148. Loads config for a guild from a JSON file on disk, or `None` if not
  149. found.
  150. """
  151. path: str = cls.__guild_config_path(guild)
  152. if not exists(path):
  153. cls.__trace(f'No config on disk for guild {guild.id}. Returning None.')
  154. return None
  155. cls.__trace(f'Loading config from disk for guild {guild.id}')
  156. with open(path, 'r', encoding='utf8') as file:
  157. config = json.load(file)
  158. cls.__trace('State loaded')
  159. return config
  160. @classmethod
  161. def __guild_config_path(cls, guild: Guild) -> str:
  162. """
  163. Returns the JSON file path where guild config should be written.
  164. """
  165. config_value: str = CONFIG['config_path']
  166. path: str = config_value if config_value.endswith('/') else f'{config_value}/'
  167. return f'{path}guild_{guild.id}.json'
  168. @classmethod
  169. def __trace(cls, message: Any) -> None:
  170. # print(f'{cls.__name__}: {str(message)}')
  171. pass