This is beneficial for all plugins so the bot doesn't accidentally spam things because the plugin writer didn't check their inputs Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
import asyncio
|
|
import ipaddress
|
|
import logging
|
|
import re
|
|
from typing import Tuple
|
|
from urllib.parse import urlparse
|
|
|
|
import aiodns
|
|
import aiohttp
|
|
from asyncirc.protocol import IrcProtocol
|
|
from irclib.parser import Prefix
|
|
from omnibot.plugin import Plugin
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
LINK_RE = re.compile(r"https?://[^ ]+")
|
|
TITLE_RE = re.compile(r"<title>(?P<title>.+?)</title>")
|
|
|
|
|
|
async def dns_lookup(host: str) -> str | None:
|
|
resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
|
|
result = await resolver.query(host, "A")
|
|
if result:
|
|
return result[0].host
|
|
else:
|
|
return None
|
|
|
|
|
|
class Linkbot(Plugin):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.network_block = []
|
|
self.host_block = []
|
|
blocked = self.plugin_config.get("blocked", [])
|
|
for addr in blocked:
|
|
try:
|
|
# Try to parse this as an IP network
|
|
self.network_block += [ipaddress.ip_network(addr)]
|
|
except ValueError:
|
|
# This is a hostname (probably)
|
|
self.host_block += [addr.lower()]
|
|
self.allow_private = self.plugin_config.get("allow_private", False)
|
|
|
|
async def is_blocked(self, url: str) -> bool:
|
|
parts = urlparse(url)
|
|
if not parts.hostname:
|
|
return True
|
|
|
|
# Make sure that this isn't any blocked hostname
|
|
if url.lower() in self.host_block or parts.hostname.lower() in self.host_block:
|
|
return True
|
|
|
|
# Resolve the hostname and sure that this IP address is not blocked
|
|
if ip := await dns_lookup(parts.hostname):
|
|
ip_addr = ipaddress.ip_address(ip)
|
|
# If the IP address is private and we've disallowed private IP
|
|
# lookups, don't fetch
|
|
if ip_addr.is_private and not self.allow_private:
|
|
return True
|
|
# If the IP address is in any blocked network, don't fetch
|
|
is_blocked = any((ip_addr in block) for block in self.network_block)
|
|
if is_blocked:
|
|
return True
|
|
else:
|
|
# Don't try if we can't resolve DNS
|
|
return True
|
|
|
|
return False
|
|
|
|
async def fetch(self, url: str) -> Tuple[int, str, str] | None:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
status = response.status
|
|
content_type = response.headers["content-type"].lower()
|
|
if "text" in content_type or "html" in content_type:
|
|
# Only download text if it's a text/html content_type
|
|
text = await response.text()
|
|
return (status, content_type, text)
|
|
else:
|
|
return None
|
|
|
|
async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str):
|
|
# get URL
|
|
matches = LINK_RE.findall(line)
|
|
if not matches:
|
|
return
|
|
for url in matches:
|
|
if await self.is_blocked(url):
|
|
# Skip this URL if it's blocked
|
|
log.debug("skipping URL %s because it is blocked", url)
|
|
continue
|
|
# Fetch the HTML at the URL
|
|
result = await self.fetch(url)
|
|
if not result:
|
|
# Could not fetch this URL
|
|
log.debug("skipping URL %s because it couldn't be fetched", url)
|
|
continue
|
|
(status, content_type, text) = result
|
|
if title := TITLE_RE.search(text):
|
|
message = f"{title['title']}"
|
|
elif not (200 <= status <= 299):
|
|
message = f"{who.nick}: (status {status})"
|
|
else:
|
|
message = None
|
|
|
|
if message:
|
|
self.send_to(conn, channel, message)
|
|
|
|
|
|
PLUGIN_TYPE = Linkbot
|