mirror of
https://github.com/nichkara/InfinitumBotty.git
synced 2026-06-11 06:36:24 +02:00
Initalize repo
This commit is contained in:
@@ -0,0 +1,165 @@
|
||||
import _thread
|
||||
import queue
|
||||
import socket
|
||||
import time
|
||||
from threading import Condition
|
||||
|
||||
from FaustBot.Communication.JoinObservable import JoinObservable
|
||||
from FaustBot.Communication.KickObservable import KickObservable
|
||||
from FaustBot.Communication.LeaveObservable import LeaveObservable
|
||||
from FaustBot.Communication.MagicNumberObservable import MagicNumberObservable
|
||||
from FaustBot.Communication.NickChangeObservable import NickChangeObservable
|
||||
from FaustBot.Communication.NoticeObservable import NoticeObservable
|
||||
from FaustBot.Communication.PingObservable import PingObservable
|
||||
from FaustBot.Communication.PrivmsgObservable import PrivmsgObservable
|
||||
from FaustBot.Model.ConnectionDetails import ConnectionDetails
|
||||
from FaustBot.StringBuffer import StringBuffer
|
||||
|
||||
|
||||
class Connection(object):
|
||||
send_queue = queue.Queue()
|
||||
details = None
|
||||
irc = None
|
||||
|
||||
def sender(self):
|
||||
while True:
|
||||
msg = self.send_queue.get()
|
||||
if msg[-1] != b'\n':
|
||||
msg = msg + b'\n'
|
||||
self.irc.send(msg)
|
||||
time.sleep(1)
|
||||
|
||||
def send_channel(self, text):
|
||||
"""
|
||||
Send to channel
|
||||
:return:
|
||||
"""
|
||||
self.raw_send("PRIVMSG " + self.details.get_channel() + " :" + text[0:])
|
||||
|
||||
def send_to_user(self, user, text):
|
||||
"""
|
||||
Send to user
|
||||
:return:
|
||||
"""
|
||||
self.raw_send('PRIVMSG ' + user + ' :' + text)
|
||||
|
||||
def send_back(self, text, data):
|
||||
"""
|
||||
Send message to the channel the command got received in
|
||||
:param message:
|
||||
:param data: needed because of concurrency, there can't be a global variable holding where messages came from
|
||||
:return:
|
||||
"""
|
||||
if data['channel'] == self.details.get_nick():
|
||||
self.send_to_user(data['nick'], text)
|
||||
else:
|
||||
self.send_channel(text)
|
||||
|
||||
def raw_send(self, message):
|
||||
self.send_queue.put(message.encode() + '\r\n'.encode())
|
||||
|
||||
def receive(self):
|
||||
"""
|
||||
receive from Network
|
||||
"""
|
||||
try:
|
||||
data = self.irc.recv(4096)
|
||||
if len(data) == 0:
|
||||
return False
|
||||
except socket.timeout:
|
||||
return False
|
||||
data = data.decode('UTF-8', errors='replace')
|
||||
#print('received: \n' + data)
|
||||
data_lines = self._receiver_buffer.append(data)
|
||||
if data is None:
|
||||
return False
|
||||
# print('splited: ')
|
||||
for data in data_lines:
|
||||
# print(data)
|
||||
data = data.rstrip()
|
||||
self.data = data
|
||||
|
||||
splited = data.split(' ')
|
||||
if not len(splited) >= 2:
|
||||
continue
|
||||
command = splited[1]
|
||||
# print(command)
|
||||
if data.split(' ')[0] == 'PING':
|
||||
self.ping_observable.input(data, self)
|
||||
elif command == 'JOIN':
|
||||
self.join_observable.input(data, self)
|
||||
elif command == 'PART' or command == 'QUIT':
|
||||
self.leave_observable.input(data, self)
|
||||
elif command == 'KICK':
|
||||
self.kick_observable.input(data, self)
|
||||
elif command == 'NICK':
|
||||
self.nick_change_observable.input(data, self)
|
||||
elif command == 'NOTICE':
|
||||
self.notice_observable.input(data, self)
|
||||
elif command == 'PRIVMSG':
|
||||
self.priv_msg_observable.input(data, self)
|
||||
else:
|
||||
try:
|
||||
int(command)
|
||||
self.magic_number_observable.input(data, self)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
def is_idented(self, user: str):
|
||||
self.send_to_user('NickServ', 'ACC ' + user)
|
||||
with self.condition_lock:
|
||||
while user not in self.idented_look_up:
|
||||
self.condition_lock.wait()
|
||||
is_idented = self.idented_look_up[user]
|
||||
del self.idented_look_up[user]
|
||||
return is_idented
|
||||
|
||||
def is_op(self, user):
|
||||
"""
|
||||
Checks wether the given user is an op in this connections' channel or not.
|
||||
:param user: the user to check
|
||||
:return: return true if the user is an op, else false
|
||||
"""
|
||||
# add call to raw send with WHO
|
||||
# manualy receive data until answer received
|
||||
# then evaluate and return
|
||||
# this way we'll block the bot until is_op is finished
|
||||
return False
|
||||
|
||||
def last_data(self):
|
||||
return self.data
|
||||
|
||||
def establish(self):
|
||||
"""
|
||||
establish the connection
|
||||
"""
|
||||
self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
self.irc.connect((self.details.get_server(), self.details.get_port()))
|
||||
#print(self.irc.recv(512))
|
||||
self.irc.send("NICK ".encode() + self.details.get_nick().encode() + "\r\n".encode())
|
||||
self.irc.send("USER botty botty botty :Botty \n".encode())
|
||||
self.irc.send("JOIN ".encode() + self.details.get_channel().encode() + '\r\n'.encode())
|
||||
self.irc.send("WHO ".encode() + self.details.get_channel().encode() + '\r\n'.encode())
|
||||
self.irc.send("MODE ".encode()+self.details.get_nick().encode()+" -R".encode()+'\r\n'.encode())
|
||||
if (self.details.get_pwd() != ''):
|
||||
self.send_to_user("NICKSERV","identify "+self.details.get_nick()+" " +self.details.get_pwd()+' ')
|
||||
|
||||
_thread.start_new_thread(self.sender, ())
|
||||
|
||||
def __init__(self, set_details: ConnectionDetails):
|
||||
self.details = set_details
|
||||
self.ping_observable = PingObservable()
|
||||
self.priv_msg_observable = PrivmsgObservable()
|
||||
self.join_observable = JoinObservable()
|
||||
self.leave_observable = LeaveObservable()
|
||||
self.kick_observable = KickObservable()
|
||||
self.nick_change_observable = NickChangeObservable()
|
||||
self.notice_observable = NoticeObservable()
|
||||
self.magic_number_observable = MagicNumberObservable()
|
||||
self.condition_lock = Condition()
|
||||
self.idented_look_up = {}
|
||||
self.data = None
|
||||
self._receiver_buffer = StringBuffer()
|
||||
@@ -0,0 +1,9 @@
|
||||
__author__ = 'Daniela'
|
||||
|
||||
|
||||
class DebugPrint(object):
|
||||
def print(self, message):
|
||||
"""
|
||||
:param message: What to print to debug output
|
||||
:return:
|
||||
"""
|
||||
@@ -0,0 +1,24 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class JoinObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
# ":nick!user@host" "JOIN" "#channel"
|
||||
# additional ignored arguments are put into "ign". This could be used
|
||||
# for http://ircv3.net/specs/extensions/extended-join-3.1.html in the
|
||||
# future.
|
||||
prefix, cmd, channel, *ign = raw_data.split(' ')
|
||||
hostmask = prefix.lstrip(':')
|
||||
nick, userhost = hostmask.split('!')
|
||||
user, host = userhost.split('@')
|
||||
|
||||
data = {'raw': raw_data, 'nick': nick, 'user': user, 'host': host,
|
||||
'channel': channel, 'raw_nick': hostmask}
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_join, (observer, data, connection))
|
||||
@@ -0,0 +1,20 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class KickObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
data = {}
|
||||
print(raw_data)
|
||||
data['raw'] = raw_data
|
||||
data['op'] = raw_data.split('!')[0][1:]
|
||||
data['channel'] = raw_data.split('KICK ')[1].split(' :')[0].split(' ')[0]
|
||||
data['nick'] = raw_data.split('KICK ')[1].split(' :')[0].split(' ')[1]
|
||||
data['raw_op'] = raw_data.split(' KICK')[0][1:]
|
||||
data['reason'] = raw_data.split('KICK ')[1].split(' :')[1]
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_kick, (observer, data, connection))
|
||||
@@ -0,0 +1,18 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class LeaveObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
data = {}
|
||||
leave_or_part = "PART" if raw_data.find('PART') != -1 else "QUIT"
|
||||
data['raw'] = raw_data
|
||||
data['nick'] = raw_data.split('!')[0][1:]
|
||||
data['channel'] = raw_data.split(leave_or_part + ' ')[1].split(' :')[0]
|
||||
data['raw_nick'] = raw_data.split(' ' + leave_or_part)[0][1:]
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_leave, (observer, data, connection))
|
||||
@@ -0,0 +1,21 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class MagicNumberObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
data = {}
|
||||
data['raw'] = raw_data
|
||||
prefix, numeric, rest = data['raw'].split(' ',2)
|
||||
data['number'] = numeric
|
||||
data['arguments'] = rest
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
try:
|
||||
_thread.start_new_thread(observer.__class__.update_on_magic_number, (observer, data, connection))
|
||||
except Exception:
|
||||
import traceback
|
||||
print (traceback.format_exc())
|
||||
@@ -0,0 +1,14 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class NickChangeObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
data = {'raw': raw_data, 'old_nick': raw_data.split('!')[0][1:],
|
||||
'new_nick': raw_data.split('NICK ')[1].split(':')[1], 'raw_nick': raw_data.split(' NICK')[0][1:]}
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_nick_change, (observer, data, connection))
|
||||
@@ -0,0 +1,14 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class NoticeObservable(Observable):
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_notice, (observer, data, connection))
|
||||
|
||||
def input(self, raw_data, connection):
|
||||
data = {'raw_data': raw_data, 'nick': raw_data.split('!')[0][1:], 'raw_nick': raw_data.split(' NOTICE ')[0][1:],
|
||||
'message': raw_data.split(':')[2]}
|
||||
self.notify_observers(data, connection)
|
||||
@@ -0,0 +1,23 @@
|
||||
|
||||
class Observable(object):
|
||||
def __init__(self):
|
||||
self._observers = []
|
||||
|
||||
def add_observer(self, observer):
|
||||
self._observers.append(observer)
|
||||
print("appended(" + str(observer.__class__) + ")")
|
||||
|
||||
def get_observer(self):
|
||||
return self._observers
|
||||
|
||||
# data has to be a dictionary matching the structure of the query
|
||||
def notify_observers(self, data, connection):
|
||||
# here implement some data handling. Fill self._data with the data received
|
||||
raise NotImplementedError("Some Observable doesn't know what to do with its input data")
|
||||
|
||||
def input(self, raw_data, connection):
|
||||
# here implement some data handling. Fill self._data with the data received
|
||||
raise NotImplementedError("Some Observable doesn't know what to do with its input data")
|
||||
|
||||
def rm_observer(self, observer):
|
||||
self._observers.remove(observer)
|
||||
@@ -0,0 +1,20 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
|
||||
|
||||
class PingObservable(Observable):
|
||||
def input(self, raw_data, connection):
|
||||
data = {'raw': raw_data, 'server': ''}
|
||||
if raw_data.find('PING') == 0:
|
||||
data['server'] = raw_data.split('PING ')[1]
|
||||
else:
|
||||
return
|
||||
# hier kann noch gecheckt werden, ob data wirklich ein server ist, der ping haben will, oder sonstwas
|
||||
# finde heraus, wer zurückgepingt werden muss, und ob das überhaupt ein ping-request ist oder ein user sich
|
||||
# einen spass erlaubt hat
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_ping, (observer, data, connection))
|
||||
@@ -0,0 +1,37 @@
|
||||
import _thread
|
||||
|
||||
from FaustBot.Communication.Observable import Observable
|
||||
from FaustBot import Modules
|
||||
from FaustBot.Model.BlockedUsers import BlockProvider
|
||||
class PrivmsgObservable(Observable):
|
||||
def __init__(self):
|
||||
Observable.__init__(self)
|
||||
self.user_list = None
|
||||
def define_user_list(self, user_list):
|
||||
self.user_list = user_list
|
||||
|
||||
def input(self, raw_data, connection):
|
||||
data = {'raw': raw_data, 'nick': raw_data.split('!')[0][1:],
|
||||
'channel': raw_data.split('PRIVMSG ')[1].split(' :')[0],
|
||||
'raw_nick': raw_data.split(' PRIVMSG')[0][1:]}
|
||||
# 12 = :<raw_nick> PRIVMSG <channel> :<message>
|
||||
data['message'] = raw_data[data['raw_nick'].__len__() + data['channel'].__len__() + 12:]
|
||||
data['command'] = 'irgendwas, das mit . oder .. anfängt oder so... oder das sollen module checken?'
|
||||
if self.user_list is None:
|
||||
return
|
||||
if data['nick'] not in self.user_list.userList.keys():
|
||||
return
|
||||
blocklist = BlockProvider()
|
||||
if blocklist.is_blocked(data['nick']):
|
||||
self.notify_whitelisted_observers(data, connection)
|
||||
return
|
||||
self.notify_observers(data, connection)
|
||||
|
||||
def notify_observers(self, data, connection):
|
||||
for observer in self._observers:
|
||||
_thread.start_new_thread(observer.__class__.update_on_priv_msg, (observer, data, connection))
|
||||
|
||||
def notify_whitelisted_observers(self,data,connection):
|
||||
for observer in self._observers:
|
||||
if observer.__class__.__name__ in ['ActivityObserver']:
|
||||
_thread.start_new_thread(observer.__class__.update_on_priv_msg, (observer, data, connection))
|
||||
@@ -0,0 +1 @@
|
||||
__author__ = 'Pups'
|
||||
@@ -0,0 +1,101 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.Config import Config
|
||||
from FaustBot.Model.ConnectionDetails import ConnectionDetails
|
||||
from FaustBot.Modules import ActivityObserver, IdentNickServObserver, GiveCookieObserver, LoveAndPeaceObserver, \
|
||||
FreeHugsObserver, WhoObserver, Kicker, ModulePrototype, PingAnswerObserver, SeenObserver, TitleObserver, \
|
||||
UserList, WikiObserver, GiveDrinkObserver, GiveFoodObserver, ComicObserver, HelpObserver, \
|
||||
IntroductionObserver, HangmanObserver, DuckObserver, AllSeenObserver, JokeObserver,TellObserver, WordRunObserver,\
|
||||
GiveIceObserver, GiveDrinkToObserver, Greeter, MathRunObserver, PartyObserver, PrideObserver, SnacksObserver, \
|
||||
BlockObserver
|
||||
from FaustBot.Modules.CustomUserModules import GlossaryModule, ICDObserver, ModmailObserver
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class FaustBot(object):
|
||||
def __init__(self, config_path: str):
|
||||
self._config = Config(config_path)
|
||||
connection_details = ConnectionDetails(self.config)
|
||||
self._connection = Connection(connection_details)
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
return self._config
|
||||
|
||||
def _setup(self):
|
||||
self._connection.establish()
|
||||
user_list = UserList.UserList()
|
||||
self._connection.priv_msg_observable.define_user_list(user_list)
|
||||
self.add_module(user_list)
|
||||
self.add_module(ActivityObserver.ActivityObserver())
|
||||
self.add_module(WhoObserver.WhoObserver(user_list))
|
||||
self.add_module(AllSeenObserver.AllSeenObserver(user_list))
|
||||
self.add_module(PingAnswerObserver.ModulePing())
|
||||
self.add_module(Kicker.Kicker(user_list, self._config.idle_time))
|
||||
self.add_module(SeenObserver.SeenObserver())
|
||||
self.add_module(TitleObserver.TitleObserver())
|
||||
self.add_module(WikiObserver.WikiObserver())
|
||||
self.add_module(ModmailObserver.ModmailObserver())
|
||||
self.add_module(ICDObserver.ICDObserver())
|
||||
self.add_module(GlossaryModule.GlossaryModule(self._config))
|
||||
self.add_module(IdentNickServObserver.IdentNickServObserver())
|
||||
self.add_module(GiveDrinkObserver.GiveDrinkObserver())
|
||||
self.add_module(GiveCookieObserver.GiveCookieObserver())
|
||||
self.add_module(LoveAndPeaceObserver.LoveAndPeaceObserver())
|
||||
self.add_module(FreeHugsObserver.FreeHugsObserver())
|
||||
self.add_module(GiveFoodObserver.GiveFoodObserver())
|
||||
self.add_module(ComicObserver.ComicObserver())
|
||||
self.add_module(HangmanObserver.HangmanObserver())
|
||||
self.add_module(HelpObserver.HelpObserver())
|
||||
self.add_module(IntroductionObserver.IntroductionObserver(user_list))
|
||||
self.add_module(DuckObserver.DuckObserver())
|
||||
self.add_module(JokeObserver.JokeObserver())
|
||||
self.add_module(TellObserver.TellObserver())
|
||||
self.add_module(WordRunObserver.WordRunObserver())
|
||||
self.add_module(GiveIceObserver.GiveIceObserver())
|
||||
self.add_module(GiveDrinkToObserver.GiveDrinkToObserver())
|
||||
self.add_module(Greeter.Greeter())
|
||||
self.add_module(MathRunObserver.MathRunObserver())
|
||||
self.add_module(PartyObserver.PartyObserver())
|
||||
self.add_module(PrideObserver.PrideObserver())
|
||||
self.add_module(SnacksObserver.SnacksObserver())
|
||||
self.add_module(BlockObserver.BlockObserver())
|
||||
def run(self):
|
||||
self._setup()
|
||||
running = True
|
||||
while running:
|
||||
if not self._connection.receive():
|
||||
return
|
||||
|
||||
def add_module(self, module: ModulePrototype):
|
||||
if module.__class__.__name__ in self._config.blacklist:
|
||||
print(module.__class__.__name__+ " not loaded because of blacklisting")
|
||||
return
|
||||
for module_type in module.get_module_types():
|
||||
observable = self._get_observable_by_module_type(module_type)
|
||||
observable.add_observer(module)
|
||||
module.config = self._config
|
||||
|
||||
def _get_observable_by_module_type(self, module_type: str):
|
||||
if module_type == ModuleType.ON_JOIN:
|
||||
return self._connection.join_observable
|
||||
|
||||
if module_type == ModuleType.ON_LEAVE:
|
||||
return self._connection.leave_observable
|
||||
|
||||
if module_type == ModuleType.ON_KICK:
|
||||
return self._connection.kick_observable
|
||||
|
||||
if module_type == ModuleType.ON_MSG:
|
||||
return self._connection.priv_msg_observable
|
||||
|
||||
if module_type == ModuleType.ON_NICK_CHANGE:
|
||||
return self._connection.nick_change_observable
|
||||
|
||||
if module_type == ModuleType.ON_PING:
|
||||
return self._connection.ping_observable
|
||||
|
||||
if module_type == ModuleType.ON_NOTICE:
|
||||
return self._connection.notice_observable
|
||||
|
||||
if module_type == ModuleType.ON_MAGIC_NUMBER:
|
||||
return self._connection.magic_number_observable
|
||||
@@ -0,0 +1,38 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class BlockProvider(object):
|
||||
_CREATE_TABLE = 'CREATE TABLE IF NOT EXISTS blockedusers (id INTEGER PRIMARY KEY, \
|
||||
user TEXT)'
|
||||
_IS_BLOCKED = 'SELECT user FROM blockedusers'
|
||||
_BLOCK = 'INSERT INTO blockedusers (id, user) VALUES (?, ?)'
|
||||
_DELETE_BLOCK = 'DELETE FROM blockedusers WHERE user = ?'
|
||||
|
||||
def __init__(self):
|
||||
self._database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(BlockProvider._CREATE_TABLE)
|
||||
self._database_connection.commit()
|
||||
|
||||
def is_blocked(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(BlockProvider._IS_BLOCKED)
|
||||
answer = cursor.fetchall()
|
||||
for ans in answer:
|
||||
if user.lower().find(ans[0])!=-1:
|
||||
return True
|
||||
return False
|
||||
|
||||
def block(self, user: str):
|
||||
data = (None, user.lower())
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(BlockProvider._BLOCK, data)
|
||||
self._database_connection.commit()
|
||||
|
||||
def delete_block(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(BlockProvider._DELETE_BLOCK, (user.lower(),))
|
||||
self._database_connection.commit()
|
||||
|
||||
def __exit__(self):
|
||||
self._database_connection.close()
|
||||
@@ -0,0 +1,78 @@
|
||||
class Config(object):
|
||||
CONFIG_PATH = 'config_path'
|
||||
|
||||
def __init__(self, path):
|
||||
"""
|
||||
|
||||
:param path:
|
||||
"""
|
||||
self._config_dict = {}
|
||||
if path:
|
||||
self._config_dict[Config.CONFIG_PATH] = path
|
||||
self.read_config(path)
|
||||
|
||||
def __getitem__(self, item: str):
|
||||
if item in self._config_dict:
|
||||
return self._config_dict[item]
|
||||
else:
|
||||
return None
|
||||
|
||||
def __setitem__(self, key: str, value: str):
|
||||
print (key +' '+ value+'\n\r')
|
||||
self._config_dict[key] = value
|
||||
|
||||
def read_config(self, path: str, append=True):
|
||||
f = open(path, 'r')
|
||||
if not append:
|
||||
self._config_dict = {}
|
||||
for l in f.readlines():
|
||||
kv_pair = l.split(':')
|
||||
if len(kv_pair) == 2:
|
||||
self._config_dict[kv_pair[0].strip()] = kv_pair[1][:-1].strip()
|
||||
mods = self._config_dict['mods'].split(',')
|
||||
self._config_dict['mods'] = []
|
||||
for mod in mods:
|
||||
self._config_dict['mods'].append(mod.strip())
|
||||
# If no idle_time value is given, we set it to five hours ( == 18000 seconds )
|
||||
if 'idle_time' not in self._config_dict:
|
||||
self._config_dict['idle_time'] = 18000
|
||||
self._config_dict['idle_time'] = int(self._config_dict['idle_time'])
|
||||
if 'blacklist' not in self._config_dict:
|
||||
self._config_dict['blacklist'] = []
|
||||
else:
|
||||
blacklist=self._config_dict['blacklist'].split(',')
|
||||
self._config_dict['blacklist'] = []
|
||||
for module in blacklist:
|
||||
self._config_dict['blacklist'].append(module.strip())
|
||||
|
||||
@property
|
||||
def lang(self):
|
||||
return self._config_dict["lang"]
|
||||
|
||||
@lang.setter
|
||||
def lang(self, value):
|
||||
self._config_dict["lang"] = value
|
||||
|
||||
@property
|
||||
def mods(self):
|
||||
return self._config_dict["mods"]
|
||||
|
||||
@mods.setter
|
||||
def mods(self, value):
|
||||
self._config_dict["mods"] = value
|
||||
|
||||
@property
|
||||
def idle_time(self):
|
||||
return self._config_dict["idle_time"]
|
||||
|
||||
@idle_time.setter
|
||||
def idle_time(self, value: int):
|
||||
self._config_dict["idle_time"] = value
|
||||
|
||||
@property
|
||||
def blacklist(self):
|
||||
return self._config_dict['blacklist']
|
||||
|
||||
@property
|
||||
def pwd(self):
|
||||
return self._config_dict['pwd']
|
||||
@@ -0,0 +1,38 @@
|
||||
class ConnectionDetails(object):
|
||||
def get_server(self):
|
||||
"""
|
||||
:return: the server to connect to
|
||||
"""
|
||||
return self._data['server']
|
||||
|
||||
def get_nick(self):
|
||||
"""
|
||||
:return: own nick
|
||||
"""
|
||||
return self._data['nick']
|
||||
|
||||
def get_channel(self):
|
||||
"""
|
||||
:return: the channel connected into
|
||||
"""
|
||||
return self._data['channel']
|
||||
|
||||
def get_port(self):
|
||||
return int(self._data['port'])
|
||||
|
||||
def get_lang(self):
|
||||
return self._data['lang']
|
||||
|
||||
def change_lang(self, lang):
|
||||
self._data['lang'] = lang
|
||||
|
||||
def get_mods(self):
|
||||
return self._data['mods']
|
||||
|
||||
def get_pwd(self):
|
||||
if self._data['pwd'] is None:
|
||||
return ''
|
||||
return self._data['pwd']
|
||||
|
||||
def __init__(self, config):
|
||||
self._data = config
|
||||
@@ -0,0 +1,47 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class GlossaryProvider(object):
|
||||
_CREATE_GLOSSARY_TABLE = 'CREATE TABLE IF NOT EXISTS glossary (id INTEGER PRIMARY KEY, \
|
||||
abbreviation TEXT, explanation TEXT)'
|
||||
_GET_EXPLANATION = 'SELECT id, explanation FROM glossary WHERE abbreviation = ?'
|
||||
_SAVE_OR_OVERWRITE = 'REPLACE INTO glossary (id, abbreviation, explanation) VALUES (?, ?, ?)'
|
||||
_DELETE_EXPLANATION = 'DELETE FROM glossary WHERE abbreviation = ?'
|
||||
|
||||
def __init__(self):
|
||||
self._database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(GlossaryProvider._CREATE_GLOSSARY_TABLE)
|
||||
self._database_connection.commit()
|
||||
|
||||
def get_explanation(self, abbreviation: str):
|
||||
"""
|
||||
|
||||
:param abbreviation:
|
||||
:return:
|
||||
"""
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(GlossaryProvider._GET_EXPLANATION, (abbreviation.lower(),))
|
||||
return cursor.fetchone()
|
||||
|
||||
def save_or_replace(self, abbreviation: str, explanation: str):
|
||||
"""
|
||||
|
||||
:param abbreviation:
|
||||
:param explanation:
|
||||
:return:
|
||||
"""
|
||||
existing = self.get_explanation(abbreviation)
|
||||
_id = existing[0] if existing is not None else None
|
||||
data = (_id, abbreviation.lower(), explanation)
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(GlossaryProvider._SAVE_OR_OVERWRITE, data)
|
||||
self._database_connection.commit()
|
||||
|
||||
def delete_explanation(self, abbreviation: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(GlossaryProvider._DELETE_EXPLANATION, (abbreviation.strip(),))
|
||||
self._database_connection.commit()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._database_connection.close()
|
||||
@@ -0,0 +1,57 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class HanDatabaseProvider(object):
|
||||
_CREATE_HANDB_TABLE = 'CREATE TABLE IF NOT EXISTS handb (id INTEGER PRIMARY KEY, \
|
||||
hanword TEXT)'
|
||||
_GET_RANDOM_WORD = 'SELECT hanword FROM handb ORDER BY RANDOM() LIMIT 1'
|
||||
_INSERT_WORD = 'REPLACE INTO handb(id, hanword) VALUES (?,?)'
|
||||
_DELETE_WORD = 'DELETE FROM handb WHERE hanword = ?'
|
||||
_GET_WORD = 'SELECT id, hanword FROM handb WHERE hanword = ?'
|
||||
def __init__(self):
|
||||
self._database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(HanDatabaseProvider._CREATE_HANDB_TABLE)
|
||||
self._database_connection.commit()
|
||||
|
||||
def get_random_word(self):
|
||||
"""
|
||||
|
||||
:param abbreviation:
|
||||
:return:
|
||||
"""
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(HanDatabaseProvider._GET_RANDOM_WORD)
|
||||
return cursor.fetchone()
|
||||
|
||||
def get_hanWord(self, HanWord):
|
||||
"""
|
||||
|
||||
:param abbreviation:
|
||||
:return:
|
||||
"""
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(HanDatabaseProvider._GET_WORD, (HanWord.upper(),))
|
||||
return cursor.fetchone()
|
||||
|
||||
def addWord(self, HanWord):
|
||||
"""
|
||||
|
||||
:param abbreviation:
|
||||
:param explanation:
|
||||
:return:
|
||||
"""
|
||||
existing = self.get_hanWord(HanWord)
|
||||
_id = existing[0] if existing is not None else None
|
||||
data = (_id, HanWord)
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(HanDatabaseProvider._INSERT_WORD, data)
|
||||
self._database_connection.commit()
|
||||
|
||||
def delete_hanWord(self, HanWord):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(HanDatabaseProvider._DELETE_WORD, (HanWord.strip(),))
|
||||
self._database_connection.commit()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._database_connection.close()
|
||||
@@ -0,0 +1,36 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class IntroductionProvider(object):
|
||||
_CREATE_TABLE = 'CREATE TABLE IF NOT EXISTS introduction (id INTEGER PRIMARY KEY, \
|
||||
user TEXT, intro TEXT)'
|
||||
_GET_INTRO = 'SELECT id, intro FROM introduction WHERE user = ?'
|
||||
_SAVE_OR_OVERWRITE = 'REPLACE INTO introduction (id, user, intro) VALUES (?, ?, ?)'
|
||||
_DELETE_INTRO = 'DELETE FROM introduction WHERE user = ?'
|
||||
|
||||
def __init__(self):
|
||||
self._database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(IntroductionProvider._CREATE_TABLE)
|
||||
self._database_connection.commit()
|
||||
|
||||
def get_intro(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(IntroductionProvider._GET_INTRO, (user.lower(),))
|
||||
return cursor.fetchone()
|
||||
|
||||
def save_or_replace(self, user: str, intro: str):
|
||||
existing = self.get_intro(user)
|
||||
_id = existing[0] if existing is not None else None
|
||||
data = (_id, user.lower(), intro)
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(IntroductionProvider._SAVE_OR_OVERWRITE, data)
|
||||
self._database_connection.commit()
|
||||
|
||||
def delete_intro(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(IntroductionProvider._DELETE_INTRO, (user.lower(),))
|
||||
self._database_connection.commit()
|
||||
|
||||
def __exit__(self):
|
||||
self._database_connection.close()
|
||||
@@ -0,0 +1,9 @@
|
||||
class RemoteUser(object):
|
||||
"""
|
||||
Holds information about another user on IRC (nick!user@host)
|
||||
"""
|
||||
|
||||
def __init__(self, nick, user, host):
|
||||
self.nick = nick
|
||||
self.user = user
|
||||
self.host = host
|
||||
@@ -0,0 +1,36 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class ScoreProvider(object):
|
||||
_CREATE_TABLE = 'CREATE TABLE IF NOT EXISTS score (id INTEGER PRIMARY KEY, \
|
||||
user TEXT, score INTEGER)'
|
||||
_GET_SCORE = 'SELECT id, score FROM score WHERE user = ?'
|
||||
_SAVE_OR_OVERWRITE = 'REPLACE INTO score (id, user, score) VALUES (?, ?, ?)'
|
||||
_DELETE_SCORE = 'DELETE FROM score WHERE user = ?'
|
||||
|
||||
def __init__(self):
|
||||
self._database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(ScoreProvider._CREATE_TABLE)
|
||||
self._database_connection.commit()
|
||||
|
||||
def get_score(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(ScoreProvider._GET_SCORE, (user.lower(),))
|
||||
return cursor.fetchone()
|
||||
|
||||
def save_or_replace(self, user: str, score: int):
|
||||
existing = self.get_score(user)
|
||||
_id = existing[0] if existing is not None else None
|
||||
data = (_id, user.lower(), score)
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(ScoreProvider._SAVE_OR_OVERWRITE, data)
|
||||
self._database_connection.commit()
|
||||
|
||||
def delete_score(self, user: str):
|
||||
cursor = self._database_connection.cursor()
|
||||
cursor.execute(ScoreProvider._DELETE_SCORE, (user.lower(),))
|
||||
self._database_connection.commit()
|
||||
|
||||
def __exit__(self):
|
||||
self._database_connection.close()
|
||||
@@ -0,0 +1,18 @@
|
||||
class TextProvider(object):
|
||||
"""
|
||||
Provides different Texts
|
||||
"""
|
||||
|
||||
def get_random_fortune(self):
|
||||
"""
|
||||
:return: a random sentence
|
||||
"""
|
||||
return "Das macht Spa�"
|
||||
|
||||
def get_definiton(self, name):
|
||||
"""
|
||||
|
||||
:param name: name of definition to get
|
||||
:return: the definition
|
||||
"""
|
||||
return "Konnte Definition nicht finden"
|
||||
@@ -0,0 +1,107 @@
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
|
||||
class UserProvider(object):
|
||||
"""
|
||||
Provides information about the users
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = self.database_connection.cursor()
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS user (id INTEGER PRIMARY KEY , name TEXT)''')
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS user_stats(id INTEGER PRIMARY KEY, characters INT)''')
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS last_seen (id INTEGER PRIMARY KEY, last_seen REAL)''')
|
||||
self.database_connection.commit()
|
||||
|
||||
def get_characters(self, name):
|
||||
"""
|
||||
|
||||
:param name: name of user whom characters are to get
|
||||
:return: total number of characters written
|
||||
"""
|
||||
cursor = self.database_connection.cursor()
|
||||
id = self._get_id(name)
|
||||
if id is None:
|
||||
return 0
|
||||
for characters in cursor.execute("SELECT characters FROM user_stats WHERE id = ?", (id,)):
|
||||
return characters[0]
|
||||
return 0
|
||||
|
||||
def get_activity(self, name):
|
||||
"""
|
||||
|
||||
:param name: name of user whom activity to get
|
||||
:return: last activity by user
|
||||
"""
|
||||
cursor = self.database_connection.cursor()
|
||||
id = self._get_id(name)
|
||||
if id is None:
|
||||
return 0
|
||||
for time in cursor.execute("SELECT last_seen FROM last_seen WHERE id = ?", (id,)):
|
||||
return time[0]
|
||||
return 0
|
||||
|
||||
def add_characters(self, name, number):
|
||||
"""
|
||||
|
||||
:param name: User to Add Characters to
|
||||
:param number: Number of Characters to add
|
||||
:return: nothing
|
||||
"""
|
||||
cursor = self.database_connection.cursor()
|
||||
id = self._get_id(name)
|
||||
if id is None:
|
||||
self._create_user(name)
|
||||
id = self._get_id(name)
|
||||
for chars in cursor.execute("SELECT characters FROM user_stats WHERE id = ?", (id,)):
|
||||
chars = chars[0]
|
||||
chars += number
|
||||
cursor.execute("UPDATE user_stats SET characters = ? WHERE id = ?", (chars, id,))
|
||||
self.database_connection.commit()
|
||||
return None
|
||||
|
||||
def set_active(self, name):
|
||||
"""
|
||||
|
||||
:param name: set this user active at the moment
|
||||
:return: Nothing
|
||||
"""
|
||||
cursor = self.database_connection.cursor()
|
||||
id = self._get_id(name)
|
||||
ntime = time.time()
|
||||
if id is None:
|
||||
self._create_user(name)
|
||||
id = self._get_id(name)
|
||||
cursor.execute("UPDATE last_seen SET last_seen = ? WHERE id = ?", (ntime, id,))
|
||||
self.database_connection.commit()
|
||||
|
||||
def permission(self, user, percent):
|
||||
"""
|
||||
|
||||
:param user: user to ask permission for
|
||||
:param percent: percent needed for permission
|
||||
:return: True or False
|
||||
http://stackoverflow.com/questions/1682920/determine-if-a-user-is-idented-on-irc
|
||||
"""
|
||||
return True
|
||||
|
||||
def _get_id(self, name):
|
||||
cursor = self.database_connection.cursor()
|
||||
try:
|
||||
for id in cursor.execute("SELECT id FROM user WHERE name = ?", (name,)):
|
||||
return id[0]
|
||||
except:
|
||||
return None
|
||||
|
||||
def _create_user(self, name):
|
||||
cursor = self.database_connection.cursor()
|
||||
cursor.execute("INSERT INTO user(name) VALUES (?)", (name,))
|
||||
id = self._get_id(name)
|
||||
cursor.execute("INSERT INTO user_stats(id, characters) VALUES (?, 0)", (id,))
|
||||
cursor.execute("INSERT INTO last_seen (id, last_seen) VALUES (?, 0)", (id,))
|
||||
self.database_connection.commit()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.database_connection.close()
|
||||
@@ -0,0 +1 @@
|
||||
__author__ = 'Pups'
|
||||
@@ -0,0 +1,23 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class i18n(object):
|
||||
def get_text(self, name, replacements=None, lang='de-DE'):
|
||||
"""
|
||||
|
||||
:param replacements:
|
||||
:param name: name of text
|
||||
:param lang: language to get text in
|
||||
:return: the text
|
||||
"""
|
||||
if replacements is None:
|
||||
replacements = {}
|
||||
database_connection = sqlite3.connect('faust_bot.db')
|
||||
cursor = database_connection.cursor()
|
||||
ltext = ""
|
||||
print(replacements)
|
||||
for longText in cursor.execute("SELECT longText FROM i18n WHERE lang = ? AND ident = ?", (lang, name,)):
|
||||
ltext = longText[0]
|
||||
for (key, value) in replacements.items():
|
||||
ltext = ltext.replace('$' + key, value)
|
||||
return ltext
|
||||
@@ -0,0 +1,40 @@
|
||||
# from ..FaustBot import ModuleType
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.UserProvider import UserProvider
|
||||
from FaustBot.Modules.JoinObserverPrototype import JoinObserverPrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
from ..Modules.NickChangeObserverPrototype import NickChangeObserverPrototype
|
||||
from ..Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class ActivityObserver(PrivMsgObserverPrototype, JoinObserverPrototype, NickChangeObserverPrototype):
|
||||
"""
|
||||
A Class only reacting to pings
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_join(self, data, connection: Connection):
|
||||
users = UserProvider()
|
||||
if data['channel'] == connection.details.get_channel():
|
||||
users.set_active(data['nick'])
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
users = UserProvider()
|
||||
if data['channel'] == connection.details.get_channel():
|
||||
users.set_active(data['nick'])
|
||||
users.add_characters(data['nick'], len(data['message']))
|
||||
|
||||
def update_on_nick_change(self, data, connection: Connection):
|
||||
users = UserProvider()
|
||||
users.set_active(data['new_nick'])
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_MSG, ModuleType.ON_JOIN, ModuleType.ON_NICK_CHANGE]
|
||||
@@ -0,0 +1,41 @@
|
||||
import datetime
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.UserProvider import UserProvider
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from ..Model.i18n import i18n
|
||||
from FaustBot.Modules.UserList import UserList
|
||||
|
||||
class AllSeenObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".seen"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".seen <nick> - um abzufragen wann <nick> zuletzt hier war"
|
||||
|
||||
def __init__(self, user_list: UserList):
|
||||
super().__init__()
|
||||
self.user_list = user_list
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.allseen') == -1:
|
||||
return
|
||||
if not self._is_idented_mod(data, connection):
|
||||
return
|
||||
User_afk = defaultdict(int)
|
||||
for who in self.user_list.userList.keys():
|
||||
user_provider = UserProvider()
|
||||
activity = user_provider.get_activity(who)
|
||||
delta = time.time() - activity
|
||||
User_afk[who] = delta
|
||||
print(who)
|
||||
print(delta)
|
||||
for w in sorted(User_afk, key=User_afk.get):
|
||||
output = (w+":\t"+str(datetime.timedelta(seconds=User_afk[w])))
|
||||
connection.send_back(output, data)
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,130 @@
|
||||
from FaustBot.Communication.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from FaustBot.Modules.JoinObserverPrototype import JoinObserverPrototype
|
||||
from FaustBot.Model.Config import Config
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
|
||||
"""
|
||||
This Module contains multiple classes to handle spam.
|
||||
Goal of this module is to provide an easy to use AntiSpam-Module, which can be activated if needed.
|
||||
It should support multiple modes regarding the aggressivity of the anti-spam-handling.
|
||||
"""
|
||||
|
||||
|
||||
class AntiSpamLevel(Enum):
|
||||
"""
|
||||
Which action to be done if spam is detected.
|
||||
"""
|
||||
OFF = 0 # No action is taken if spam is detected.
|
||||
WARN = 1 # Warns the user by messaging him/her without any further steps.
|
||||
WARN_KICK = 2 # Warns the user first, then kicks him/her.
|
||||
KICK = 3 # Kicks the user without any further warning.
|
||||
WARN_KICK_BAN = 4 # Like WARN_KICK but also bans the user directly.
|
||||
KICK_BAN = 5 # Like KICK, but also bans the user directly.
|
||||
|
||||
|
||||
class AntiSpamAggressivity(Enum):
|
||||
"""
|
||||
Settings to detect spam.
|
||||
a: Amount of seconds between two similiar messages to detect them as spam
|
||||
b: Amount of (non similiar) messages to be received with time-distance c, to detect them as spam
|
||||
c: Time between messages of b:
|
||||
d: Trustfactor
|
||||
"""
|
||||
LOW = (3, 7, 0.5, 15) # (a, b, c, d)
|
||||
MEDIUM = (5, 5, 0.7, 10)
|
||||
HIGH = (7, 3, 1.0, 5)
|
||||
ULTRA = (10, 3, 1.0, 2)
|
||||
|
||||
|
||||
class AntiSpamEntry(object):
|
||||
"""
|
||||
Entry collecting information about possible spammers.
|
||||
"""
|
||||
|
||||
def __init__():
|
||||
super().__init__()
|
||||
self.user = ""
|
||||
self.warn_count = 0
|
||||
self.msg = ""
|
||||
self.timestamp = datetime.now()
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
return self.user
|
||||
|
||||
@user.setter
|
||||
def user(self, user)
|
||||
self.user = user
|
||||
|
||||
@property
|
||||
def warn_count(self):
|
||||
return self.warn_count
|
||||
|
||||
@warn_count.setter
|
||||
def warn_count(self, warn_count)
|
||||
self.warn_count = warn_count
|
||||
|
||||
def inc_warn_count(self)
|
||||
self.warn_count += 1
|
||||
|
||||
@property
|
||||
def msg(self):
|
||||
return self.msg
|
||||
|
||||
@msg.setter
|
||||
def msg(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
return self.timestamp
|
||||
|
||||
@timestamp.setter
|
||||
def timestamp(self, timestamp):
|
||||
self.timestamp = timestamp
|
||||
|
||||
|
||||
class AntiSpamObserver(PrivMsgObserverPrototype, JoinObserverPrototype):
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError("TBD!")
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("TBD!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_JOIN,
|
||||
ModuleType.ON_PRIVMSG]
|
||||
|
||||
def __init__(self, config : Config):
|
||||
super().__init__()
|
||||
self._msg_map = dict()
|
||||
self._anti_spam_level = AntiSpamLevel.OFF
|
||||
self._anti_spam_aggressivity = AntiSpamAggressivity.LOW
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if _bot_name in data['channel']: # TBD! _bot_name should be fetched from the config!
|
||||
self._handle_command(data, connection)
|
||||
|
||||
if self._anti_spam_level == AntiSpamLevel.OFF:
|
||||
return
|
||||
|
||||
def update_on_join(self, data: dict, connection: Connection):
|
||||
raise NotImplementedError("TBD!")
|
||||
|
||||
def _is_spam(self, user: str, msg: str)
|
||||
pass
|
||||
|
||||
def _handle_command(self, data: dict, connection: Connection)
|
||||
pass
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
"""
|
||||
Check wether the issuer of a module control command is a moderator or not
|
||||
"""
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick']
|
||||
@@ -0,0 +1,50 @@
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.UserProvider import UserProvider
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from ..Model.i18n import i18n
|
||||
from FaustBot.Model.BlockedUsers import BlockProvider
|
||||
|
||||
class BlockObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [""]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ""
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if not self._is_idented_mod(data, connection):
|
||||
return
|
||||
if data['message'].find('.block ') != -1:
|
||||
self.block(data, connection)
|
||||
if data['message'].find ('.unblock')!=-1:
|
||||
self.unblock(data, connection)
|
||||
if data['message'].find('.isblocked') != -1:
|
||||
self.isBlocked(data, connection)
|
||||
|
||||
def block(self,data,connection):
|
||||
blocklist = BlockProvider()
|
||||
blocklist.block(self.isolateTarget(data))
|
||||
connection.send_back("blocked: "+ self.isolateTarget(data), data)
|
||||
|
||||
def unblock(self,data,connection):
|
||||
blocklist = BlockProvider()
|
||||
blocklist.delete_block(self.isolateTarget(data))
|
||||
connection.send_back("unblocked: "+ self.isolateTarget(data), data)
|
||||
|
||||
def isBlocked(self,data,connection):
|
||||
blocklist= BlockProvider()
|
||||
answ = blocklist.is_blocked(self.isolateTarget(data))
|
||||
if answ:
|
||||
connection.send_back(self.isolateTarget(data) + " ist geblocked", data)
|
||||
return
|
||||
connection.send_back(self.isolateTarget(data)+" ist nicht geblocked", data)
|
||||
def isolateTarget(self,data):
|
||||
return data['message'].split(' ')[1]
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,43 @@
|
||||
import random
|
||||
import urllib
|
||||
import requests
|
||||
import html
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from FaustBot.Modules.TitleObserver import TitleObserver
|
||||
from FaustBot.Modules.ComicScraper import ComicScraper
|
||||
|
||||
from comics import *
|
||||
|
||||
|
||||
class ComicObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return ['.comic']
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return '.comic liefer einen Link zu einem zufälligen Comic.'
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.comic') == -1:
|
||||
return
|
||||
|
||||
#Join list of comics that have a web based random functionality and those that need a scraper
|
||||
all_comics=comics+scraper_comics
|
||||
|
||||
#Choose from the joined list
|
||||
comic = random.choice(all_comics)
|
||||
|
||||
#Check which type of comic it is: If it's one that doesn't need a scraper, get the url and return it.
|
||||
#If it needs a scraper, use ComicScraper to scrape the comic.
|
||||
#If you want to add custom comic scrapers: Look at ComicScraper.py and insert your functionality.
|
||||
if not comic in scraper_comics:
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'}
|
||||
req = urllib.request.Request(comic, None, headers)
|
||||
resource = urllib.request.urlopen(req)
|
||||
title = TitleObserver.getTitle(TitleObserver(), resource)
|
||||
connection.send_back(resource.geturl() + " " + title, data)
|
||||
else:
|
||||
connection.send_back(ComicScraper.getRandomComic(comic),data);
|
||||
@@ -0,0 +1,41 @@
|
||||
import random
|
||||
import urllib
|
||||
import requests
|
||||
import html
|
||||
|
||||
#Comic scraper scrapes comics from urls that have no website based random functionality. Comic URLs have to be in comics.py
|
||||
class ComicScraper():
|
||||
|
||||
#Scrapers for specific websites follow here:
|
||||
|
||||
#scraper for Betamonkeys
|
||||
def scrapeBetamonkeys(url):
|
||||
#get latest comic id from the website, then generate a random number within the range of 1 and the latest comic.
|
||||
#Finally generate a new comic url from that. I know this is dirty - But it works for a comic i guess ;)
|
||||
r = requests.get(url)
|
||||
comic_id_latest=r.content.decode("utf-8").split("http://betamonkeys.co.uk/wp-content/stripshow_comics/betamonkeys")[1].split(".png")[0]
|
||||
random_comic_number=str(random.randint(1,int(comic_id_latest)))
|
||||
random_comic_url="http://betamonkeys.co.uk/wp-content/stripshow_comics/betamonkeys"+random_comic_number+".png"
|
||||
return random_comic_url+ " Betamonkeys "+ random_comic_number + " | Betamonkeys"
|
||||
|
||||
#scraper for Nichtlustig
|
||||
def scrapeNichtlustig(url):
|
||||
#TODO: Write a scraper for Nichtlustig!
|
||||
return "Bisher kein Scraper für Nichtlustig."
|
||||
|
||||
#your custom scraper here
|
||||
#def scrapeYourCustomComic(url):
|
||||
#return "Your custom scraped URL"
|
||||
|
||||
|
||||
|
||||
#Main scraping function. Takes url, decides scraping method to use. If no scraping method is found: return "No parser found"
|
||||
def getRandomComic(url):
|
||||
if "betamonkeys.co.uk" in url:
|
||||
return ComicScraper.scrapeBetamonkeys(url)
|
||||
|
||||
if "nichtlustig.de" in url:
|
||||
return ComicScraper.scrapeNichtlustig(url)
|
||||
|
||||
else:
|
||||
return "No parser found for comic URL: "+url
|
||||
@@ -0,0 +1,94 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.Config import Config
|
||||
from FaustBot.Model.GlossaryProvider import GlossaryProvider
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from FaustBot.Modules.WikiObserver import WikiObserver
|
||||
|
||||
class GlossaryModule(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [GlossaryModule._ADD_EXPLANATION,
|
||||
GlossaryModule._REMOVE_EXPLANATION,
|
||||
GlossaryModule._QUERY_EXPLANATION]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
_QUERY_EXPLANATION = '.?'
|
||||
_REMOVE_EXPLANATION = '.?-'
|
||||
_ADD_EXPLANATION = '.?+'
|
||||
|
||||
def __init__(self, config: Config):
|
||||
super().__init__()
|
||||
self._config = config
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
msg = data['message']
|
||||
if not -1 == msg.find(GlossaryModule._REMOVE_EXPLANATION):
|
||||
self._remove_query(data, connection)
|
||||
elif not -1 == msg.find(GlossaryModule._ADD_EXPLANATION):
|
||||
self._add_query(data, connection)
|
||||
elif not -1 == msg.find(GlossaryModule._QUERY_EXPLANATION):
|
||||
self._answer_query(data, connection)
|
||||
|
||||
def _answer_query(self, data, connection: Connection):
|
||||
"""
|
||||
:param data:
|
||||
:param connection:
|
||||
:return:
|
||||
"""
|
||||
glossary_provider = GlossaryProvider()
|
||||
split = data['message'].split(GlossaryModule._QUERY_EXPLANATION)
|
||||
if not len(split) == 2:
|
||||
return
|
||||
answer = glossary_provider.get_explanation(split[1].strip())
|
||||
if answer is None or answer[1] is None or answer[1].strip() == '':
|
||||
if split[1].strip() == '':
|
||||
return
|
||||
# connection.send_back("Tut mir leid, " + data['nick'] + ". Für " + split[1].strip() +
|
||||
# " habe ich noch keinen Eintrag. Aber Wikipedia sagt dazu:", data)
|
||||
wikiObserver = WikiObserver()
|
||||
wikiObserver.config = self.config
|
||||
data2 = data.copy()
|
||||
data2['message'] = '.w '+split[1]+" \r\n"
|
||||
wikiObserver.update_on_priv_msg(data2, connection)
|
||||
else:
|
||||
connection.send_back(data['nick'] + ": " + split[1] + " - " + answer[1], data)
|
||||
|
||||
def _remove_query(self, data, connection: Connection):
|
||||
"""
|
||||
|
||||
:param data:
|
||||
:param connection:
|
||||
:return:
|
||||
"""
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_back("Dir fehlen die Berechtigungen zum Löschen von Einträgen, " + data['nick'] + ".", data)
|
||||
return
|
||||
glossary_provider = GlossaryProvider()
|
||||
split = data['message'].split(GlossaryModule._REMOVE_EXPLANATION)
|
||||
if not len(split) == 2:
|
||||
return
|
||||
glossary_provider.delete_explanation(split[1])
|
||||
connection.send_back("Der Eintrag zu " + split[1] + " wurde gelöscht, " + data['nick'] + ".", data)
|
||||
|
||||
def _add_query(self, data, connection: Connection):
|
||||
"""
|
||||
|
||||
:param data:
|
||||
:param connection:
|
||||
:return:
|
||||
"""
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_back("Dir fehlen leider die Rechte zum Hinzufügen von Einträgen, " + data['nick'] + ".",
|
||||
data)
|
||||
return
|
||||
msg = data['message'].split(GlossaryModule._ADD_EXPLANATION)[1].strip()
|
||||
split = msg.split(' ', 1)
|
||||
glossary_provider = GlossaryProvider()
|
||||
glossary_provider.save_or_replace(split[0], split[1])
|
||||
connection.send_back(data['nick'] + ": der Eintrag zu " + split[0] + " wurde gespeichert.", data)
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,42 @@
|
||||
import csv
|
||||
import re
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class ICDObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def get_icd(self, code):
|
||||
if code == "C64" or code == "P20":
|
||||
return ""
|
||||
icd10_codes = open('care_icd10_de.csv', 'r',encoding='utf8')
|
||||
icd10 = csv.reader(icd10_codes, delimiter=';', quotechar='"')
|
||||
for row in icd10:
|
||||
if row[0] == code:
|
||||
return code +' - ' + row[1]
|
||||
return 0
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['channel'] != connection.details.get_channel():
|
||||
return
|
||||
regex = r'\b(\w\d{2}\.?\d?\d?)\b'
|
||||
codes = re.findall(regex, data['message'])
|
||||
for code in codes:
|
||||
code = code.capitalize()
|
||||
text = self.get_icd(code)
|
||||
if text == 0:
|
||||
if code.find('.') != -1:
|
||||
code += '-'
|
||||
else:
|
||||
code += '.-'
|
||||
text = self.get_icd(code)
|
||||
if text != 0:
|
||||
connection.send_back(text, data)
|
||||
@@ -0,0 +1,21 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class ModmailObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".modmail"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".modmail <msg> - Sendet allen Moderatoren <msg> per PN"
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.modmail') == -1:
|
||||
return
|
||||
mods = connection.details.get_mods()
|
||||
print(mods)
|
||||
message = data['message'].split('.modmail ')[1]
|
||||
for mod in mods:
|
||||
connection.send_to_user(mod, data['nick'] + ' meldet: ' + message)
|
||||
@@ -0,0 +1,88 @@
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from FaustBot.Modules.PingObserverPrototype import PingObserverPrototype
|
||||
from random import randint
|
||||
from collections import defaultdict
|
||||
|
||||
class DuckObserver(PrivMsgObserverPrototype, PingObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return ['.freunde', '.schiessen', '.starthunt','.stophunt','.ducks']
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return 'duck game'
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_MSG, ModuleType.ON_PING]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.active = 0
|
||||
self.duck_alive = 0
|
||||
self.ducks_hunt = defaultdict(int)
|
||||
self.ducks_befriend = defaultdict(int)
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.starthunt') != -1:
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_back("Dir fehlen leider die Rechte zum Starten der Jagd, " + data['nick'] + ".",data)
|
||||
return
|
||||
self.active = 1
|
||||
connection.send_channel("Jagd eröffnet")
|
||||
return
|
||||
if data['message'].find('.stophunt') != -1:
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_back("Dir fehlen leider die Rechte zum Stoppen der Jagd, " + data['nick'] + ".",
|
||||
data)
|
||||
return
|
||||
self.active = 0
|
||||
self.duck_alive = 0
|
||||
connection.send_channel("Jagd beended")
|
||||
return
|
||||
if data['message'].find('.ducks') != -1:
|
||||
connection.send_channel(data['nick'] + " hat schon " + str(self.ducks_befriend[data['nick']]) + " befreundete Enten und " + str(self.ducks_hunt[data['nick']]) + " getötete Enten.")
|
||||
if data['message'].find('.freunde') != -1:
|
||||
self.befriend(data, connection)
|
||||
if data['message'].find('.schiessen') != -1:
|
||||
self.shoot(data, connection)
|
||||
|
||||
def befriend(self, data, connection):
|
||||
if self.duck_alive == 1:
|
||||
if randint(1, 100) > 97:
|
||||
connection.send_channel(data['nick'] + " probiert eine Ente zu befreunden aber sie will nicht.")
|
||||
else:
|
||||
self.duck_alive = 0
|
||||
self.ducks_befriend[data['nick']] += 1
|
||||
connection.send_channel(data['nick'] + " hat schon " + str(self.ducks_befriend[data['nick']]) + " befreundete Enten und " + str(self.ducks_hunt[data['nick']]) + " getötete Enten.")
|
||||
return
|
||||
if (self.duck_alive == 0 and self.active == 1):
|
||||
connection.send_channel(data['nick']+ " probiert eine nicht existente Ente zu befreunden")
|
||||
if self.active == 0:
|
||||
connection.send_channel("Es läuft derzeit keine Entenjagd.")
|
||||
def shoot(self, data, connection):
|
||||
if self.duck_alive == 1:
|
||||
if randint(1,100) >97:
|
||||
connection.send_channel(data['nick'] + " trifft daneben")
|
||||
else:
|
||||
self.duck_alive = 0
|
||||
self.ducks_hunt[data['nick']] += 1
|
||||
connection.send_channel(data['nick'] + " hat schon " + str(self.ducks_befriend[data['nick']]) + " befreundete Enten und " + str(self.ducks_hunt[data['nick']]) + " getötete Enten.")
|
||||
return
|
||||
if (self.duck_alive == 0 and self.active == 1):
|
||||
connection.send_channel(data['nick']+ " schiesst ins Nichts")
|
||||
if self.active == 0:
|
||||
connection.send_channel("Es läuft derzeit keine Entenjagd.")
|
||||
|
||||
def update_on_ping(self, data, connection: Connection):
|
||||
if self.active == 0:
|
||||
return
|
||||
if 1 == randint(1,11):
|
||||
if self.duck_alive == 0:
|
||||
connection.send_channel("*. *. *. * <<w°)> *. *. * Quack!")
|
||||
self.duck_alive = 1
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,17 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class FreeHugsObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".hug"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".hug - verteilt Umarmungen"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.hug') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION knuddelt ' + data['nick'] + '.\001', data)
|
||||
@@ -0,0 +1,23 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
kekse = ['einen Schokoladenkeks', 'einen Vanillekeks', 'einen Doppelkeks', 'keinen Keks',
|
||||
'einen Keks', 'einen Erdbeerkeks', 'einen Schokoladen-Cheesecake-Keks',
|
||||
'einen Glückskeks', 'einen Scherzkeks', 'einen Unglückskeks']
|
||||
|
||||
|
||||
class GiveCookieObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".cookie"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".cookie - verteilt kekse; oder auch nicht"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.cookie') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION schenkt ' + data['nick'] + ' ' + random.choice(kekse) + '.\001', data)
|
||||
@@ -0,0 +1,20 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from getraenke import getraenke
|
||||
|
||||
|
||||
class GiveDrinkObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".drink"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".drink - schenkt Getränke aus"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.drink') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION schenkt ' + data['nick'] + ' ' + random.choice(getraenke) + ' ein.\001', data)
|
||||
@@ -0,0 +1,98 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from getraenkeOnlyGoodOnes import getraenkegoodones
|
||||
from getraenke import getraenke
|
||||
from essen import essen
|
||||
from icecreamlist import icecream
|
||||
from extras import giveextras
|
||||
from snacks import snacks
|
||||
kekse = ['einen Schokoladenkeks', 'einen Vanillekeks', 'einen Doppelkeks',
|
||||
'einen Keks', 'einen Erdbeerkeks', 'einen Schokoladen-Cheesecake-Keks',
|
||||
'einen Glückskeks', 'einen Scherzkeks', 'einen Unglückskeks']
|
||||
|
||||
class GiveDrinkToObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".givedrink"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".givedrink NUTZER - schenkt jemand anders ein Getränke aus"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.give') == -1:
|
||||
return
|
||||
receiver = data['message'].split()[1]
|
||||
if receiver == data['nick']:
|
||||
type = data['message'].split()[2]
|
||||
if type is not None:
|
||||
if type.lower() == "kaffee":
|
||||
connection.send_back('Fehler 418 Ich bin eine Teekanne', data)
|
||||
return
|
||||
connection.send_back('Bitte nutze .drink um dir selbst ein Getränk zu besorgen', data)
|
||||
return
|
||||
if len(data['message'].split()) < 3:
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(getraenkegoodones) + '. Schöne Grüße von ' + data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
type = data['message'].split()[2]
|
||||
if type is not None:
|
||||
matchingDrinks = []
|
||||
for drink in getraenkegoodones:
|
||||
if type.lower() in drink.lower():
|
||||
matchingDrinks.append(drink)
|
||||
if matchingDrinks:
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(matchingDrinks) + '. Schöne Grüße von ' + data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
if type.lower() == "drink":
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(getraenke) + '. Schöne Grüße von ' +
|
||||
data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
|
||||
if type.lower() == "food":
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(essen) + '. Schöne Grüße von ' +
|
||||
data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
|
||||
if type.lower() == "cookie":
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(kekse) + '. Schöne Grüße von ' +
|
||||
data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
if type.lower() == "snack":
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(snacks) + '. Schöne Grüße von ' +
|
||||
data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
if type.lower() == "massage":
|
||||
connection.send_back(
|
||||
'\001ACTION knetet ' + receiver + ' feste den Rücken durch. ' +
|
||||
data[
|
||||
'nick'] + ' meinte ich solle dir was gutes tun. \001', data)
|
||||
return
|
||||
for drink in getraenke+essen+icecream+giveextras+snacks:
|
||||
if type.lower() in drink.lower():
|
||||
matchingDrinks.append(drink)
|
||||
if matchingDrinks:
|
||||
connection.send_back(
|
||||
'\001ACTION serviert ' + receiver + ' ' + random.choice(matchingDrinks) + '. Schöne Grüße von ' +
|
||||
data[
|
||||
'nick'] + '\001', data)
|
||||
return
|
||||
else:
|
||||
connection.send_back(
|
||||
'Tut mir leid ' + data['nick'] + ', '+ type+' haben wir nicht auf der Karte!', data)
|
||||
return
|
||||
connection.send_back('\001ACTION serviert ' + receiver + ' ' + random.choice(getraenkegoodones) + '. Schöne Grüße von '+data['nick']+'\001', data)
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from essen import essen
|
||||
|
||||
|
||||
class GiveFoodObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".food"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".food - gibt etwas zu essen aus"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.food') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION tischt ' + data['nick'] + ' ' + random.choice(essen) + ' auf.\001', data)
|
||||
@@ -0,0 +1,20 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from icecreamlist import icecream
|
||||
|
||||
|
||||
class GiveIceObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".ice"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".ice - schenkt Eis"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.ice') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION serviert ' + data['nick'] + ' ' + random.choice(icecream) + '.\001', data)
|
||||
@@ -0,0 +1,36 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.i18n import i18n
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class GoogleObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.g') == -1:
|
||||
return
|
||||
i18n_server = i18n()
|
||||
lang = i18n_server.get_text('google_lang')
|
||||
t = i18n_server.get_text('google_tld')
|
||||
q = data['message'].split(' ')
|
||||
query = ''
|
||||
for word in q:
|
||||
if word.strip() != '.g':
|
||||
query += word + ' '
|
||||
# g = google.search(query, tld=t, lang=lang, num=1, start=0, stop=0, pause=2.0)
|
||||
# s = next(g)
|
||||
# print(s)
|
||||
|
||||
# Connection.singleton().send_channel(g)
|
||||
# if g has nonzero results:
|
||||
# Connection.singleton().send_channel(data['nick'] + ', ' + i18n_server.get_text('google_fail'))
|
||||
# return
|
||||
# Connection.singleton().send_channel(data['nick'] + ' ' + gefundenes erstes result)
|
||||
# Connection.singleton().send_channel(title von dem link)
|
||||
pass
|
||||
@@ -0,0 +1,31 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.JoinObserverPrototype import JoinObserverPrototype
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
class Greeter(JoinObserverPrototype):
|
||||
"""
|
||||
A Class only reacting to pings
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.names = defaultdict(int)
|
||||
|
||||
def update_on_join(self, data, connection: Connection):
|
||||
if data['channel'] == connection.details.get_channel():
|
||||
if int(time.time()) - self.names[data['nick']] > 28800:
|
||||
if data['nick'].find("Neuling") != -1:
|
||||
connection.send_back("Herzlich Willkommen bei uns "+data['nick'],data)
|
||||
self.names[data['nick']] = int(time.time())
|
||||
return
|
||||
connection.send_back("Hallo " + data['nick'], data)
|
||||
self.names[data['nick']] = int(time.time())
|
||||
@@ -0,0 +1,294 @@
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from FaustBot.Model.ScoreProvider import ScoreProvider
|
||||
from FaustBot.Model.HanDatabaseProvider import HanDatabaseProvider
|
||||
from collections import defaultdict
|
||||
from threading import Lock
|
||||
import csv
|
||||
import random
|
||||
import time
|
||||
import datetime
|
||||
|
||||
class HangmanObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return ['.guess', '.word', '.stop', '.hint', '.score', '.spielregeln']
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return 'hangman game'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
HangmanObserver.lock = Lock()
|
||||
self.word = ''
|
||||
self.guesses = ['-', '/', ' ', '_','.']
|
||||
self.tries_left = 0
|
||||
self.wrong_guessed = []
|
||||
self.worder = ''
|
||||
self.wrongly_guessedWords = []
|
||||
self.time = time.time()
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.guess ') != -1:
|
||||
self.guess(data, connection)
|
||||
return
|
||||
if data['message'].find('.word ') != -1:
|
||||
self.take_word(data, connection)
|
||||
if data['message'].find('.han') != -1 and not data['message'].find('.handelete')!= -1 and not data['message'].find('hanadd'
|
||||
) != -1:
|
||||
self.start_solo_game(data, connection)
|
||||
if data['message'].find('hanadd') != -1:
|
||||
self.han_user_add(data, connection)
|
||||
if data['message'].find('.stop') != -1 and not data['message'].find('.stophunt') != -1 \
|
||||
and not data['message'].find('.stopMath') != -1:
|
||||
connection.send_channel("Spiel gestoppt. Das Wort war: " + self.word + " in: "+self.timeRelapsedString())
|
||||
self.word = ''
|
||||
self.guesses = []
|
||||
self.tries_left = 0
|
||||
self.wrong_guessed = []
|
||||
self.worder = ''
|
||||
self.wrongly_guessedWords = []
|
||||
self.worder = ''
|
||||
if data['message'].find('.hint') != -1:
|
||||
self.hint(data, connection)
|
||||
if data['message'].find('.score') != -1:
|
||||
self.print_score(data, connection)
|
||||
if data['message'].find('.spielregeln') != -1:
|
||||
self.rules(data, connection)
|
||||
if data['message'].find('.look') != -1:
|
||||
self.look(data, connection)
|
||||
if data['message'].find('.resetscore') != -1:
|
||||
self.reset(data,connection)
|
||||
if data['message'].find('.handelete') != -1:
|
||||
self.delete_HanWord(data, connection)
|
||||
|
||||
def delete_HanWord(self,data,connection):
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_back(
|
||||
"Du hast keine Berechtigung Wörter zu löschen " + data['nick'], data)
|
||||
return
|
||||
if data['message'].split(' ')[1] is not None:
|
||||
self.deleteHanWord(data['message'].split(' ')[1].upper())
|
||||
connection.send_back("Das Wort "+data['message'].split(' ')[1].upper()+" wurde gelöscht, " + data['nick'], data)
|
||||
|
||||
def reset(self,data,connection):
|
||||
score_provider = ScoreProvider()
|
||||
score_provider.delete_score(data['nick'])
|
||||
connection.send_back("Dein Score wurde gelöscht "+data['nick'], data)
|
||||
|
||||
def look(self,data, connection):
|
||||
if self.worder != '':
|
||||
connection.send_channel("Das Wort kommt von: "+self.worder )
|
||||
connection.send_channel(self.prepare_word(data))
|
||||
self.hint(data,connection)
|
||||
|
||||
def print_score(self, data, connection):
|
||||
punkte = self.getScore(data['nick'])
|
||||
connection.send_back(data['nick']+" hat einen Hangman-Score von: " + str(punkte), data)
|
||||
|
||||
def hint(self, data, connection):
|
||||
wrongGuessesString = ""
|
||||
if len(self.wrong_guessed) == 0 and len(self.wrongly_guessedWords) == 0:
|
||||
wrongGuessesString = "Noch keine falschen Buchstaben."
|
||||
if len(self.wrong_guessed) > 0:
|
||||
wrongGuessesString += "Falsch geratene Buchstaben bis jetzt: "
|
||||
for w in self.wrong_guessed:
|
||||
if w == self.wrong_guessed[0]:
|
||||
wrongGuessesString += w
|
||||
else:
|
||||
wrongGuessesString += ", " + w
|
||||
|
||||
# Append wrongly guessed words
|
||||
for w in self.wrongly_guessedWords:
|
||||
if w == self.wrongly_guessedWords[0]:
|
||||
if len(self.wrong_guessed) > 0:
|
||||
wrongGuessesString += " | "
|
||||
wrongGuessesString += "Falsche Wörter: " + w
|
||||
else:
|
||||
wrongGuessesString += ", " + w
|
||||
if self.worder == "":
|
||||
wrongGuessesString = ""
|
||||
else:
|
||||
connection.send_back(wrongGuessesString, data)
|
||||
|
||||
def start_solo_game(self, data, connection):
|
||||
if self.word == '':
|
||||
self.time = time.time()
|
||||
self.word = self.getRandomHanWord()
|
||||
self.guesses = ['-', '/', ' ', '_','.']
|
||||
self.wrong_guessed = []
|
||||
self.tries_left = 11
|
||||
self.wrongly_guessedWords = []
|
||||
connection.send_channel("Automatisch gewähltes Wort")
|
||||
self.worder = "Botty"
|
||||
connection.send_channel(self.prepare_word(data))
|
||||
else:
|
||||
connection.send_back("Sorry es läuft bereits ein Wort", data)
|
||||
|
||||
def guess(self, data, connection):
|
||||
if data['channel'] != connection.details.get_channel():
|
||||
connection.send_back("Sorry kein raten im Query", data)
|
||||
return
|
||||
guess = data['message'].split(' ')[1].upper()
|
||||
if self.tries_left < 1:
|
||||
connection.send_channel("Flüstere mir ein neues Wort mit .word WORT")
|
||||
return
|
||||
word_unique_chars = len(set(self.word))
|
||||
if guess == self.word:
|
||||
score = word_unique_chars * self.count_missing_unique()
|
||||
self.addToScore(data['nick'], int(score))
|
||||
self.word = ''
|
||||
self.worder = ''
|
||||
connection.send_channel("Das ist korrekt: " + guess + " gelöst hat: "+data["nick"]+ " in: "+self.timeRelapsedString())
|
||||
self.giveExtraPointsInTime(data["nick"])
|
||||
return
|
||||
if guess in self.word:
|
||||
if guess not in self.guesses:
|
||||
score = word_unique_chars / 2
|
||||
self.addToScore(data['nick'], int(score))
|
||||
self.guesses.append(guess)
|
||||
else:
|
||||
self.tries_left -= 1
|
||||
punishment_factor = 1
|
||||
if guess in self.guesses:
|
||||
punishment_factor = 2
|
||||
self.addToScore(data['nick'], -1)
|
||||
#(int((word_unique_chars / 20) * punishment_factor * 10))
|
||||
|
||||
# append thread safe wrongly guessed characters and words
|
||||
HangmanObserver.lock.acquire()
|
||||
try:
|
||||
if guess not in self.wrong_guessed:
|
||||
if len(guess) == 1:
|
||||
self.wrong_guessed.append(guess)
|
||||
else:
|
||||
self.wrongly_guessedWords.append(guess)
|
||||
finally:
|
||||
HangmanObserver.lock.release()
|
||||
|
||||
connection.send_channel(self.prepare_word(data))
|
||||
|
||||
def take_word(self, data, connection):
|
||||
if self.word == '':
|
||||
self.time =time.time()
|
||||
if data['message'].split(' ')[1] is not None:
|
||||
self.addHanWord(data['message'].split(' ')[1].upper())
|
||||
log = open('HangmanLog', 'a')
|
||||
log.write(data['nick'] + ' ; ' + data['message'].split(' ')[1].upper() + '\n')
|
||||
log.close()
|
||||
self.word = data['message'].split(' ')[1].upper()
|
||||
self.guesses = ['-', '/', ' ', '_','.']
|
||||
self.wrong_guessed = []
|
||||
self.tries_left = 11
|
||||
self.wrongly_guessedWords = []
|
||||
connection.send_back("Danke für das Wort, es ist nun im Spiel!", data)
|
||||
connection.send_channel("Das Wort ist von: "+data['nick'])
|
||||
self.worder = data['nick']
|
||||
connection.send_channel(self.prepare_word(data))
|
||||
else:
|
||||
connection.send_back("Sorry es läuft bereits ein Wort", data)
|
||||
def han_user_add(self, data, connection):
|
||||
if data['message'].split(' ')[1] is not None:
|
||||
self.addHanWord(data['message'].split(' ')[1].upper())
|
||||
connection.send_channel("Das Wort "+data['message'].split(' ')[1].upper() +" wurde von "+ data['nick']+ " hinzugefügt")
|
||||
def prepare_word(self, data):
|
||||
outWord = ""
|
||||
failedChars = 0
|
||||
for char in self.word:
|
||||
if char in self.guesses:
|
||||
outWord += char + " "
|
||||
else:
|
||||
outWord += "_ "
|
||||
failedChars += 1
|
||||
if failedChars == 0:
|
||||
if len(self.word) > 0:
|
||||
outWord = "Das ist korrekt: " + self.word + " gelöst hat: "+data["nick"]+ " in : "+self.timeRelapsedString()
|
||||
self.giveExtraPointsInTime(data["nick"])
|
||||
self.addToScore(data['nick'], 5)
|
||||
self.word = ''
|
||||
self.worder = ''
|
||||
return outWord
|
||||
else:
|
||||
outWord = "Bitte gib ein neues Wort mit .word im Query an."
|
||||
return outWord
|
||||
if self.tries_left == 0:
|
||||
self.addToScore(self.worder,11)
|
||||
outWord = "Das richtige Wort wäre gewesen: " + self.word + " in: "+self.timeRelapsedString()
|
||||
self.word = ''
|
||||
self.worder = ''
|
||||
return outWord
|
||||
outWord += "Verbleibende Rateversuche: "+str(self.tries_left)
|
||||
return outWord
|
||||
|
||||
def count_missing(self):
|
||||
missing_chars = 0
|
||||
for char in self.word:
|
||||
if char not in self.guesses:
|
||||
missing_chars += 1
|
||||
return missing_chars
|
||||
|
||||
def count_missing_unique(self):
|
||||
return len(set(self.word) - set(self.guesses))
|
||||
|
||||
def rules(self, data, connection):
|
||||
if data['channel'] == connection.details.get_channel():
|
||||
connection.send_back("Spielregeln bitte im Query abfragen",data)
|
||||
return
|
||||
connection.send_back("""Wort starten mit ".word Wort" im Query mit dem Bot""", data)
|
||||
connection.send_back("""Raten mit ".guess Buchstabe" im Channel""", data)
|
||||
connection.send_back("""Geraten werden können einzelne Buchstaben oder das ganze Wort.""", data)
|
||||
connection.send_back("""Alle dürfen durcheinander raten. Es gibt keine Reihenfolge.""", data)
|
||||
connection.send_back("""".hint" gibt alle bereits falsch geratenen Buchstaben aus.""", data)
|
||||
connection.send_back("""Bei 2 verbleibenden Versuchen darf nach einem Tipp vom Steller des Wortes gefragt
|
||||
werden.""", data)
|
||||
connection.send_back("""Wer ein Wort errät, darf das nächste stellen.""", data)
|
||||
connection.send_back("""Wird ein Wort nicht gelöst, darf derjenige, der es gestellt hat, nochmal.""", data)
|
||||
connection.send_back("""Zulässig sind alle Wörter, die deutsch oder im deutschen Sprachraum geläufig sind.""", data)
|
||||
|
||||
def getScore(self, nick:str):
|
||||
score_provider = ScoreProvider()
|
||||
score = score_provider.get_score(nick)
|
||||
if score is not None:
|
||||
return score[1]
|
||||
else:
|
||||
return 0
|
||||
|
||||
def writeScore(self, nick:str, score:int):
|
||||
score_provider = ScoreProvider()
|
||||
score_provider.save_or_replace(nick, score)
|
||||
|
||||
def addToScore(self, nick:str, add_score: int):
|
||||
score = self.getScore(nick)
|
||||
self.writeScore(nick, score + add_score)
|
||||
|
||||
def addHanWord(self, hanWord:str):
|
||||
hanDB = HanDatabaseProvider()
|
||||
hanDB.addWord(hanWord.strip().upper())
|
||||
|
||||
def getRandomHanWord(self):
|
||||
hanDB = HanDatabaseProvider()
|
||||
word = hanDB.get_random_word()
|
||||
if word is not None:
|
||||
return word[0].upper()
|
||||
else:
|
||||
return "dummywort".upper()
|
||||
|
||||
def deleteHanWord(self, hanWord:str):
|
||||
hanDB = HanDatabaseProvider()
|
||||
hanDB.delete_hanWord(hanWord.strip().upper())
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
|
||||
def timeRelapsedString(self):
|
||||
delta = time.time()-self.time
|
||||
return str(datetime.timedelta(seconds= delta))
|
||||
|
||||
def giveExtraPointsInTime(self, nick):
|
||||
delta = time.time()-self.time
|
||||
if delta <30:
|
||||
self.addToScore(nick, 5)
|
||||
if delta <60:
|
||||
self.addToScore(nick, 5)
|
||||
@@ -0,0 +1,32 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class HelpObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".help"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".help - zeigt Hilftexte aller Module an"
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
msg = data["message"]
|
||||
if not msg.startswith(".help"):
|
||||
return
|
||||
|
||||
if data["channel"] == connection.details.get_channel():
|
||||
all_cmd = []
|
||||
for observer in connection.priv_msg_observable.get_observer():
|
||||
cmds = observer.cmd()
|
||||
if cmds is not None:
|
||||
all_cmd.extend(cmds)
|
||||
msg = ", ".join(all_cmd)
|
||||
msg = "Bekannte Befehle: " + msg + ". Für Details per Query .help ."
|
||||
connection.send_back(msg, data)
|
||||
else:
|
||||
all_help = [m.help() for m in connection.priv_msg_observable.get_observer()]
|
||||
for help_msg in all_help:
|
||||
if help_msg is not None:
|
||||
connection.send_back(help_msg, data)
|
||||
@@ -0,0 +1,27 @@
|
||||
import re
|
||||
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.NoticeObserverPrototype import NoticeObserverPrototype
|
||||
|
||||
|
||||
class IdentNickServObserver(NoticeObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_notice(self, data, connection: Connection):
|
||||
# b':NickServ!NickServ@services. NOTICE FaustBotDev :corvidae ACC 3 \r\n'
|
||||
if not data['nick'].lower() == 'nickserv':
|
||||
return
|
||||
with connection.condition_lock:
|
||||
if re.match(r'.*? ACC [0-3].*', data['message']):
|
||||
msg_parts = data['message'].split(' ')
|
||||
if msg_parts[2] == '3':
|
||||
connection.idented_look_up[msg_parts[0]] = True
|
||||
else:
|
||||
connection.idented_look_up[msg_parts[0]] = False
|
||||
connection.condition_lock.notify_all()
|
||||
@@ -0,0 +1,54 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Model.Introduction import IntroductionProvider
|
||||
from FaustBot.Modules import UserList
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class IntroductionObserver(PrivMsgObserverPrototype):
|
||||
def __init__(self, user_list: UserList):
|
||||
super().__init__()
|
||||
self.userList = user_list
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".me"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".me - kann von registrierten Nutzern verwendet werden um eine Vorstellung zu speichern"
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
msg = data["message"]
|
||||
nick = data["nick"]
|
||||
if not msg.startswith(".me") and not msg.startswith(".me-"):
|
||||
return
|
||||
if not self.authenticated(nick, connection):
|
||||
connection.send_back("Für die Nutzung von .me ist es zwingend erforderlich, einen registrierten Nick zu "
|
||||
"haben sowie eingeloggt zu sein. Wie dies geht, erfährst du unter "
|
||||
"https://freenode.net/kb/answer/registration", data)
|
||||
return
|
||||
intro_provider = IntroductionProvider()
|
||||
msg = msg.split('.me')[1].strip()
|
||||
if len(msg) == 0:
|
||||
intro = intro_provider.get_intro(nick)
|
||||
text = ""
|
||||
if intro is not None:
|
||||
text = nick + " ist " + intro[1]
|
||||
else:
|
||||
text = nick + " für dich gibt es noch keinen Eintrag, vielleicht magst du ja mittels .me <intro> noch " \
|
||||
"einen hinzufügen? "
|
||||
connection.send_back(text, data)
|
||||
elif len(msg) == 1 and '-' in msg:
|
||||
intro_provider.delete_intro(nick)
|
||||
connection.send_back(nick + " dein Intro wurde gelöscht!", data)
|
||||
else:
|
||||
intro = msg.strip()
|
||||
intro_provider.save_or_replace(nick, intro)
|
||||
connection.send_back(
|
||||
nick + ": Dein Intro wurde gespeichert! Mittels .me- kannst du deinen Eintrag wieder löschen.", data)
|
||||
text = nick + " ist " + intro_provider.get_intro(nick)[1]
|
||||
connection.send_back(text, data)
|
||||
|
||||
def authenticated(self, nick: str, connection: Connection):
|
||||
return nick in self.userList.userList and \
|
||||
connection.is_idented(nick)
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class JoinObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_JOIN]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_join(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,209 @@
|
||||
import random
|
||||
import time
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
jokes = [['Was ist orange und geht über die Berge?'
|
||||
,'Eine Wanderine.']
|
||||
,['Was ist orange und schaut durchs Schlüsselloch?'
|
||||
,'Eine Spannderine.']
|
||||
,['Was ist violett und sitzt in der Kirche ganz vorne?'
|
||||
,'Eine Frommbeere.']
|
||||
,['Was ist grün und liegt im Sarg?'
|
||||
,'Ein Sterbschen.']
|
||||
,['Was ist bunt und läuft über den Tisch davon?'
|
||||
,'Ein Fluchtsalat.']
|
||||
,['Was ist braun und schwimmt im Wasser?'
|
||||
,'Ein U-Brot.']
|
||||
,['Was ist schwarz/weiß und hüpft von Eisscholle zu Eisscholle?'
|
||||
,'Ein Springuin.']
|
||||
,['Was ist rot und sitzt auf dem WC?'
|
||||
,'Eine Klomate!']
|
||||
,['Was ist braun und fährt einen verschneiten Hang hinunter?'
|
||||
,'Ein Snowbrot.']
|
||||
,['Was ist braun und späht durchs Schlafzimmerfenster?'
|
||||
,'Ein Spannzapfen.']
|
||||
,['Was ist weiß und springt im Wald umher?'
|
||||
,'Ein Jumpignon.']
|
||||
,['Was ist braun, süß und rennt durch den Wald?'
|
||||
,'Eine Joggolade.']
|
||||
,['Was ist braun und sitzt hinter Gittern?'
|
||||
,'Eine Knastanie.']
|
||||
,['Was ist rot, rund und hat ein Maschinengewehr?'
|
||||
,'Ein Rambodischen.']
|
||||
,['Was ist braun, knusprig und läuft mit dem Korb durch den Wald?'
|
||||
,'Brotkäppchen.']
|
||||
,['Was ist braun, klebrig und läuft in der Wüste umher?'
|
||||
,'Ein Karamel.']
|
||||
,['Was ist rot, sitzt in einer Konservendose und spielt Musik?'
|
||||
,'Ein Radioli.']
|
||||
,['Was ist grün und radelt durch die Gegend?'
|
||||
,'Eine Velone.']
|
||||
,['Was ist orange, tiefergelegt und hat einen Spoiler?'
|
||||
,'Ein Mantarinchen']
|
||||
,['Was ist gelb, krumm und schwimmt auf dem Wasser?'
|
||||
,'Eine Schwanane']
|
||||
,['Was ist orange und steckt traurig in der Erde?'
|
||||
,'Ein Trübchen.']
|
||||
,['Was ist orange, sauer und kann keine Minute ruhig sitzen?'
|
||||
,'Eine Zappelsine.']
|
||||
,['Was ist haarig und wird in der Pfanne frittiert?'
|
||||
,'Bartkartoffeln.']
|
||||
,['Was ist gesund und kräftig und spielt den Beleidigten?'
|
||||
,'Ein Schmollkornbrot.']
|
||||
,['Was steht im Schlafzimmer des Metzgers neben dem Bett?'
|
||||
,'Ein Schlachttischlämpchen.']
|
||||
,['Was ist grün, sauer und versteckt sich vor der Polizei?'
|
||||
,'Ein Essig-Schurke.']
|
||||
,['Was ist orange, rund und versteckt sich vor der Polizei?'
|
||||
,'Ein Vandalinchen.']
|
||||
,['Was ist grün und schaut durchs Schlüsselloch?'
|
||||
,'Ein Spionat']
|
||||
,['Was ist groß, grau und telefoniert aus Afrika?'
|
||||
,'Ein Telefant.']
|
||||
,['Was ist gelb und flattert im Wind?'
|
||||
,'Eine Fahnane.']
|
||||
,['Was ist grün und klopft an die Tür?'
|
||||
,'Ein Klopfsalat.']
|
||||
,['Was ist braun, sehr zäh und fliegt umher?'
|
||||
,'Eine Ledermaus.']
|
||||
,['Was macht "Muh" und hilft beim Anziehen?'
|
||||
,'Ein Kuhlöffel.']
|
||||
,['Was ist viereckig, hat Noppen und einen Sprachfehler?'
|
||||
,'Ein Legosteniker.']
|
||||
,['Was ist gelb und immer bekifft?'
|
||||
,'Ein Bong-Frites.']
|
||||
,['Was ist grün, glücklich und hüpft von Grashalm zu Grashalm?'
|
||||
,'Eine Freuschrecke.']
|
||||
,['Was ist ist braun, hat einen Beutel und hängt am Baum?'
|
||||
,'Ein Hänguruh.']
|
||||
,['Was ist orange-rot und riskiert alles?'
|
||||
,'Eine Mutorange']
|
||||
,['Was ist gelb, ölig und und sitzt in der Kirche in der ersten Reihe?'
|
||||
,'Eine Frommfrites']
|
||||
,['Was ist grün und irrt durch Istanbul?'
|
||||
,'Ein Gürke']
|
||||
,['Was ist hellbraun und hangelt sich von Tortenstück zu Tortenstück?'
|
||||
,'Ein Tarzipan.']
|
||||
,['Was ist braun und klebt an der Wand?'
|
||||
,'Ein Klebkuchen']
|
||||
,['Was ist rot und läuft die Straße auf und ab?'
|
||||
,'Eine Hagenutte.']
|
||||
,['Was ist weiss und läuft die Straße auf und ab?'
|
||||
,'Schneeflittchen.']
|
||||
,['Was ist grün und läuft die Straße auf und ab?'
|
||||
,'Eine Frosch-tituierte.']
|
||||
,['Was ist braun und trägt Strapse?'
|
||||
,'Ein Haselnüttchen.']
|
||||
,['Was ist gelb und steht frankiert und abgestempelt am Strassenrand?'
|
||||
,'Eine Postituierte.']
|
||||
,['Was leuchtet und geht fremd?'
|
||||
,'Ein Schlampion.']
|
||||
,['Was ist gelb und rutscht den Hang hinunter?'
|
||||
,'Ein Cremeschlitten.']
|
||||
,['Was ist weiss und tanzt ums Feuer?'
|
||||
,'Rumpelpilzchen.']
|
||||
,['Was ist weiss und liegt schnarchend auf der Wiese?'
|
||||
,'Ein Schlaf.']
|
||||
,['Was ist gelb, saftig und sitzt bei jedem Fussballspiel vor dem Fernseher?'
|
||||
,'Eine Fananas.']
|
||||
,['Was ist rosa und schwimmt im Wasser?'
|
||||
,'Eine Meerjungsau.']
|
||||
,['Was ist durchsichtig, stinkt und es ist ihm alles egal?'
|
||||
,'Ein Schnurz.']
|
||||
,['Was ist unordentlich und gibt Licht?'
|
||||
,'Eine Schlampe.']
|
||||
,['Was ist blöd, süß und bunt?'
|
||||
,'Ein Dummibärchen.']
|
||||
,['Was trägt einen Frack und hilft im Haushalt?'
|
||||
,'Ein Diener Schnitzel.']
|
||||
,['Was ist silbrig, sticht und hat Spass daran?'
|
||||
,'Eine Sadistel.']
|
||||
,['Was ist gelb und kann schießen?'
|
||||
,'Eine Banone']
|
||||
,['Was kommt nach Elch?'
|
||||
,'Zwölch']
|
||||
,['Was liegt am Strand und spricht undeutlich?'
|
||||
,'Eine Nuschel']
|
||||
,['Was hüpft über die Wiese und raucht?'
|
||||
,'Ein Kaminchen']
|
||||
,['Was ist knusprig und liegt unterm Baum?'
|
||||
,'Schattenplätzle']
|
||||
,['Kleines Schwein das nach Hilfe schreit?'
|
||||
,'Ein Notrufsäule']
|
||||
,['Was liegt am Strand und hat Schnupfen?'
|
||||
,'Eine Niesmuschel']
|
||||
,['Was ist ein Cowboy ohne Pferd?'
|
||||
,'Ein Sattelschlepper']
|
||||
,['Was ist grün und trägt Kopftuch?'
|
||||
,'Eine Gürkin']
|
||||
,['Was ist rot und sitzt unterm Tisch?'
|
||||
,'Ne Paprikantin']
|
||||
,['Was ist schwarz-weiß und kommt nicht vom Fleck?'
|
||||
,'Ein Klebra']
|
||||
,['Was ist rosa, quiekt und wird zum Hausbau verwendet?'
|
||||
,'Ein Ziegelschwein']
|
||||
,['Wer ist bei jeder Wanderung betrunken?'
|
||||
,'Der Schlucksack']
|
||||
,['Was ist rot und wiehert?'
|
||||
,'Die Pferdbeere']
|
||||
,['Was ist weiß, blau, grün und steht auf der Wiese?'
|
||||
,'Eine Schlumpfdotterblume']
|
||||
,['Was kaut und hat immer Verspätung?'
|
||||
,'Die Essbahn']
|
||||
,['Was fährt unter der Erde und macht Muh?'
|
||||
,'Die Kuhbahn']
|
||||
,['Was wühlt den Himmel auf?'
|
||||
,'Ein Pflugzeug']
|
||||
,['Welche Frucht wächst im Gerichtssaal?'
|
||||
,'Advokado']
|
||||
,['Wie nennt man einen “scharfen” Mann mit Kilt?'
|
||||
,'Chilischotte']
|
||||
,['Was lebt im Meer und kann gut rechnen?'
|
||||
,'Der Octoplus']
|
||||
,['Was ist tiefergelegt und schwimmt unter wasser?'
|
||||
,'Der Tunefisch']
|
||||
,['Was ist unter der Erde und stinkt?'
|
||||
,'Eine Furzel']
|
||||
,['Von was wird man nachts beobachtet?'
|
||||
,'Vom Spannbettlaken']
|
||||
,['Wo wohnen die meisten Katzen?'
|
||||
,'Im Miezhaus']
|
||||
,['Warum ging der Luftballon kaputt?'
|
||||
,'Aus Platzgründen']
|
||||
,['Wie nennt man einen ausgehungerten Frosch?'
|
||||
,'Magerquak']
|
||||
,['Was macht ein Dieb im Zirkus?'
|
||||
,'Clown']
|
||||
,['Was macht ein Clown im Büro?'
|
||||
,'Faxen']
|
||||
,['Wie nennt man eine Zauberin in der Wüste?'
|
||||
,'Sand Witch']
|
||||
,['Wo betrinkt sich eine Mücke?'
|
||||
,'In Sekt']
|
||||
,['Warum können Seeräuber keine Kreise berechnen?'
|
||||
,'Weil sie pi raten']
|
||||
,['Was sitzt in der Savanne und wäscht sich?'
|
||||
,'Die Hygiäne']
|
||||
,['Was sitzt im Dschungel und spielt unfair?'
|
||||
,'Mogli']
|
||||
,['Wie nennt man den Paarungsruf von Leutstofflampen?'
|
||||
,'Neonröhren']]
|
||||
|
||||
|
||||
class JokeObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".joke"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".joke erzählt einen Flachwitz"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.joke') == -1:
|
||||
return
|
||||
joke = random.choice(jokes)
|
||||
connection.send_back(joke[0], data)
|
||||
time.sleep(30)
|
||||
connection.send_back(joke[1], data)
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class KickObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_KICK]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_kick(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,53 @@
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.UserProvider import UserProvider
|
||||
from FaustBot.Modules.UserList import UserList
|
||||
from getraenke import getraenke
|
||||
from essen import essen
|
||||
from icecreamlist import icecream
|
||||
|
||||
from ..Modules.PingObserverPrototype import PingObserverPrototype
|
||||
|
||||
|
||||
class Kicker(PingObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def __init__(self, user_list: UserList, idle_time: int):
|
||||
super().__init__()
|
||||
self.idle_time = idle_time
|
||||
self.user_list = user_list
|
||||
self.warned_users = defaultdict(int)
|
||||
|
||||
def update_on_ping(self, data, connection: Connection):
|
||||
for user in self.user_list.userList.keys():
|
||||
offline_time = Kicker.get_offline_time(user)
|
||||
if offline_time < self.idle_time:
|
||||
self.warned_users[user] = 0
|
||||
host = self.user_list.userList.get(user).host
|
||||
if offline_time > self.idle_time \
|
||||
and not user == connection.details.get_nick() \
|
||||
and 'freenode/staff' not in host and 'freenode/utility-bot' not in host:
|
||||
if self.warned_users[user] % 30 == 0:
|
||||
connection.send_channel(
|
||||
'\001ACTION serviert ' + user + ' ' + random.choice(getraenke+essen+icecream) + '.\001')
|
||||
self.warned_users[user] += 1
|
||||
if self.warned_users[user] % 29 == 0:
|
||||
connection.raw_send("KICK " + connection.details.get_channel() + " " + user +
|
||||
" :Zu lang geidlet, komm gerne wieder!")
|
||||
|
||||
@staticmethod
|
||||
def get_offline_time(nick):
|
||||
who = nick
|
||||
user_provider = UserProvider()
|
||||
activity = user_provider.get_activity(who)
|
||||
delta = time.time() - activity
|
||||
return delta
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class LeaveObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_LEAVE]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_leave(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,18 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class LoveAndPeaceObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".peace"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".peace - sorgt für Frieden"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.peace') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION hüpft durch den Raum, schmeißt Blumen um sich und singt: \"Love and '
|
||||
'Peace, wir haben uns alle lieb..!\".\001', data)
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class MagicNumberObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to server actions
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_MAGIC_NUMBER]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_magic_number(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,76 @@
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from random import randrange
|
||||
from time import sleep
|
||||
class MathRunObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return ['.s', '.startMath', '.stopMath']
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return 'startMath startet eine Reihe von Aufgaben. StopMath beendet sie.'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.players = {}
|
||||
self.solutionForGame = 0
|
||||
self.type = 0
|
||||
self.running = False
|
||||
self.oldSolution = 0
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.s ') != -1 :
|
||||
self.solution(data, connection)
|
||||
if data['message'].find('.startMath') != -1:
|
||||
self.start_math(data, connection)
|
||||
|
||||
def solution(self, data, connection):
|
||||
nick = data["nick"]
|
||||
solutionByPlayer = data['message'].split()[1]
|
||||
if solutionByPlayer is None:
|
||||
connection.send_back("Sorry du hast keine Lösung angegeben " + nick, data)
|
||||
return
|
||||
if solutionByPlayer == str(self.solutionForGame):
|
||||
connection.send_channel("Korrekte Lösung " + nick)
|
||||
if nick not in self.players:
|
||||
self.players[nick] = 0
|
||||
self.players[nick] += 1
|
||||
self.oldSolution = self.solutionForGame
|
||||
self.start_math(data, connection)
|
||||
return
|
||||
if solutionByPlayer == str(self.oldSolution):
|
||||
connection.send_channel("Korrekte Lösung für das Problem davor " + nick)
|
||||
if nick not in self.players:
|
||||
self.players[nick] = 0
|
||||
self.players[nick] += 1
|
||||
return
|
||||
connection.send_channel("Sorry die Lösung ist falsch "+nick )
|
||||
|
||||
def start_math(self, data, connection):
|
||||
summand1 = randrange(1,100)
|
||||
summand2 = randrange(1,11)
|
||||
operation = randrange(1,3)
|
||||
if operation == 1:
|
||||
self.solutionForGame = summand1 - summand2
|
||||
connection.send_channel(str(summand1) +" - "+str(summand2) +" = ?")
|
||||
if operation == 2:
|
||||
self.solutionForGame = summand1 + summand2
|
||||
connection.send_channel(str(summand1) +" + "+str(summand2) +" = ?")
|
||||
if not self.running:
|
||||
self.running = True
|
||||
self.stop_Timer(data, connection)
|
||||
|
||||
def stop_math(self, data, connection):
|
||||
for player in self.players.keys():
|
||||
connection.send_channel(player + " hat\t" + str(self.players[player]) + "\t Punkte")
|
||||
connection.send_channel("Spiel beendet")
|
||||
self.players = {}
|
||||
self.solutionForGame = 0
|
||||
self.type = 0
|
||||
self.running = False
|
||||
|
||||
def stop_Timer(self, data, connection):
|
||||
sleep(120)
|
||||
self.stop_math(data, connection)
|
||||
@@ -0,0 +1,23 @@
|
||||
class ModulePrototype(object):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
raise NotImplementedError("This method needs to be implemented by a subclass!")
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Needs to be implemented by subclasses")
|
||||
|
||||
def __init__(self):
|
||||
self._config = None
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
return self._config
|
||||
|
||||
@config.setter
|
||||
def config(self, value):
|
||||
self._config = value
|
||||
@@ -0,0 +1,12 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class ModuleType(Enum):
|
||||
ON_JOIN = 'ON_JOIN'
|
||||
ON_KICK = 'ON_KICK'
|
||||
ON_LEAVE = 'ON_LEAVE'
|
||||
ON_PING = 'ON_PING'
|
||||
ON_NICK_CHANGE = 'ON_NICK_CHANGE'
|
||||
ON_MSG = 'ON_MSG'
|
||||
ON_NOTICE = 'ON_NOTICE'
|
||||
ON_MAGIC_NUMBER = 'ON_MAGIC_NUMBER'
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class NickChangeObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_NICK_CHANGE]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_nick_change(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,23 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class NoticeObserverPrototype(ModulePrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_NOTICE]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_notice(self, data, connection: Connection):
|
||||
raise NotImplementedError('Needs to be implemented by csubclasses!')
|
||||
@@ -0,0 +1,17 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class PartyObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".party"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".party - sorgt für Konfetti"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.party') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION schmeißt mit Konfetti aus buntem Esspapier um sich.\001', data)
|
||||
@@ -0,0 +1,21 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PingObserverPrototype import PingObserverPrototype
|
||||
|
||||
|
||||
class ModulePing(PingObserverPrototype):
|
||||
"""
|
||||
A Class only reacting to pings
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_ping(self, data, connection: Connection):
|
||||
# print('Module Ping')
|
||||
msg = 'PONG ' + data['server']
|
||||
connection.raw_send(msg)
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class PingObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_PING]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_ping(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,17 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class PrideObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".pride"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".party - sorgt für sehr viele Pride Flags"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.pride') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION schmückt den Channel mit ganz vielen großen Pride Flaggen.\001', data)
|
||||
@@ -0,0 +1,27 @@
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.ModulePrototype import ModulePrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
|
||||
|
||||
class PrivMsgObserverPrototype(ModulePrototype):
|
||||
"""
|
||||
The Prototype of a Class who can react to every action
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def cmd():
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
raise NotImplementedError("Need sto be implemented by subclasses!")
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_MSG]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
raise NotImplementedError("Some module doesn't do anything")
|
||||
@@ -0,0 +1,34 @@
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Model.UserProvider import UserProvider
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from ..Model.i18n import i18n
|
||||
|
||||
|
||||
class SeenObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".seen"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".seen <nick> - um abzufragen wann <nick> zuletzt hier war"
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.seen ') == -1:
|
||||
return
|
||||
if not self._is_idented_mod(data, connection):
|
||||
return
|
||||
who = data['message'].split(' ')[1]
|
||||
user_provider = UserProvider()
|
||||
activity = user_provider.get_activity(who)
|
||||
output = data['nick']+": Ich habe "+who+" zuletzt am "+str(datetime.datetime.fromtimestamp(activity).strftime("%d.%m.%Y um %H:%M:%S"))+ ' Uhr gesehen'
|
||||
if not self._is_idented_mod(data, connection):
|
||||
connection.send_channel(output)
|
||||
return
|
||||
connection.send_back(output, data)
|
||||
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,20 @@
|
||||
import random
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from snacks import snacks
|
||||
|
||||
|
||||
class SnacksObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".snack"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".snack - teilt Snacks aus"
|
||||
|
||||
def update_on_priv_msg(self, data: dict, connection: Connection):
|
||||
if data['message'].find('.snack') == -1:
|
||||
return
|
||||
connection.send_back('\001ACTION serviert ' + data['nick'] + ' ' + random.choice(snacks) + '.\001', data)
|
||||
@@ -0,0 +1,20 @@
|
||||
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class TellObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.tell') != -1 and self._is_idented_mod(data, connection):
|
||||
connection.send_channel(data['message'][6:])
|
||||
def _is_idented_mod(self, data: dict, connection: Connection):
|
||||
return data['nick'] in self._config.mods and connection.is_idented(data['nick'])
|
||||
@@ -0,0 +1,53 @@
|
||||
import html
|
||||
import re
|
||||
import urllib
|
||||
from urllib import request
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class TitleObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
regex = "(?P<url>https?://[^\s]+)"
|
||||
url = re.search(regex, data['message'])
|
||||
if url is not None:
|
||||
url = url.group()
|
||||
print(url)
|
||||
try:
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'}
|
||||
url = url
|
||||
req = urllib.request.Request(url, None, headers)
|
||||
resource = urllib.request.urlopen(req)
|
||||
title = self.getTitle(resource)
|
||||
print(title)
|
||||
title = title[:350]
|
||||
connection.send_back(title, data)
|
||||
except Exception as exc:
|
||||
print(exc)
|
||||
pass
|
||||
|
||||
def getTitle(self, resource):
|
||||
encoding = resource.headers.get_content_charset()
|
||||
# der erste Fall kann raus, wenn ein anderer Channel benutzt wird
|
||||
if resource.geturl().find('rehakids.de') != -1:
|
||||
encoding = 'windows-1252'
|
||||
if not encoding:
|
||||
encoding = 'utf-8'
|
||||
content = resource.read().decode(encoding, errors='replace')
|
||||
title_re = re.compile("<title>(.+?)</title>")
|
||||
title = title_re.search(content).group(1)
|
||||
title = html.unescape(title)
|
||||
title = title.replace('\n', ' ').replace('\r', '')
|
||||
title = title.replace("<", "<")
|
||||
title = title.replace(">", ">")
|
||||
title = title.replace("&", "&")
|
||||
return title
|
||||
@@ -0,0 +1,56 @@
|
||||
from FaustBot.Model.RemoteUser import RemoteUser
|
||||
from FaustBot.Modules.JoinObserverPrototype import JoinObserverPrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
from ..Modules.KickObserverPrototype import KickObserverPrototype
|
||||
from ..Modules.LeaveObserverPrototype import LeaveObserverPrototype
|
||||
from ..Modules.NickChangeObserverPrototype import NickChangeObserverPrototype
|
||||
|
||||
|
||||
class UserList(JoinObserverPrototype, KickObserverPrototype, LeaveObserverPrototype, NickChangeObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.userList = {}
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_JOIN, ModuleType.ON_KICK, ModuleType.ON_LEAVE, ModuleType.ON_NICK_CHANGE]
|
||||
|
||||
def update_on_kick(self, data, connection):
|
||||
if data['nick'] in self.userList:
|
||||
del self.userList[data['nick']]
|
||||
# print(self.userList)
|
||||
|
||||
def update_on_leave(self, data, connection):
|
||||
if data['nick'] in self.userList:
|
||||
del self.userList[data['nick']]
|
||||
# print(self.userList)
|
||||
|
||||
def update_on_join(self, data, connection):
|
||||
self.userList[data['nick']] = RemoteUser(data['nick'], data['user'], data['host'])
|
||||
# print(self.userList)
|
||||
|
||||
def update_on_nick_change(self, data, connection):
|
||||
if data['old_nick'] in self.userList:
|
||||
remuser = self.userList[data['old_nick']]
|
||||
del self.userList[data['old_nick']]
|
||||
else:
|
||||
# shouldn't happen but let's be safe.
|
||||
remuser = RemoteUser('UN.KNOWN', 'UN.KNOWN', 'UN.KNOWN')
|
||||
|
||||
remuser.nick = data['new_nick']
|
||||
self.userList[data['new_nick']] = remuser
|
||||
# print(self.userList)
|
||||
|
||||
def clear_list(self):
|
||||
self.userList = {}
|
||||
|
||||
def add_user(self, remuser):
|
||||
self.userList[remuser.nick] = remuser
|
||||
@@ -0,0 +1,51 @@
|
||||
from FaustBot.Communication import Connection
|
||||
from FaustBot.Model.RemoteUser import RemoteUser
|
||||
from FaustBot.Modules.MagicNumberObserverPrototype import MagicNumberObserverPrototype
|
||||
from FaustBot.Modules.ModuleType import ModuleType
|
||||
from FaustBot.Modules.PingObserverPrototype import PingObserverPrototype
|
||||
from FaustBot.Modules.UserList import UserList
|
||||
import time
|
||||
|
||||
|
||||
class WhoObserver(MagicNumberObserverPrototype, PingObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return None
|
||||
|
||||
def __init__(self, user_list: UserList):
|
||||
super().__init__()
|
||||
self.user_list = user_list
|
||||
self.pings_seen = 1
|
||||
self.pending_whos = []
|
||||
|
||||
@staticmethod
|
||||
def get_module_types():
|
||||
return [ModuleType.ON_MAGIC_NUMBER, ModuleType.ON_PING]
|
||||
|
||||
def update_on_magic_number(self, data, connection):
|
||||
if data['number'] == '352': # RPL_WHOREPLY
|
||||
self.input_who(data, connection)
|
||||
elif data['number'] == '315': # RPL_ENDOFWHO
|
||||
#make sure other thread runs to its end.
|
||||
time.sleep(10)
|
||||
self.end_who()
|
||||
|
||||
def input_who(self, data, connection: Connection):
|
||||
# target #channel user host server nick status :0 gecos
|
||||
target, channel, user, host, server, nick, *ign = data['arguments'].split(' ')
|
||||
self.pending_whos.append(RemoteUser(nick, user, host))
|
||||
|
||||
def end_who(self):
|
||||
self.user_list.clear_list()
|
||||
for remuser in self.pending_whos:
|
||||
self.user_list.add_user(remuser)
|
||||
self.pending_whos = []
|
||||
|
||||
def update_on_ping(self, data, connection: Connection):
|
||||
if self.pings_seen % 90 == 0: # 90 * 2 min = 3 Stunden
|
||||
connection.raw_send('WHO ' + connection.details.get_channel())
|
||||
self.pings_seen += 1
|
||||
@@ -0,0 +1,45 @@
|
||||
from wikipedia import wikipedia
|
||||
|
||||
from FaustBot.Model.i18n import i18n
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
|
||||
|
||||
class WikiObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return [".w"]
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return ".w <term> - fragt Wikipediaartikel zu <term> ab"
|
||||
|
||||
def update_on_priv_msg(self, data, connection):
|
||||
|
||||
if data['message'].find('.w ') == -1:
|
||||
return
|
||||
i18n_server = i18n()
|
||||
w = wikipedia.set_lang(i18n_server.get_text('wiki_lang', lang=self.config.lang))
|
||||
q = data['message'].split(' ')
|
||||
query = ''
|
||||
for word in q:
|
||||
if word.strip() != '.w':
|
||||
query += word + ' '
|
||||
w = wikipedia.search(query)
|
||||
if w.__len__() == 0: # TODO BUG BELOW, ERROR MESSAGE NOT SHOWN!
|
||||
connection.send_back(data['nick'] + ', ' +
|
||||
i18n_server.get_text('wiki_fail',
|
||||
lang=self.config.lang),
|
||||
data)
|
||||
return
|
||||
try:
|
||||
page = wikipedia.WikipediaPage(w.pop(0))
|
||||
except wikipedia.DisambiguationError as error:
|
||||
print('disambiguation page')
|
||||
page = wikipedia.WikipediaPage(error.args[1][0])
|
||||
connection.send_back(data['nick'] + ' ' + page.url, data)
|
||||
index = 51 + page.summary[50:350].rfind('. ')
|
||||
if index == 50 or index > 230:
|
||||
index = page.summary[0:350].rfind(' ')
|
||||
connection.send_back(page.summary[0:index], data)
|
||||
else:
|
||||
connection.send_back(page.summary[0:index], data)
|
||||
@@ -0,0 +1,97 @@
|
||||
|
||||
from FaustBot.Communication.Connection import Connection
|
||||
from FaustBot.Modules.PrivMsgObserverPrototype import PrivMsgObserverPrototype
|
||||
from collections import defaultdict
|
||||
from time import sleep
|
||||
|
||||
|
||||
class WordRunObserver(PrivMsgObserverPrototype):
|
||||
@staticmethod
|
||||
def cmd():
|
||||
return ['.a', '.add', '.begin', '.end']
|
||||
|
||||
@staticmethod
|
||||
def help():
|
||||
return 'hangman game'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.player = {}
|
||||
self.gamestatus = 0
|
||||
"""
|
||||
0 = Kein Spiel
|
||||
1 = Spiel mit Silbe am Anfang
|
||||
2 = Spiel mit Silbe am Ende
|
||||
"""
|
||||
self.syllable = ""
|
||||
|
||||
def update_on_priv_msg(self, data, connection: Connection):
|
||||
if data['message'].find('.a ') != -1 or data['message'].find('.add ') != -1:
|
||||
self.add(data, connection)
|
||||
if data['message'].find('.begin ') != -1:
|
||||
self.begin_word(data, connection)
|
||||
if data['message'].find('.end ') != -1:
|
||||
self.end_word(data, connection)
|
||||
|
||||
def add(self, data, connection):
|
||||
if self.gamestatus == 0:
|
||||
connection.send_channel("Es läuft derzeit kein Wordrun, bitte einen neuen mit .begin <Silbe> oder .end <silbe> erstellen!")
|
||||
return
|
||||
if self.gamestatus == 1 or self.gamestatus == 2:
|
||||
self.check_word(data["nick"], data['message'], connection)
|
||||
|
||||
def begin_word(self, data, connection):
|
||||
if self.gamestatus != 0:
|
||||
connection.send_channel("Es läuft bereits ein Spiel")
|
||||
return
|
||||
self.gamestatus = 1
|
||||
self.handle_game(data,connection)
|
||||
|
||||
def end_word(self, data, connection):
|
||||
if self.gamestatus != 0:
|
||||
connection.send_channel("Es läuft bereits ein Spiel")
|
||||
return
|
||||
self.gamestatus = 2
|
||||
self.handle_game(data,connection)
|
||||
|
||||
def check_word(self,player , message, connection):
|
||||
for word in message.split():
|
||||
if word == '.a':
|
||||
continue
|
||||
if self.gamestatus == 1:
|
||||
if word.upper().startswith(self.syllable.upper()):
|
||||
self.add_to_dict(player, word, connection)
|
||||
if self.gamestatus == 2:
|
||||
if word.upper().endswith(self.syllable.upper()):
|
||||
self.add_to_dict(player, word, connection)
|
||||
|
||||
def add_to_dict(self, nick, word, connection):
|
||||
for p in self.player.keys():
|
||||
for w in self.player[p]:
|
||||
if w.upper() == word.upper():
|
||||
connection.send_channel("Das Wort "+word+" wurde bereits von "+p+ " genannt")
|
||||
return
|
||||
if nick not in self.player:
|
||||
self.player[nick] = []
|
||||
self.player[nick].append(word)
|
||||
|
||||
def handle_game(self, data, connection):
|
||||
self.syllable = data["message"].split()[1]
|
||||
if self.gamestatus == 1:
|
||||
connection.send_channel("Das Wort muss mit " + self.syllable + "- beginnen")
|
||||
if self.gamestatus == 2:
|
||||
connection.send_channel("Das Wort muss mit -" + self.syllable + " enden")
|
||||
sleep(150)
|
||||
connection.send_channel("Noch 30 Sekunden")
|
||||
sleep(30)
|
||||
player_score = defaultdict(int)
|
||||
s = "Folgende Ergebnisse: "
|
||||
for p in self.player.keys():
|
||||
for w in self.player[p]:
|
||||
print(p+" "+w)
|
||||
player_score[p] += 1
|
||||
for p in self.player.keys():
|
||||
s = s + p + " : "+str(player_score[p])+ "; "
|
||||
connection.send_channel(s)
|
||||
self.gamestatus = 0
|
||||
self.player = {}
|
||||
@@ -0,0 +1 @@
|
||||
__author__ = 'Pups'
|
||||
@@ -0,0 +1,28 @@
|
||||
class StringBuffer:
|
||||
def __init__(self):
|
||||
self._buffer = str()
|
||||
|
||||
def append(self, to_append):
|
||||
self._buffer = self._buffer + to_append
|
||||
return self.get()
|
||||
|
||||
def get(self):
|
||||
ready = list()
|
||||
# Python do-while-loop
|
||||
idx = self._buffer.find('\n')
|
||||
while idx is not -1:
|
||||
data = self._buffer[0:idx] #
|
||||
data = data.strip()
|
||||
if len(data) >= 1:
|
||||
ready.append(data)
|
||||
self._buffer = self._buffer[idx + 1:]
|
||||
idx = self._buffer.find('\n')
|
||||
return ready
|
||||
|
||||
@property
|
||||
def buffer(self):
|
||||
return self._buffer
|
||||
|
||||
@buffer.setter
|
||||
def buffer(self, new_value):
|
||||
self._buffer = new_value
|
||||
Reference in New Issue
Block a user