Files
interchan/board/models.py

290 lines
10 KiB
Python
Raw Normal View History

from datetime import timedelta
import os
from pathlib import Path
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.files.base import ContentFile
from django.db import models
from django.db.models import signals
from django.dispatch import receiver
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from PIL import Image
from io import BytesIO
def image_upload(instance, filename):
op_id = instance.op.id if instance.op else instance.id
now = timezone.now()
now_sec = now.strftime("%s.%f")
ext = Path(filename).suffix.lower()
if ext in (".jpeg", ".jpg"):
ext = ".jpg"
if ext not in (".jpg", ".png", ".gif"):
raise Exception("File type invalid")
return f"{instance.board.url}/{now_sec}{ext}"
def thumbs_upload(instance, filename):
op_id = instance.op.id if instance.op else instance.id
now = timezone.now()
now_sec = now.strftime("%s.%f")
ext = Path(filename).suffix.lower()
return f"{instance.board.url}/{now_sec}t{ext}"
class Board(models.Model):
# The short URL name for the board
url = models.CharField(max_length=255, null=False, blank=False, unique=True)
# Human-readable name for the board
name = models.CharField(max_length=255, null=False, blank=False)
# Max pages
max_pages = models.IntegerField(default=10)
# Threads per page
threads_per_page = models.IntegerField(default=10)
# The amount of time that users from the same IP address are allowed to make
# consecutive posts
post_cooldown = models.DurationField(default=timedelta(seconds=60))
# Auto-sink threshhold. This is the number of replies that a thread can have
# before it stops being bumped.
autosink = models.IntegerField(default=300)
@property
def threads(self):
return Post.objects.filter(board=self, op=None)
@property
def max_threads(self):
return self.max_pages * self.threads_per_page
def prune_threads(self):
to_remove = self.threads.order_by("-last_bump")[self.max_threads :]
for thread in to_remove:
thread.delete()
def __str__(self):
return f"/{self.url}/ - {self.name}"
class Post(models.Model):
# Board that this post was made on
board = models.ForeignKey("Board", on_delete=models.CASCADE)
# Thread that this is a part of
op = models.ForeignKey(
"self", null=True, on_delete=models.CASCADE, related_name="replies"
)
# User's supplied name for this post
name = models.CharField(max_length=255, null=True, blank=True)
# User's supplied subject for this post
subject = models.CharField(max_length=255, null=True, blank=True)
# Text of this post
text = models.TextField(max_length=10000, null=False, blank=True)
# The IP address of the user that made this post
ip = models.GenericIPAddressField()
# Creation time
created = models.DateTimeField(auto_now_add=True)
# Last bump time
last_bump = models.DateTimeField(auto_now_add=True)
# Image
image = models.ImageField(
upload_to=image_upload,
null=True,
blank=True,
width_field="image_width",
height_field="image_height",
)
# Bump - if this post is an OP, determines whether the OP can be bumped. If
# this post is not an OP, determines whether the thread SHOULD be bumped.
bump = models.BooleanField(default=True)
# Thumbnail
thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True)
# Original image name
original_image_name = models.CharField(max_length=255, null=True)
# Image width and height
image_width = models.IntegerField(null=True)
image_height = models.IntegerField(null=True)
def save(self, *args, **kwargs):
if self.image:
self.original_image_name = Path(self.image.name).parts[-1]
self.__make_thumbnail()
super(Post, self).save(*args, **kwargs)
def __make_thumbnail(self):
image = Image.open(self.image)
image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS)
image_path = Path(self.image.name)
thumb_path = Path(image_path.stem + "t" + image_path.suffix)
ext = thumb_path.suffix.lower()
if ext in (".jpg", ".jpeg"):
FTYPE = "JPEG"
elif ext == ".gif":
FTYPE = "GIF"
elif ext == ".png":
FTYPE = "PNG"
else:
raise Exception("File type invalid")
thumb_temp = BytesIO()
image.save(thumb_temp, FTYPE)
thumb_temp.seek(0)
# save=False stops recursion
self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False)
thumb_temp.close()
def get_absolute_url(self):
if self.op is None:
return reverse(
"board:post_detail", kwargs={"url": self.board.url, "id": self.id}
)
else:
return (
reverse(
"board:post_detail",
kwargs={"url": self.board.url, "id": self.op.id},
)
+ f"#p{self.id}"
)
def clean(self):
# Make sure there is at least some content
self.text = self.text.strip()
if not (self.text or self.image):
raise ValidationError(_("Please either write a message or upload an image"))
# Image upload size check
if self.image and self.image.size > settings.MAX_UPLOAD_SIZE:
raise ValidationError(
_("Image supplied is too large. Maximum image size is %(max)s"),
params={"max": settings.MAX_UPLOAD_SIZE},
)
# Rate limiting for posts
# TODO BUG: if a user's last post is deleted, and it is their only post,
# they will not hit rate limit. This could probably be abused
last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first()
if last_post:
now = timezone.now()
delta = now - last_post.created
if delta < self.board.post_cooldown:
cooldown = self.board.post_cooldown - delta
raise ValidationError(
_(f"Please wait %(cooldown)s seconds before posting again"),
params={"cooldown": int(cooldown.total_seconds())},
)
@receiver(signals.post_save, sender=Post)
def post_created(sender, instance, created, **kwargs):
if created:
if instance.op:
# Update the bump
if (
instance.bump
and instance.op.bump
and instance.op.replies.count() < instance.board.autosink
):
instance.op.last_bump = timezone.now()
instance.op.save()
else:
# Prune threads for the board
instance.board.prune_threads()
@receiver(signals.post_delete, sender=Post)
def post_deleted(sender, instance, **kwargs):
if instance.image and Path(instance.image.path).is_file():
os.remove(instance.image.path)
if instance.thumbnail and Path(instance.thumbnail.path).is_file():
os.remove(instance.thumbnail.path)
class ReportReason(models.Model):
# The reason for this report.
reason = models.CharField(max_length=255)
# The weight of this report reason.
# Heigher = more urgent.
weight = models.IntegerField(default=1)
# Urgency. If true, this post is probably reported as illegal content.
urgent = models.BooleanField(default=False)
# This is a board-specific report reason
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
def __str__(self):
return self.reason
class ReportRecord(models.Model):
# Post that this report is for
post = models.OneToOneField("Post", on_delete=models.CASCADE)
# Report weight
weight = models.IntegerField(default=0)
# If this report is urgent or not
urgent = models.BooleanField(default=False)
# def urgent(self) -> bool:
# return self.report_set().aggregate(
class Report(models.Model):
"""
There are two reasons why this model exists:
1. In the report view in the Django admin, there is probably not a good way
to specially modify the list of reports. Since we don't want to see every
single report, we need a way to limit this *in data* rather than in code.
We accomplish this by having:
- Report model, which is all reports unique to users.
- ReportRecord model, which is the collection of all reports unique to posts.
2. We want to keep a log of all reports created by users, in case we think
they are abusing the system.
"""
# The report that was made by this user
record = models.ForeignKey("ReportRecord", on_delete=models.CASCADE)
# Reason for this report
reason = models.ForeignKey("ReportReason", on_delete=models.CASCADE)
# IP address of the reporter
ip = models.GenericIPAddressField()
@receiver(signals.post_save, sender=Report)
def report_created(sender, instance, created, **kwargs):
if created:
# get the total weight, this is probably going to be more reliable than
# adding the weight every time
weights = [report.reason.weight for report in instance.record.report_set.all()]
instance.record.weight = sum(weights)
instance.record.urgent |= instance.reason.urgent
instance.record.save()
class Ban(models.Model):
# IP address of the reporter
ip = models.GenericIPAddressField()
# The reason for this ban
ban_reason = models.TextField(blank=False)
# Board that this ban is for. If null, then all boards.
# Even though this is nullable, we just want to delete all reports for a
# board if that board is deleted.
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True)
# The time that this ban was created.
created = models.DateTimeField(auto_now_add=True)
# Expiration date of this ban.
expires = models.DateTimeField()
class BanTemplate(models.Model):
# The reason for this ban
ban_reason = models.TextField(blank=False)
# The duration of the ban
duration = models.DurationField()
def create_ban(self, ip: str) -> Ban:
expires = timezone.now() + self.duration
return Ban.objects.create(ip=ip, ban_reason=self.ban_reason, expires=expires)