import logging import os from datetime import timedelta, datetime, date from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.utils import timezone from zenpy import Zenpy from zenpy.lib.exception import APIException from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus class ZendeskAdmin: """ Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора. :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) :type credentials: :class:`dict` :param email: Email администратора, указанный в env :type email: :class:`str` :param token: Токен администратора (формируется в Zendesk, указывается в env) :type token: :class:`str` :param password: Пароль администратора, указанный в env :type password: :class:`str` """ credentials: dict = { 'subdomain': 'ngenix1612197338' } email: str = os.getenv('ACCESS_CONTROLLER_API_EMAIL') token: str = os.getenv('ACCESS_CONTROLLER_API_TOKEN') password: str = os.getenv('ACCESS_CONTROLLER_API_PASSWORD') def __init__(self): self.create_admin() def check_user(self, email: str) -> bool: """ Функция осуществляет проверку существования пользователя в Zendesk по email. :param email: Email пользователя :return: Является ли зарегистрированным """ return True if self.admin.search(email, type='user') else False def get_user_name(self, email: str) -> str: """ Функция **get_user_name** возвращает имя пользователя по его email """ user = self.admin.users.search(email).values[0] return user.name def get_user_role(self, email: str) -> str: """ Функция возвращает роль пользователя по его email. :param email: Email пользователя :return: Роль пользователя """ user = self.admin.users.search(email).values[0] return user.role def get_user_id(self, email: str) -> str: """ Функция возвращает id пользователя по его email :param email: Email пользователя :return: ID пользователя """ user = self.admin.users.search(email).values[0] return user.id def get_user_image(self, email: str) -> str: """ Функция возвращает url-ссылку на аватар пользователя по его email. :param email: Email пользователя :return: Аватар пользователя """ user = self.admin.users.search(email).values[0] return user.photo['content_url'] if user.photo else None def get_user(self, email: str): """ Функция возвращает пользователя (объект) по его email. :param email: Email пользователя :return: Объект пользователя, найденного в БД """ return self.admin.users.search(email).values[0] def get_group(self, name: str) -> str: """ Функция возвращает группы, к которым принадлежит пользователь. :param name: Имя пользователя :return: Группы пользователя (в случае отсутствия None) """ groups = self.admin.search(name) for group in groups: return group return None def get_user_org(self, email: str) -> str: """ Функция возвращает организацию, к которой относится пользователь по его email. :param email: Email пользователя :return: Организация пользователя """ user = self.admin.users.search(email).values[0] return user.organization.name if user.organization else None def create_admin(self) -> Zenpy: """ Функция создает администратора, проверяя наличие вводимых данных в env. :param credentials: В список полномочий администратора вносятся email, token, password из env :type credentials: :class:`dict` :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk """ if self.email is None: raise ValueError('access_controller email not in env') self.credentials['email'] = self.email if self.token: self.credentials['token'] = self.token elif self.password: self.credentials['password'] = self.password else: raise ValueError('access_controller token or password not in env') self.admin = Zenpy(**self.credentials) try: self.admin.search(self.email, type='user') except APIException: raise ValueError('invalid access_controller`s login data') def update_role(user_profile: UserProfile, role: int) -> UserProfile: """ Функция меняет роль пользователя. :param user_profile: Профиль пользователя :param role: Новая роль :return: Пользователь с обновленной ролью """ zendesk = ZendeskAdmin() user = zendesk.get_user(user_profile.user.email) user.custom_role_id = role user_profile.custom_role_id = role user_profile.save() zendesk.admin.users.update(user) def make_engineer(user_profile: UserProfile, who_changes: User) -> UserProfile: """ Функция устанавливает пользователю роль инженера. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" """ update_role(user_profile, ROLES['engineer']) def make_light_agent(user_profile: UserProfile, who_changes: User) -> UserProfile: """ Функция устанавливает пользователю роль легкого агента. :param user_profile: Профиль пользователя :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" """ tickets = get_tickets_list(user_profile.user.email) for ticket in tickets: UnassignedTicket.objects.create( assignee=user_profile.user, ticket_id=ticket.id, status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED ) if ticket.status == 'solved': ticket.assignee = ZendeskAdmin().get_user(SOLVED_TICKETS_EMAIL) else: ticket.assignee = None ticket.group = ZendeskAdmin().get_group(ZENDESK_GROUPS['buffer']) ZendeskAdmin().admin.tickets.update(ticket) update_role(user_profile, ROLES['light_agent']) def get_users_list() -> list: """ Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. """ zendesk = ZendeskAdmin() # У пользователей должна быть организация SYSTEM org = next(zendesk.admin.search(type='organization', name='SYSTEM')) users = zendesk.admin.organizations.users(org) return users def get_tickets_list(email): """ Функция возвращает список тикетов пользователя Zendesk """ return ZendeskAdmin().admin.search(assignee=email, type='ticket') def update_profile(user_profile: UserProfile) -> UserProfile: """ Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. :param user_profile: Профиль пользователя :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя """ user = ZendeskAdmin().get_user(user_profile.user.email) user_profile.name = user.name user_profile.role = user.role user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 user_profile.image = user.photo['content_url'] if user.photo else None user_profile.save() def check_user_exist(email: str) -> bool: """ Функция проверяет, существует ли пользователь. :param email: Email пользователя :return: Зарегистрирован ли пользователь в Zendesk """ return ZendeskAdmin().check_user(email) def get_user_organization(email: str) -> str: """ Функция возвращает организацию пользователя. :param email: Email пользователя :return: Организация пользователя """ return ZendeskAdmin().get_user_org(email) def check_user_auth(email: str, password: str) -> bool: """ Функция проверяет, верны ли входные данные. :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован """ creds = { 'email': email, 'password': password, 'subdomain': 'ngenix1612197338', } try: user = Zenpy(**creds) user.search(email, type='user') except APIException: return False return True def update_user_in_model(profile: UserProfile, zendesk_user: User): """ Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. :param profile: Профиль пользователя :param zendesk_user: Данные пользователя в Zendesk :return: Обновленный профиль пользователя """ profile.name = zendesk_user.name profile.role = zendesk_user.role profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None if zendesk_user.custom_role_id is not None: profile.custom_role_id = int(zendesk_user.custom_role_id) profile.save() def count_users(users) -> tuple: """ Функция подсчета количества сотрудников с ролями engineer и light_agent """ engineers, light_agents = 0, 0 for user in users: if user.custom_role_id == ROLES['engineer']: engineers += 1 elif user.custom_role_id == ROLES['light_agent']: light_agents += 1 return engineers, light_agents def update_users_in_model(): """ Обновляет пользователей в модели UserProfile по списку пользователей в организации """ users = get_users_list() for user in users: try: profile = User.objects.get(email=user.email).userprofile update_user_in_model(profile, user) except ObjectDoesNotExist: pass return users def daterange(start_date, end_date) -> list: """ Функция возвращает список дней с start_date по end_date, исключая правую границу. :param start_date: Начальная дата :param end_date: Конечная дата :return: Список дней, не включая конечную дату """ dates = [] for n in range(int((end_date - start_date).days)): dates.append(start_date + timedelta(n)) return dates def get_timedelta(log, time=None) -> timedelta: """ Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён. :param log: Лог :param time: Время :return: Сколько времени прошло от начала суток до события """ if time is None: time = log.change_time.time() time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) return time def last_day_of_month(day: int) -> int: """ Функция возвращает последний день текущего месяца. :param day: Текущий день :return: Последний день месяца """ next_month = day.replace(day=28) + timedelta(days=4) return next_month - timedelta(days=next_month.day) class StatisticData: """ Класс для учета статистики интервалов работы пользователей. Передаваемые параметры: start_date, end_date, email, stat. :param display: Формат отображения времени (часы, минуты) :type display: :class:`list` :param interval: Интервал времени в часах и минутах :type interval: :class:`list` :param start_date: Дата начала работы :type start_date: :class:`date` :param end_date: Дата окончания работы :type end_date: :class:`date` :param email: Email пользователя :type email: :class:`str` :param errors: Список ошибок :type errors: :class:`list` :param warnings: Список предупреждений :type warnings: :class:`list` :param data: Ретроспектива смены ролей пользователя :type data: :class:`dict` :param statistic: Интервалы работы пользователя :type statistic: :class:`dict` """ def __init__(self, start_date, end_date, user_email, stat=None): self.display = None self.interval = None self.start_date = start_date self.end_date = end_date self.email = user_email self.errors = list() self.warnings = list() self.data = dict() self.statistic = dict() self._init_data() if stat is None: self._init_statistic() else: self.statistic = stat def get_statistic(self) -> dict: """ Функция возвращает статистику работы пользователя. :return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании. """ if self.is_valid_statistic(): stat = self.statistic stat = self._use_display(stat) stat = self._use_interval(stat) return stat else: return None def is_valid_statistic(self) -> bool: """ Функция проверяет были ли ошибки при создании статистики. :return: True, при отсутствии ошибок """ return not self.errors and self.statistic def set_interval(self, interval: list) -> bool: """ Функция проверяет корректность представления интервала работы. :param interval: Интервал должен быть указан в днях или месяцах. :return: True, если указан верно """ if interval not in ['months', 'days']: self.errors += ['Интервал работы должен быть в днях или месяцах'] return False self.interval = interval return True def set_display(self, display_format: list) -> bool: """ Функция проверяет корректность формата отображения интервала. :param display_format: Формат отображения должен быть указан в днях или месяцах. :return: True, если указан верно """ if display_format not in ['days', 'hours']: self.errors += ['Формат отображения должен быть в часах или днях'] return False self.display = display_format return True def get_data(self) -> list: """ Функция возвращает данные - список объектов RoleChangeLogs. """ if self.is_valid_data(): return self.data else: return None def is_valid_data(self) -> bool: """ Функция определяет были ли ошибки при получении логов. :return: True, если ошибок нет """ return not self.errors def _use_display(self, stat: list) -> list: """ Функция приводит данные к формату отображения. :param stat: Список данных статистики пользователя :return: Обновленный список """ if not self.is_valid_statistic() or not self.display: return stat new_stat = {} for key, item in stat.items(): if self.display == 'hours': new_stat[key] = item / 3600 elif self.display == 'days': new_stat[key] = item / (ONE_DAY * 3600) return new_stat def _use_interval(self, stat: dict) -> dict: """ Функция объединяет ключи и значения в соответствии с интервалом работы. :param stat: Статистика работы пользователя :return: Обновленная статистика """ if not self.is_valid_statistic() or not self.interval: return stat new_stat = {} if self.interval == 'months': # Переделываем ключи под формат('начало_месяца - конец_месяца') for key, value in stat.items(): current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) index = ' - '.join([str(current_month_start), str(current_month_end)]) if new_stat.get(index): new_stat[index] += value else: new_stat[index] = value elif self.interval == 'days': new_stat = stat # статистика изначально в днях return new_stat def check_time(self) -> bool: """ Функция проверяет корректность введенного времени. :return: True, если время указано корректно. Иначе, False """ if self.end_date < self.start_date or self.end_date > datetime.now().date(): return False return True def _init_data(self): """ Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email. :return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку. """ if not self.check_time(): self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] return try: self.data = RoleChangeLogs.objects.filter( change_time__range=[self.start_date, self.end_date + timedelta(days=1)], user=User.objects.get(email=self.email), ).order_by('change_time') except User.DoesNotExist: self.errors += ['Пользователь не найден'] def _init_statistic(self) -> dict: """ Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд. :return: Статистика работы пользователя (statistic) """ self.clear_statistic() if not self.get_data(): self.warnings += ['Не обнаружены изменения роли в данном промежутке'] return None first_log, last_log = self.data[0], self.data[len(self.data) - 1] if first_log.old_role == ROLES['engineer']: self.prev_engineer_logic(first_log) if last_log.new_role == ROLES['engineer']: self.post_engineer_logic(last_log) for log_index in range(len(self.data) - 1): if self.data[log_index].new_role == ROLES['engineer']: self.engineer_logic(log_index) def engineer_logic(self, log_index): """ Функция обрабатывает основную часть работы инженера """ current_log, next_log = self.data[log_index], self.data[log_index + 1] if current_log.change_time.date() != next_log.change_time.date(): self.statistic[current_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(current_log)).total_seconds() self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) else: elapsed_time = next_log.change_time - current_log.change_time self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() def post_engineer_logic(self, last_log): """ Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона """ self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) if last_log.change_time.date() == timezone.now().date(): self.statistic[last_log.change_time.date()] += ( get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) ).total_seconds() else: self.statistic[last_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(last_log)).total_seconds() if self.end_date == timezone.now().date(): self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() def prev_engineer_logic(self, first_log): """ Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона """ self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date), first_log.change_time.date()) self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict: """ Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне). :param first: Начальная дата интервала :param last: Последняя дата интервала :param val: Количество секунд в одном дне """ for day in daterange(first, last): self.statistic[day] = val def clear_statistic(self) -> dict: """ Функция осуществляет обновление всех дней. """ self.statistic.clear() self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0) class DatabaseHandler(logging.Handler): def __init__(self): logging.Handler.__init__(self) def emit(self, record): database = RoleChangeLogs() users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] database.name = user.name database.user = user.user database.changed_by = admin.user if user.custom_role_id == ROLES['engineer']: database.old_role = ROLES['light_agent'] elif user.custom_role_id == ROLES['light_agent']: database.old_role = ROLES['engineer'] database.new_role = user.custom_role_id database.save() class CsvFormatter(logging.Formatter): def __init__(self): logging.Formatter.__init__(self) def format(self, record): users = record.msg if users[1]: user = users[0] admin = users[1] elif not users[1]: user = users[0] admin = users[0] msg = '' msg += user.name if user.custom_role_id == ROLES['engineer']: msg += ',engineer,' elif user.custom_role_id == ROLES['light_agent']: msg += ',light_agent,' time = str(timezone.now().today()) msg += time[:16] msg += ',' msg += admin.name return msg def log(user, admin=0): """ Осуществляет запись логов в базу данных и csv файл :param admin: :param user: :return: """ users = [user, admin] logger = logging.getLogger('MY_LOGGER') if not logger.hasHandlers(): dbhandler = DatabaseHandler() csvformatter = CsvFormatter() csvhandler = logging.FileHandler('logs/logs.csv', "a") csvhandler.setFormatter(csvformatter) logger.addHandler(dbhandler) logger.addHandler(csvhandler) logger.setLevel('INFO') logger.info(users)