from datetime import timedelta import os from pathlib import Path from django.conf import settings from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.db import models from django.db.models import signals from django.dispatch import receiver from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext as _ from PIL import Image from io import BytesIO def image_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() if ext in (".jpeg", ".jpg"): ext = ".jpg" if ext not in (".jpg", ".png", ".gif"): raise Exception("File type invalid") return f"{instance.board.url}/{now_sec}{ext}" def thumbs_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() return f"{instance.board.url}/{now_sec}t{ext}" class Board(models.Model): # The short URL name for the board url = models.CharField(max_length=255, null=False, blank=False, unique=True) # Human-readable name for the board name = models.CharField(max_length=255, null=False, blank=False) # Max pages max_pages = models.IntegerField(default=10) # Threads per page threads_per_page = models.IntegerField(default=10) # The amount of time that users from the same IP address are allowed to make # consecutive posts post_cooldown = models.DurationField(default=timedelta(seconds=60)) # Auto-sink threshhold. This is the number of replies that a thread can have # before it stops being bumped. autosink = models.IntegerField(default=300) @property def threads(self): return Post.objects.filter(board=self, op=None) @property def max_threads(self): return self.max_pages * self.threads_per_page def prune_threads(self): to_remove = self.threads.order_by("-last_bump")[self.max_threads :] for thread in to_remove: thread.delete() def __str__(self): return f"/{self.url}/ - {self.name}" class Post(models.Model): # Board that this post was made on board = models.ForeignKey("Board", on_delete=models.CASCADE) # Thread that this is a part of op = models.ForeignKey( "self", null=True, on_delete=models.CASCADE, related_name="replies" ) # User's supplied name for this post name = models.CharField(max_length=255, null=True, blank=True) # User's supplied subject for this post subject = models.CharField(max_length=255, null=True, blank=True) # Text of this post text = models.TextField(max_length=10000, null=False, blank=True) # The IP address of the user that made this post ip = models.GenericIPAddressField() # Creation time created = models.DateTimeField(auto_now_add=True) # Last bump time last_bump = models.DateTimeField(auto_now_add=True) # Image image = models.ImageField( upload_to=image_upload, null=True, blank=True, width_field="image_width", height_field="image_height", ) # Bump - if this post is an OP, determines whether the OP can be bumped. If # this post is not an OP, determines whether the thread SHOULD be bumped. bump = models.BooleanField(default=True) # Thumbnail thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True) # Original image name original_image_name = models.CharField(max_length=255, null=True) # Image width and height image_width = models.IntegerField(null=True) image_height = models.IntegerField(null=True) def save(self, *args, **kwargs): if self.image: self.original_image_name = Path(self.image.name).parts[-1] self.__make_thumbnail() super(Post, self).save(*args, **kwargs) def __make_thumbnail(self): image = Image.open(self.image) image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS) image_path = Path(self.image.name) thumb_path = Path(image_path.stem + "t" + image_path.suffix) ext = thumb_path.suffix.lower() if ext in (".jpg", ".jpeg"): FTYPE = "JPEG" elif ext == ".gif": FTYPE = "GIF" elif ext == ".png": FTYPE = "PNG" else: raise Exception("File type invalid") thumb_temp = BytesIO() image.save(thumb_temp, FTYPE) thumb_temp.seek(0) # save=False stops recursion self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False) thumb_temp.close() def get_absolute_url(self): if self.op is None: return reverse( "board:post_detail", kwargs={"url": self.board.url, "id": self.id} ) else: return ( reverse( "board:post_detail", kwargs={"url": self.board.url, "id": self.op.id}, ) + f"#p{self.id}" ) def clean(self): # Make sure there is at least some content self.text = self.text.strip() if not (self.text or self.image): raise ValidationError(_("Please either write a message or upload an image")) # Image upload size check if self.image and self.image.size > settings.MAX_UPLOAD_SIZE: raise ValidationError( _("Image supplied is too large. Maximum image size is %(max)s"), params={"max": settings.MAX_UPLOAD_SIZE}, ) # Rate limiting for posts # TODO BUG: if a user's last post is deleted, and it is their only post, # they will not hit rate limit. This could probably be abused last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first() if last_post: now = timezone.now() delta = now - last_post.created if delta < self.board.post_cooldown: cooldown = self.board.post_cooldown - delta raise ValidationError( _(f"Please wait %(cooldown)s seconds before posting again"), params={"cooldown": int(cooldown.total_seconds())}, ) @receiver(signals.post_save, sender=Post) def post_created(sender, instance, created, **kwargs): if created: if instance.op: # Update the bump if ( instance.bump and instance.op.bump and instance.op.replies.count() < instance.board.autosink ): instance.op.last_bump = timezone.now() instance.op.save() else: # Prune threads for the board instance.board.prune_threads() @receiver(signals.post_delete, sender=Post) def post_deleted(sender, instance, **kwargs): if instance.image and Path(instance.image.path).is_file(): os.remove(instance.image.path) if instance.thumbnail and Path(instance.thumbnail.path).is_file(): os.remove(instance.thumbnail.path) class ReportReason(models.Model): # The reason for this report. reason = models.CharField(max_length=255) # The weight of this report reason. # Heigher = more urgent. weight = models.IntegerField(default=1) # Urgency. If true, this post is probably reported as illegal content. urgent = models.BooleanField(default=False) # This is a board-specific report reason board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True) def __str__(self): return self.reason class ReportRecord(models.Model): # Post that this report is for post = models.OneToOneField("Post", on_delete=models.CASCADE) # Report weight weight = models.IntegerField(default=0) # If this report is urgent or not urgent = models.BooleanField(default=False) class Report(models.Model): """ There are two reasons why this model exists: 1. In the report view in the Django admin, there is probably not a good way to specially modify the list of reports. Since we don't want to see every single report, we need a way to limit this *in data* rather than in code. We accomplish this by having: - Report model, which is all reports unique to users. - ReportRecord model, which is the collection of all reports unique to posts. 2. We want to keep a log of all reports created by users, in case we think they are abusing the system. """ # The report that was made by this user record = models.ForeignKey("ReportRecord", on_delete=models.CASCADE) # Reason for this report reason = models.ForeignKey("ReportReason", on_delete=models.CASCADE) # IP address of the reporter ip = models.GenericIPAddressField() @receiver(signals.post_save, sender=Report) def report_created(sender, instance, created, **kwargs): if created: # get the total weight, this is probably going to be more reliable than # adding the weight every time weights = [report.reason.weight for report in instance.record.report_set.all()] instance.record.weight = sum(weights) instance.record.urgent |= instance.reason.urgent instance.record.save() # class BanTemplate(models.Model): # board = class RangeBan(models.Model): # Starting IP address of the ban start = models.GenericIPAddressField() # Ending IP address of the ban end = models.GenericIPAddressField() # The reason for this ban ban_reason = models.TextField(blank=False) # The time that this ban was created. created = models.DateTimeField(auto_now_add=True) # Expiration date of this ban. If it is null, it is permanent. expires = models.DateTimeField(null=True) class Ban(models.Model): # IP address of the ban ip = models.GenericIPAddressField() # The reason for this ban ban_reason = models.TextField(blank=False) # Board that this ban is for. If null, then all boards. # Even though this is nullable, we just want to delete all reports for a # board if that board is deleted. board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True) # The time that this ban was created. created = models.DateTimeField(auto_now_add=True) # Expiration date of this ban. If it is null, it is permanent. expires = models.DateTimeField(null=True) class BanTemplate(models.Model): # The reason for this ban ban_reason = models.TextField(blank=False) # The duration of the ban duration = models.DurationField() def create_ban(self, ip: str) -> Ban: expires = timezone.now() + self.duration return Ban.objects.create(ip=ip, ban_reason=self.ban_reason, expires=expires)