Add linkbot and config example for linkbot
Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
This commit is contained in:
@@ -35,3 +35,27 @@ channels = ["#fortune_telling"]
|
|||||||
|
|
||||||
# A list of fortunes, or a path to a list of fortunes - one per line.
|
# A list of fortunes, or a path to a list of fortunes - one per line.
|
||||||
fortunes = "data/fortunes.txt"
|
fortunes = "data/fortunes.txt"
|
||||||
|
|
||||||
|
[[plugins]]
|
||||||
|
# Linkbot module will fetch the title of a value at a URL.
|
||||||
|
module = "plugins.linkbot"
|
||||||
|
|
||||||
|
# Whether URLs that are pointing to private IP addresses are allowed.
|
||||||
|
# This is recommended to be left false.
|
||||||
|
# default: false
|
||||||
|
# allow_private = false
|
||||||
|
|
||||||
|
# Additional blocked hosts, URLs, and IP ranges.
|
||||||
|
# You don't need to block things like localhost, 192.168.0.0/16, etc if you are
|
||||||
|
# using `allow_private = false` above
|
||||||
|
# default: []
|
||||||
|
blocked = [
|
||||||
|
# This can take the form of a URL,
|
||||||
|
"https://www.example.com/full-page/",
|
||||||
|
# a hostname,
|
||||||
|
"www.example.com",
|
||||||
|
# an IP address,
|
||||||
|
"8.8.8.8",
|
||||||
|
# or an IP subnet
|
||||||
|
"8.8.0.0/16"
|
||||||
|
]
|
||||||
111
plugins/linkbot.py
Normal file
111
plugins/linkbot.py
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
import asyncio
|
||||||
|
import ipaddress
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from typing import Tuple
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
import aiodns
|
||||||
|
import aiohttp
|
||||||
|
from asyncirc.protocol import IrcProtocol
|
||||||
|
from irclib.parser import Prefix
|
||||||
|
from omnibot.plugin import Plugin
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
LINK_RE = re.compile(r"https?://[^ ]+")
|
||||||
|
TITLE_RE = re.compile(r"<title>(?P<title>.+?)</title>")
|
||||||
|
|
||||||
|
|
||||||
|
async def dns_lookup(host: str) -> str | None:
|
||||||
|
resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
|
||||||
|
result = await resolver.query(host, "A")
|
||||||
|
if result:
|
||||||
|
return result[0].host
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class Linkbot(Plugin):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self.network_block = []
|
||||||
|
self.host_block = []
|
||||||
|
blocked = self.plugin_config.get("blocked", [])
|
||||||
|
for addr in blocked:
|
||||||
|
try:
|
||||||
|
# Try to parse this as an IP network
|
||||||
|
self.network_block += [ipaddress.ip_network(addr)]
|
||||||
|
except ValueError:
|
||||||
|
# This is a hostname (probably)
|
||||||
|
self.host_block += [addr.lower()]
|
||||||
|
self.allow_private = self.plugin_config.get("allow_private", False)
|
||||||
|
|
||||||
|
async def is_blocked(self, url: str) -> bool:
|
||||||
|
parts = urlparse(url)
|
||||||
|
if not parts.hostname:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Make sure that this isn't any blocked hostname
|
||||||
|
if url.lower() in self.host_block or parts.hostname.lower() in self.host_block:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Resolve the hostname and sure that this IP address is not blocked
|
||||||
|
if ip := await dns_lookup(parts.hostname):
|
||||||
|
ip_addr = ipaddress.ip_address(ip)
|
||||||
|
# If the IP address is private and we've disallowed private IP
|
||||||
|
# lookups, don't fetch
|
||||||
|
if ip_addr.is_private and not self.allow_private:
|
||||||
|
return True
|
||||||
|
# If the IP address is in any blocked network, don't fetch
|
||||||
|
is_blocked = any((ip_addr in block) for block in self.network_block)
|
||||||
|
if is_blocked:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# Don't try if we can't resolve DNS
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def fetch(self, url: str) -> Tuple[int, str, str] | None:
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(url) as response:
|
||||||
|
status = response.status
|
||||||
|
content_type = response.headers["content-type"].lower()
|
||||||
|
if "text" in content_type or "html" in content_type:
|
||||||
|
# Only download text if it's a text/html content_type
|
||||||
|
text = await response.text()
|
||||||
|
return (status, content_type, text)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def on_message(self, conn: IrcProtocol, channel: str, who: Prefix, line: str):
|
||||||
|
# get URL
|
||||||
|
matches = LINK_RE.findall(line)
|
||||||
|
if not matches:
|
||||||
|
return
|
||||||
|
for url in matches:
|
||||||
|
if await self.is_blocked(url):
|
||||||
|
# Skip this URL if it's blocked
|
||||||
|
log.debug("skipping URL %s because it is blocked", url)
|
||||||
|
continue
|
||||||
|
# Fetch the HTML at the URL
|
||||||
|
result = await self.fetch(url)
|
||||||
|
if not result:
|
||||||
|
# Could not fetch this URL
|
||||||
|
log.debug("skipping URL %s because it couldn't be fetched", url)
|
||||||
|
continue
|
||||||
|
(status, content_type, text) = result
|
||||||
|
if title := TITLE_RE.search(text):
|
||||||
|
message = f"{title['title']}"
|
||||||
|
elif not (200 <= status <= 299):
|
||||||
|
message = f"{who.nick}: (status {status})"
|
||||||
|
else:
|
||||||
|
message = None
|
||||||
|
|
||||||
|
if message:
|
||||||
|
self.send_to(conn, channel, message)
|
||||||
|
|
||||||
|
|
||||||
|
PLUGIN_TYPE = Linkbot
|
||||||
Reference in New Issue
Block a user