Harden uploads and enforce language-prefixed routes
This commit is contained in:
115
claims/validators.py
Normal file
115
claims/validators.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import io
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
DEFAULT_ALLOWED_EXTENSIONS = ("pdf", "png", "jpg", "jpeg")
|
||||
DEFAULT_ALLOWED_CONTENT_TYPES = ("application/pdf", "image/png", "image/jpeg")
|
||||
|
||||
PDF_SIGNATURE = b"%PDF-"
|
||||
PNG_SIGNATURE = b"\x89PNG\r\n\x1a\n"
|
||||
JPEG_PREFIX = b"\xff\xd8"
|
||||
|
||||
|
||||
def _peek(file_obj, length=8):
|
||||
"""Read the first bytes of a file without consuming the stream."""
|
||||
stream = getattr(file_obj, "file", file_obj)
|
||||
if isinstance(stream, io.BytesIO):
|
||||
pos = stream.tell()
|
||||
stream.seek(0)
|
||||
data = stream.read(length)
|
||||
stream.seek(pos)
|
||||
return data
|
||||
|
||||
try:
|
||||
pos = stream.tell()
|
||||
except (AttributeError, OSError):
|
||||
pos = None
|
||||
try:
|
||||
stream.seek(0)
|
||||
data = stream.read(length)
|
||||
finally:
|
||||
if pos is not None:
|
||||
stream.seek(pos)
|
||||
else:
|
||||
try:
|
||||
stream.seek(0)
|
||||
except (AttributeError, OSError):
|
||||
pass
|
||||
return data
|
||||
|
||||
|
||||
def _allowed_extensions():
|
||||
exts = getattr(settings, "CLAIMS_ALLOWED_RECEIPT_EXTENSIONS", DEFAULT_ALLOWED_EXTENSIONS)
|
||||
normalized = {ext.lower() for ext in exts}
|
||||
return normalized or set(DEFAULT_ALLOWED_EXTENSIONS)
|
||||
|
||||
|
||||
def _allowed_content_types():
|
||||
cts = getattr(settings, "CLAIMS_ALLOWED_RECEIPT_CONTENT_TYPES", DEFAULT_ALLOWED_CONTENT_TYPES)
|
||||
normalized = {ct.lower() for ct in cts}
|
||||
return normalized or set(DEFAULT_ALLOWED_CONTENT_TYPES)
|
||||
|
||||
|
||||
def _max_file_size():
|
||||
return int(getattr(settings, "CLAIMS_MAX_RECEIPT_BYTES", 10 * 1024 * 1024))
|
||||
|
||||
|
||||
def _extension(validated_file):
|
||||
name = validated_file.name or ""
|
||||
if "." not in name:
|
||||
return ""
|
||||
return name.rsplit(".", 1)[-1].lower()
|
||||
|
||||
|
||||
def _signature_matches(ext, header):
|
||||
if not header:
|
||||
return False
|
||||
if ext == "pdf":
|
||||
return header.startswith(PDF_SIGNATURE)
|
||||
if ext == "png":
|
||||
return header.startswith(PNG_SIGNATURE)
|
||||
if ext in {"jpg", "jpeg"}:
|
||||
return header.startswith(JPEG_PREFIX)
|
||||
return False
|
||||
|
||||
|
||||
def validate_receipt_file(uploaded_file):
|
||||
"""Ensure uploaded receipts comply with size/format requirements."""
|
||||
if not uploaded_file:
|
||||
return
|
||||
|
||||
max_bytes = _max_file_size()
|
||||
if uploaded_file.size > max_bytes:
|
||||
max_mb = round(max_bytes / (1024 * 1024), 2)
|
||||
raise ValidationError(
|
||||
_("Kvitton får vara max %(size)s MB."),
|
||||
code="file_too_large",
|
||||
params={"size": max_mb},
|
||||
)
|
||||
|
||||
ext = _extension(uploaded_file)
|
||||
extensions = _allowed_extensions()
|
||||
if ext not in extensions:
|
||||
raise ValidationError(
|
||||
_("Otillåtet filformat. Tillåtna format är %(formats)s."),
|
||||
code="invalid_extension",
|
||||
params={"formats": ", ".join(sorted(extensions))},
|
||||
)
|
||||
|
||||
content_type = getattr(uploaded_file, "content_type", "")
|
||||
allowed_types = _allowed_content_types()
|
||||
if content_type and content_type.lower() not in allowed_types:
|
||||
raise ValidationError(
|
||||
_("Otillåten MIME-typ: %(type)s."),
|
||||
code="invalid_mime",
|
||||
params={"type": content_type},
|
||||
)
|
||||
|
||||
header = _peek(uploaded_file, length=8)
|
||||
if not _signature_matches(ext, header):
|
||||
raise ValidationError(
|
||||
_("Filens innehåll matchar inte förväntat format."),
|
||||
code="invalid_signature",
|
||||
)
|
||||
Reference in New Issue
Block a user