import asyncio
import ipaddress
import logging
import re
from typing import Tuple
from urllib.parse import urlparse
import aiodns
import aiohttp
from asyncirc.protocol import IrcProtocol
from irclib.parser import Prefix
from omnibot.plugin import Plugin
log = logging.getLogger(__name__)
LINK_RE = re.compile(r"https?://[^ ]+")
TITLE_RE = re.compile(r"
(?P.+?)")
async def dns_lookup(host: str) -> str | None:
resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
result = await resolver.query(host, "A")
if result:
return result[0].host
else:
return None
class Linkbot(Plugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.network_block = []
self.host_block = []
blocked = self.plugin_config.get("blocked", [])
for addr in blocked:
try:
# Try to parse this as an IP network
self.network_block += [ipaddress.ip_network(addr)]
except ValueError:
# This is a hostname (probably)
self.host_block += [addr.lower()]
self.allow_private = self.plugin_config.get("allow_private", False)
async def is_blocked(self, url: str) -> bool:
parts = urlparse(url)
if not parts.hostname:
return True
# Make sure that this isn't any blocked hostname
if url.lower() in self.host_block or parts.hostname.lower() in self.host_block:
return True
# Resolve the hostname and sure that this IP address is not blocked
if ip := await dns_lookup(parts.hostname):
ip_addr = ipaddress.ip_address(ip)
# If the IP address is private and we've disallowed private IP
# lookups, don't fetch
if ip_addr.is_private and not self.allow_private:
return True
# If the IP address is in any blocked network, don't fetch
is_blocked = any((ip_addr in block) for block in self.network_block)
if is_blocked:
return True
else:
# Don't try if we can't resolve DNS
return True
return False
async def fetch(self, url: str) -> Tuple[int, str, str] | None:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
status = response.status
content_type = response.headers["content-type"].lower()
if "text" in content_type or "html" in content_type:
# Only download text if it's a text/html content_type
text = await response.text()
return (status, content_type, text)
else:
return None
async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str):
# get URL
matches = LINK_RE.findall(line)
if not matches:
return
for url in matches:
if await self.is_blocked(url):
# Skip this URL if it's blocked
log.debug("skipping URL %s because it is blocked", url)
continue
# Fetch the HTML at the URL
result = await self.fetch(url)
if not result:
# Could not fetch this URL
log.debug("skipping URL %s because it couldn't be fetched", url)
continue
(status, content_type, text) = result
if title := TITLE_RE.search(text):
message = f"{title['title']}"
elif not (200 <= status <= 299):
message = f"{who.nick}: (status {status})"
else:
message = None
if message:
self.send_to(conn, channel, message)
PLUGIN_TYPE = Linkbot