Python 으로 DB 다루는 법: 장고 ORM

RDBMS 쿼리문을 Python으로 작성해보자!

SoniaComp
10 min readMar 12, 2021

출처

STEP 1

  1. ORM → 실제 query set 확인: query
  2. or 조건, and 조건 ( Q 객체 )
SELECT first_name, last_name FROM auth_user WHERE first_name LIKE 'R%' OR last_name LIKE 'D%';
SELECT first_name, last_name FROM auth_user WHERE first_name LIKE 'R%' AND last_name LIKE 'D%';
queryset = User.objects.filter(
first_name__startswith='R'
) | User.objects.filter(
last_name__startswith='D'
)
queryset_1 = User.objects.filter(
first_name__startswith='R',
last_name__startswith='D'
)

3. NOT 연산 ( Q 객체 )

SELECT id, username FROM auth_user WHERE NOT id < 5;queryset = User.objects.exclude(id__lt=5)

4. Union 연산

합 하려는 쿼리 셋의 모델이 서로 다른 경우, 각 쿼리 셋에 포함된 필드와 데이터 유형이 서로 맞아야 합니다.

Hero.objects.all().values_list(
"name", "gender"
).union(
Villain.objects.all().values_list(
"name", "gender"
))

5. 필요한 열만 조회: values, values_list, only

User.objects.filter(
first_name__startswith='R'
).values('first_name', 'last_name')
User.objects.filter(
first_name__startswith='R'
).only("first_name", "last_name")

6. 서브 쿼리식(질의문 내 하위 질의)

from django.db.models import Subquery
users = User.objects.all()
UserParent.objects.filter(user_id__in=Subquery(users.values('id')))
hero_qs = Hero.objects.filter(
category=OuterRef("pk")
).order_by("-benevolence_factor")
Category.objects.all().annotate(
most_benevolent_hero=Subquery(
hero_qs.values('name')[:1]
)
)

7. 필드의 값을 서로 비교할 때 → F 객체

User.objects.filter(last_name=F("first_name"))User.objects.annotate(first=Substr("first_name", 1, 1)
last=Substr("last_name", 1, 1)).filter(first=F("last"))

8. File Field 들어있지 않은 행 확인

no_files_objects = MyModel.objects.filter(
Q(file='')|Q(file=None)
)

9. 결합 연산

a1 = Article.objects.select_related('reporter')

10. N번째로 큰 항목 → 인덱싱 연산

  • 첫 번째 항목: first ( )
  • 마지막 항목: last( )
user = User.objects.order_by('-last_login')[1]

장고 ORM은 데이터베이스에서 전체 데이터를 가져온 뒤 인덱싱하는 것이 아니라, LIMIT … OFFSET SQL 구문을 이용해 필요한 데이터만 읽어 옵니다.

11. 특정 열의 값이 동일한 항목

duplicates = User.objects.values(
'first_name'
).annotate(name_count=Count('first_name')).filter(name_count__gt=1)
records = User.objects.filter(first_name__in=[item['first_name'] for item in duplicates])

12. 쿼리셋에서 고유한 필드 값을 가진 항목 ( 다른 row 와 겹치지 않음 )

distinct = User.objects.values(
'first_name'
).annotate(
name_count=Count('first_name')
).filter(name_count=1)
records = User.objects.filter(first_name__in=[item['first_name'] for item in distinct])

13. Q 객체: SQL 질의문의 WHERE 절에 해당하는 기능을 온전히 활용할 수 있다.

from django.db.models import Q
User.objects.filter(
Q(first_name__startswith='R') | Q(last_name__startswith='D')
)
User.objects.filter(
Q(first_name__startswith='R') & ~Q(last_name__startswith='D')
)

14. 집계값

from django.db.models import Avg, Max, Min, Sum, Count
>>> User.objects.all().aggregate(Avg('id'))
>>> User.objects.all().aggregate(Max('id'))
>>> User.objects.all().aggregate(Min('id'))
>>> User.objects.all().aggregate(Sum('id'))

15. 무작위 항목

class Category(models.Model):
name = models.CharField(max_length=100)
class Meta:
verbose_name_plural = "Categories"
def __str__(self):
return self.name

# 사용하는 데이터베이스 시스템에 따라 order_by('?') 의 실행 비용이 비싸고 성능이 느릴 수 있습니다.
def get_random():
return Category.objects.order_by("?").first()

# 난수
def get_random2():
max_id = Category.objects.all().aggregate(max_id=Max("id"))['max_id']
pk = random.randint(1, max_id)
return Category.objects.get(pk=pk)
# 중간에 삭제한 데이터가 있을 경우
def get_random3():
max_id = Category.objects.all().aggregate(max_id=Max("id"))['max_id']
while True:
pk = random.randint(1, max_id)
category = Category.objects.filter(pk=pk).first()
if category:
return category
# get_random3 의 방법은 장고의 기본 ID 생성 방식(auto increment, 자동 증가)을 재정의한 경우나 삭제된 항목이 너무 많을 때에는 사용하기가 어려울 수 있습니다.

16. 장고가 지원하지 않는 데이터베이스 함수

PostgreSQL에는 fuzzystrmatch 확장 기능이 있습니다. 이 확장에는 텍스트 데이터의 유사도를 측정하기 위한 함수가 여러 가지 포함되어 있습니다.

from django.db.models import Func, F
Hero.objects.annotate(like_zeus=Func(F('name'), function='levenshtein', template="%(function)s(%(expressions)s, 'Zeus')"))
# 클래스를 확장하여 정의
class LevenshteinLikeZeus(Func):
function='levenshtein'
template="%(function)s(%(expressions)s, 'Zeus')"
# Hero.objects.annotate(
like_zeus=LevenshteinLikeZeus(F("name"))
).filter(
like_zeus__lt=2
)

STEP 2

  1. 여러 개의 행을 한번에 생성하는 방법: bulk_create
>>> Category.objects.bulk_create(
[Category(name="God"),
Category(name="Demi God"),
Category(name="Mortal")]
)
[<Category: God>, <Category: Demi God>, <Category: Mortal>]

2. 기존에 저장된 행을 복사해 새로 저장하는 방법

모델 인스턴스를 저장할 때, pk 필드 값=None 으로 지정되어 있으면 데이터베이스에 새 행으로 저장됩니다.

hero = Hero.objects.first()
hero.pk = None
hero.save()

3. 특정 모델의 항목이 하나만 생성되도록 강제: singleton

class Origin(models.Model):
name = models.CharField(max_length=100)
def save(self, *args, **kwargs):
if self.__class__.objects.count():
self.pk = self.__class__.objects.first().pk
super().save(*args, **kwargs)
// save 메서드를 재정의하여 pk 필드를 이미 존재하는 값으로 지정하도록 강제
// create 메서드를 호출하는 경우 IntegrityError 예외 발생

더 알아볼 것

  • annotate
  • values
  • OuterRef

--

--

SoniaComp

Data Engineer interested in Data Infrastructure Powering Fintech Innovation (https://www.linkedin.com/in/sonia-comp/)