Just like 4chan has!!!!!!! Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
390 lines
14 KiB
Python
390 lines
14 KiB
Python
from datetime import timedelta
|
|
import os
|
|
from pathlib import Path
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files.base import ContentFile
|
|
from django.db import models
|
|
from django.db.models import signals
|
|
from django.db.models.deletion import SET_NULL
|
|
from django.dispatch import receiver
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext as _
|
|
from io import BytesIO
|
|
|
|
from colorfield.fields import ColorField
|
|
from PIL import Image
|
|
|
|
|
|
def image_upload(instance, filename):
|
|
op_id = instance.op.id if instance.op else instance.id
|
|
now = timezone.now()
|
|
now_sec = now.strftime("%s.%f")
|
|
ext = Path(filename).suffix.lower()
|
|
if ext in (".jpeg", ".jpg"):
|
|
ext = ".jpg"
|
|
|
|
if ext not in (".jpg", ".png", ".gif"):
|
|
raise Exception("File type invalid")
|
|
|
|
return f"{instance.board.url}/{now_sec}{ext}"
|
|
|
|
|
|
def thumbs_upload(instance, filename):
|
|
op_id = instance.op.id if instance.op else instance.id
|
|
now = timezone.now()
|
|
now_sec = now.strftime("%s.%f")
|
|
ext = Path(filename).suffix.lower()
|
|
return f"{instance.board.url}/{now_sec}t{ext}"
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
class Board(models.Model):
|
|
# The short URL name for the board
|
|
url = models.CharField(max_length=255, null=False, blank=False, unique=True)
|
|
# Human-readable name for the board
|
|
name = models.CharField(max_length=255, null=False, blank=False)
|
|
# Max pages
|
|
max_pages = models.IntegerField(default=10)
|
|
# Threads per page
|
|
threads_per_page = models.IntegerField(default=10)
|
|
# The amount of time that users from the same IP address are allowed to make
|
|
# consecutive posts
|
|
post_cooldown = models.DurationField(default=timedelta(seconds=60))
|
|
# Auto-sink threshhold. This is the number of replies that a thread can have
|
|
# before it stops being bumped.
|
|
autosink = models.IntegerField(default=300)
|
|
# Whether this board is read-only or not.
|
|
readonly = models.BooleanField(default=False)
|
|
|
|
@property
|
|
def threads(self):
|
|
return Post.objects.filter(board=self, op=None)
|
|
|
|
@property
|
|
def max_threads(self):
|
|
return self.max_pages * self.threads_per_page
|
|
|
|
def prune_threads(self):
|
|
to_remove = self.threads.order_by("-sticky", "-last_bump")[self.max_threads :]
|
|
for thread in to_remove:
|
|
thread.delete()
|
|
|
|
def __str__(self):
|
|
return f"/{self.url}/ - {self.name}"
|
|
|
|
|
|
class Post(models.Model):
|
|
# Board that this post was made on
|
|
board = models.ForeignKey("Board", on_delete=models.CASCADE)
|
|
# Thread that this is a part of
|
|
op = models.ForeignKey(
|
|
"self", null=True, on_delete=models.CASCADE, related_name="replies"
|
|
)
|
|
# User's supplied name for this post
|
|
name = models.CharField(max_length=255, null=True, blank=True)
|
|
# User's supplied subject for this post
|
|
subject = models.CharField(max_length=255, null=True, blank=True)
|
|
# Text of this post
|
|
text = models.TextField(max_length=2000, null=False, blank=True)
|
|
# The IP address of the user that made this post
|
|
ip = models.GenericIPAddressField()
|
|
# Capcode
|
|
capcode = models.ForeignKey("Capcode", null=True, blank=True, on_delete=SET_NULL)
|
|
# Post is stickied - it remains at the top of the bump list.
|
|
sticky = models.BooleanField(default=False, blank=True)
|
|
# Post is locked - nobody can post in it.
|
|
lock = models.BooleanField(default=False, blank=True)
|
|
# Creation time
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
# Last bump time
|
|
last_bump = models.DateTimeField(auto_now_add=True)
|
|
# Image
|
|
image = models.ImageField(
|
|
upload_to=image_upload,
|
|
null=True,
|
|
blank=True,
|
|
width_field="image_width",
|
|
height_field="image_height",
|
|
)
|
|
# Bump - if this post is an OP, determines whether the OP can be bumped. If
|
|
# this post is not an OP, determines whether the thread SHOULD be bumped.
|
|
bump = models.BooleanField(default=True)
|
|
# Thumbnail
|
|
thumbnail = models.ImageField(upload_to=thumbs_upload, editable=False, null=True)
|
|
# Original image name
|
|
original_image_name = models.CharField(max_length=255, null=True, blank=True)
|
|
# Image width and height
|
|
image_width = models.IntegerField(null=True, blank=True)
|
|
image_height = models.IntegerField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
permissions = [
|
|
("set_sticky", "Can sticky post"),
|
|
("set_bump", "Can bumplock post"),
|
|
("set_lock", "Can lock post"),
|
|
]
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.image:
|
|
self.original_image_name = Path(self.image.name).parts[-1]
|
|
self.__make_thumbnail()
|
|
super(Post, self).save(*args, **kwargs)
|
|
|
|
def image_replies_count(self) -> int:
|
|
"""
|
|
Gets the number of image replies to this OP.
|
|
"""
|
|
# I would normally use the filter() and count() functions here, but
|
|
# image__isnull=True does not seem to behave correctly. This is close
|
|
# enough.
|
|
return len([p for p in self.replies.all() if p.image])
|
|
|
|
def unique_posters(self) -> int:
|
|
return len(set(p.ip for p in self.replies.all()))
|
|
|
|
def __make_thumbnail(self):
|
|
image = Image.open(self.image)
|
|
image.thumbnail(settings.THUMB_SIZE, Image.ANTIALIAS)
|
|
|
|
image_path = Path(self.image.name)
|
|
thumb_path = Path(image_path.stem + "t" + image_path.suffix)
|
|
|
|
ext = thumb_path.suffix.lower()
|
|
if ext in (".jpg", ".jpeg"):
|
|
FTYPE = "JPEG"
|
|
elif ext == ".gif":
|
|
FTYPE = "GIF"
|
|
elif ext == ".png":
|
|
FTYPE = "PNG"
|
|
else:
|
|
raise Exception("File type invalid")
|
|
|
|
thumb_temp = BytesIO()
|
|
image.save(thumb_temp, FTYPE)
|
|
thumb_temp.seek(0)
|
|
|
|
# save=False stops recursion
|
|
self.thumbnail.save(thumb_path, ContentFile(thumb_temp.read()), save=False)
|
|
thumb_temp.close()
|
|
|
|
def get_absolute_url(self):
|
|
if self.op is None:
|
|
return reverse(
|
|
"board:post_detail", kwargs={"url": self.board.url, "id": self.id}
|
|
)
|
|
else:
|
|
return (
|
|
reverse(
|
|
"board:post_detail",
|
|
kwargs={"url": self.board.url, "id": self.op.id},
|
|
)
|
|
+ f"#p{self.id}"
|
|
)
|
|
|
|
def clean(self):
|
|
# Make sure there is at least some content
|
|
self.text = self.text.strip()
|
|
if not (self.text or self.image):
|
|
raise ValidationError(_("Please either write a message or upload an image"))
|
|
# Image upload size check
|
|
if self.image and self.image.size > settings.MAX_UPLOAD_SIZE:
|
|
raise ValidationError(
|
|
_("Image supplied is too large. Maximum image size is %(max)s"),
|
|
params={"max": settings.MAX_UPLOAD_SIZE},
|
|
)
|
|
# Check if board is readonly
|
|
if self.board.readonly:
|
|
raise ValidationError(
|
|
_("This board is in readonly mode, you cannot create new posts.")
|
|
)
|
|
# Check if OP is locked
|
|
if self.op and self.op.lock:
|
|
raise ValidationError(_("This thread is locked, you cannot reply to it"))
|
|
|
|
# Rate limiting for posts
|
|
# TODO BUG: if a user's last post is deleted, and it is their only post,
|
|
# they will not hit rate limit. This could probably be abused
|
|
last_post = self.board.post_set.filter(ip=self.ip).order_by("-created").first()
|
|
if last_post:
|
|
now = timezone.now()
|
|
delta = now - last_post.created
|
|
if delta < self.board.post_cooldown:
|
|
cooldown = self.board.post_cooldown - delta
|
|
raise ValidationError(
|
|
_(f"Please wait %(cooldown)s seconds before posting again"),
|
|
params={"cooldown": int(cooldown.total_seconds())},
|
|
)
|
|
|
|
|
|
@receiver(signals.post_save, sender=Post)
|
|
def post_created(sender, instance, created, **kwargs):
|
|
if created:
|
|
if instance.op:
|
|
# Update the bump
|
|
if (
|
|
instance.bump
|
|
and instance.op.bump
|
|
and instance.op.replies.count() < instance.board.autosink
|
|
and not instance.op.sticky
|
|
):
|
|
instance.op.last_bump = timezone.now()
|
|
instance.op.save()
|
|
else:
|
|
# Prune threads for the board
|
|
instance.board.prune_threads()
|
|
|
|
|
|
@receiver(signals.post_delete, sender=Post)
|
|
def post_deleted(sender, instance, **kwargs):
|
|
if instance.image and Path(instance.image.path).is_file():
|
|
os.remove(instance.image.path)
|
|
if instance.thumbnail and Path(instance.thumbnail.path).is_file():
|
|
os.remove(instance.thumbnail.path)
|
|
|
|
|
|
class ReportReason(models.Model):
|
|
# The reason for this report.
|
|
reason = models.CharField(max_length=255)
|
|
# The weight of this report reason.
|
|
# Heigher = more urgent.
|
|
weight = models.IntegerField(default=1)
|
|
# Urgency. If true, this post is probably reported as illegal content.
|
|
urgent = models.BooleanField(default=False)
|
|
# This is a board-specific report reason
|
|
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
|
|
|
|
def __str__(self):
|
|
return self.reason
|
|
|
|
|
|
class ReportRecord(models.Model):
|
|
# Post that this report is for
|
|
post = models.OneToOneField("Post", on_delete=models.CASCADE)
|
|
# Report weight
|
|
weight = models.IntegerField(default=0)
|
|
# If this report is urgent or not
|
|
urgent = models.BooleanField(default=False)
|
|
|
|
|
|
class Report(models.Model):
|
|
"""
|
|
There are two reasons why this model exists:
|
|
|
|
1. In the report view in the Django admin, there is probably not a good way
|
|
to specially modify the list of reports. Since we don't want to see every
|
|
single report, we need a way to limit this *in data* rather than in code.
|
|
We accomplish this by having:
|
|
- Report model, which is all reports unique to users.
|
|
- ReportRecord model, which is the collection of all reports unique to posts.
|
|
2. We want to keep a log of all reports created by users, in case we think
|
|
they are abusing the system.
|
|
"""
|
|
|
|
# The report that was made by this user
|
|
record = models.ForeignKey("ReportRecord", on_delete=models.CASCADE)
|
|
# Reason for this report
|
|
reason = models.ForeignKey("ReportReason", on_delete=models.CASCADE)
|
|
# IP address of the reporter
|
|
ip = models.GenericIPAddressField()
|
|
|
|
|
|
@receiver(signals.post_save, sender=Report)
|
|
def report_created(sender, instance, created, **kwargs):
|
|
if created:
|
|
# get the total weight, this is probably going to be more reliable than
|
|
# adding the weight every time
|
|
weights = [report.reason.weight for report in instance.record.report_set.all()]
|
|
instance.record.weight = sum(weights)
|
|
instance.record.urgent |= instance.reason.urgent
|
|
instance.record.save()
|
|
|
|
|
|
class BanCommon(models.Model):
|
|
# The reason for this ban
|
|
ban_reason = models.TextField(blank=False)
|
|
# Board that this ban is for. If null, then all boards.
|
|
# Even though this is nullable, we just want to delete all reports for a
|
|
# board if that board is deleted.
|
|
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
|
|
# The time that this ban was created.
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
# Expiration date of this ban. If it is null, it is permanent.
|
|
expires = models.DateTimeField(null=True, blank=True)
|
|
# The post ID that caused this ban
|
|
post_id = models.IntegerField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
|
|
class RangeBan(BanCommon):
|
|
# Starting IP address of the ban
|
|
start = models.GenericIPAddressField()
|
|
# Ending IP address of the ban
|
|
end = models.GenericIPAddressField()
|
|
|
|
def clean(self):
|
|
import ipaddress
|
|
|
|
start = ipaddress.ip_address(self.start)
|
|
end = ipaddress.ip_address(self.end)
|
|
if start.version != end.version:
|
|
raise ValidationError(
|
|
_("Start and end IP address must be the same protocol (IPv4 or IPv6)")
|
|
)
|
|
if start >= end:
|
|
raise ValidationError(_("Start IP must be lower than end IP"))
|
|
|
|
def __str__(self):
|
|
return f"Range ban for {self.start}-{self.end}"
|
|
|
|
|
|
class Ban(BanCommon):
|
|
# IP address of the ban
|
|
ip = models.GenericIPAddressField()
|
|
|
|
def __str__(self):
|
|
return f"Ban for {self.ip}"
|
|
|
|
|
|
@receiver(signals.post_save, sender=Ban)
|
|
def ban_created(sender, instance, created, **kwargs):
|
|
if created:
|
|
if instance.post_id:
|
|
post = Post.objects.get(id=instance.post_id)
|
|
post.delete()
|
|
|
|
|
|
class BanTemplate(models.Model):
|
|
# The name of this template
|
|
name = models.CharField(max_length=100)
|
|
# The reason for this ban
|
|
ban_reason = models.TextField(blank=False)
|
|
# The duration of the ban, in days
|
|
duration = models.IntegerField(blank=True, null=True)
|
|
# The board that this template is for, or none.
|
|
board = models.ForeignKey("Board", on_delete=models.CASCADE, null=True, blank=True)
|
|
|
|
def __str__(self) -> str:
|
|
if self.board:
|
|
return f"/{self.board.url}/ - {self.name}"
|
|
else:
|
|
return self.name
|
|
|
|
|
|
class Capcode(models.Model):
|
|
# Content to display *after* the capcoded user's name.
|
|
suffix = models.CharField(max_length=100)
|
|
color = ColorField()
|
|
|
|
class Meta:
|
|
permissions = (("use_capcode", "Can use capcode"),)
|
|
|
|
def __str__(self) -> str:
|
|
return self.suffix
|