from ast import Param from email.policy import default from pickle import TRUE from re import L from turtle import update from urllib import response import disnake from disnake.ext import commands, tasks import json import modals import helpers import time import embed_factory import sqlite3 from sqlite3 import Cursor, Error import asyncio import botOptions import bingo import random import math import os from datetime import datetime,timezone import datetime import sbclasses import requests from PIL import Image from io import BytesIO intents = disnake.Intents().all() bot = commands.Bot( command_preifx='%', sync_commands_debug=False, intents=intents ) def store_basic_user(inter:disnake.MessageInteraction): helpers.sql_set("INSERT INTO user(dID,d_name,gID) VALUES(?,?,?) ON CONFLICT(dID) DO NOTHING",(inter.author.id,inter.author.display_name,inter.guild_id)) return #--------------------------- # BOT EVENTS #--------------------------- @bot.event async def on_ready(): print('Logged In') #update_timed_attacks.start() getHouseData.start() update_vm_embed.start() @bot.event async def on_guild_available(guild): # helpers.sql_set('INSERT INTO guild(gID,name) VALUES(?,?) ON CONFLICT DO NOTHING',(guild.id,guild.name)) @bot.event async def on_message_interaction(inter:disnake.MessageInteraction): store_basic_user(inter) print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = inter.component.custom_id)) if inter.component.custom_id == 'llcheck' and not inter.response.is_done(): #await inter.response.defer(ephemeral=True) inter.component.disabled = True ll_data_dict = json.loads(helpers.sql_get("SELECT advert_data FROM guild WHERE gID = {gid}".format(gid=inter.guild.id))[0][0]) ll_channel = inter.guild.get_channel_or_thread(ll_data_dict['ll'][0]) ll_thread_message = await ll_channel.fetch_message(ll_data_dict['ll'][1]) helpers.sql_set("UPDATE llrequests SET fulfilled = ? WHERE messageID = ?",(time.time(),inter.message.id)) button_r_ll = disnake.ui.Button(emoji='<:LiegeLord:1003401484295213066>',custom_id='request_ll',label="Click here to request a Liege Lord") view = disnake.ui.View() view.add_item(button_r_ll) await ll_thread_message.edit(embed= embed_factory.lfll(ll_data_dict['ll'][1]),view=view) await inter.message.delete() if inter.component.custom_id == 'request_ll': playerCP = helpers.sql_get("SELECT cp,prefPikes,prefArchers FROM user WHERE dID = {dID}".format(dID=inter.author.id)) print(playerCP) cp = 0 prefArcher = 300 prefPikes = 200 if len(playerCP): cp = int(playerCP[0][0]) if playerCP[0][1] != None: prefArcher = int(playerCP[0][2]) prefPikes = int(playerCP[0][1]) await inter.response.send_modal(modal=modals.LLModal(cp=cp,pike=prefPikes,archer=prefArcher)) if inter.component.custom_id == 'add_to_attack': #get data from db result = helpers.sql_get("SELECT * FROM times WHERE mID = {mID}".format(mID=inter.message.id),True) print(result) ####---> Vacation Mode buttons ################################################################# if inter.component.custom_id == 'add_vm': await inter.response.send_modal(modals.add_vm()) if inter.component.custom_id == 'rem_vm': #Get list of candidates query_response = helpers.sql_get("SELECT name FROM vm_entries WHERE finished IS NULL") temp_list = [] temp_list2 = [] if len(query_response) == 0: await inter.response.send_message(ephemeral=True,content="Nothing to remove") return for mega,ugh in enumerate(query_response): if mega < 24: temp_list.append(ugh[0]) continue temp_list2.append(ugh[0]) await inter.response.send_modal(modals.rem_vm(temp_list,temp_list2)) ####---> Castle Gallery ################################################################# if inter.component.custom_id == "view_castles_back": await inter.response.defer() user = inter.message.embeds[0].title.partition(' >|<')[0] embed = embed_factory.check_player(user,inter.guild_id) embed_view = disnake.ui.View() castle_design_btn = disnake.ui.Button(label='Castles',emoji='<:Charter:947634559078498374>',custom_id='view_castles') query_response=helpers.sql_get("SELECT castle_designs FROM players WHERE gID = {gID} AND LOWER(name) = LOWER('{user}')".format(gID = inter.guild_id,user=user)) if len(query_response) > 0: if len(query_response[0][0]) > 2: embed_view.add_item(castle_design_btn) await inter.message.edit(view=embed_view,embed=embed,attachments=None) return if 'view_castles' in inter.component.custom_id: await inter.response.defer() if inter.component.custom_id == 'view_castles' : name=inter.message.embeds[0].title.partition('<<')[2].partition('>>')[0] page = 1 else: #print('ping') name=inter.message.embeds[0].title.partition(' >|< Page ')[0] #print(inter.message.embeds[0].title.partition(' >|< Page ')) #print(name) page = int(inter.message.embeds[0].title.partition(' >|< Page ')[2]) filepaths = list(json.loads(helpers.sql_get(f"SELECT castle_designs FROM players WHERE LOWER(name) = LOWER('{name}') and gID = {inter.guild_id}")[0][0]).keys()) if 'next' in inter.component.custom_id: page += 1 if page > len(filepaths): return if 'prev' in inter.component.custom_id: page -= 1 if page < 1: return embed_view = disnake.ui.View() prev_castle_btn = disnake.ui.Button(emoji='⬅️',custom_id='view_castles_prev') back_castle_btn = disnake.ui.Button(emoji='⏏️',custom_id='view_castles_back') next_castle_btn = disnake.ui.Button(emoji='➡️',custom_id='view_castles_next') if page != 1: embed_view.add_item(prev_castle_btn) embed_view.add_item(back_castle_btn) if page != len(filepaths): embed_view.add_item(next_castle_btn) await inter.message.edit(view=embed_view,embed=embed_factory.show_castle_designs(gID = inter.guild_id,name=name,page=page)) ####---> Timed attack buttons ################################################################# if inter.component.custom_id == 'timed_add': print("Pressed Add") await inter.response.send_modal(modals.timed_add()) #--------------------------- # TASKS #--------------------------- @tasks.loop(hours=2) async def getHouseData(): await bot.wait_until_ready() for house in range(1,21): houseActivity = requests.get('https://shk.azure-api.net/shkinfo/v1/HouseActivity?world=World%202&house={house}&Key=5E78CFC8-1FFA-4036-8427-D94ED6E1A45B&subscription-key=ff2e578e119348ea8b48a2acd2f5a48d'.format(house=house)).json() await asyncio.sleep(1) for user in houseActivity: result = helpers.sql_get("SELECT house_history FROM house WHERE username = '{user}'".format(user = user['Username'])) if len(result) > 0: user_house_json = json.loads(helpers.sql_get("SELECT house_history FROM house WHERE username = '{user}'".format(user = user['Username']))[0][0]) else: user_house_json = {} user_house_json[str(house)] = int(time.time()) helpers.sql_set('INSERT INTO house(username,house,date,house_history) VALUES(?,?,?,?) ON CONFLICT DO UPDATE SET house = excluded.house, date = excluded.date, house_history = excluded.house_history',(user['Username'],house,time.time(),json.dumps(user_house_json))) print("Updated House Data") @tasks.loop(minutes=30.0) async def update_vm_embed(): await bot.wait_until_ready() print("Updated vm advert") for guild in bot.guilds: advert_result = json.loads(helpers.sql_get("SELECT advert_data FROM guild WHERE gID = {gID}".format(gID = guild.id))[0][0]) if 'vm' in advert_result.keys(): channel = guild.get_channel_or_thread(advert_result['vm'][0]) message = await channel.fetch_message(advert_result['vm'][1]) button_add_vm = disnake.ui.Button(emoji='➕',custom_id='add_vm',label="Report VM") button_rem_vm = disnake.ui.Button(emoji="➖",custom_id="rem_vm",label="Remove VM") embed_view = disnake.ui.View() embed_view.add_item(button_add_vm) embed_view.add_item(button_rem_vm) await message.edit(view=embed_view,embed=embed_factory.vm_advert(guild.id)) @tasks.loop(seconds=2) async def update_timed_attacks(): await bot.wait_until_ready() #Get attacks created in the last 12 hours total_attacks = helpers.sql_get("SELECT * FROM attack_targets WHERE created_at > unixepoch('now','-12 hours')",True) if len(total_attacks) == 0: print("No more attacks stopping task") update_timed_attacks.stop() return current_time = int(time.time()) attacks = helpers.sql_get("SELECT * FROM attack_targets WHERE update_time <= {cur_time} AND update_time > 0;".format(cur_time = current_time),True) if len(attacks) == 0: #waiting for attacks to get updated return print("Updating {n} attacks".format(n=len(attacks))) for line in attacks: guild = None message = None message = bot.get_message(int(line['mID'])) if message == None: try: guild:disnake.Guild = await bot.fetch_guild(line['gID']) channel:disnake.TextChannel = await guild.fetch_channel(line['cID']) message = await channel.fetch_message(line['mID']) except: message = None if message == None: print("Attack update failed! Message probably deleted?") helpers.sql_del("DELETE FROM attack_targets WHERE mID = {mID}".format(mID=line['mID'])) return embed = embed_factory.timed_attack(line['mID']) await message.edit(embed=embed,content='') if line['duration'] != None: if int(line['duration']) < current_time: #attack has landed, stop updating it. print("{mID} - Landed removing from update queue") helpers.sql_del("DELETE FROM attack_targets WHERE mID = {mID}".format(mID=line['mID'])) #--------------------------- # LIEGE LORD REQUESTS #--------------------------- @bot.slash_command(description='List some villages in need of LL') async def ll(inter:disnake.ApplicationCommandInteraction): playerCP = helpers.sql_get("SELECT cp,prefPikes,prefArchers FROM user WHERE dID = {dID}".format(dID=inter.author.id)) print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Request LL")) cp = 0 prefArcher = 300 prefPikes = 200 if len(playerCP): cp = int(playerCP[0][0]) if playerCP[0][1] != None: prefArcher = int(playerCP[0][2]) prefPikes = int(playerCP[0][1]) await inter.response.send_modal(modal=modals.LLModal(cp=cp,pike=prefPikes,archer=prefArcher)) #--------------------------- # CREATE ADVERT #--------------------------- typeList = ['ll','vm'] @bot.slash_command(description='set LL advert channel',default_member_permissions=disnake.Permissions(manage_guild=True)) async def create_advert(inter:disnake.ApplicationCommandInteraction,advert_type:str = commands.Param(description='What type=',choices=typeList), target: disnake.abc.GuildChannel = commands.Param(description='Channel override, optional')): embed = disnake.Embed() result = json.loads(helpers.sql_get("SELECT advert_data FROM guild WHERE gID = {gID}".format(gID = inter.guild.id))[0][0]) if advert_type == 'll': if advert_type not in result: embed.title = 'Looking for Liege Lords' button_r_ll = disnake.ui.Button(emoji='<:LiegeLord:1003401484295213066>',custom_id='request_ll',label="Click here to request a Liege Lord") embed_view = disnake.ui.View() embed_view.add_item(button_r_ll) message = await target.send(embed=embed,view=embed_view) await message.create_thread(name="Requests") result[advert_type] = (message.channel.id,message.id) helpers.sql_set("INSERT INTO guild(gID,advert_data) VALUES(?,?) ON CONFLICT DO UPDATE SET advert_data = excluded.advert_data",(message.guild.id,json.dumps(result))) else: await inter.response.send_message(content="Already set",ephemeral=True) if advert_type == 'vm': if advert_type not in result: embed = embed_factory.vm_advert() button_add_vm = disnake.ui.Button(emoji='➕',custom_id='add_vm',label="Report VM") button_rem_vm = disnake.ui.Button(emoji="➖",custom_id="rem_vm",label="Remove VM") embed_view = disnake.ui.View() embed_view.add_item(button_add_vm) embed_view.add_item(button_rem_vm) message = await target.send(embed=embed,view=embed_view) result[advert_type] = (message.channel.id,message.id) helpers.sql_set("INSERT INTO guild(gID,advert_data) VALUES(?,?) ON CONFLICT DO UPDATE SET advert_data = excluded.advert_data",(message.guild.id,json.dumps(result))) else: await inter.response.send_message(content="Already set",ephemeral=True) await inter.response.send_message(content="Ok",ephemeral=True) #--------------------------- # CHECK PLAYER #--------------------------- @bot.slash_command(description="List some info about target") async def check_player(inter:disnake.ApplicationCommandInteraction,user:str = commands.Param(description='Player Name')): print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Check Player")) await inter.response.defer() store_basic_user(inter) embed = embed_factory.check_player(user,inter.guild_id) embed_view = disnake.ui.View() castle_design_btn = disnake.ui.Button(label='Castles',emoji='<:Charter:947634559078498374>',custom_id='view_castles') query_response=helpers.sql_get("SELECT castle_designs FROM players WHERE gID = {gID} AND LOWER(name) = LOWER('{user}')".format(gID = inter.guild_id,user=user)) if len(query_response) > 0: if len(query_response[0][0]) > 2: embed_view.add_item(castle_design_btn) await inter.followup.send(embed=embed,view=embed_view) #--------------------------- # TIME CALCULATOR #--------------------------- @bot.slash_command(description="Convert times to carded times run without arguments to get an input form") async def calculate_times(inter:disnake.ApplicationCommandInteraction,time:str = commands.Param(default = None, description='Format: name|hh:mm:ss or hh:mm:ss can add as many as you want'),multi:int = commands.Param(default=None,description='OPTIONAL: Card multiplier to apply. defaults to 4 or your previous input if omitted')): print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Calculate Time")) if multi == None: multi = helpers.sql_get('SELECT defaultCard FROM user WHERE dID = {id}'.format(id=inter.author.id)) if len(multi) > 0: multi = multi[0][0] or 4 else: helpers.sql_set('INSERT INTO user(dID,gID,d_name,defaultCard) VALUES(?,?,?,?) ON CONFLICT DO UPDATE SET defaultCard = excluded.defaultCard',(inter.author.id,inter.guild.id,inter.author.display_name,multi)) if time == None: await inter.response.send_modal(modal=modals.timesModal(multi)) else: response = helpers.calc_times(times=time,multi=multi) view = disnake.ui.View() add_to_attack = disnake.ui.Button(emoji='<:Breaker:947543175025819709>',custom_id='add_to_attack',label='Copy to timed attack') view.add_item(add_to_attack) await inter.response.send_message(view = view,content=response[0],delete_after=540) message = await inter.original_message() for time in response[1]: helpers.sql_set("INSERT INTO times(dID,mID,gID,seconds,name,multi,modifier) VALUES(?,?,?,?,?,?,?)",(inter.author.id,message.id,inter.guild_id,time[1],time[0],time[2],time[3])) ################################################ # Remember to mirror changes in the modal!! # ################################################ #--------------------------- # SAVE CASTLE IMAGE #--------------------------- @bot.message_command() async def save_castle(inter:disnake.ApplicationCommandInteraction): text_content = inter.target.content for attach in inter.target.attachments: ext = attach.filename.split('.')[-1] filename = str(attach.id) +'.' + ext await attach.save(fp=f'images/{str(attach.id)}.{ext}') await inter.response.send_modal(modals.capture_castle(text_content,filename=filename)) await inter.target.delete() #--------------------------- # TEST COMMAND #--------------------------- @bot.message_command() async def sbtest(inter:disnake.ApplicationCommandInteraction): print(inter.target.embeds[0].title.partition('<<')[2].partition('>>')[0]) #--------------------------- # EDIT PLAYER #--------------------------- @bot.slash_command(description="Set status / peace for player.",default_member_permissions=disnake.Permissions(manage_guild=True)) async def edit_player(inter:disnake.ApplicationCommandInteraction,player:str = commands.Param(description="in-game name of player"),status:str = commands.Param(default='',description="Keep it short"),peace:str = commands.Param(default= '',description="Has peace",choices=['True','False'])): print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Edit Player")) helpers.sql_set("INSERT INTO players(gID,name,status,peace) VALUES(?,?,?,?) ON CONFLICT DO UPDATE SET status = excluded.status,peace = excluded.peace",(inter.guild_id,player.lower(),status,peace)) await inter.response.send_message(content='OK👌',ephemeral=True) #--------------------------- # EDIT HOUSE #--------------------------- @bot.slash_command(description="Set House relations",default_member_permissions=disnake.Permissions(manage_guild=True)) async def edit_house(inter:disnake.ApplicationCommandInteraction,house:str = commands.Param(description="in-game name of player"),status:str = commands.Param(default= 'Neutral',description="House status",choices=['Allied','Enemy','Neutral'])): print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Edit House")) result = json.loads(helpers.sql_get("SELECT relationships FROM guild WHERE gID = {gID}".format(gID = inter.guild.id))[0][0]) result[house] = status helpers.sql_set("INSERT INTO guild(gID,relationships) VALUES(?,?) ON CONFLICT DO UPDATE SET relationships = excluded.relationships",(inter.guild.id,json.dumps(result))) await inter.response.send_message(content='OK👌',ephemeral=True) #--------------------------- # INITIATE ATTACK #--------------------------- @bot.slash_command(description="WIP:Start a timed attack on target") async def timed_attack(inter:disnake.ApplicationCommandInteraction,target:str = commands.Param(description="ID of target")): print("{cTime} - {user} used {command}".format(cTime=datetime.date.today(),user = inter.author.display_name,command = "Create time attack")) button_start = disnake.ui.Button(emoji="<:timer_green:947634559225327707>" ,custom_id='timed_start',label="Start") button_add = disnake.ui.Button(emoji="<:Breaker:947543175025819709>",custom_id="timed_add",label="Add") button_remove = disnake.ui.Button(emoji="<:Army_red:947634559044947979>",custom_id="timed_remove",label="Remove") view = disnake.ui.View() view.add_item(button_start) view.add_item(button_add) view.add_item(button_remove) await inter.response.send_message(view=view,content="Generating attack, hold on") message = await inter.original_message() helpers.sql_set("INSERT INTO attack_targets(mID,gID,cID,target,update_time,created_at) VALUES(?,?,?,?,?,?)",(message.id,inter.guild_id,inter.channel_id,target,int(time.time()),int(time.time()))) if not update_timed_attacks.is_running(): update_timed_attacks.start() #--------------------------- # CREATE PLAYER EMOJI #--------------------------- @bot.slash_command(description="Make an emoji from the player shield",default_member_permissions=disnake.Permissions(manage_guild=True)) async def make_shield_emoji(inter:disnake.ApplicationCommandInteraction,user:str = commands.Param(description='Ingame player Name')): await inter.response.defer(ephemeral=True) shield_url = requests.get('https://login.strongholdkingdoms.com/ajaxphp/get_shield_url.php?username={user}&transparent=1'.format(user=user)).json()['url'] res = requests.get(shield_url) img = Image.open(BytesIO(res.content)) img = img.resize((128,128)) byteIO = BytesIO() img.save(byteIO, format='PNG') byteArr = byteIO.getvalue() emoji = await inter.guild.create_custom_emoji(name=user,image=byteArr) print("done",emoji) await inter.followup.send(emoji) #Stuff to be updated :S #------------------------------------------------------------ timedAttacks = {} timesMessages = {} def sql_connection(): try: con = sqlite3.connect('sbingo2.sqlite') return con except Error: print(Error) def sql_setSeed(con,entry): cursorObj = con.cursor() cursorObj.execute('INSERT INTO overrides(dID,seed) VALUES(?,?) ON CONFLICT(dID) DO UPDATE SET seed = excluded.seed',entry) cursorObj.close() con.commit() def sql_setCard(con,entry): cursorObj = con.cursor() cursorObj.execute('INSERT INTO defaultCard(dID,speed) VALUES(?,?) ON CONFLICT(dID) DO UPDATE SET speed = excluded.speed',entry) cursorObj.close() con.commit() def sql_getCard(con,id): cursorObj = con.cursor() query = 'SELECT speed FROM defaultCard WHERE dID = '+str(id) cursorObj.execute(query) result = cursorObj.fetchall() cursorObj.close() if len(result) == 1: return result[0][0] return result def sql_addVM(con,name,dateTime): ok = True data = (name.lower(),dateTime) cursorObj = con.cursor() try: cursorObj.execute('INSERT INTO vecations(name,date) VALUES(?,?)',data) except Error: ok = False cursorObj.close() con.commit() return ok def sql_getVM(con): cursorObj = con.cursor() cursorObj.execute('SELECT * FROM vecations where date > unixepoch("now","-14 days");') result = cursorObj.fetchall() cursorObj.close() return result def sql_getSeed(con,id): cursorObj = con.cursor() query = 'SELECT dID, seed FROM overrides WHERE dID = '+str(id) cursorObj.execute(query) result = cursorObj.fetchall() cursorObj.close() return result def generateBingoCard(display_name, id): bingoimg = bingo.generate_bingo(display_name,id) bingoimg.save(display_name+'.png') @bot.event async def on_message(message): if message.author == bot.user: return if message.content.startswith(botOptions.prefix): #Potential command command = message.content.lower().split()[0][1:] or None args = message.content.split()[1:] or [None] match command: case 'spin': random.seed(time.time()) response = bingo.generate_gif_test(str(message.id),1) await message.channel.send(content='Spinning the wheel for: '+message.author.display_name,file=disnake.File(str(message.id)+".gif")) if response == 'T2': await asyncio.sleep(4) response = bingo.generate_gif_test(str(message.id),2) await message.channel.send(content='LUCKY! You won another spin!',file=disnake.File(str(message.id)+".gif")) if response == 'T3': await asyncio.sleep(4) response = bingo.generate_gif_test(str(message.id),3) await message.channel.send(content='SPINTASTIC!!! Max level spin achieved',file=disnake.File(str(message.id)+".gif")) await asyncio.sleep(4) await message.channel.send('Congratulations! You won: '+response) os.remove(str(message.id)+".gif") case 'bingo': seed = sql_getSeed(con,message.author.id) if seed: generateBingoCard(message.author.display_name,seed[0][1]) else: generateBingoCard(message.author.display_name,message.author.id) await message.channel.send(content='Here is your bingo card, if you find something impossible ask for an override!',file=disnake.File(message.author.display_name+".png")) os.remove(message.author.display_name+".png") case 'reroll': if message.author.id in botOptions.verifiedUsers: if len(args) != 1: await message.channel.send('Expecting an user ID') elif not args[0].isnumeric(): await message.channel.send('The argument needs to be the user ID') else: target = message.guild.get_member(int(args[0])) print(args) print(message.guild) print(target) if target == None: await message.channel.send("Can't find the user in this disnake") await message.delete() return newRandom = math.floor(random.random()*100000000000) sql_setSeed(con,(target.id,newRandom)) generateBingoCard(target.display_name,newRandom) await message.channel.send('New seed set!',file=disnake.File(target.display_name+".png")) os.remove(target.display_name+".png") case 'vm': if len(args) < 1 or args[0] == 'list': vms = sql_getVM(con) print(datetime.fromtimestamp(vms[0][1],timezone.utc)) vmList = 'Players potentially still on VM:' if len(vms) == 0: vmList += '\n-None' else: for player in vms: vmList += '\n'+player[0]+' - ' await message.channel.send(vmList) if len(args) == 1: if sql_addVM(con,args[0],message.created_at.replace(tzinfo=timezone.utc).timestamp()): await message.channel.send(args[0]+' Added to VM list!') else: await message.channel.send(args[0]+' Is already on the list') case 'test': editAttack = timedAttacks[args[0]] editAttack.setMark(time.time()) case ('timed'|'attack'|'countdown'): #validate args if len(args) <= 1: if args[0] == 'help' or args[0] == None: embed = disnake.Embed(color=0x5eb4ee,title="<:SB:947837030782615613> Sheriff Timed Attacks for dummies!"\ ,description="The SB Sheriff can keep track of your times and participating villas!\nThis can be used to keep track of targets and work out what to set the timer to,\n but it can also show a countdown for each attack \ Granted this can be a bit slow, and inconvenient for some, but on large timed operations or keeping track of multiple attacks it can be quite useful!\n\n \ The only command needed is: **{prefix}timed