from datetime import timedelta import os from pathlib import Path from django.conf import settings from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.db import models from django.db.models import signals from django.db.models.deletion import SET_NULL from django.dispatch import receiver from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext as _ from io import BytesIO from colorfield.fields import ColorField from PIL import Image def image_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() if ext in (".jpeg", ".jpg"): ext = ".jpg" if ext not in (".jpg", ".png", ".gif"): raise Exception("File type invalid") return f"{instance.board.url}/{now_sec}{ext}" def thumbs_upload(instance, filename): op_id = instance.op.id if instance.op else instance.id now = timezone.now() now_sec = now.strftime("%s.%f") ext = Path(filename).suffix.lower() return f"{instance.board.url}/{now_sec}t{ext}" User = get_user_model() class Board(models.Model): # The short URL name for the board url = models.CharField(max_length=255, null=False, blank=False, unique=True) # Human-readable name for the board name = models.CharField(max_length=255, null=False, blank=False) # Max pages max_pages = models.IntegerField(default=10) # Threads per page threads_per_page = models.IntegerField(default=10) # The amount of time that users from the same IP address are allowed to make # consecutive posts post_cooldown = models.DurationField(default=timedelta(seconds=60)) # The amount of time that users from the same IP address are allowed to make # consecutive reports report_cooldown = models.DurationField(default=timedelta(seconds=15)) # Auto-sink threshhold. This is the number of replies that a thread can have # before it stops being bumped. autosink = models.IntegerField(default=300) # Whether this board is read-only or not. readonly = models.BooleanField(default=False) # Whether this board appears in the board listing or not hidden = models.BooleanField(default=False) @property def threads(self): return Post.objects.filter(board=self, op=None) @property def max_threads(self): return self.max_pages * self.threads_per_page def prune_threads(self): to_remove = self.threads.order_by("-sticky", "-last_bump")[self.max_threads :] for thread in to_remove: thread.delete() def __str__(self): return f"/{self.url}/ - {self.name}" class Post(models.Model): # Board that this post was made on board = models.ForeignKey("Board", on_delete=models.CASCADE) # Thread that this is a part of op = models.ForeignKey( "self", null=True, on_delete=models.CASCADE, related_name="replies" ) # User's supplied name for this post name = models.CharField(max_length=255, null=True, blank=True) # User's tripcode, already calculated tripcode = models.CharField(max_length=10, blank=True) # User's supplied subject for this post subject = models.CharField(max_length=255, null=True, blank=True) # Text of this post text = models.TextField(max_length=2000, null=False, blank=True) # The IP address of the user that made this post ip = models.GenericIPAddressField() # Capcode capcode = models.ForeignKey("Capcode", null=True, blank=True, on_delete=SET_NULL) # Post is stickied - it remains at the top of the bump list. sticky = models.BooleanField(default=False, blank=True) # Post is locked - nobody can post in it. lock = models.BooleanField(default=False, blank=True) # Creation time created = models.DateTimeField(auto_now_add=True) # Last bump time last_bump = models.DateTimeField(auto_now_add=True) # Image image = models.ImageField( upload_to=image_upload, null=True, blank=True, width_field="image_width", height_field="image_height", ) # Bump - if this post is an OP, determines whether the OP can be bumped. If # this post is not an OP, determines whether the thread SHOULD be bumped. bump = models.BooleanField(default=True) # Thumbnail thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True) # Original image name original_image_name = models.CharField(max_length=255, null=True, blank=True) # Image width and height image_width = models.IntegerField(null=True, blank=True) image_height = models.IntegerField(null=True, blank=True) # The user token that was used to make this post. This is used for # verification when a user wants to delete their post. It is not # particularly secret and does not need to be hashed. user_token = models.CharField(max_length=30, null=True) class Meta: permissions = [ ("set_sticky", "Can sticky post"), ("set_bump", "Can bumplock post"), ("set_lock", "Can lock post"), ("wipe_user", "Can wipe all posts by a user"), ] def save(self, *args, **kwargs): if self.image: self.original_image_name = Path(self.image.name).parts[-1] self.__make_thumbnail() super(Post, self).save(*args, **kwargs) def image_replies_count(self) -> int: """ Gets the number of image replies to this OP. """ # I would normally use the filter() and count() functions here, but # image__isnull=True does not seem to behave correctly. This is close # enough. return len([p for p in self.replies.all() if p.image]) def unique_posters(self) -> int: return len(set(p.ip for p in self.replies.all())) def __make_thumbnail(self): image = Image.open(self.image) image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS) image_path = Path(self.image.name) thumb_path = Path(image_path.stem + "t" + image_path.suffix) ext = thumb_path.suffix.lower() if ext in (".jpg", ".jpeg"): FTYPE = "JPEG" elif ext == ".gif": FTYPE = "GIF" elif ext == ".png": FTYPE = "PNG" else: raise Exception("File type invalid") thumb_temp = BytesIO() image.save(thumb_temp, FTYPE) thumb_temp.seek(0) # save=False stops recursion self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False) thumb_temp.close() def get_absolute_url(self): op = self.op if self.op else self return ( reverse( "board:post_detail", kwargs={"url": self.board.url, "id": op.id}, ) + f"#p{self.id}" ) def clean(self): # Make sure there is at least some content self.text = self.text.strip() if not (self.text or self.image): raise ValidationError(_("Please either write a message or upload an image")) # Image upload size check if self.image and self.image.size > settings.MAX_UPLOAD_SIZE: raise ValidationError( _("Image supplied is too large. Maximum image size is %(max)s"), params={"max": settings.MAX_UPLOAD_SIZE}, ) # Check if board is readonly if self.board.readonly: raise ValidationError( _("This board is in readonly mode, you cannot create new posts.") ) # Check if OP is locked if self.op and self.op.lock: raise ValidationError(_("This thread is locked, you cannot reply to it")) # Rate limiting for posts # TODO BUG: if a user's last post is deleted, and it is their only post, # they will not hit rate limit. This could probably be abused last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first() if last_post: now = timezone.now() delta = now - last_post.created if delta < self.board.post_cooldown: cooldown = self.board.post_cooldown - delta raise ValidationError( _(f"Please wait %(cooldown)s seconds before posting again"), params={"cooldown": int(cooldown.total_seconds())}, ) @receiver(signals.post_save, sender=Post) def post_created(sender, instance, created, **kwargs): if created: if instance.op: # Update the bump if ( instance.bump and instance.op.bump and instance.op.replies.count() < instance.board.autosink and not instance.op.sticky ): instance.op.last_bump = timezone.now() instance.op.save() else: # Prune threads for the board instance.board.prune_threads() @receiver(signals.post_delete, sender=Post) def post_deleted(sender, instance, **kwargs): if instance.image and Path(instance.image.path).is_file(): os.remove(instance.image.path) if instance.thumbnail and Path(instance.thumbnail.path).is_file(): os.remove(instance.thumbnail.path) class ReportReason(models.Model): # The reason for this report. reason = models.CharField(max_length=255) # The weight of this report reason. # Heigher = more urgent. weight = models.IntegerField(default=1) # Urgency. If true, this post is probably reported as illegal content. urgent = models.BooleanField(default=False) # This is a board-specific report reason board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True) def __str__(self): return self.reason class ReportRecord(models.Model): # Post that this report is for post = models.OneToOneField("Post", on_delete=models.CASCADE) # Report weight weight = models.IntegerField(default=0) # If this report is urgent or not urgent = models.BooleanField(default=False) class Report(models.Model): """ There are two reasons why this model exists: 1. In the report view in the Django admin, there is probably not a good way to specially modify the list of reports. Since we don't want to see every single report, we need a way to limit this *in data* rather than in code. We accomplish this by having: - Report model, which is all reports unique to users. - ReportRecord model, which is the collection of all reports unique to posts. 2. We want to keep a log of all reports created by users, in case we think they are abusing the system. """ class Meta: unique_together = [ ["record", "ip"], ] # The report that was made by this user record = models.ForeignKey("ReportRecord", on_delete=models.CASCADE) # Reason for this report reason = models.ForeignKey("ReportReason", on_delete=models.CASCADE) # IP address of the reporter ip = models.GenericIPAddressField() # Time that this report was created created = models.DateTimeField(auto_now_add=True) @receiver(signals.post_save, sender=Report) def report_created(sender, instance, created, **kwargs): if created: # get the total weight, this is probably going to be more reliable than # adding the weight every time weights = [report.reason.weight for report in instance.record.report_set.all()] instance.record.weight = sum(weights) instance.record.urgent |= instance.reason.urgent instance.record.save() class BanCommon(models.Model): # The reason for this ban ban_reason = models.TextField(blank=False) # Board that this ban is for. If null, then all boards. # Even though this is nullable, we just want to delete all reports for a # board if that board is deleted. board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True) # The time that this ban was created. created = models.DateTimeField(auto_now_add=True) # Expiration date of this ban. If it is null, it is permanent. expires = models.DateTimeField(null=True, blank=True) # The post ID that caused this ban post_id = models.IntegerField(null=True, blank=True) class Meta: abstract = True class RangeBan(BanCommon): # Starting IP address of the ban start = models.GenericIPAddressField() # Ending IP address of the ban end = models.GenericIPAddressField() def clean(self): import ipaddress start = ipaddress.ip_address(self.start) end = ipaddress.ip_address(self.end) if start.version != end.version: raise ValidationError( _("Start and end IP address must be the same protocol (IPv4 or IPv6)") ) if start >= end: raise ValidationError(_("Start IP must be lower than end IP")) def __str__(self): return f"Range ban for {self.start}-{self.end}" class Ban(BanCommon): # IP address of the ban ip = models.GenericIPAddressField() def __str__(self): return f"Ban for {self.ip}" @receiver(signals.post_save, sender=Ban) def ban_created(sender, instance, created, **kwargs): if created: if instance.post_id: post = Post.objects.get(id=instance.post_id) post.delete() class BanTemplate(models.Model): # The name of this template name = models.CharField(max_length=100) # The reason for this ban ban_reason = models.TextField(blank=False) # The duration of the ban, in days duration = models.IntegerField(blank=True, null=True) # The board that this template is for, or none. board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True) def __str__(self) -> str: if self.board: return f"/{self.board.url}/ - {self.name}" else: return self.name class Capcode(models.Model): # Content to display *after* the capcoded user's name. suffix = models.CharField(max_length=100) color = ColorField() class Meta: permissions = (("use_capcode", "Can use capcode"),) def __str__(self) -> str: return self.suffix class NewsPost(models.Model): # The time that this news post was created. created = models.DateTimeField(auto_now_add=True) # The title for this news post. title = models.CharField(max_length=300) # The author of this news post. author = models.CharField(max_length=100, blank=True) # The content of this news post. body = models.TextField(blank=True)