265 lines
12 KiB
Python
265 lines
12 KiB
Python
"""
|
||
Обработка статистики.
|
||
"""
|
||
from datetime import date, datetime, timedelta
|
||
from typing import Optional
|
||
|
||
from django.contrib.auth import get_user_model
|
||
from django.utils import timezone
|
||
|
||
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
|
||
from main.extra_func import last_day_of_month, get_timedelta, daterange
|
||
from main.models import RoleChangeLogs
|
||
|
||
|
||
class StatisticData:
|
||
"""
|
||
Класс для учета статистики интервалов работы пользователей.
|
||
Передаваемые параметры: start_date, end_date, email, stat.
|
||
|
||
:param display: Формат отображения времени (часы, минуты)
|
||
:type display: :class:`list`
|
||
:param interval: Интервал времени в часах и минутах
|
||
:type interval: :class:`list`
|
||
:param start_date: Дата начала работы
|
||
:type start_date: :class:`date`
|
||
:param end_date: Дата окончания работы
|
||
:type end_date: :class:`date`
|
||
:param email: Email пользователя
|
||
:type email: :class:`str`
|
||
:param errors: Список ошибок
|
||
:type errors: :class:`list`
|
||
:param warnings: Список предупреждений
|
||
:type warnings: :class:`list`
|
||
:param data: Ретроспектива смены ролей пользователя
|
||
:type data: :class:`dict`
|
||
:param statistic: Интервалы работы пользователя
|
||
:type statistic: :class:`dict`
|
||
"""
|
||
|
||
def __init__(self, start_date, end_date, user_email, stat=None):
|
||
self.display = None
|
||
self.interval = None
|
||
self.start_date = start_date
|
||
self.end_date = end_date
|
||
self.email = user_email
|
||
self.errors = list()
|
||
self.warnings = list()
|
||
self.data = dict()
|
||
self.statistic = dict()
|
||
self._init_data()
|
||
if stat is None:
|
||
self._init_statistic()
|
||
else:
|
||
self.statistic = stat
|
||
|
||
def get_statistic(self) -> Optional[dict]:
|
||
"""
|
||
Функция возвращает статистику работы пользователя.
|
||
|
||
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть).
|
||
None, если были ошибки при создании.
|
||
"""
|
||
if self.is_valid_statistic():
|
||
stat = self.statistic
|
||
stat = self._use_display(stat)
|
||
stat = self._use_interval(stat)
|
||
return stat
|
||
return None
|
||
|
||
def is_valid_statistic(self) -> bool:
|
||
"""
|
||
Функция проверяет были ли ошибки при создании статистики.
|
||
|
||
:return: True, при отсутствии ошибок
|
||
"""
|
||
return not self.errors and self.statistic
|
||
|
||
def set_interval(self, interval: list) -> bool:
|
||
"""
|
||
Функция проверяет корректность представления интервала работы.
|
||
|
||
:param interval: Интервал должен быть указан в днях или месяцах.
|
||
:return: True, если указан верно
|
||
"""
|
||
if interval not in ['months', 'days']:
|
||
self.errors += ['Интервал работы должен быть в днях или месяцах']
|
||
return False
|
||
self.interval = interval
|
||
return True
|
||
|
||
def set_display(self, display_format: list) -> bool:
|
||
"""
|
||
Функция проверяет корректность формата отображения интервала.
|
||
|
||
:param display_format: Формат отображения должен быть указан в днях или месяцах.
|
||
:return: True, если указан верно
|
||
"""
|
||
if display_format not in ['days', 'hours']:
|
||
self.errors += ['Формат отображения должен быть в часах или днях']
|
||
return False
|
||
self.display = display_format
|
||
return True
|
||
|
||
def get_data(self) -> Optional[dict]:
|
||
"""
|
||
Функция возвращает данные - список объектов RoleChangeLogs.
|
||
"""
|
||
if self.is_valid_data():
|
||
return self.data
|
||
return None
|
||
|
||
def is_valid_data(self) -> bool:
|
||
"""
|
||
Функция определяет были ли ошибки при получении логов.
|
||
|
||
:return: True, если ошибок нет
|
||
"""
|
||
return not self.errors
|
||
|
||
def _use_display(self, stat: list) -> list:
|
||
"""
|
||
Функция приводит данные к формату отображения.
|
||
|
||
:param stat: Список данных статистики пользователя
|
||
:return: Обновленный список
|
||
"""
|
||
if not self.is_valid_statistic() or not self.display:
|
||
return stat
|
||
new_stat = {}
|
||
for key, item in stat.items():
|
||
if self.display == 'hours':
|
||
new_stat[key] = item / 3600
|
||
elif self.display == 'days':
|
||
new_stat[key] = item / (ONE_DAY * 3600)
|
||
return new_stat
|
||
|
||
def _use_interval(self, stat: dict) -> dict:
|
||
"""
|
||
Функция объединяет ключи и значения в соответствии с интервалом работы.
|
||
|
||
:param stat: Статистика работы пользователя
|
||
:return: Обновленная статистика
|
||
"""
|
||
if not self.is_valid_statistic() or not self.interval:
|
||
return stat
|
||
new_stat = {}
|
||
if self.interval == 'months':
|
||
# Переделываем ключи под формат('начало_месяца - конец_месяца')
|
||
for key, value in stat.items():
|
||
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
|
||
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
|
||
index = ' - '.join([str(current_month_start), str(current_month_end)])
|
||
if new_stat.get(index):
|
||
new_stat[index] += value
|
||
else:
|
||
new_stat[index] = value
|
||
elif self.interval == 'days':
|
||
new_stat = stat # статистика изначально в днях
|
||
return new_stat
|
||
|
||
def check_time(self) -> bool:
|
||
"""
|
||
Функция проверяет корректность введенного времени.
|
||
|
||
:return: True, если время указано корректно. Иначе, False
|
||
"""
|
||
if self.end_date < self.start_date or self.end_date > datetime.now().date():
|
||
return False
|
||
return True
|
||
|
||
def _init_data(self):
|
||
"""
|
||
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
|
||
|
||
:return: Данные о смене статусов пользователя. Если пользователь не найден или
|
||
интервал времени некорректен - ошибку.
|
||
"""
|
||
if not self.check_time():
|
||
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
|
||
return
|
||
try:
|
||
self.data = RoleChangeLogs.objects.filter(
|
||
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
|
||
user=get_user_model().objects.get(email=self.email),
|
||
).order_by('change_time')
|
||
except get_user_model().DoesNotExist:
|
||
self.errors += ['Пользователь не найден']
|
||
|
||
def _init_statistic(self) -> None:
|
||
"""
|
||
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
|
||
|
||
:return: Статистика работы пользователя (statistic)
|
||
"""
|
||
self.clear_statistic()
|
||
if not self.get_data():
|
||
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
|
||
else:
|
||
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
|
||
|
||
if first_log.old_role == ROLES['engineer']:
|
||
self.prev_engineer_logic(first_log)
|
||
|
||
if last_log.new_role == ROLES['engineer']:
|
||
self.post_engineer_logic(last_log)
|
||
|
||
for log_index in range(len(self.data) - 1):
|
||
if self.data[log_index].new_role == ROLES['engineer']:
|
||
self.engineer_logic(log_index)
|
||
|
||
def engineer_logic(self, log_index):
|
||
"""
|
||
Функция обрабатывает основную часть работы инженера
|
||
"""
|
||
current_log, next_log = self.data[log_index], self.data[log_index + 1]
|
||
if current_log.change_time.date() != next_log.change_time.date():
|
||
self.statistic[current_log.change_time.date()] += (
|
||
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
|
||
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
|
||
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
|
||
else:
|
||
elapsed_time = next_log.change_time - current_log.change_time
|
||
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
|
||
|
||
def post_engineer_logic(self, last_log):
|
||
"""
|
||
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
|
||
"""
|
||
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
|
||
if last_log.change_time.date() == timezone.now().date():
|
||
self.statistic[last_log.change_time.date()] += (
|
||
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
|
||
).total_seconds()
|
||
else:
|
||
self.statistic[last_log.change_time.date()] += (
|
||
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
|
||
if self.end_date == timezone.now().date():
|
||
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
|
||
|
||
def prev_engineer_logic(self, first_log):
|
||
"""
|
||
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
|
||
"""
|
||
self.fill_daterange(max(get_user_model().objects.get(email=self.email).date_joined.date(), self.start_date),
|
||
first_log.change_time.date())
|
||
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
|
||
|
||
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
|
||
"""
|
||
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
|
||
|
||
:param first: Начальная дата интервала
|
||
:param last: Последняя дата интервала
|
||
:param val: Количество секунд в одном дне
|
||
"""
|
||
for day in daterange(first, last):
|
||
self.statistic[day] = val
|
||
|
||
def clear_statistic(self) -> dict:
|
||
"""
|
||
Функция осуществляет обновление всех дней.
|
||
"""
|
||
self.statistic.clear()
|
||
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
|