import asyncio import ipaddress import logging import re from typing import Tuple from urllib.parse import urlparse import aiodns import aiohttp from asyncirc.protocol import IrcProtocol from irclib.parser import Prefix from omnibot.plugin import Plugin log = logging.getLogger(__name__) LINK_RE = re.compile(r"https?://[^ ]+") TITLE_RE = re.compile(r"(?P<title>.+?)") async def dns_lookup(host: str) -> str | None: resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) result = await resolver.query(host, "A") if result: return result[0].host else: return None class Linkbot(Plugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.network_block = [] self.host_block = [] blocked = self.plugin_config.get("blocked", []) for addr in blocked: try: # Try to parse this as an IP network self.network_block += [ipaddress.ip_network(addr)] except ValueError: # This is a hostname (probably) self.host_block += [addr.lower()] self.allow_private = self.plugin_config.get("allow_private", False) async def is_blocked(self, url: str) -> bool: parts = urlparse(url) if not parts.hostname: return True # Make sure that this isn't any blocked hostname if url.lower() in self.host_block or parts.hostname.lower() in self.host_block: return True # Resolve the hostname and sure that this IP address is not blocked if ip := await dns_lookup(parts.hostname): ip_addr = ipaddress.ip_address(ip) # If the IP address is private and we've disallowed private IP # lookups, don't fetch if ip_addr.is_private and not self.allow_private: return True # If the IP address is in any blocked network, don't fetch is_blocked = any((ip_addr in block) for block in self.network_block) if is_blocked: return True else: # Don't try if we can't resolve DNS return True return False async def fetch(self, url: str) -> Tuple[int, str, str] | None: async with aiohttp.ClientSession() as session: async with session.get(url) as response: status = response.status content_type = response.headers["content-type"].lower() if "text" in content_type or "html" in content_type: # Only download text if it's a text/html content_type text = await response.text() return (status, content_type, text) else: return None async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str): # get URL matches = LINK_RE.findall(line) if not matches: return for url in matches: if await self.is_blocked(url): # Skip this URL if it's blocked log.debug("skipping URL %s because it is blocked", url) continue # Fetch the HTML at the URL result = await self.fetch(url) if not result: # Could not fetch this URL log.debug("skipping URL %s because it couldn't be fetched", url) continue (status, content_type, text) = result if title := TITLE_RE.search(text): message = f"{title['title']}" elif not (200 <= status <= 299): message = f"{who.nick}: (status {status})" else: message = None if message: self.send_to(conn, channel, message) PLUGIN_TYPE = Linkbot