From 48f182e41ac7b608b253155517883b286193cbd0 Mon Sep 17 00:00:00 2001 From: Alek Ratzloff Date: Mon, 23 May 2022 21:02:51 -0700 Subject: [PATCH] Add linkbot and config example for linkbot Signed-off-by: Alek Ratzloff --- config.example.toml | 26 ++++++++++- plugins/linkbot.py | 111 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 plugins/linkbot.py diff --git a/config.example.toml b/config.example.toml index 576796a..6383024 100644 --- a/config.example.toml +++ b/config.example.toml @@ -34,4 +34,28 @@ channels = ["#fortune_telling"] # documentation for the values you can set. # A list of fortunes, or a path to a list of fortunes - one per line. -fortunes = "data/fortunes.txt" \ No newline at end of file +fortunes = "data/fortunes.txt" + +[[plugins]] +# Linkbot module will fetch the title of a value at a URL. +module = "plugins.linkbot" + +# Whether URLs that are pointing to private IP addresses are allowed. +# This is recommended to be left false. +# default: false +# allow_private = false + +# Additional blocked hosts, URLs, and IP ranges. +# You don't need to block things like localhost, 192.168.0.0/16, etc if you are +# using `allow_private = false` above +# default: [] +blocked = [ + # This can take the form of a URL, + "https://www.example.com/full-page/", + # a hostname, + "www.example.com", + # an IP address, + "8.8.8.8", + # or an IP subnet + "8.8.0.0/16" +] \ No newline at end of file diff --git a/plugins/linkbot.py b/plugins/linkbot.py new file mode 100644 index 0000000..ba2f814 --- /dev/null +++ b/plugins/linkbot.py @@ -0,0 +1,111 @@ +import asyncio +import ipaddress +import logging +import re +from typing import Tuple +from urllib.parse import urlparse + +import aiodns +import aiohttp +from asyncirc.protocol import IrcProtocol +from irclib.parser import Prefix +from omnibot.plugin import Plugin + + +log = logging.getLogger(__name__) +LINK_RE = re.compile(r"https?://[^ ]+") +TITLE_RE = re.compile(r"(?P<title>.+?)") + + +async def dns_lookup(host: str) -> str | None: + resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) + result = await resolver.query(host, "A") + if result: + return result[0].host + else: + return None + + +class Linkbot(Plugin): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.network_block = [] + self.host_block = [] + blocked = self.plugin_config.get("blocked", []) + for addr in blocked: + try: + # Try to parse this as an IP network + self.network_block += [ipaddress.ip_network(addr)] + except ValueError: + # This is a hostname (probably) + self.host_block += [addr.lower()] + self.allow_private = self.plugin_config.get("allow_private", False) + + async def is_blocked(self, url: str) -> bool: + parts = urlparse(url) + if not parts.hostname: + return True + + # Make sure that this isn't any blocked hostname + if url.lower() in self.host_block or parts.hostname.lower() in self.host_block: + return True + + # Resolve the hostname and sure that this IP address is not blocked + if ip := await dns_lookup(parts.hostname): + ip_addr = ipaddress.ip_address(ip) + # If the IP address is private and we've disallowed private IP + # lookups, don't fetch + if ip_addr.is_private and not self.allow_private: + return True + # If the IP address is in any blocked network, don't fetch + is_blocked = any((ip_addr in block) for block in self.network_block) + if is_blocked: + return True + else: + # Don't try if we can't resolve DNS + return True + + return False + + async def fetch(self, url: str) -> Tuple[int, str, str] | None: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + status = response.status + content_type = response.headers["content-type"].lower() + if "text" in content_type or "html" in content_type: + # Only download text if it's a text/html content_type + text = await response.text() + return (status, content_type, text) + else: + return None + + async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str): + # get URL + matches = LINK_RE.findall(line) + if not matches: + return + for url in matches: + if await self.is_blocked(url): + # Skip this URL if it's blocked + log.debug("skipping URL %s because it is blocked", url) + continue + # Fetch the HTML at the URL + result = await self.fetch(url) + if not result: + # Could not fetch this URL + log.debug("skipping URL %s because it couldn't be fetched", url) + continue + (status, content_type, text) = result + if title := TITLE_RE.search(text): + message = f"{title['title']}" + elif not (200 <= status <= 299): + message = f"{who.nick}: (status {status})" + else: + message = None + + if message: + self.send_to(conn, channel, message) + + +PLUGIN_TYPE = Linkbot