import asyncio import html from html.parser import HTMLParser import ipaddress import logging import re from typing import Tuple from urllib.parse import urlparse import aiodns import aiohttp from asyncirc.protocol import IrcProtocol from irclib.parser import Prefix from omnibot.plugin import Plugin log = logging.getLogger(__name__) LINK_RE = re.compile(r"https?://[^ ]+") class TitleParser(HTMLParser): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.parsing_title = False self.__parsed_title: str | None = None @property def parsed_title(self) -> str | None: return self.__parsed_title @parsed_title.setter def parsed_title(self, value: str | None): if value: value = value.strip() self.__parsed_title = value def reset(self) -> None: super().reset() self.parsed_title = None def handle_starttag( self, tag: str, attrs_list: list[tuple[str, str | None]] ) -> None: if self.parsed_title: return tag = tag.lower() if tag == "title": self.parsing_title = True elif tag == "meta": # filter None values attrs = {name: value for name, value in attrs_list if value} # where property="og:title" or name="title", use the "content" attribute if attrs.get("property", None) == "og:title" and "content" in attrs: self.parsed_title = html.unescape(attrs["content"]) elif attrs.get("name", None) == "title" and "content" in attrs: self.parsed_title = html.unescape(attrs["content"]) def handle_endtag(self, tag: str) -> None: match tag.lower(): case "title": self.parsing_title = False def handle_data(self, data: str) -> None: if self.parsing_title and not self.parsed_title: self.parsed_title = html.unescape(data) async def dns_lookup(host: str) -> str | None: resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) result = await resolver.query(host, "A") if result: return result[0].host else: return None class Linkbot(Plugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.network_block = [] self.host_block = [] blocked = self.plugin_config.get("blocked", []) for addr in blocked: try: # Try to parse this as an IP network self.network_block += [ipaddress.ip_network(addr)] except ValueError: # This is a hostname (probably) self.host_block += [addr.lower()] self.allow_private = self.plugin_config.get("allow_private", False) async def is_blocked(self, url: str) -> bool: parts = urlparse(url) if not parts.hostname: return True # Make sure that this isn't any blocked hostname if url.lower() in self.host_block or parts.hostname.lower() in self.host_block: return True # Resolve the hostname and sure that this IP address is not blocked if ip := await dns_lookup(parts.hostname): ip_addr = ipaddress.ip_address(ip) # If the IP address is private and we've disallowed private IP # lookups, don't fetch if ip_addr.is_private and not self.allow_private: return True # If the IP address is in any blocked network, don't fetch is_blocked = any((ip_addr in block) for block in self.network_block) if is_blocked: return True else: # Don't try if we can't resolve DNS return True return False async def fetch(self, url: str) -> Tuple[int, str, str] | None: headers = { "User-Agent": "Omnibot", } async with aiohttp.ClientSession(headers=headers) as session: async with session.get(url) as response: status = response.status content_type = response.headers["content-type"].lower() if "text" in content_type or "html" in content_type: # Only download text if it's a text/html content_type text = await response.text() return (status, content_type, text) else: return None async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str): # get URL matches = LINK_RE.findall(line) if not matches: return for url in matches: log.debug("trying URL %s", url) if await self.is_blocked(url): # Skip this URL if it's blocked log.debug("skipping URL %s because it is blocked", url) continue # Fetch the HTML at the URL result = await self.fetch(url) log.debug("got %s characters back", len(result)) if not result: # Could not fetch this URL log.debug("skipping URL %s because it couldn't be fetched", url) continue (status, content_type, text) = result title_parser = TitleParser() title_parser.feed(text) message: str | None if not (200 <= status <= 299): message = f"{who.nick}: (status {status})" else: log.debug("got title %r", title_parser.parsed_title) message = title_parser.parsed_title if message: self.send_to(conn, channel, message) PLUGIN_TYPE = Linkbot