Skip to content
Snippets Groups Projects
Commit b215f55d authored by Jonathan Weth's avatar Jonathan Weth :keyboard:
Browse files

Merge branch '408-add-recursive-helper-methods-for-group' into 'master'

Resolve "Add recursive helper methods for Group"

Closes #408

See merge request !852
parents 5bc1a635 75e40770
No related branches found
No related tags found
1 merge request!852Resolve "Add recursive helper methods for Group"
Pipeline #47881 passed with warnings
Pipeline: AlekSIS

#47890

    ......@@ -9,6 +9,11 @@ and this project adheres to `Semantic Versioning`_.
    Unreleased
    ----------
    Added
    ~~~~~
    * Recursive helper methods for group hierarchies
    Fixed
    ~~~~~
    ......
    ......@@ -7,6 +7,7 @@ from django.db.models import QuerySet
    from django.db.models.manager import Manager
    from calendarweek import CalendarWeek
    from django_cte import CTEManager, CTEQuerySet
    from polymorphic.managers import PolymorphicManager
    ......@@ -84,7 +85,7 @@ class SchoolTermRelatedQuerySet(QuerySet):
    return None
    class GroupManager(CurrentSiteManagerWithoutMigrations):
    class GroupManager(CurrentSiteManagerWithoutMigrations, CTEManager):
    """Manager adding specific methods to groups."""
    def get_queryset(self):
    ......@@ -92,7 +93,7 @@ class GroupManager(CurrentSiteManagerWithoutMigrations):
    return super().get_queryset().select_related("school_term")
    class GroupQuerySet(SchoolTermRelatedQuerySet):
    class GroupQuerySet(SchoolTermRelatedQuerySet, CTEQuerySet):
    pass
    ......
    ......@@ -14,7 +14,7 @@ from django.contrib.sites.models import Site
    from django.core.exceptions import ValidationError
    from django.core.validators import MaxValueValidator
    from django.db import models, transaction
    from django.db.models import QuerySet
    from django.db.models import Q, QuerySet
    from django.db.models.signals import m2m_changed
    from django.dispatch import receiver
    from django.forms.widgets import Media
    ......@@ -28,6 +28,7 @@ import jsonstore
    from cachalot.api import cachalot_disabled
    from cache_memoize import cache_memoize
    from django_celery_results.models import TaskResult
    from django_cte import CTEQuerySet, With
    from dynamic_preferences.models import PerInstancePreferenceModel
    from invitations import signals
    from invitations.adapters import get_invitations_adapter
    ......@@ -299,6 +300,22 @@ class Person(ExtensibleModel):
    user_info_tracker = FieldTracker(fields=("first_name", "last_name", "email"))
    @property
    def member_of_recursive(self) -> QuerySet:
    """Get all groups this person is a member of, recursively."""
    q = self.member_of
    for group in q.all():
    q = q.union(group.parent_groups_recursive)
    return q
    @property
    def owner_of_recursive(self) -> QuerySet:
    """Get all groups this person is a member of, recursively."""
    q = self.owner_of
    for group in q.all():
    q = q.union(group.child_groups_recursive)
    return q
    def save(self, *args, **kwargs):
    # Determine all fields that were changed since last load
    dirty = self.pk is None or bool(self.user_info_tracker.changed())
    ......@@ -486,6 +503,50 @@ class Group(SchoolTermRelatedExtensibleModel):
    return stats
    @property
    def parent_groups_recursive(self) -> CTEQuerySet:
    """Get all parent groups recursively."""
    def _make_cte(cte):
    Through = self.parent_groups.through
    return (
    Through.objects.values("to_group_id")
    .filter(from_group=self)
    .union(cte.join(Through, from_group=cte.col.to_group_id), all=True)
    )
    cte = With.recursive(_make_cte)
    return cte.join(Group, id=cte.col.to_group_id).with_cte(cte)
    @property
    def child_groups_recursive(self) -> CTEQuerySet:
    """Get all child groups recursively."""
    def _make_cte(cte):
    Through = self.child_groups.through
    return (
    Through.objects.values("from_group_id")
    .filter(to_group=self)
    .union(cte.join(Through, to_group=cte.col.from_group_id), all=True)
    )
    cte = With.recursive(_make_cte)
    return cte.join(Group, id=cte.col.from_group_id).with_cte(cte)
    @property
    def members_recursive(self) -> QuerySet:
    """Get all members of this group and its child groups."""
    return Person.objects.filter(
    Q(member_of=self) | Q(member_of__in=self.child_groups_recursive)
    )
    @property
    def owners_recursive(self) -> QuerySet:
    """Get all ownerss of this group and its parent groups."""
    return Person.objects.filter(
    Q(owner_of=self) | Q(owner_of__in=self.parent_groups_recursive)
    )
    def __str__(self) -> str:
    if self.school_term:
    return f"{self.name} ({self.short_name}) ({self.school_term})"
    ......
    import pytest
    from aleksis.core.models import Group, Person
    pytestmark = pytest.mark.django_db
    def test_child_groups_recursive():
    g_1st_grade = Group.objects.create(name="1st grade")
    g_1a = Group.objects.create(name="1a")
    g_1b = Group.objects.create(name="1b")
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2c = Group.objects.create(name="2c")
    g_2nd_grade_french = Group.objects.create(name="2nd grade French")
    g_1a.parent_groups.set([g_1st_grade])
    g_1b.parent_groups.set([g_1st_grade])
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    g_2c.parent_groups.set([g_2nd_grade])
    g_2nd_grade_french.parent_groups.set([g_2b, g_2c])
    assert g_2nd_grade_french in g_2nd_grade.child_groups_recursive
    assert g_2nd_grade_french in g_2b.child_groups_recursive
    assert g_2nd_grade_french in g_2c.child_groups_recursive
    assert g_2nd_grade_french not in g_2a.child_groups_recursive
    assert g_2nd_grade_french not in g_1st_grade.child_groups_recursive
    def test_parent_groups_recursive():
    g_1st_grade = Group.objects.create(name="1st grade")
    g_1a = Group.objects.create(name="1a")
    g_1b = Group.objects.create(name="1b")
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2c = Group.objects.create(name="2c")
    g_2nd_grade_french = Group.objects.create(name="2nd grade French")
    g_1a.parent_groups.set([g_1st_grade])
    g_1b.parent_groups.set([g_1st_grade])
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    g_2c.parent_groups.set([g_2nd_grade])
    g_2nd_grade_french.parent_groups.set([g_2b, g_2c])
    assert g_1st_grade in g_1a.parent_groups_recursive
    assert g_2nd_grade in g_2a.parent_groups_recursive
    assert g_2nd_grade in g_2nd_grade_french.parent_groups_recursive
    assert g_1st_grade not in g_2nd_grade_french.parent_groups_recursive
    def test_members_recursive():
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2c = Group.objects.create(name="2c")
    g_2nd_grade_french = Group.objects.create(name="2nd grade French")
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    g_2c.parent_groups.set([g_2nd_grade])
    g_2nd_grade_french.parent_groups.set([g_2b, g_2c])
    p_2a_1 = Person.objects.create(first_name="A", last_name="B")
    p_2a_2 = Person.objects.create(first_name="A", last_name="B")
    p_2b_1 = Person.objects.create(first_name="A", last_name="B")
    p_2b_2 = Person.objects.create(first_name="A", last_name="B")
    p_2c_1 = Person.objects.create(first_name="A", last_name="B")
    p_2c_2 = Person.objects.create(first_name="A", last_name="B")
    p_french_only = Person.objects.create(first_name="A", last_name="B")
    g_2a.members.set([p_2a_1, p_2a_2])
    g_2b.members.set([p_2b_1, p_2b_2])
    g_2c.members.set([p_2c_1, p_2c_2])
    g_2nd_grade_french.members.set([p_2b_1, p_2c_1, p_french_only])
    assert p_2a_1 in g_2nd_grade.members_recursive
    assert p_2a_2 in g_2nd_grade.members_recursive
    assert p_2b_1 in g_2nd_grade.members_recursive
    assert p_2b_2 in g_2nd_grade.members_recursive
    assert p_2c_1 in g_2nd_grade.members_recursive
    assert p_2c_2 in g_2nd_grade.members_recursive
    assert p_french_only in g_2nd_grade.members_recursive
    assert p_french_only in g_2b.members_recursive
    assert p_french_only in g_2c.members_recursive
    assert p_french_only not in g_2a.members_recursive
    def test_member_of_recursive():
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2c = Group.objects.create(name="2c")
    g_2nd_grade_french = Group.objects.create(name="2nd grade French")
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    g_2c.parent_groups.set([g_2nd_grade])
    g_2nd_grade_french.parent_groups.set([g_2b, g_2c])
    p_2a_1 = Person.objects.create(first_name="A", last_name="B")
    p_2a_2 = Person.objects.create(first_name="A", last_name="B")
    p_2b_1 = Person.objects.create(first_name="A", last_name="B")
    p_2b_2 = Person.objects.create(first_name="A", last_name="B")
    p_2c_1 = Person.objects.create(first_name="A", last_name="B")
    p_2c_2 = Person.objects.create(first_name="A", last_name="B")
    p_french_only = Person.objects.create(first_name="A", last_name="B")
    g_2a.members.set([p_2a_1, p_2a_2])
    g_2b.members.set([p_2b_1, p_2b_2])
    g_2c.members.set([p_2c_1, p_2c_2])
    g_2nd_grade_french.members.set([p_2b_1, p_2c_1, p_french_only])
    assert g_2nd_grade in p_2a_1.member_of_recursive
    assert g_2nd_grade in p_2a_2.member_of_recursive
    assert g_2nd_grade in p_2b_1.member_of_recursive
    assert g_2nd_grade in p_2b_2.member_of_recursive
    assert g_2nd_grade in p_2c_1.member_of_recursive
    assert g_2nd_grade in p_2c_2.member_of_recursive
    assert g_2nd_grade in p_french_only.member_of_recursive
    assert g_2b in p_french_only.member_of_recursive
    assert g_2c in p_french_only.member_of_recursive
    def test_owners_recursive():
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    p_1 = Person.objects.create(first_name="A", last_name="B")
    p_2 = Person.objects.create(first_name="A", last_name="B")
    g_2nd_grade.owners.set([p_1])
    assert p_1 in g_2a.owners_recursive
    assert p_1 in g_2b.owners_recursive
    assert p_2 not in g_2a.owners_recursive
    assert p_2 not in g_2b.owners_recursive
    def test_owner_of_recursive():
    g_2nd_grade = Group.objects.create(name="2nd grade")
    g_2a = Group.objects.create(name="2a")
    g_2b = Group.objects.create(name="2b")
    g_2a.parent_groups.set([g_2nd_grade])
    g_2b.parent_groups.set([g_2nd_grade])
    p_1 = Person.objects.create(first_name="A", last_name="B")
    p_2 = Person.objects.create(first_name="A", last_name="B")
    g_2nd_grade.owners.set([p_1])
    assert g_2a in p_1.owner_of_recursive.all()
    assert g_2b in p_1.owner_of_recursive.all()
    assert g_2a not in p_2.owner_of_recursive.all()
    assert g_2b not in p_2.owner_of_recursive.all()
    ......@@ -116,6 +116,7 @@ django-titofisto = "^0.2.0"
    haystack-redis = "^0.0.1"
    python-gnupg = "^0.4.7"
    sentry-sdk = {version = "^1.4.3", optional = true}
    django-cte = "^1.1.5"
    [tool.poetry.extras]
    ldap = ["django-auth-ldap"]
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment