""" Вспомогательные функции со списками пользователей, статистикой и т.д. """ import logging from datetime import timedelta, datetime, date from typing import Optional, Union from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.core.handlers.wsgi import WSGIRequest from django.http import HttpResponseRedirect, HttpResponsePermanentRedirect from django.shortcuts import redirect from django.utils import timezone from zenpy import Zenpy from zenpy.lib.exception import APIException from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket from zenpy.lib.generator import SearchResultGenerator from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus from main.zendesk_admin import zenpy def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None: """ Функция меняет роль пользователя. :param user_profile: Профиль пользователя :param role: Новая роль :param who_changes: Пользователь, меняющий роль :return: Пользователь с обновленной ролью """ zendesk = zenpy user = zendesk.get_user(user_profile.user.email) user.custom_role_id = role user_profile.custom_role_id = role user_profile.save() log(user_profile, who_changes.userprofile) zendesk.admin.users.update(user) def make_engineer(user_profile: UserProfile, who_changes: User) -> None: """ Функция устанавливает пользователю роль инженера. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" """ update_role(user_profile, ROLES['engineer'], who_changes) def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: """ Функция устанавливает пользователю роль легкого агента. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" """ tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email) ticket: ZenpyTicket for ticket in tickets: UnassignedTicket.objects.create( assignee=user_profile.user, ticket_id=ticket.id, status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED ) if ticket.status == 'solved': ticket.assignee_id = zenpy.solved_tickets_user_id else: ticket.assignee = None ticket.group_id = zenpy.buffer_group_id if tickets.count: zenpy.admin.tickets.update(tickets.values) attempts, success = 5, False while not success and attempts != 0: try: update_role(user_profile, ROLES['light_agent'], who_changes) success = True except APIException as e: attempts -= 1 if attempts == 0: raise e def get_users_list() -> list: """ Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. """ zendesk = zenpy # У пользователей должна быть организация SYSTEM org = next(zendesk.admin.search(type='organization', name='SYSTEM')) users = zendesk.admin.organizations.users(org) return users def get_tickets_list(email) -> list: """ Функция возвращает список тикетов пользователя Zendesk """ return zenpy.admin.search(assignee=email, type='ticket') def update_profile(user_profile: UserProfile) -> None: """ Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. :param user_profile: Профиль пользователя :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя """ user = zenpy.get_user(user_profile.user.email) user_profile.name = user.name user_profile.role = user.role user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 user_profile.image = user.photo['content_url'] if user.photo else None user_profile.save() def check_user_exist(email: str) -> bool: """ Функция проверяет, существует ли пользователь. :param email: Email пользователя :return: Зарегистрирован ли пользователь в Zendesk """ return zenpy.check_user(email) def get_user_organization(email: str) -> str: """ Функция возвращает организацию пользователя. :param email: Email пользователя :return: Организация пользователя """ return zenpy.get_user_org(email) def check_user_auth(email: str, password: str) -> bool: """ Функция проверяет, верны ли входные данные. :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован """ creds = { 'email': email, 'password': password, 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, } try: user = Zenpy(**creds) user.search(email, type='user') except APIException: return False return True def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser) -> None: """ Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. :param profile: Профиль пользователя :param zendesk_user: Данные пользователя в Zendesk :return: Обновленный профиль пользователя """ profile.name = zendesk_user.name profile.role = zendesk_user.role profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None if zendesk_user.custom_role_id is not None: profile.custom_role_id = int(zendesk_user.custom_role_id) profile.save() def count_users(users: list) -> tuple: """ Функция подсчета количества сотрудников с ролями engineer и light_agent. """ engineers, light_agents = 0, 0 for user in users: if user.custom_role_id == ROLES['engineer']: engineers += 1 elif user.custom_role_id == ROLES['light_agent']: light_agents += 1 return engineers, light_agents def update_users_in_model(): """ Обновляет пользователей в модели UserProfile по списку пользователей в организации """ users = get_users_list() for user in users: try: profile = User.objects.get(email=user.email).userprofile update_user_in_model(profile, user) except ObjectDoesNotExist: pass return users def daterange(start_date: timedelta, end_date: timedelta) -> list: """ Функция возвращает список дней с start_date по end_date, исключая правую границу. :param start_date: Начальная дата :param end_date: Конечная дата :return: Список дней, не включая конечную дату """ dates = [] for n in range(int((end_date - start_date).days)): dates.append(start_date + timedelta(n)) return dates def get_timedelta(log: RoleChangeLogs, time: timedelta = None) -> timedelta: """ Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён. :param log: Лог :param time: Время :return: Сколько времени прошло от начала суток до события """ if time is None: time = log.change_time.time() time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) return time def last_day_of_month(day: int) -> int: """ Функция возвращает последний день текущего месяца. :param day: Текущий день :return: Последний день месяца """ next_month = day.replace(day=28) + timedelta(days=4) return next_month - timedelta(days=next_month.day) class StatisticData: """ Класс для учета статистики интервалов работы пользователей. Передаваемые параметры: start_date, end_date, email, stat. :param display: Формат отображения времени (часы, минуты) :type display: :class:`list` :param interval: Интервал времени в часах и минутах :type interval: :class:`list` :param start_date: Дата начала работы :type start_date: :class:`date` :param end_date: Дата окончания работы :type end_date: :class:`date` :param email: Email пользователя :type email: :class:`str` :param errors: Список ошибок :type errors: :class:`list` :param warnings: Список предупреждений :type warnings: :class:`list` :param data: Ретроспектива смены ролей пользователя :type data: :class:`dict` :param statistic: Интервалы работы пользователя :type statistic: :class:`dict` """ def __init__(self, start_date, end_date, user_email, stat=None): self.display = None self.interval = None self.start_date = start_date self.end_date = end_date self.email = user_email self.errors = list() self.warnings = list() self.data = dict() self.statistic = dict() self._init_data() if stat is None: self._init_statistic() else: self.statistic = stat def get_statistic(self) -> Optional[dict]: """ Функция возвращает статистику работы пользователя. :return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании. """ if self.is_valid_statistic(): stat = self.statistic stat = self._use_display(stat) stat = self._use_interval(stat) return stat else: return None def is_valid_statistic(self) -> bool: """ Функция проверяет были ли ошибки при создании статистики. :return: True, при отсутствии ошибок """ return not self.errors and self.statistic def set_interval(self, interval: list) -> bool: """ Функция проверяет корректность представления интервала работы. :param interval: Интервал должен быть указан в днях или месяцах. :return: True, если указан верно """ if interval not in ['months', 'days']: self.errors += ['Интервал работы должен быть в днях или месяцах'] return False self.interval = interval return True def set_display(self, display_format: list) -> bool: """ Функция проверяет корректность формата отображения интервала. :param display_format: Формат отображения должен быть указан в днях или месяцах. :return: True, если указан верно """ if display_format not in ['days', 'hours']: self.errors += ['Формат отображения должен быть в часах или днях'] return False self.display = display_format return True def get_data(self) -> Optional[dict]: """ Функция возвращает данные - список объектов RoleChangeLogs. """ if self.is_valid_data(): return self.data else: return None def is_valid_data(self) -> bool: """ Функция определяет были ли ошибки при получении логов. :return: True, если ошибок нет """ return not self.errors def _use_display(self, stat: list) -> list: """ Функция приводит данные к формату отображения. :param stat: Список данных статистики пользователя :return: Обновленный список """ if not self.is_valid_statistic() or not self.display: return stat new_stat = {} for key, item in stat.items(): if self.display == 'hours': new_stat[key] = item / 3600 elif self.display == 'days': new_stat[key] = item / (ONE_DAY * 3600) return new_stat def _use_interval(self, stat: dict) -> dict: """ Функция объединяет ключи и значения в соответствии с интервалом работы. :param stat: Статистика работы пользователя :return: Обновленная статистика """ if not self.is_valid_statistic() or not self.interval: return stat new_stat = {} if self.interval == 'months': # Переделываем ключи под формат('начало_месяца - конец_месяца') for key, value in stat.items(): current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) index = ' - '.join([str(current_month_start), str(current_month_end)]) if new_stat.get(index): new_stat[index] += value else: new_stat[index] = value elif self.interval == 'days': new_stat = stat # статистика изначально в днях return new_stat def check_time(self) -> bool: """ Функция проверяет корректность введенного времени. :return: True, если время указано корректно. Иначе, False """ if self.end_date < self.start_date or self.end_date > datetime.now().date(): return False return True def _init_data(self) -> None: """ Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email. :return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку. """ if not self.check_time(): self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] return try: self.data = RoleChangeLogs.objects.filter( change_time__range=[self.start_date, self.end_date + timedelta(days=1)], user=User.objects.get(email=self.email), ).order_by('change_time') except User.DoesNotExist: self.errors += ['Пользователь не найден'] def _init_statistic(self) -> None: """ Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд. :return: Статистика работы пользователя (statistic) """ self.clear_statistic() if not self.get_data(): self.warnings += ['Не обнаружены изменения роли в данном промежутке'] return first_log, last_log = self.data[0], self.data[len(self.data) - 1] if first_log.old_role == ROLES['engineer']: self.prev_engineer_logic(first_log) if last_log.new_role == ROLES['engineer']: self.post_engineer_logic(last_log) for log_index in range(len(self.data) - 1): if self.data[log_index].new_role == ROLES['engineer']: self.engineer_logic(log_index) def engineer_logic(self, log_index): """ Функция обрабатывает основную часть работы инженера. :param log_index: Индекс текущего лога """ current_log, next_log = self.data[log_index], self.data[log_index + 1] if current_log.change_time.date() != next_log.change_time.date(): self.statistic[current_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(current_log)).total_seconds() self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) else: elapsed_time = next_log.change_time - current_log.change_time self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() def post_engineer_logic(self, last_log: RoleChangeLogs) -> None: """ Функция обрабатывает случай, когда нам известно что инженер работал и после диапазона. :param last_log: Последний лог """ self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) if last_log.change_time.date() == timezone.now().date(): self.statistic[last_log.change_time.date()] += ( get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) ).total_seconds() else: self.statistic[last_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(last_log)).total_seconds() if self.end_date == timezone.now().date(): self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() def prev_engineer_logic(self, first_log: RoleChangeLogs) -> None: """ Функция обрабатывает случай, когда нам извеcтно, что инженер начал работу до диапазона. :param first_log: Первый лог """ self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date), first_log.change_time.date()) self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict: """ Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне). :param first: Начальная дата интервала :param last: Последняя дата интервала :param val: Количество секунд в одном дне """ for day in daterange(first, last): self.statistic[day] = val def clear_statistic(self) -> None: """ Функция осуществляет обновление всех дней. """ self.statistic.clear() self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0) class DatabaseHandler(logging.Handler): """ Класс записи изменений ролей в базу данных. """ def __init__(self): logging.Handler.__init__(self) def emit(self, record: logging.LogRecord) -> None: """ Функция осуществляет запись об изменении роли пользователя. :param record: Запись в сущность main.rolchangelogs """ database = RoleChangeLogs() users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] database.name = user.name database.user = user.user database.changed_by = admin.user if user.custom_role_id == ROLES['engineer']: database.old_role = ROLES['light_agent'] elif user.custom_role_id == ROLES['light_agent']: database.old_role = ROLES['engineer'] database.new_role = user.custom_role_id database.save() class CsvFormatter(logging.Formatter): """ Класс преобразования смены ролей пользователей в строковый формат. """ def __init__(self): logging.Formatter.__init__(self) def format(self, record: logging.LogRecord) -> str: """ Функция форматирует запись смены роли пользователя в строку. :param record: Запись смены роли пользователя. :return: Строка с записью смены пользователя. """ users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] msg = '' msg += user.name if user.custom_role_id == ROLES['engineer']: msg += ',engineer,' elif user.custom_role_id == ROLES['light_agent']: msg += ',light_agent,' time = str(timezone.now().today()) msg += time[:16] msg += ',' msg += admin.name return msg def log(user: User, admin: int = 0) -> None: """ Функция осуществляет запись логов в базу данных и csv файл. :param admin: Админ, который меняет роль :param user: Пользователь, которому изменена роль """ users = [user, admin] logger = logging.getLogger('MY_LOGGER') if not logger.hasHandlers(): dbhandler = DatabaseHandler() csvformatter = CsvFormatter() csvhandler = logging.FileHandler('logs/logs.csv', "a") csvhandler.setFormatter(csvformatter) logger.addHandler(dbhandler) logger.addHandler(csvhandler) logger.setLevel('INFO') logger.info(users) def set_session_params_for_work_page(request: WSGIRequest, count: int = None, is_confirm: bool = True) -> \ Union[HttpResponsePermanentRedirect, HttpResponseRedirect]: """ Функция для страницы получения прав, устанавливает данные сессии о успешности запроса и количестве назначенных тикетов. :param request: Получение данных с рабочей страницы пользователя :param count: Количество запрошенных тикетов :param is_confirm: Назначение тикетов :return: Перезагрузка страницы "Управление правами" соответствующего пользователя """ request.session['is_confirm'] = is_confirm request.session['count_tickets'] = count return redirect('work', request.user.id)