from django.conf import settings from django.contrib import admin from django.db import models from django.db.models import Q from django.utils import timezone from django.utils.translation import gettext_lazy as _ # # Workflow Helpers """ States: - inactive (either active=False or publication date not reached) - public (active or archived) - active (public, not archived) - archived (public, not active) """ class WorkflowQuerySet(models.QuerySet): @classmethod def published_filter(cls): return { "is_published": True, "publication_datetime__lte": timezone.now(), } @classmethod def active_filter(cls): return { "is_published": True, "publication_datetime__lte": timezone.now(), "archiving_datetime__gt": timezone.now(), } @classmethod def archived_filter(cls): return { "is_published": True, "publication_datetime__lte": timezone.now(), "archiving_datetime__lte": timezone.now(), } @classmethod def future_filter(cls): return { "is_published": True, "publication_datetime__gt": timezone.now(), "archiving_datetime__gt": timezone.now(), } def unpublished(self): return self.exclude(**self.published_filter()) def published(self): # Active or archived return self.filter(**self.published_filter()) public = published def active(self): return self.filter(**self.active_filter()) def archived(self): return self.filter(**self.archived_filter()) def future(self): return self.filter(**self.future_filter()) class ManyToManyWorkflowQuerySet(WorkflowQuerySet): def _build_related_filters(self, filter_template): """ Transforms all filter rules to match any related models which itself is workflow managed. """ filter = {} for field in self.model._meta.get_fields(): if field.many_to_one and isinstance(field.related_model._meta.default_manager, WorkflowManager): field.name for path, value in filter_template.items(): filter[f'{field.name}__{path}'] = value return filter def published(self): # Active or archived return self.exclude(**self._build_related_filters(self.published_filter())) public = published def active(self): return self.filter(**self._build_related_filters(self.active_filter())) def archived(self): return self.filter(**self._build_related_filters(self.archived_filter())) def future(self): return self.filter(**self._build_related_filters(self.future_filter())) class WorkflowManager(models.Manager.from_queryset(WorkflowQuerySet)): pass class ManyToManyWorkflowManager(models.Manager.from_queryset(ManyToManyWorkflowQuerySet)): pass class WorkflowMixin(models.Model): creation_datetime = models.DateTimeField(auto_now_add=True) modification_datetime = models.DateTimeField(auto_now=True) creation_user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, editable=False, on_delete=models.SET_NULL, related_name='+') modification_user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, editable=False, on_delete=models.SET_NULL, related_name='+') is_published = models.BooleanField(_("Published"), default=False) publication_datetime = models.DateTimeField(_("Publication Date"), default=timezone.now) archiving_datetime = models.DateTimeField(_("Archiving Date"), default=timezone.datetime.max) objects = WorkflowManager() class Meta: abstract = True ordering = ['-publication_datetime'] # Most recent first @property def workflow_status(self): now = timezone.now() if not self.is_published or self.publication_datetime > now: return 'unpublished' elif self.publication_datetime <= now and self.archiving_datetime > now: return 'active' else: return 'archived' def display_is_published(obj): return obj.is_published and obj.publication_datetime <= timezone.now() display_is_published.boolean = True display_is_published.short_description = _("Aktiv") class PublicationStateListFilter(admin.SimpleListFilter): title = _("Published") # Parameter for the filter that will be used in the URL query. parameter_name = 'workflow_state' filter_dict = { 'unpublished': 'unpublished', 'public': 'public', 'active': 'active', 'archived': 'archived', } def lookups(self, request, model_admin): return ( ('unpublished', _("Unpublished")), ('public', _("Published (Active or Archived)")), ('active', _("Active (published, not archived)")), ('archived', _("Archived")), ) def queryset(self, request, queryset): if self.value(): filter_method = self.filter_dict[self.value()] return getattr(queryset, filter_method)() else: return queryset