quotebot/bot.py

344 lines
11 KiB
Python

#!/usr/bin/env python3
"""quotebot — a simple quotes IRC bot for Libera.Chat.
Standard library only. Connects over TLS, joins a channel, and serves
quotes stored in a local SQLite database. Quotes are added and recalled
with in-channel `!` commands.
Configuration is read from environment variables (see config.example.env);
sensible defaults connect to Libera and join #r.trees.
"""
from __future__ import annotations
import logging
import os
import socket
import sqlite3
import ssl
import sys
import time
# --------------------------------------------------------------------------
# Configuration (env-overridable; defaults target Libera + #r.trees)
# --------------------------------------------------------------------------
SERVER = os.environ.get("IRC_SERVER", "irc.libera.chat")
PORT = int(os.environ.get("IRC_PORT", "6697"))
USE_TLS = os.environ.get("IRC_TLS", "1") != "0"
NICK = os.environ.get("IRC_NICK", "treesquotes")
USERNAME = os.environ.get("IRC_USER", NICK)
REALNAME = os.environ.get("IRC_REALNAME", "quotebot")
CHANNEL = os.environ.get("IRC_CHANNEL", "#r.trees")
# Optional NickServ password — keep it in the environment, never in git.
NICKSERV_PASS = os.environ.get("IRC_NICKSERV_PASS", "")
DB_PATH = os.environ.get("QUOTEBOT_DB", os.path.join(os.path.dirname(__file__), "quotes.db"))
CMD_PREFIX = os.environ.get("QUOTEBOT_PREFIX", "!")
# Seconds between outbound messages, to stay clear of flood limits.
SEND_DELAY = float(os.environ.get("QUOTEBOT_SEND_DELAY", "1.0"))
# Max characters of a quote echoed back (IRC line length is ~512 with overhead).
MAX_QUOTE_LEN = 400
logging.basicConfig(
level=os.environ.get("QUOTEBOT_LOGLEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("quotebot")
# --------------------------------------------------------------------------
# Quote storage
# --------------------------------------------------------------------------
class QuoteStore:
"""Thin SQLite wrapper for the quotes table."""
def __init__(self, path: str):
self.db = sqlite3.connect(path)
self.db.execute(
"""
CREATE TABLE IF NOT EXISTS quotes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
text TEXT NOT NULL,
added_by TEXT NOT NULL,
added_at INTEGER NOT NULL
)
"""
)
self.db.commit()
def add(self, text: str, added_by: str) -> int:
cur = self.db.execute(
"INSERT INTO quotes (text, added_by, added_at) VALUES (?, ?, ?)",
(text, added_by, int(time.time())),
)
self.db.commit()
return cur.lastrowid
def get(self, quote_id: int):
row = self.db.execute(
"SELECT id, text FROM quotes WHERE id = ?", (quote_id,)
).fetchone()
return row
def random(self):
return self.db.execute(
"SELECT id, text FROM quotes ORDER BY RANDOM() LIMIT 1"
).fetchone()
def search(self, term: str):
return self.db.execute(
"SELECT id, text FROM quotes WHERE text LIKE ? ORDER BY id LIMIT 1",
(f"%{term}%",),
).fetchone()
def delete(self, quote_id: int) -> bool:
cur = self.db.execute("DELETE FROM quotes WHERE id = ?", (quote_id,))
self.db.commit()
return cur.rowcount > 0
def count(self) -> int:
return self.db.execute("SELECT COUNT(*) FROM quotes").fetchone()[0]
# --------------------------------------------------------------------------
# IRC client
# --------------------------------------------------------------------------
class IRCBot:
def __init__(self, store: QuoteStore):
self.store = store
self.sock: socket.socket | None = None
self._recv_buf = b""
# --- low-level I/O ----------------------------------------------------
def connect(self):
log.info("Connecting to %s:%s (tls=%s)", SERVER, PORT, USE_TLS)
raw = socket.create_connection((SERVER, PORT), timeout=300)
if USE_TLS:
ctx = ssl.create_default_context()
self.sock = ctx.wrap_socket(raw, server_hostname=SERVER)
else:
self.sock = raw
self._send_raw(f"NICK {NICK}")
self._send_raw(f"USER {USERNAME} 0 * :{REALNAME}")
def _send_raw(self, line: str):
"""Send a single protocol line (no trailing CRLF needed)."""
log.debug(">> %s", line)
self.sock.sendall((line + "\r\n").encode("utf-8", "replace"))
def send_privmsg(self, target: str, text: str):
# Strip newlines so a quote can't inject extra IRC commands.
text = text.replace("\r", " ").replace("\n", " ")
if len(text) > MAX_QUOTE_LEN:
text = text[: MAX_QUOTE_LEN - 1] + ""
self._send_raw(f"PRIVMSG {target} :{text}")
time.sleep(SEND_DELAY)
def _read_lines(self):
"""Yield complete protocol lines as they arrive."""
while True:
data = self.sock.recv(4096)
if not data:
raise ConnectionError("server closed the connection")
self._recv_buf += data
while b"\r\n" in self._recv_buf:
line, self._recv_buf = self._recv_buf.split(b"\r\n", 1)
yield line.decode("utf-8", "replace")
# --- main loop --------------------------------------------------------
def run_forever(self):
backoff = 5
while True:
try:
self.connect()
self._event_loop()
except (OSError, ConnectionError) as exc:
log.warning("Disconnected: %s", exc)
finally:
if self.sock:
try:
self.sock.close()
except OSError:
pass
self.sock = None
log.info("Reconnecting in %ss", backoff)
time.sleep(backoff)
backoff = min(backoff * 2, 300)
def _event_loop(self):
registered = False
for line in self._read_lines():
log.debug("<< %s", line)
prefix, command, params = parse_message(line)
if command == "PING":
self._send_raw(f"PONG :{params[-1] if params else ''}")
continue
# 376 = end of MOTD, 422 = no MOTD; either means we can join.
if command in ("376", "422") and not registered:
registered = True
if NICKSERV_PASS:
self.send_privmsg("NickServ", f"IDENTIFY {NICKSERV_PASS}")
log.info("Joining %s", CHANNEL)
self._send_raw(f"JOIN {CHANNEL}")
continue
# Nick already in use — try a variant so we still connect.
if command == "433":
global NICK
NICK = NICK + "_"
log.warning("Nick in use, retrying as %s", NICK)
self._send_raw(f"NICK {NICK}")
continue
if command == "PRIVMSG":
self._handle_privmsg(prefix, params)
# --- command handling -------------------------------------------------
def _handle_privmsg(self, prefix: str, params: list[str]):
if len(params) < 2:
return
target, message = params[0], params[1]
sender = prefix.split("!", 1)[0] if prefix else ""
# Reply to the channel for channel messages, or to the user for PMs.
reply_to = target if target.startswith(("#", "&")) else sender
if not message.startswith(CMD_PREFIX):
return
parts = message[len(CMD_PREFIX):].strip().split(" ", 1)
cmd = parts[0].lower()
arg = parts[1].strip() if len(parts) > 1 else ""
handler = COMMANDS.get(cmd)
if handler:
handler(self, reply_to, sender, arg)
# --------------------------------------------------------------------------
# Commands
# --------------------------------------------------------------------------
def _fmt(row) -> str:
return f"[#{row[0]}] {row[1]}"
def cmd_quote(bot: IRCBot, reply_to: str, sender: str, arg: str):
"""!quote [id] — random quote, or a specific one by id."""
if arg:
if not arg.isdigit():
bot.send_privmsg(reply_to, "Usage: !quote [id]")
return
row = bot.store.get(int(arg))
if not row:
bot.send_privmsg(reply_to, f"No quote #{arg}.")
return
else:
row = bot.store.random()
if not row:
bot.send_privmsg(reply_to, "No quotes yet — add one with !addquote <text>")
return
bot.send_privmsg(reply_to, _fmt(row))
def cmd_addquote(bot: IRCBot, reply_to: str, sender: str, arg: str):
"""!addquote <text> — store a new quote."""
if not arg:
bot.send_privmsg(reply_to, "Usage: !addquote <text>")
return
qid = bot.store.add(arg, sender)
bot.send_privmsg(reply_to, f"Added quote #{qid}.")
def cmd_delquote(bot: IRCBot, reply_to: str, sender: str, arg: str):
"""!delquote <id> — remove a quote."""
if not arg.isdigit():
bot.send_privmsg(reply_to, "Usage: !delquote <id>")
return
ok = bot.store.delete(int(arg))
bot.send_privmsg(reply_to, f"Deleted quote #{arg}." if ok else f"No quote #{arg}.")
def cmd_search(bot: IRCBot, reply_to: str, sender: str, arg: str):
"""!search <term> — first quote matching a substring."""
if not arg:
bot.send_privmsg(reply_to, "Usage: !search <term>")
return
row = bot.store.search(arg)
bot.send_privmsg(reply_to, _fmt(row) if row else f"No quote matching '{arg}'.")
def cmd_count(bot: IRCBot, reply_to: str, sender: str, arg: str):
"""!quotecount — how many quotes are stored."""
bot.send_privmsg(reply_to, f"{bot.store.count()} quotes stored.")
def cmd_help(bot: IRCBot, reply_to: str, sender: str, arg: str):
bot.send_privmsg(
reply_to,
"Commands: !quote [id], !addquote <text>, !delquote <id>, "
"!search <term>, !quotecount, !help",
)
COMMANDS = {
"quote": cmd_quote,
"addquote": cmd_addquote,
"delquote": cmd_delquote,
"search": cmd_search,
"quotecount": cmd_count,
"count": cmd_count,
"help": cmd_help,
}
# --------------------------------------------------------------------------
# IRC message parsing
# --------------------------------------------------------------------------
def parse_message(line: str):
"""Split a raw IRC line into (prefix, command, params)."""
prefix = ""
if line.startswith(":"):
prefix, _, line = line[1:].partition(" ")
# The trailing parameter (after " :") may contain spaces.
if " :" in line:
head, _, trailing = line.partition(" :")
params = head.split()
params.append(trailing)
else:
params = line.split()
command = params.pop(0) if params else ""
return prefix, command, params
# --------------------------------------------------------------------------
# Entry point
# --------------------------------------------------------------------------
def main():
store = QuoteStore(DB_PATH)
log.info("Quote DB: %s (%d quotes)", DB_PATH, store.count())
bot = IRCBot(store)
try:
bot.run_forever()
except KeyboardInterrupt:
log.info("Shutting down.")
sys.exit(0)
if __name__ == "__main__":
main()