feat: migration scripts
This commit is contained in:
parent
668d5d2415
commit
4a4fd5492d
1 changed files with 302 additions and 0 deletions
302
kruhobot/cogs/_migrate.py
Normal file
302
kruhobot/cogs/_migrate.py
Normal file
|
|
@ -0,0 +1,302 @@
|
|||
|
||||
import asyncio
|
||||
import re
|
||||
from typing import Callable
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
|
||||
TOKEN = "MTQxOTQxNDM4OTIyODA0ODQyNA.GXr4xn.x5WWZHcRtjoY8MswVluzJp-e8ndC-tU1fyUJRw"
|
||||
AUTHORIZED_USERS = {
|
||||
251710204238888960,
|
||||
}
|
||||
AUTHORIZED_GUILDS = [
|
||||
1419405238951088210, # TEST
|
||||
1275128046906773601, # KRUHY
|
||||
]
|
||||
KRUH_ROLE_PATTERN = r"^Kruh [0-9]{2}$"
|
||||
MIGRATED_KRUH_ROLE_PATTERN = r"^Loňský Kruh [0-9]{2}$"
|
||||
|
||||
|
||||
intents = discord.Intents.default()
|
||||
intents.members = True
|
||||
intents.message_content = True
|
||||
bot = commands.Bot(command_prefix='!', intents=intents)
|
||||
|
||||
|
||||
def _build_category_mapping(guild: discord.Guild) -> dict[str, discord.Guild]:
|
||||
return {category.name: category for category in guild.categories}
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
guild = bot.get_guild(1419405238951088210)
|
||||
if guild is None:
|
||||
print("Guild not found.")
|
||||
return
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def migrate(ctx: discord.ApplicationContext,
|
||||
role: discord.Role,
|
||||
):
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
if not re.match(KRUH_ROLE_PATTERN, role.name):
|
||||
await ctx.respond("Unsupported role!")
|
||||
return
|
||||
|
||||
category_mapping = _build_category_mapping(ctx.guild)
|
||||
if not role.name in category_mapping:
|
||||
await ctx.respond("Matching category not found!")
|
||||
return
|
||||
|
||||
original_name = role.name
|
||||
new_role = await _migrate_role(role, category_mapping[original_name], lambda name: f"Loňský {name}")
|
||||
await ctx.respond(f"Migrated [{original_name}] to [{new_role.name}]")
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def migrate_all(ctx: discord.ApplicationContext):
|
||||
await ctx.defer()
|
||||
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
kruh_roles = [role for role in guild.roles if re.match(KRUH_ROLE_PATTERN, role.name)]
|
||||
|
||||
guild_categories = {category.name: category for category in guild.categories}
|
||||
kruh_categories = {}
|
||||
for role in kruh_roles:
|
||||
if not role.name in guild_categories:
|
||||
await ctx.respond(f"Category for [{role.name}] not found!")
|
||||
return
|
||||
kruh_categories[role.name] = guild_categories[role.name]
|
||||
|
||||
for role in kruh_roles:
|
||||
original_name = role.name
|
||||
await _migrate_role(role, kruh_categories[original_name], lambda name: f"Loňský {name}")
|
||||
|
||||
await ctx.respond(f"Migrated {len(kruh_roles)} roles")
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def undo_migrate(ctx: discord.ApplicationContext,
|
||||
role: discord.Role,
|
||||
):
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
if not re.match(MIGRATED_KRUH_ROLE_PATTERN, role.name):
|
||||
await ctx.respond("Unsupported role!")
|
||||
return
|
||||
|
||||
category_mapping = _build_category_mapping(ctx.guild)
|
||||
if not role.name in category_mapping:
|
||||
await ctx.respond("Matching category not found!")
|
||||
return
|
||||
|
||||
original_name = role.name
|
||||
new_role = await _migrate_role(role, category_mapping[original_name], lambda name: name[7:])
|
||||
await ctx.respond(f"Migrated [{original_name}] to [{new_role.name}]")
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def undo_migrate_all(ctx: discord.ApplicationContext):
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
kruh_roles = [role for role in guild.roles if re.match(MIGRATED_KRUH_ROLE_PATTERN, role.name)]
|
||||
|
||||
guild_categories = {category.name: category for category in guild.categories}
|
||||
kruh_categories = {}
|
||||
for role in kruh_roles:
|
||||
if not role.name in guild_categories:
|
||||
await ctx.respond(f"Category for [{role.name}] not found!")
|
||||
return
|
||||
kruh_categories[role.name] = guild_categories[role.name]
|
||||
|
||||
for role in kruh_roles:
|
||||
original_name = role.name
|
||||
await _migrate_role(role, kruh_categories[original_name], lambda name: name[7:])
|
||||
|
||||
await ctx.respond(f"Migrated {len(kruh_roles)} roles")
|
||||
|
||||
|
||||
async def _migrate_role(role: discord.Role, category: discord.CategoryChannel,
|
||||
f_migrate: Callable[[str], str]):
|
||||
original_name = role.name
|
||||
new_name = f_migrate(original_name)
|
||||
new_role = await role.edit(name=new_name)
|
||||
await category.edit(name=new_name)
|
||||
return new_role
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def migrate_check(ctx: discord.ApplicationContext):
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
kruh_roles = [role for role in guild.roles if re.match(KRUH_ROLE_PATTERN, role.name)]
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Roles to migrate",
|
||||
)
|
||||
|
||||
kruh_role_names = "".join(sorted(f"- {role.name}\n" for role in kruh_roles))
|
||||
embed.add_field(name="Roles", value=kruh_role_names, inline=True)
|
||||
|
||||
await ctx.respond("The following roles are ready for migration", embed=embed)
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def create_kruhy(ctx: discord.ApplicationContext):
|
||||
await ctx.defer()
|
||||
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
KRUHY_TO_CREATE = {
|
||||
"fyzika": [
|
||||
11, 12, 13, 14, 15, 16, 17, 18, # FP
|
||||
],
|
||||
"matematika": [
|
||||
19, 20, # MMOP
|
||||
51, 52, 53, 54, 55, 56, 57, 58, # MOMP+MITP
|
||||
61, 62, 63, 64, # MFMP
|
||||
],
|
||||
"informatika": [
|
||||
31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, # IPP
|
||||
],
|
||||
"ucitelstvi": [
|
||||
70, 71, 72, 73, 74, 75, # UC
|
||||
]
|
||||
}
|
||||
OBORY_COLORS = {
|
||||
"matematika": discord.Color.from_rgb(240, 139, 170),
|
||||
"fyzika": discord.Color.from_rgb(55, 196, 229),
|
||||
"informatika": discord.Color.from_rgb(138, 199, 90),
|
||||
"ucitelstvi": discord.Color.from_rgb(245, 191, 105),
|
||||
}
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
if any(re.match(KRUH_ROLE_PATTERN, role.name) for role in guild.roles):
|
||||
await ctx.respond("Kruhy not migrated!")
|
||||
return
|
||||
|
||||
role_everyone = next(iter(role for role in guild.roles if role.name == "@everyone"))
|
||||
|
||||
for obor in KRUHY_TO_CREATE:
|
||||
for kruh_id in KRUHY_TO_CREATE[obor]:
|
||||
name = f"Kruh {kruh_id}"
|
||||
role = await guild.create_role(name=name, color=OBORY_COLORS[obor], hoist=False, mentionable=True)
|
||||
category = await guild.create_category(
|
||||
name=name,
|
||||
overwrites={
|
||||
role_everyone: discord.PermissionOverwrite.from_pair(
|
||||
[],
|
||||
[("view_channel", True)]),
|
||||
role: discord.PermissionOverwrite.from_pair(
|
||||
[("view_channel", True), ("send_messages", True), ("connect", True)],
|
||||
[]),
|
||||
},
|
||||
)
|
||||
channel = await category.create_text_channel(f"obecné-{kruh_id}")
|
||||
|
||||
response = "\n".join(["Created categories and roles for Kruhy:"] + [f"- {obor.capitalize()}: {len(kruhy)}" for obor, kruhy in KRUHY_TO_CREATE.items()])
|
||||
await ctx.respond(response)
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def delete_kruhy(ctx: discord.ApplicationContext):
|
||||
await ctx.defer()
|
||||
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
kruhy_roles = [role for role in guild.roles if re.match(KRUH_ROLE_PATTERN, role.name)]
|
||||
kruhy_categories = [category for category in guild.categories if re.match(KRUH_ROLE_PATTERN, category.name)]
|
||||
|
||||
nums = (len(kruhy_roles), len(kruhy_categories))
|
||||
|
||||
delete_tasks = []
|
||||
for category in kruhy_categories:
|
||||
for channel in category.channels:
|
||||
delete_tasks.append(channel.delete())
|
||||
delete_tasks.append(category.delete())
|
||||
for role in kruhy_roles:
|
||||
delete_tasks.append(role.delete())
|
||||
|
||||
await asyncio.gather(*delete_tasks)
|
||||
|
||||
await ctx.respond(f"Deleted {nums[0]} categories and {nums[1]} roles")
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def rename_migrated(ctx: discord.ApplicationContext):
|
||||
await ctx.defer()
|
||||
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
migrated_roles = [role for role in guild.roles if re.match(MIGRATED_KRUH_ROLE_PATTERN, role.name)]
|
||||
migrated_categories = [category for category in guild.categories if re.match(MIGRATED_KRUH_ROLE_PATTERN, category.name)]
|
||||
|
||||
nums = (len(migrated_roles), len(migrated_categories))
|
||||
|
||||
rename_tasks = []
|
||||
for category in migrated_categories:
|
||||
old_name = category.name
|
||||
new_name = old_name[7:] + " (2024/25)"
|
||||
rename_tasks.append(category.edit(name=new_name))
|
||||
for role in migrated_roles:
|
||||
old_name = category.name
|
||||
new_name = old_name[7:] + " (2024/25)"
|
||||
rename_tasks.append(role.edit(name=new_name))
|
||||
|
||||
await asyncio.gather(*rename_tasks)
|
||||
|
||||
await ctx.respond(f"Renamed {nums[0]} categories and {nums[1]} roles")
|
||||
|
||||
|
||||
@bot.slash_command(guild_ids=AUTHORIZED_GUILDS)
|
||||
async def fetch_kruhy_ids(ctx: discord.ApplicationContext):
|
||||
user: discord.Member = ctx.user
|
||||
if not user.id in AUTHORIZED_USERS:
|
||||
await ctx.respond("Unauthorized!")
|
||||
return
|
||||
|
||||
guild: discord.Guild = ctx.guild
|
||||
|
||||
kruh_roles = [role for role in guild.roles if re.match(KRUH_ROLE_PATTERN, role.name)]
|
||||
for role in sorted(kruh_roles, key=lambda x: x.name):
|
||||
print(role.name, role.id)
|
||||
|
||||
await ctx.respond("Done")
|
||||
|
||||
|
||||
# Run the bot
|
||||
bot.run(TOKEN)
|
||||
Loading…
Reference in a new issue