import logging from datetime import timedelta, datetime, date from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.utils import timezone from zenpy import Zenpy from zenpy.lib.exception import APIException from zenpy.lib.api_objects import User as ZenpyUser from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL, \ ACTRL_ZENDESK_SUBDOMAIN from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus from main.zendesk_admin import zenpy def update_role(user_profile: UserProfile, role: int) -> None: """ Функция меняет роль пользователя. :param user_profile: Профиль пользователя :param role: Новая роль :return: Пользователь с обновленной ролью """ zendesk = zenpy user = zendesk.get_user(user_profile.user.email) user.custom_role_id = role user_profile.custom_role_id = role user_profile.save() zendesk.admin.users.update(user) def make_engineer(user_profile: UserProfile, who_changes: User) -> None: """ Функция устанавливает пользователю роль инженера. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" """ update_role(user_profile, ROLES['engineer']) def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: """ Функция устанавливает пользователю роль легкого агента. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" """ tickets = get_tickets_list(user_profile.user.email) tickets_to_update = [] buffer_group = zenpy.get_group(ZENDESK_GROUPS['buffer']) for ticket in tickets: UnassignedTicket.objects.create( assignee=user_profile.user, ticket_id=ticket.id, status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED ) if ticket.status == 'solved': ticket.assignee = zenpy.get_user(SOLVED_TICKETS_EMAIL) else: ticket.assignee = None ticket.group = buffer_group tickets_to_update.append(ticket) print(tickets_to_update) zenpy.admin.tickets.update(tickets.values) update_role(user_profile, ROLES['light_agent']) def get_users_list() -> list: """ Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. """ zendesk = zenpy # У пользователей должна быть организация SYSTEM org = next(zendesk.admin.search(type='organization', name='SYSTEM')) users = zendesk.admin.organizations.users(org) return users def get_tickets_list(email): """ Функция возвращает список тикетов пользователя Zendesk """ return zenpy.admin.search(assignee=email, type='ticket') def update_profile(user_profile: UserProfile) -> UserProfile: """ Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. :param user_profile: Профиль пользователя :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя """ user = zenpy.get_user(user_profile.user.email) user_profile.name = user.name user_profile.role = user.role user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 user_profile.image = user.photo['content_url'] if user.photo else None user_profile.save() def check_user_exist(email: str) -> bool: """ Функция проверяет, существует ли пользователь. :param email: Email пользователя :return: Зарегистрирован ли пользователь в Zendesk """ return zenpy.check_user(email) def get_user_organization(email: str) -> str: """ Функция возвращает организацию пользователя. :param email: Email пользователя :return: Организация пользователя """ return zenpy.get_user_org(email) def check_user_auth(email: str, password: str) -> bool: """ Функция проверяет, верны ли входные данные. :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован """ creds = { 'email': email, 'password': password, 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, } try: user = Zenpy(**creds) user.search(email, type='user') except APIException: return False return True def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser): """ Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. :param profile: Профиль пользователя :param zendesk_user: Данные пользователя в Zendesk :return: Обновленный профиль пользователя """ profile.name = zendesk_user.name profile.role = zendesk_user.role profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None if zendesk_user.custom_role_id is not None: profile.custom_role_id = int(zendesk_user.custom_role_id) profile.save() def count_users(users) -> tuple: """ Функция подсчета количества сотрудников с ролями engineer и light_agent """ engineers, light_agents = 0, 0 for user in users: if user.custom_role_id == ROLES['engineer']: engineers += 1 elif user.custom_role_id == ROLES['light_agent']: light_agents += 1 return engineers, light_agents def update_users_in_model(): """ Обновляет пользователей в модели UserProfile по списку пользователей в организации """ users = get_users_list() for user in users: try: profile = User.objects.get(email=user.email).userprofile update_user_in_model(profile, user) except ObjectDoesNotExist: pass return users def daterange(start_date, end_date) -> list: """ Функция возвращает список дней с start_date по end_date, исключая правую границу. :param start_date: Начальная дата :param end_date: Конечная дата :return: Список дней, не включая конечную дату """ dates = [] for n in range(int((end_date - start_date).days)): dates.append(start_date + timedelta(n)) return dates def get_timedelta(log, time=None) -> timedelta: """ Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён. :param log: Лог :param time: Время :return: Сколько времени прошло от начала суток до события """ if time is None: time = log.change_time.time() time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) return time def last_day_of_month(day: int) -> int: """ Функция возвращает последний день текущего месяца. :param day: Текущий день :return: Последний день месяца """ next_month = day.replace(day=28) + timedelta(days=4) return next_month - timedelta(days=next_month.day) class StatisticData: """ Класс для учета статистики интервалов работы пользователей. Передаваемые параметры: start_date, end_date, email, stat. :param display: Формат отображения времени (часы, минуты) :type display: :class:`list` :param interval: Интервал времени в часах и минутах :type interval: :class:`list` :param start_date: Дата начала работы :type start_date: :class:`date` :param end_date: Дата окончания работы :type end_date: :class:`date` :param email: Email пользователя :type email: :class:`str` :param errors: Список ошибок :type errors: :class:`list` :param warnings: Список предупреждений :type warnings: :class:`list` :param data: Ретроспектива смены ролей пользователя :type data: :class:`dict` :param statistic: Интервалы работы пользователя :type statistic: :class:`dict` """ def __init__(self, start_date, end_date, user_email, stat=None): self.display = None self.interval = None self.start_date = start_date self.end_date = end_date self.email = user_email self.errors = list() self.warnings = list() self.data = dict() self.statistic = dict() self._init_data() if stat is None: self._init_statistic() else: self.statistic = stat def get_statistic(self) -> dict: """ Функция возвращает статистику работы пользователя. :return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании. """ if self.is_valid_statistic(): stat = self.statistic stat = self._use_display(stat) stat = self._use_interval(stat) return stat else: return None def is_valid_statistic(self) -> bool: """ Функция проверяет были ли ошибки при создании статистики. :return: True, при отсутствии ошибок """ return not self.errors and self.statistic def set_interval(self, interval: list) -> bool: """ Функция проверяет корректность представления интервала работы. :param interval: Интервал должен быть указан в днях или месяцах. :return: True, если указан верно """ if interval not in ['months', 'days']: self.errors += ['Интервал работы должен быть в днях или месяцах'] return False self.interval = interval return True def set_display(self, display_format: list) -> bool: """ Функция проверяет корректность формата отображения интервала. :param display_format: Формат отображения должен быть указан в днях или месяцах. :return: True, если указан верно """ if display_format not in ['days', 'hours']: self.errors += ['Формат отображения должен быть в часах или днях'] return False self.display = display_format return True def get_data(self) -> list: """ Функция возвращает данные - список объектов RoleChangeLogs. """ if self.is_valid_data(): return self.data else: return None def is_valid_data(self) -> bool: """ Функция определяет были ли ошибки при получении логов. :return: True, если ошибок нет """ return not self.errors def _use_display(self, stat: list) -> list: """ Функция приводит данные к формату отображения. :param stat: Список данных статистики пользователя :return: Обновленный список """ if not self.is_valid_statistic() or not self.display: return stat new_stat = {} for key, item in stat.items(): if self.display == 'hours': new_stat[key] = item / 3600 elif self.display == 'days': new_stat[key] = item / (ONE_DAY * 3600) return new_stat def _use_interval(self, stat: dict) -> dict: """ Функция объединяет ключи и значения в соответствии с интервалом работы. :param stat: Статистика работы пользователя :return: Обновленная статистика """ if not self.is_valid_statistic() or not self.interval: return stat new_stat = {} if self.interval == 'months': # Переделываем ключи под формат('начало_месяца - конец_месяца') for key, value in stat.items(): current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) index = ' - '.join([str(current_month_start), str(current_month_end)]) if new_stat.get(index): new_stat[index] += value else: new_stat[index] = value elif self.interval == 'days': new_stat = stat # статистика изначально в днях return new_stat def check_time(self) -> bool: """ Функция проверяет корректность введенного времени. :return: True, если время указано корректно. Иначе, False """ if self.end_date < self.start_date or self.end_date > datetime.now().date(): return False return True def _init_data(self): """ Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email. :return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку. """ if not self.check_time(): self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] return try: self.data = RoleChangeLogs.objects.filter( change_time__range=[self.start_date, self.end_date + timedelta(days=1)], user=User.objects.get(email=self.email), ).order_by('change_time') except User.DoesNotExist: self.errors += ['Пользователь не найден'] def _init_statistic(self) -> dict: """ Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд. :return: Статистика работы пользователя (statistic) """ self.clear_statistic() if not self.get_data(): self.warnings += ['Не обнаружены изменения роли в данном промежутке'] return None first_log, last_log = self.data[0], self.data[len(self.data) - 1] if first_log.old_role == ROLES['engineer']: self.prev_engineer_logic(first_log) if last_log.new_role == ROLES['engineer']: self.post_engineer_logic(last_log) for log_index in range(len(self.data) - 1): if self.data[log_index].new_role == ROLES['engineer']: self.engineer_logic(log_index) def engineer_logic(self, log_index): """ Функция обрабатывает основную часть работы инженера """ current_log, next_log = self.data[log_index], self.data[log_index + 1] if current_log.change_time.date() != next_log.change_time.date(): self.statistic[current_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(current_log)).total_seconds() self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) else: elapsed_time = next_log.change_time - current_log.change_time self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() def post_engineer_logic(self, last_log): """ Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона """ self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) if last_log.change_time.date() == timezone.now().date(): self.statistic[last_log.change_time.date()] += ( get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) ).total_seconds() else: self.statistic[last_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(last_log)).total_seconds() if self.end_date == timezone.now().date(): self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() def prev_engineer_logic(self, first_log): """ Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона """ self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date), first_log.change_time.date()) self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict: """ Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне). :param first: Начальная дата интервала :param last: Последняя дата интервала :param val: Количество секунд в одном дне """ for day in daterange(first, last): self.statistic[day] = val def clear_statistic(self) -> dict: """ Функция осуществляет обновление всех дней. """ self.statistic.clear() self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0) class DatabaseHandler(logging.Handler): def __init__(self): logging.Handler.__init__(self) def emit(self, record): database = RoleChangeLogs() users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] database.name = user.name database.user = user.user database.changed_by = admin.user if user.custom_role_id == ROLES['engineer']: database.old_role = ROLES['light_agent'] elif user.custom_role_id == ROLES['light_agent']: database.old_role = ROLES['engineer'] database.new_role = user.custom_role_id database.save() class CsvFormatter(logging.Formatter): def __init__(self): logging.Formatter.__init__(self) def format(self, record): users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] msg = '' msg += user.name if user.custom_role_id == ROLES['engineer']: msg += ',engineer,' elif user.custom_role_id == ROLES['light_agent']: msg += ',light_agent,' time = str(timezone.now().today()) msg += time[:16] msg += ',' msg += admin.name return msg def log(user, admin=0): """ Осуществляет запись логов в базу данных и csv файл :param admin: :param user: :return: """ users = [user, admin] logger = logging.getLogger('MY_LOGGER') if not logger.hasHandlers(): dbhandler = DatabaseHandler() csvformatter = CsvFormatter() csvhandler = logging.FileHandler('logs/logs.csv', "a") csvhandler.setFormatter(csvformatter) logger.addHandler(dbhandler) logger.addHandler(csvhandler) logger.setLevel('INFO') logger.info(users)