from datetime import timedelta import os from pathlib import Path from django.db import models from django.db.models import signals from django.conf import settings from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.dispatch import receiver from django.urls import reverse from django.utils import timezone from PIL import Image from io import BytesIO def image_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() if ext in (".jpeg", ".jpg"): ext = ".jpg" if ext not in (".jpg", ".png", ".gif"): raise Exception("File type invalid") return f"{instance.board.url}/{now_sec}{ext}" def thumbs_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() return f"{instance.board.url}/{now_sec}t{ext}" class Board(models.Model): # The short URL name for the board url = models.CharField(max_length=255, null=False, blank=False, unique=True) # Human-readable name for the board name = models.CharField(max_length=255, null=False, blank=False) # Max pages max_pages = models.IntegerField(default=10) # Threads per page threads_per_page = models.IntegerField(default=10) # The amount of time that users from the same IP address are allowed to make # consecutive posts post_cooldown = models.DurationField(default=timedelta(seconds=60)) @property def threads(self): return Post.objects.filter(board=self, op=None) @property def max_threads(self): return self.max_pages * self.threads_per_page def prune_threads(self): to_remove = self.threads.order_by("-last_bump")[self.max_threads :] for thread in to_remove: thread.delete() class Post(models.Model): # Board that this post was made on board = models.ForeignKey("Board", on_delete=models.CASCADE) # Thread that this is a part of op = models.ForeignKey( "self", null=True, on_delete=models.CASCADE, related_name="replies" ) # User's supplied name for this post name = models.CharField(max_length=255, null=True, blank=True) # User's supplied subject for this post subject = models.CharField(max_length=255, null=True, blank=True) # Text of this post text = models.TextField(max_length=10000, null=False, blank=True) # The IP address of the user that made this post ip = models.GenericIPAddressField() # Creation time created = models.DateTimeField(auto_now_add=True) # Last bump time last_bump = models.DateTimeField(auto_now_add=True) # Image image = models.ImageField( upload_to=image_upload, null=True, blank=True, width_field="image_width", height_field="image_height", ) # Thumbnail thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True) # Original image name original_image_name = models.CharField(max_length=255, null=True) # Image width and height image_width = models.IntegerField(null=True) image_height = models.IntegerField(null=True) def save(self, *args, **kwargs): if self.image: self.original_image_name = Path(self.image.name).parts[-1] self.__make_thumbnail() super(Post, self).save(*args, **kwargs) def __make_thumbnail(self): image = Image.open(self.image) image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS) image_path = Path(self.image.name) thumb_path = Path(image_path.stem + "t" + image_path.suffix) ext = thumb_path.suffix.lower() if ext in (".jpg", ".jpeg"): FTYPE = "JPEG" elif ext == ".gif": FTYPE = "GIF" elif ext == ".png": FTYPE = "PNG" else: raise Exception("File type invalid") thumb_temp = BytesIO() image.save(thumb_temp, FTYPE) thumb_temp.seek(0) # save=False stops recursion self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False) thumb_temp.close() def get_absolute_url(self): if self.op is None: return reverse( "board:post_detail", kwargs={"url": self.board.url, "id": self.id} ) else: return ( reverse( "board:post_detail", kwargs={"url": self.board.url, "id": self.op.id}, ) + f"#p{self.id}" ) def clean(self): # Make sure there is at least some content self.text = self.text.strip() if not (self.text or self.image): raise ValidationError("Please either write a message or upload an image") # Image upload size check if self.image and self.image.size > settings.MAX_UPLOAD_SIZE: raise ValidationError( "Image supplied is too large. Maximum image size is %(max)s", params={"max": settings.MAX_UPLOAD_SIZE}, ) # Rate limiting for posts # TODO BUG: if a user's last post is deleted, and it is their only post, # they will not hit rate limit. This could probably be abused last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first() if last_post: now = timezone.now() delta = now - last_post.created if delta < self.board.post_cooldown: cooldown = self.board.post_cooldown - delta raise ValidationError( f"Please wait {int(cooldown.total_seconds())} seconds before posting again" ) @receiver(signals.post_save, sender=Post) def post_created(sender, instance, created, **kwargs): if created: if instance.op: # Update the bump instance.op.last_bump = timezone.now() instance.op.save() else: # Prune threads for the board instance.board.prune_threads() @receiver(signals.post_delete, sender=Post) def post_deleted(sender, instance, **kwargs): if instance.image and Path(instance.image.path).is_file(): os.remove(instance.image.path) if instance.thumbnail and Path(instance.thumbnail.path).is_file(): os.remove(instance.thumbnail.path)