import warnings
from decimal import Decimal
from typing import Dict, Type, Tuple
from django.conf import settings
from django.db import models, transaction
from django.utils import timezone
from gpp.model.exceptions import InvalidTaskStatus
[docs]def archive_model(queryset, ARCHIVE_MODEL: Type[models.Model], user_id: int, delete_instance: bool):
"""
archive model
:param queryset:
:param ARCHIVE_MODEL:
:param user_id:
:param delete_instance:
:return:
"""
archive_list = []
now = timezone.now()
for instance in chunk_queryset(queryset=queryset, chunk_size=100):
data = {
field.attname: getattr(instance, field.attname)
for field in instance._meta.fields if hasattr(ARCHIVE_MODEL, field.attname)
}
data.update({
'id': None,
'old_pk': instance.pk,
'archive_date': now,
'archive_user_id': user_id,
})
archive_list.append(ARCHIVE_MODEL(**data))
if archive_list:
ARCHIVE_MODEL.objects.bulk_create(archive_list, 100)
if delete_instance:
queryset.delete()
[docs]def restore_model(queryset, SOURCE_MODEL: Type[models.Model], delete_instance: bool):
"""
:param queryset: Queryset
:param SOURCE_MODEL:
:return:
"""
source_objects = []
for instance in chunk_queryset(queryset=queryset, chunk_size=100):
data = {
field.attname: getattr(instance, field.attname)
for field in instance._meta.fields if hasattr(SOURCE_MODEL, field.attname)
}
for attr in ['archive_date', 'archive_user_id']:
data.pop(attr, None)
src_instance = SOURCE_MODEL(**data)
src_instance.id = instance.old_pk
source_objects.append(src_instance)
if source_objects and SOURCE_MODEL.objects.bulk_create(source_objects, 100):
if delete_instance:
queryset.delete()
[docs]def get_model_differs(src: models.Model, dest: models.Model) -> Dict[str, tuple]:
"""
src 기준 dest 모델과의 차이
Args:
src:
dest:
Returns:
"""
def convert_python_value(instance, field_name):
"""
python variable type 으로 변환
Args:
instance:
field_name:
Returns:
"""
value = getattr(instance, field_name)
src_field = instance._meta._forward_fields_map.get(field_name)
if src_field:
return src_field.get_prep_value(value)
return None # pragma: no cover
ret = {}
for field in src._meta.fields:
field_name = field.attname
if field_name in ('id', 'pk', 'created', 'modified'):
continue
src_value = convert_python_value(instance=src, field_name=field_name)
dest_value = None
if hasattr(dest, field_name):
dest_value = convert_python_value(instance=dest, field_name=field_name)
if isinstance(dest_value, Decimal):
src_field = src._meta._forward_fields_map.get(field_name)
if abs(src_value - dest_value) < 0.1 ** src_field.decimal_places:
continue
elif src_value == dest_value:
continue
ret.update({field_name: (src_value, dest_value)})
return ret
[docs]def chunk_queryset(queryset, chunk_size):
last_pk = None
while True:
inner_queryset = queryset.order_by('id')
if last_pk:
inner_queryset = inner_queryset.filter(id__gt=last_pk)
data_list = list(inner_queryset.all()[:chunk_size])
for row in data_list:
yield row
if len(data_list) < chunk_size:
break
last_pk = data_list[-1].id
[docs]def chunk_list(data, chunk_size):
if not isinstance(data, list):
data = list(data)
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
[docs]def truncate_model(MODEL):
# allowed only specified model
allowed_model = getattr(settings, 'TRUNCATE_ALLOWED_LABEL', [])
app_label = MODEL._meta.app_label or ''
model_name = MODEL._meta.model_name or ''
keyword = f'{app_label}.{model_name}'
if keyword in allowed_model:
db_alias = MODEL.objects.db
MODEL.objects.using(db_alias).delete()
warnings.warn(
"truncate model (TRUNCATE `[DB Table]`) is deprecated.",
DeprecationWarning,
stacklevel=2,
)
# if db_alias in connections:
# with connections[db_alias].cursor() as cursor:
# cursor.execute(
# 'TRUNCATE TABLE `{table}`'.format(table=MODEL._meta.db_table)
# )
# return True
return False
[docs]def check_task_status(MODEL: Type[models.Model], pk: int) -> Tuple[models.Model, int]:
"""
MODEL class의 pk를 row-level lock 상태로 load 후,
status 체크.
Args:
MODEL:
pk:
Returns:
Tuple[instance, previous status]
"""
db_alias = MODEL.objects.db
with transaction.atomic(using=db_alias):
version = MODEL.objects.filter(id=pk).select_for_update().first()
old_status = version.task_status
if version.is_processing_task:
raise InvalidTaskStatus(version.CHOICE_TASK_STATUS_QUEUED, version.task_status)
version.set_processing(save=True, update_fields=[])
return version, old_status