394 lines
14 KiB
Python
394 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""quotebot — a simple quotes IRC bot for Libera.Chat.
|
|
|
|
Standard library only. Connects over TLS, joins a channel, and serves
|
|
quotes stored in a local SQLite database. Quotes are added and recalled
|
|
with in-channel `!` commands.
|
|
|
|
Configuration is read from environment variables (see config.example.env);
|
|
sensible defaults connect to Libera and join #r.trees.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sqlite3
|
|
import ssl
|
|
import sys
|
|
import time
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Configuration (env-overridable; defaults target Libera + #r.trees)
|
|
# --------------------------------------------------------------------------
|
|
|
|
SERVER = os.environ.get("IRC_SERVER", "irc.libera.chat")
|
|
PORT = int(os.environ.get("IRC_PORT", "6697"))
|
|
USE_TLS = os.environ.get("IRC_TLS", "1") != "0"
|
|
NICK = os.environ.get("IRC_NICK", "treesquotes")
|
|
USERNAME = os.environ.get("IRC_USER", NICK)
|
|
REALNAME = os.environ.get("IRC_REALNAME", "quotebot")
|
|
CHANNEL = os.environ.get("IRC_CHANNEL", "#r.trees")
|
|
# Optional NickServ password — keep it in the environment, never in git.
|
|
NICKSERV_PASS = os.environ.get("IRC_NICKSERV_PASS", "")
|
|
DB_PATH = os.environ.get("QUOTEBOT_DB", os.path.join(os.path.dirname(__file__), "quotes.db"))
|
|
CMD_PREFIX = os.environ.get("QUOTEBOT_PREFIX", "!")
|
|
|
|
# Seconds between outbound messages, to stay clear of flood limits.
|
|
SEND_DELAY = float(os.environ.get("QUOTEBOT_SEND_DELAY", "1.0"))
|
|
# Max characters of a quote echoed back (IRC line length is ~512 with overhead).
|
|
MAX_QUOTE_LEN = 400
|
|
|
|
logging.basicConfig(
|
|
level=os.environ.get("QUOTEBOT_LOGLEVEL", "INFO"),
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
log = logging.getLogger("quotebot")
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Quote storage
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
class QuoteStore:
|
|
"""Thin SQLite wrapper for the quotes table."""
|
|
|
|
def __init__(self, path: str):
|
|
self.db = sqlite3.connect(path)
|
|
self.db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quotes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
text TEXT NOT NULL,
|
|
added_by TEXT NOT NULL,
|
|
added_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
self.db.commit()
|
|
|
|
def add(self, text: str, added_by: str) -> int:
|
|
cur = self.db.execute(
|
|
"INSERT INTO quotes (text, added_by, added_at) VALUES (?, ?, ?)",
|
|
(text, added_by, int(time.time())),
|
|
)
|
|
self.db.commit()
|
|
return cur.lastrowid
|
|
|
|
def get(self, quote_id: int):
|
|
row = self.db.execute(
|
|
"SELECT id, text FROM quotes WHERE id = ?", (quote_id,)
|
|
).fetchone()
|
|
return row
|
|
|
|
def random(self):
|
|
return self.db.execute(
|
|
"SELECT id, text FROM quotes ORDER BY RANDOM() LIMIT 1"
|
|
).fetchone()
|
|
|
|
def search(self, term: str):
|
|
return self.db.execute(
|
|
"SELECT id, text FROM quotes WHERE text LIKE ? ORDER BY id LIMIT 1",
|
|
(f"%{term}%",),
|
|
).fetchone()
|
|
|
|
def delete(self, quote_id: int) -> bool:
|
|
cur = self.db.execute("DELETE FROM quotes WHERE id = ?", (quote_id,))
|
|
self.db.commit()
|
|
return cur.rowcount > 0
|
|
|
|
def count(self) -> int:
|
|
return self.db.execute("SELECT COUNT(*) FROM quotes").fetchone()[0]
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# IRC client
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
class IRCBot:
|
|
def __init__(self, store: QuoteStore):
|
|
self.store = store
|
|
self.sock: socket.socket | None = None
|
|
self._recv_buf = b""
|
|
# Last non-command channel line per user, for !grabquote.
|
|
# Keyed by lowercased nick -> (original_nick, text).
|
|
self.last_lines: dict[str, tuple[str, str]] = {}
|
|
self.last_speaker: str | None = None
|
|
|
|
# --- low-level I/O ----------------------------------------------------
|
|
|
|
def connect(self):
|
|
log.info("Connecting to %s:%s (tls=%s)", SERVER, PORT, USE_TLS)
|
|
raw = socket.create_connection((SERVER, PORT), timeout=300)
|
|
if USE_TLS:
|
|
ctx = ssl.create_default_context()
|
|
self.sock = ctx.wrap_socket(raw, server_hostname=SERVER)
|
|
else:
|
|
self.sock = raw
|
|
|
|
self._send_raw(f"NICK {NICK}")
|
|
self._send_raw(f"USER {USERNAME} 0 * :{REALNAME}")
|
|
|
|
def _send_raw(self, line: str):
|
|
"""Send a single protocol line (no trailing CRLF needed)."""
|
|
log.debug(">> %s", line)
|
|
self.sock.sendall((line + "\r\n").encode("utf-8", "replace"))
|
|
|
|
def send_privmsg(self, target: str, text: str):
|
|
# Strip newlines so a quote can't inject extra IRC commands.
|
|
text = text.replace("\r", " ").replace("\n", " ")
|
|
if len(text) > MAX_QUOTE_LEN:
|
|
text = text[: MAX_QUOTE_LEN - 1] + "…"
|
|
self._send_raw(f"PRIVMSG {target} :{text}")
|
|
time.sleep(SEND_DELAY)
|
|
|
|
def _read_lines(self):
|
|
"""Yield complete protocol lines as they arrive."""
|
|
while True:
|
|
data = self.sock.recv(4096)
|
|
if not data:
|
|
raise ConnectionError("server closed the connection")
|
|
self._recv_buf += data
|
|
while b"\r\n" in self._recv_buf:
|
|
line, self._recv_buf = self._recv_buf.split(b"\r\n", 1)
|
|
yield line.decode("utf-8", "replace")
|
|
|
|
# --- main loop --------------------------------------------------------
|
|
|
|
def run_forever(self):
|
|
backoff = 5
|
|
while True:
|
|
try:
|
|
self.connect()
|
|
self._event_loop()
|
|
except (OSError, ConnectionError) as exc:
|
|
log.warning("Disconnected: %s", exc)
|
|
finally:
|
|
if self.sock:
|
|
try:
|
|
self.sock.close()
|
|
except OSError:
|
|
pass
|
|
self.sock = None
|
|
log.info("Reconnecting in %ss", backoff)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 300)
|
|
|
|
def _join_channel(self):
|
|
log.info("Joining %s", CHANNEL)
|
|
self._send_raw(f"JOIN {CHANNEL}")
|
|
|
|
def _event_loop(self):
|
|
registered = False
|
|
joined = False
|
|
for line in self._read_lines():
|
|
log.debug("<< %s", line)
|
|
prefix, command, params = parse_message(line)
|
|
|
|
if command == "PING":
|
|
self._send_raw(f"PONG :{params[-1] if params else ''}")
|
|
continue
|
|
|
|
# 376 = end of MOTD, 422 = no MOTD; either means we're registered.
|
|
if command in ("376", "422") and not registered:
|
|
registered = True
|
|
if NICKSERV_PASS:
|
|
# Identify first, then wait for the login confirmation
|
|
# below before joining. +r channels reject us until the
|
|
# NickServ login has actually landed, so joining now races.
|
|
self.send_privmsg("NickServ", f"IDENTIFY {NICKSERV_PASS}")
|
|
log.info("Identifying with NickServ; join deferred until login")
|
|
else:
|
|
self._join_channel()
|
|
joined = True
|
|
continue
|
|
|
|
# 900 RPL_LOGGEDIN, or the NickServ "now identified" notice —
|
|
# either confirms login is complete, so it's safe to join +r chans.
|
|
if not joined and registered and NICKSERV_PASS:
|
|
identified = command == "900" or (
|
|
command == "NOTICE"
|
|
and prefix.lower().startswith("nickserv!")
|
|
and "now identified" in (params[-1].lower() if params else "")
|
|
)
|
|
if identified:
|
|
self._join_channel()
|
|
joined = True
|
|
continue
|
|
|
|
# Nick already in use — try a variant so we still connect.
|
|
if command == "433":
|
|
global NICK
|
|
NICK = NICK + "_"
|
|
log.warning("Nick in use, retrying as %s", NICK)
|
|
self._send_raw(f"NICK {NICK}")
|
|
continue
|
|
|
|
if command == "PRIVMSG":
|
|
self._handle_privmsg(prefix, params)
|
|
|
|
# --- command handling -------------------------------------------------
|
|
|
|
def _handle_privmsg(self, prefix: str, params: list[str]):
|
|
if len(params) < 2:
|
|
return
|
|
target, message = params[0], params[1]
|
|
sender = prefix.split("!", 1)[0] if prefix else ""
|
|
is_channel = target.startswith(("#", "&"))
|
|
|
|
# Reply to the channel for channel messages, or to the user for PMs.
|
|
reply_to = target if is_channel else sender
|
|
|
|
if not message.startswith(CMD_PREFIX):
|
|
# Remember the last real line each user said, so !grabquote can
|
|
# recall it. Bot commands are deliberately not recorded.
|
|
if is_channel and sender:
|
|
self.last_lines[sender.lower()] = (sender, message)
|
|
self.last_speaker = sender
|
|
return
|
|
parts = message[len(CMD_PREFIX):].strip().split(" ", 1)
|
|
cmd = parts[0].lower()
|
|
arg = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
handler = COMMANDS.get(cmd)
|
|
if handler:
|
|
handler(self, reply_to, sender, arg)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Commands
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def _fmt(row) -> str:
|
|
return f"[#{row[0]}] {row[1]}"
|
|
|
|
|
|
def cmd_quote(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!quote [id] — random quote, or a specific one by id."""
|
|
if arg:
|
|
if not arg.isdigit():
|
|
bot.send_privmsg(reply_to, "Usage: !quote [id]")
|
|
return
|
|
row = bot.store.get(int(arg))
|
|
if not row:
|
|
bot.send_privmsg(reply_to, f"No quote #{arg}.")
|
|
return
|
|
else:
|
|
row = bot.store.random()
|
|
if not row:
|
|
bot.send_privmsg(reply_to, "No quotes yet — add one with !addquote <text>")
|
|
return
|
|
bot.send_privmsg(reply_to, _fmt(row))
|
|
|
|
|
|
def cmd_addquote(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!addquote <text> — store a new quote."""
|
|
if not arg:
|
|
bot.send_privmsg(reply_to, "Usage: !addquote <text>")
|
|
return
|
|
qid = bot.store.add(arg, sender)
|
|
bot.send_privmsg(reply_to, f"Added quote #{qid}.")
|
|
|
|
|
|
def cmd_delquote(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!delquote <id> — remove a quote."""
|
|
if not arg.isdigit():
|
|
bot.send_privmsg(reply_to, "Usage: !delquote <id>")
|
|
return
|
|
ok = bot.store.delete(int(arg))
|
|
bot.send_privmsg(reply_to, f"Deleted quote #{arg}." if ok else f"No quote #{arg}.")
|
|
|
|
|
|
def cmd_grabquote(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!grabquote [user] — store the last line a user said (default: last speaker)."""
|
|
nick = arg.split()[0] if arg.split() else bot.last_speaker
|
|
if not nick:
|
|
bot.send_privmsg(reply_to, "Nothing to grab yet.")
|
|
return
|
|
rec = bot.last_lines.get(nick.lower())
|
|
if not rec:
|
|
bot.send_privmsg(reply_to, f"I haven't seen {nick} say anything.")
|
|
return
|
|
orig_nick, text = rec
|
|
qid = bot.store.add(f"<{orig_nick}> {text}", sender)
|
|
bot.send_privmsg(reply_to, f"Grabbed quote #{qid}: <{orig_nick}> {text}")
|
|
|
|
|
|
def cmd_search(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!search <term> — first quote matching a substring."""
|
|
if not arg:
|
|
bot.send_privmsg(reply_to, "Usage: !search <term>")
|
|
return
|
|
row = bot.store.search(arg)
|
|
bot.send_privmsg(reply_to, _fmt(row) if row else f"No quote matching '{arg}'.")
|
|
|
|
|
|
def cmd_count(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
"""!quotecount — how many quotes are stored."""
|
|
bot.send_privmsg(reply_to, f"{bot.store.count()} quotes stored.")
|
|
|
|
|
|
def cmd_help(bot: IRCBot, reply_to: str, sender: str, arg: str):
|
|
bot.send_privmsg(
|
|
reply_to,
|
|
"Commands: !quote [id], !addquote <text>, !grabquote [user], "
|
|
"!delquote <id>, !search <term>, !quotecount, !help",
|
|
)
|
|
|
|
|
|
COMMANDS = {
|
|
"quote": cmd_quote,
|
|
"addquote": cmd_addquote,
|
|
"grabquote": cmd_grabquote,
|
|
"grab": cmd_grabquote,
|
|
"delquote": cmd_delquote,
|
|
"search": cmd_search,
|
|
"quotecount": cmd_count,
|
|
"count": cmd_count,
|
|
"help": cmd_help,
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# IRC message parsing
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def parse_message(line: str):
|
|
"""Split a raw IRC line into (prefix, command, params)."""
|
|
prefix = ""
|
|
if line.startswith(":"):
|
|
prefix, _, line = line[1:].partition(" ")
|
|
# The trailing parameter (after " :") may contain spaces.
|
|
if " :" in line:
|
|
head, _, trailing = line.partition(" :")
|
|
params = head.split()
|
|
params.append(trailing)
|
|
else:
|
|
params = line.split()
|
|
command = params.pop(0) if params else ""
|
|
return prefix, command, params
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Entry point
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def main():
|
|
store = QuoteStore(DB_PATH)
|
|
log.info("Quote DB: %s (%d quotes)", DB_PATH, store.count())
|
|
bot = IRCBot(store)
|
|
try:
|
|
bot.run_forever()
|
|
except KeyboardInterrupt:
|
|
log.info("Shutting down.")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|