from disnake.ext import commands, tasks import disnake from main import StronkBot from loguru import logger from datetime import datetime, timedelta from embed_factory import vm_advert import httpx from thefuzz import process class Vm_tracker(commands.Cog): def __init__(self,bot:StronkBot): self.bot = bot self.update_vm_embeds.start() def cog_unload(self): self.update_vm_embeds.cancel() @commands.slash_command(description="Report someone going in VM") async def report_vm(self,inter:disnake.ApplicationCommandInteraction, in_game_name:str): logger.info(f"{inter.application_command.name} used by {inter.author} using {inter.filled_options}") inter.response.defer(ephemeral=True) async with httpx.AsyncClient() as client: response = await client.get(f'http://login.strongholdkingdoms.com/ajaxphp/username_search.php?term={in_game_name}') if in_game_name not in response.json(): return await inter.send(content=f"You submitted {in_game_name} this name does not seem valid. Make sure the name has the right capitalization. Or the player is banned") #See if this is a duplicate check = await self.bot.database.vm_entries.find_one({"in_game_name": in_game_name,"guild_id": inter.guild_id, "finished": {"$exists" : False}}) if check is not None: return await inter.send(content="Good news, this is already reported! :D") await self.bot.database.vm_entries.insert_one( {"guild_id": inter.guild_id, "in_game_name": in_game_name, "added": datetime.now()} ) advert = await self.bot.database.adverts.find_one({"advert_type":"vm_tracker", "guild_id": inter.guild_id}) if advert is not None: await self.update_adverts([advert]) await inter.send(content=f"Thank you! {in_game_name} has been registered") @commands.slash_command(description="Edit a VM entry on this server") async def edit_vm(self,inter:disnake.ApplicationCommandInteraction, in_game_name:str, action:str = commands.Param(choices=["Remove", "Adjust Start Time"], description="If you choose adjust time add the optional 'Value' argument"), value:int = commands.Param(default= 0 ,description="In hours how much do you want to adjust the start time. Negative numbers go forward in time")): logger.info(f"{inter.application_command.name} used by {inter.author} using {inter.filled_options}") # Get the list of players this server is allowed to edit. filter = { "added": { "$gte": datetime(datetime.now().year - 1, 12, 15), "$lte": datetime(datetime.now().year + 1, 1, 15) }, "guild_id": inter.guild_id, "finished": {"$exists" : False} } vm_entries = await self.bot.database.vm_entries.find(filter, {"_id": 0, "in_game_name": 1} ).to_list(length=None) eligible_names = [entry["in_game_name"] for entry in vm_entries] filter.update({"in_game_name": in_game_name}) if in_game_name not in eligible_names: return await inter.send(ephemeral=True, content=f"You submitted {in_game_name} this name does not seem valid. You can only edit entries submitted by your server") if action == "Remove": await self.bot.database.vm_entries.update_one(filter, {"$set": {"finished": datetime.now()}}) elif action == "Adjust Start Time": await self.bot.database.vm_entries.update_one(filter, [{"$set": {"added": { "$add": ["$added", -value*3600000]}}}]) advert = await self.bot.database.adverts.find_one({"advert_type":"vm_tracker", "guild_id": inter.guild_id}) if advert is not None: await self.update_adverts([advert]) await inter.response.send_message(ephemeral=True,content=f"Thank you! {in_game_name} has been edited") @edit_vm.autocomplete("in_game_name") async def autocomp_input(self, inter:disnake.ApplicationCommandInteraction,user_input:str): if len(user_input) == 0: return [] filter = { "added": { "$gte": datetime(datetime.now().year - 1, 12, 15), "$lte": datetime(datetime.now().year + 1, 1, 15) }, "guild_id": inter.guild_id, "finished": {"$exists" : False} } vm_entries = await self.bot.database.vm_entries.find(filter, {"_id": 0, "in_game_name": 1} ).to_list(length=None) eligible_names = [entry["in_game_name"] for entry in vm_entries] return [option[0] for option in process.extract(user_input,set(eligible_names),limit=4)] @tasks.loop(hours=1) async def update_vm_embeds(self) -> None: await self.bot.wait_until_ready() logger.info(f"---> Updating VM adverts") #Check if any entries have expired result = await self.bot.database.vm_entries.update_many({"added": {"$lt": datetime.now()-timedelta(days=15)}, "finished" : { "$exists" : False }},{"$set":{"finished": datetime.now()}}) adverts = await self.bot.database.adverts.find({"advert_type": "vm_tracker"}).to_list(length=None) #TODO Enable some way to switch what world house colors should be pulled from. As servers might have more than one world listed. await self.update_adverts(adverts) async def update_adverts(self,adverts:list): for advert in adverts: #Make sure the guild is still present target_guild = self.bot.get_guild(advert["guild_id"]) if not target_guild: logger.error(f"Tried updating VM advert in {advert['guild_id']} but its not present in the bot") continue #TODO Note infraction for later cleanup if this repeats. target_channel = target_guild.get_channel(advert["channel_id"]) if not target_channel: logger.error(f"Tried updating VM advert in {target_guild.name} but the channel is not present in the guild") continue #TODO Note infraction, warn the server owner. Repeated failures will result in advert removal #Collect what world(s) the guild is looking at current_server_document = await self.bot.database.servers.find_one({"_id": advert["guild_id"]}) world_list:dict = current_server_document.get("worlds", None) vm_entries = await self.collect_server_vms(guild_id=advert["guild_id"],all=True) vm_entries = sorted(vm_entries, key=lambda x: x["added"]) #Get the relevant house affiliations if applicable house_data = None house_relations = None if len(world_list) > 0: house_data = await self.bot.database.house_history.find({"timestamp":{"$gte":datetime.now()- timedelta(hours=2)}, "in_game_name": {"$in":[entry["in_game_name"] for entry in vm_entries]}, "world":next(iter(world_list))}).to_list(length=None) house_data = {doc["in_game_name"]: doc for doc in house_data} house_relations = await self.bot.database.relationships.find({"guild_id": advert["guild_id"], "type": "House", "world": {"$in": [next(iter(world_list)),None]}}).to_list(length=None) relations = {doc["target"]: doc["state"] for doc in house_relations} player_relations = await self.bot.database.relationships.find({"guild_id": advert["guild_id"], "type": "Player", "world": {"$in": [next(iter(world_list)),None]}}).to_list(length=None) relations.update({doc["target"]: doc["state"] for doc in player_relations}) embed = vm_advert.generate_embed(bot = self.bot, guild_id = advert["guild_id"],vm_entries=vm_entries,house_data=house_data,relations=relations) target_message_id = advert.get("message_id", None) if target_message_id is None: sent_message = await target_channel.send(embed=embed) #Add new message to advert for future updates await self.bot.database.adverts.update_one({"guild_id":advert["guild_id"],"channel_id":advert["channel_id"]},{"$set": {"message_id":sent_message.id}}) continue else: target_message = await target_channel.fetch_message(target_message_id) await target_message.edit(embed=embed,components=None) async def collect_server_vms(self,guild_id:int, all=False): from_date = datetime(datetime.now().year - 1, 12, 15) to_date = datetime(datetime.now().year + 1, 1, 15) # Build the filter for the query filter_query = { "added": { "$gte": from_date, "$lte": to_date }, "guild_id": guild_id, } if not all: filter_query.update({"finished": {"$exists" : False}}) return await self.bot.database.vm_entries.find(filter_query).to_list(length=None) @commands.Cog.listener() async def on_button_click(self, inter:disnake.MessageInteraction): if "vm_tracker" not in inter.component.custom_id: return logger.info(f"{inter.component.custom_id} used by {inter.author}") await inter.response.defer(with_message=False) await self.bot.database.vm await self.bot.database.adverts.update_one({"advert_type":"vm_tracker", "guild_id": inter.guild_id}, [ { "$set": {"private": {"$not": "$private" } } } ] ) advert = await self.bot.database.adverts.find_one({"advert_type":"vm_tracker", "guild_id": inter.guild_id}) if advert is not None: await self.update_adverts([advert]) def setup(bot): logger.info(f"{__name__} Loaded") bot.add_cog(Vm_tracker(bot))