Split up logic into common files

All functions were previously in __main__.py but now they've been split
out to separate modules for better reuse.

Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
This commit is contained in:
2023-06-20 23:22:56 -07:00
parent 99624c17a5
commit 54a0488e3c
4 changed files with 171 additions and 159 deletions

View File

@@ -1,163 +1,6 @@
import asyncio
import functools
import json
from pathlib import Path
import re
import sqlite3
import time
from typing import Optional, Union
import httpx
from bs4 import BeautifulSoup as Soup
from .pull import pull
BANS_URL = "https://4chan.org/bans"
PREVIEW_RE = re.compile(r"var postPreviews = (.+)")
DB_PATH = "bans.db"
THUMBS_DIR = Path("thumbs")
CACHE_DIR = Path("bans")
def get_db():
return sqlite3.connect(DB_PATH)
def file_cache(
directory: str = ".", format: str = "%Y%m%d%H%M", suffix: str = "", deduplicate=True
):
def newest_in_dir(path: Union[str, Path]) -> Optional[Path]:
d = Path(path)
newest = None
newest_time = 0
for p in d.glob("*" + suffix):
if not p.is_file():
continue
stats = p.stat()
if stats.st_mtime > newest_time:
newest = p
newest_time = stats.st_mtime
return newest
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
d = Path(directory)
d.mkdir(parents=True, exist_ok=True)
path = Path(directory, time.strftime(format) + suffix)
if path.exists():
return path.read_text()
else:
text = await func(*args, **kwargs)
if deduplicate:
# find the most recent file in the path
newest_path = newest_in_dir(directory)
if newest_path:
newest_text = newest_path.read_text()
if newest_text == text:
# Move the old cache to the new path
newest_path.rename(path)
return text
path.write_text(text)
return text
#
return wrapper
return decorator
@file_cache(directory=CACHE_DIR, suffix=".html")
async def get_bans_html() -> str:
async with httpx.AsyncClient() as c:
r = await c.get(BANS_URL)
return r.text
async def get_thumb(thumb_path: Union[str, Path], post: dict) -> Optional[bytes]:
if "thumb" in post:
if thumb_path.exists():
return thumb_path.read_bytes()
else:
url = f"https://i.4cdn.org/bans/thumb/{post['board']}/{post['thumb']}s.jpg"
print("Downloading", url)
async with httpx.AsyncClient() as c:
r = await c.get(url)
THUMBS_DIR.mkdir(parents=True, exist_ok=True)
thumb_path.write_bytes(r.content)
return r.content
else:
return None
async def main():
# Ensure DB tables
db = get_db()
db.executescript(
"""
create table if not exists bans (
id integer primary key,
action varchar(5),
board varchar(10),
length varchar(10),
post text unique,
thumb_path text,
reason varchar(200)
);
"""
)
# Get HTML
html = await get_bans_html()
# Get post JSON
m = PREVIEW_RE.search(html)
posts = json.loads(m[1][:-1])
# Parse HTML
soup = Soup(html, "html.parser")
rows = soup.find_all("tr")
# Get labels
labels = [next(head.children).lower() for head in [c for c in rows[0] if c != "\n"]]
rows = rows[1:]
cur = db.cursor()
download_jobs = []
for i, row in enumerate(rows):
# Labels
# ['board', 'action', 'length', 'post', 'reason', 'time']
cols = {
key: next(value.children)
for key, value in zip(labels, [c for c in row if c != "\n"])
if key != "time"
}
post = posts[cols["post"]["data-pid"]]
cols["post"] = post
if 'thumb' in post:
thumb_path = Path(THUMBS_DIR, f"{post['thumb']}s.jpg")
download_jobs += [get_thumb(thumb_path, post)]
else:
thumb_path = ""
# Try to create post in database
try:
with db:
curs = db.execute(
"insert into bans (action, board, length, post, thumb_path, reason) values(?, ?, ?, ?, ?, ?)",
(
cols["action"],
cols["board"],
cols["length"],
json.dumps(cols["post"]),
str(thumb_path),
cols["reason"],
),
)
except Exception as ex:
msg = str(ex)
if 'UNIQUE' not in msg:
print("error:", ex)
# Finish off thumbnail jobs
await asyncio.gather(*download_jobs)
asyncio.run(main())
asyncio.run(pull())

23
chanbans/db.py Normal file
View File

@@ -0,0 +1,23 @@
import sqlite3
DB_PATH = "bans.db"
def get_db(db_path: str = DB_PATH):
db = sqlite3.connect(db_path)
# ensure that the database exists
db.executescript(
"""
create table if not exists bans (
id integer primary key,
action varchar(5),
board varchar(10),
length varchar(10),
post text unique,
thumb_path text,
reason varchar(200)
);
"""
)
return db

50
chanbans/files.py Normal file
View File

@@ -0,0 +1,50 @@
import functools
from pathlib import Path
import time
from typing import Optional, Union
def file_cache(
directory: Union[str, Path] = ".",
format: str = "%Y%m%d%H%M",
suffix: str = "",
deduplicate=True,
):
def newest_in_dir(path: Union[str, Path]) -> Optional[Path]:
d = Path(path)
newest = None
newest_time = 0.0
for p in d.glob("*" + suffix):
if not p.is_file():
continue
stats = p.stat()
if stats.st_mtime > newest_time:
newest = p
newest_time = stats.st_mtime
return newest
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
d = Path(directory)
d.mkdir(parents=True, exist_ok=True)
path = Path(directory, time.strftime(format) + suffix)
if path.exists():
return path.read_text()
else:
text = await func(*args, **kwargs)
if deduplicate:
# find the most recent file in the path
newest_path = newest_in_dir(directory)
if newest_path:
newest_text = newest_path.read_text()
if newest_text == text:
# Move the old cache to the new path
newest_path.rename(path)
return text
path.write_text(text)
return text
return wrapper
return decorator

96
chanbans/pull.py Normal file
View File

@@ -0,0 +1,96 @@
import asyncio
import json
from pathlib import Path
import re
from typing import Optional, Union
import httpx
from bs4 import BeautifulSoup as Soup
from .db import get_db
from .files import file_cache
BANS_URL = "https://4chan.org/bans"
PREVIEW_RE = re.compile(r"var postPreviews = (.+)")
THUMBS_DIR = Path("thumbs")
CACHE_DIR = Path("bans")
@file_cache(directory=CACHE_DIR, suffix=".html")
async def get_bans_html() -> str:
async with httpx.AsyncClient() as c:
r = await c.get(BANS_URL)
return r.text
async def get_thumb(thumb_path: Union[str, Path], post: dict) -> Optional[bytes]:
if "thumb" in post:
thumb_path = Path(thumb_path)
if thumb_path.exists():
return thumb_path.read_bytes()
else:
url = f"https://i.4cdn.org/bans/thumb/{post['board']}/{post['thumb']}s.jpg"
print("Downloading", url)
async with httpx.AsyncClient() as c:
r = await c.get(url)
THUMBS_DIR.mkdir(parents=True, exist_ok=True)
thumb_path.write_bytes(r.content)
return r.content
else:
return None
async def pull():
# TODO(args) --db-path arg
db = get_db()
# Get HTML
html = await get_bans_html()
# Get post JSON
m = PREVIEW_RE.search(html)
posts = json.loads(m[1][:-1])
# Parse HTML
soup = Soup(html, "html.parser")
rows = soup.find_all("tr")
# Get labels
labels = [next(head.children).lower() for head in [c for c in rows[0] if c != "\n"]]
rows = rows[1:]
download_jobs = []
for _i, row in enumerate(rows):
# Labels
# ['board', 'action', 'length', 'post', 'reason', 'time']
cols = {
key: next(value.children)
for key, value in zip(labels, [c for c in row if c != "\n"])
if key != "time"
}
post = posts[cols["post"]["data-pid"]]
cols["post"] = post
if 'thumb' in post:
thumb_path = Path(THUMBS_DIR, f"{post['thumb']}s.jpg")
download_jobs += [get_thumb(thumb_path, post)]
else:
thumb_path = ""
# Try to create post in database
try:
with db:
db.execute(
"insert into bans (action, board, length, post, thumb_path, reason) values(?, ?, ?, ?, ?, ?)",
(
cols["action"],
cols["board"],
cols["length"],
json.dumps(cols["post"]),
str(thumb_path),
cols["reason"],
),
)
except Exception as ex:
msg = str(ex)
if 'UNIQUE' not in msg:
print("error:", ex)
# Finish off thumbnail jobs
await asyncio.gather(*download_jobs)