Files
interchan/board/models.py

416 lines
15 KiB
Python
Raw Normal View History

from datetime import timedelta
import os
from pathlib import Path
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.core.files.base import ContentFile
from django.db import models
from django.db.models import signals
from django.db.models.deletion import SET_NULL
from django.dispatch import receiver
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from io import BytesIO
from colorfield.fields import ColorField
from PIL import Image
def image_upload(instance, filename):
op_id = instance.op.id if instance.op else instance.id
now = timezone.now()
now_sec = now.strftime("%s.%f")
ext = Path(filename).suffix.lower()
if ext in (".jpeg", ".jpg"):
ext = ".jpg"
if ext not in (".jpg", ".png", ".gif"):
raise Exception("File type invalid")
return f"{instance.board.url}/{now_sec}{ext}"
def thumbs_upload(instance, filename):
op_id = instance.op.id if instance.op else instance.id
now = timezone.now()
now_sec = now.strftime("%s.%f")
ext = Path(filename).suffix.lower()
return f"{instance.board.url}/{now_sec}t{ext}"
User = get_user_model()
class Board(models.Model):
# The short URL name for the board
url = models.CharField(max_length=255, null=False, blank=False, unique=True)
# Human-readable name for the board
name = models.CharField(max_length=255, null=False, blank=False)
# Max pages
max_pages = models.IntegerField(default=10)
# Threads per page
threads_per_page = models.IntegerField(default=10)
# The amount of time that users from the same IP address are allowed to make
# consecutive posts
post_cooldown = models.DurationField(default=timedelta(seconds=60))
# The amount of time that users from the same IP address are allowed to make
# consecutive reports
report_cooldown = models.DurationField(default=timedelta(seconds=15))
# Auto-sink threshhold. This is the number of replies that a thread can have
# before it stops being bumped.
autosink = models.IntegerField(default=300)
# Whether this board is read-only or not.
readonly = models.BooleanField(default=False)
# Whether this board appears in the board listing or not
hidden = models.BooleanField(default=False)
@property
def threads(self):
return Post.objects.filter(board=self, op=None)
@property
def max_threads(self):
return self.max_pages * self.threads_per_page
def prune_threads(self):
to_remove = self.threads.order_by("-sticky", "-last_bump")[self.max_threads :]
for thread in to_remove:
thread.delete()
def __str__(self):
return f"/{self.url}/ - {self.name}"
class Post(models.Model):
# Board that this post was made on
board = models.ForeignKey("Board", on_delete=models.CASCADE)
# Thread that this is a part of
op = models.ForeignKey(
"self", null=True, on_delete=models.CASCADE, related_name="replies"
)
# User's supplied name for this post
name = models.CharField(max_length=255, null=True, blank=True)
# User's tripcode, already calculated
tripcode = models.CharField(max_length=10, blank=True)
# User's supplied subject for this post
subject = models.CharField(max_length=255, null=True, blank=True)
# Text of this post
text = models.TextField(max_length=2000, null=False, blank=True)
# The IP address of the user that made this post
ip = models.GenericIPAddressField()
# Capcode
capcode = models.ForeignKey("Capcode", null=True, blank=True, on_delete=SET_NULL)
# Post is stickied - it remains at the top of the bump list.
sticky = models.BooleanField(default=False, blank=True)
# Post is locked - nobody can post in it.
lock = models.BooleanField(default=False, blank=True)
# Creation time
created = models.DateTimeField(auto_now_add=True)
# Last bump time
last_bump = models.DateTimeField(auto_now_add=True)
# Image
image = models.ImageField(
upload_to=image_upload,
null=True,
blank=True,
width_field="image_width",
height_field="image_height",
)
# Bump - if this post is an OP, determines whether the OP can be bumped. If
# this post is not an OP, determines whether the thread SHOULD be bumped.
bump = models.BooleanField(default=True)
# Thumbnail
thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True)
# Original image name
original_image_name = models.CharField(max_length=255, null=True, blank=True)
# Image width and height
image_width = models.IntegerField(null=True, blank=True)
image_height = models.IntegerField(null=True, blank=True)
# The user token that was used to make this post. This is used for
# verification when a user wants to delete their post. It is not
# particularly secret and does not need to be hashed.
user_token = models.CharField(max_length=30, null=True)
class Meta:
permissions = [
("set_sticky", "Can sticky post"),
("set_bump", "Can bumplock post"),
("set_lock", "Can lock post"),
("wipe_user", "Can wipe all posts by a user"),
]
def save(self, *args, **kwargs):
if self.image:
self.original_image_name = Path(self.image.name).parts[-1]
self.__make_thumbnail()
super(Post, self).save(*args, **kwargs)
def image_replies_count(self) -> int:
"""
Gets the number of image replies to this OP.
"""
# I would normally use the filter() and count() functions here, but
# image__isnull=True does not seem to behave correctly. This is close
# enough.
return len([p for p in self.replies.all() if p.image])
def unique_posters(self) -> int:
return len(set(p.ip for p in self.replies.all()))
def __make_thumbnail(self):
image = Image.open(self.image)
image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS)
image_path = Path(self.image.name)
thumb_path = Path(image_path.stem + "t" + image_path.suffix)
ext = thumb_path.suffix.lower()
if ext in (".jpg", ".jpeg"):
FTYPE = "JPEG"
elif ext == ".gif":
FTYPE = "GIF"
elif ext == ".png":
FTYPE = "PNG"
else:
raise Exception("File type invalid")
thumb_temp = BytesIO()
image.save(thumb_temp, FTYPE)
thumb_temp.seek(0)
# save=False stops recursion
self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False)
thumb_temp.close()
def get_absolute_url(self):
op = self.op if self.op else self
return (
reverse(
"board:post_detail",
kwargs={"url": self.board.url, "id": op.id},
)
+ f"#p{self.id}"
)
def clean(self):
# Make sure there is at least some content
self.text = self.text.strip()
if not (self.text or self.image):
raise ValidationError(_("Please either write a message or upload an image"))
# Image upload size check
if self.image and self.image.size > settings.MAX_UPLOAD_SIZE:
raise ValidationError(
_("Image supplied is too large. Maximum image size is %(max)s"),
params={"max": settings.MAX_UPLOAD_SIZE},
)
# Check if board is readonly
if self.board.readonly:
raise ValidationError(
_("This board is in readonly mode, you cannot create new posts.")
)
# Check if OP is locked
if self.op and self.op.lock:
raise ValidationError(_("This thread is locked, you cannot reply to it"))
# Rate limiting for posts
# TODO BUG: if a user's last post is deleted, and it is their only post,
# they will not hit rate limit. This could probably be abused
last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first()
if last_post:
now = timezone.now()
delta = now - last_post.created
if delta < self.board.post_cooldown:
cooldown = self.board.post_cooldown - delta
raise ValidationError(
_(f"Please wait %(cooldown)s seconds before posting again"),
params={"cooldown": int(cooldown.total_seconds())},
)
@receiver(signals.post_save, sender=Post)
def post_created(sender, instance, created, **kwargs):
if created:
if instance.op:
# Update the bump
if (
instance.bump
and instance.op.bump
and instance.op.replies.count() < instance.board.autosink
and not instance.op.sticky
):
instance.op.last_bump = timezone.now()
instance.op.save()
else:
# Prune threads for the board
instance.board.prune_threads()
@receiver(signals.post_delete, sender=Post)
def post_deleted(sender, instance, **kwargs):
if instance.image and Path(instance.image.path).is_file():
os.remove(instance.image.path)
if instance.thumbnail and Path(instance.thumbnail.path).is_file():
os.remove(instance.thumbnail.path)
class ReportReason(models.Model):
# The reason for this report.
reason = models.CharField(max_length=255)
# The weight of this report reason.
# Heigher = more urgent.
weight = models.IntegerField(default=1)
# Urgency. If true, this post is probably reported as illegal content.
urgent = models.BooleanField(default=False)
# This is a board-specific report reason
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
def __str__(self):
return self.reason
class ReportRecord(models.Model):
# Post that this report is for
post = models.OneToOneField("Post", on_delete=models.CASCADE)
# Report weight
weight = models.IntegerField(default=0)
# If this report is urgent or not
urgent = models.BooleanField(default=False)
class Report(models.Model):
"""
There are two reasons why this model exists:
1. In the report view in the Django admin, there is probably not a good way
to specially modify the list of reports. Since we don't want to see every
single report, we need a way to limit this *in data* rather than in code.
We accomplish this by having:
- Report model, which is all reports unique to users.
- ReportRecord model, which is the collection of all reports unique to posts.
2. We want to keep a log of all reports created by users, in case we think
they are abusing the system.
"""
class Meta:
unique_together = [
["record", "ip"],
]
# The report that was made by this user
record = models.ForeignKey("ReportRecord", on_delete=models.CASCADE)
# Reason for this report
reason = models.ForeignKey("ReportReason", on_delete=models.CASCADE)
# IP address of the reporter
ip = models.GenericIPAddressField()
# Time that this report was created
created = models.DateTimeField(auto_now_add=True)
@receiver(signals.post_save, sender=Report)
def report_created(sender, instance, created, **kwargs):
if created:
# get the total weight, this is probably going to be more reliable than
# adding the weight every time
weights = [report.reason.weight for report in instance.record.report_set.all()]
instance.record.weight = sum(weights)
instance.record.urgent |= instance.reason.urgent
instance.record.save()
class BanCommon(models.Model):
# The reason for this ban
ban_reason = models.TextField(blank=False)
# Board that this ban is for. If null, then all boards.
# Even though this is nullable, we just want to delete all reports for a
# board if that board is deleted.
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
# The time that this ban was created.
created = models.DateTimeField(auto_now_add=True)
# Expiration date of this ban. If it is null, it is permanent.
expires = models.DateTimeField(null=True, blank=True)
# The post ID that caused this ban
post_id = models.IntegerField(null=True, blank=True)
class Meta:
abstract = True
class RangeBan(BanCommon):
# Starting IP address of the ban
start = models.GenericIPAddressField()
# Ending IP address of the ban
end = models.GenericIPAddressField()
def clean(self):
import ipaddress
start = ipaddress.ip_address(self.start)
end = ipaddress.ip_address(self.end)
if start.version != end.version:
raise ValidationError(
_("Start and end IP address must be the same protocol (IPv4 or IPv6)")
)
if start >= end:
raise ValidationError(_("Start IP must be lower than end IP"))
def __str__(self):
return f"Range ban for {self.start}-{self.end}"
class Ban(BanCommon):
# IP address of the ban
ip = models.GenericIPAddressField()
def __str__(self):
return f"Ban for {self.ip}"
@receiver(signals.post_save, sender=Ban)
def ban_created(sender, instance, created, **kwargs):
if created:
if instance.post_id:
post = Post.objects.get(id=instance.post_id)
post.delete()
class BanTemplate(models.Model):
# The name of this template
name = models.CharField(max_length=100)
# The reason for this ban
ban_reason = models.TextField(blank=False)
# The duration of the ban, in days
duration = models.IntegerField(blank=True, null=True)
# The board that this template is for, or none.
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
def __str__(self) -> str:
if self.board:
return f"/{self.board.url}/ - {self.name}"
else:
return self.name
class Capcode(models.Model):
# Content to display *after* the capcoded user's name.
suffix = models.CharField(max_length=100)
color = ColorField()
class Meta:
permissions = (("use_capcode", "Can use capcode"),)
def __str__(self) -> str:
return self.suffix
class NewsPost(models.Model):
# The time that this news post was created.
created = models.DateTimeField(auto_now_add=True)
# The title for this news post.
title = models.CharField(max_length=300)
# The author of this news post.
author = models.CharField(max_length=100, blank=True)
# The content of this news post.
body = models.TextField(blank=True)