diff --git a/main/lib/__init__.py b/main/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main/lib/__pycache__/__init__.cpython-39.pyc b/main/lib/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..70ee7f6 Binary files /dev/null and b/main/lib/__pycache__/__init__.cpython-39.pyc differ diff --git a/main/lib/__pycache__/extra_func.cpython-39.pyc b/main/lib/__pycache__/extra_func.cpython-39.pyc new file mode 100644 index 0000000..506e57c Binary files /dev/null and b/main/lib/__pycache__/extra_func.cpython-39.pyc differ diff --git a/main/lib/__pycache__/statistic_data.cpython-39.pyc b/main/lib/__pycache__/statistic_data.cpython-39.pyc new file mode 100644 index 0000000..8165837 Binary files /dev/null and b/main/lib/__pycache__/statistic_data.cpython-39.pyc differ diff --git a/main/lib/__pycache__/zendesk_admin.cpython-39.pyc b/main/lib/__pycache__/zendesk_admin.cpython-39.pyc new file mode 100644 index 0000000..8b38c98 Binary files /dev/null and b/main/lib/__pycache__/zendesk_admin.cpython-39.pyc differ diff --git a/main/lib/extra_func.py b/main/lib/extra_func.py new file mode 100644 index 0000000..ad251cd --- /dev/null +++ b/main/lib/extra_func.py @@ -0,0 +1,319 @@ +import logging +from datetime import timedelta + +from django.contrib.auth.models import User +from django.core.exceptions import ObjectDoesNotExist +from django.shortcuts import redirect +from django.utils import timezone +from zenpy import Zenpy +from zenpy.lib.exception import APIException +from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket +from zenpy.lib.generator import SearchResultGenerator + +from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN +from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus +from main.requester import TicketListRequester +from main.lib.zendesk_admin import zenpy + + +def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None: + """ + Функция меняет роль пользователя. + + :param user_profile: Профиль пользователя + :param role: Новая роль + :param who_changes: Пользователь, меняющий роль + :return: Пользователь с обновленной ролью + """ + zendesk = zenpy + user = zendesk.get_user(user_profile.user.email) + user.custom_role_id = role + user_profile.custom_role_id = role + user_profile.save() + log(user_profile, who_changes.userprofile) + zendesk.update_user(user) + + +def make_engineer(user_profile: UserProfile, who_changes: User) -> None: + """ + Функция устанавливает пользователю роль инженера. + + :param user_profile: Профиль пользователя + :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" + """ + update_role(user_profile, ROLES['engineer'], who_changes) + + +def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: + """ + Функция устанавливает пользователю роль легкого агента. + + :param user_profile: Профиль пользователя + :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" + """ + tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email) + ticket: ZenpyTicket + for ticket in tickets: + UnassignedTicket.objects.create( + assignee=user_profile.user, + ticket_id=ticket.id, + status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED + ) + if ticket.status == 'solved': + ticket.assignee_id = zenpy.solved_tickets_user_id + else: + ticket.assignee = None + ticket.group_id = zenpy.buffer_group_id + if tickets: + zenpy.admin.tickets.update(tickets) + attempts, success = 20, False + while not success and attempts != 0: + try: + update_role(user_profile, ROLES['light_agent'], who_changes) + success = True + except APIException as e: + attempts -= 1 + if attempts == 0: + raise e + + +def get_users_list() -> list: + """ + Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. + """ + zendesk = zenpy + + # У пользователей должна быть организация SYSTEM + org = next(zendesk.admin.search(type='organization', name='SYSTEM')) + users = zendesk.admin.organizations.users(org) + return users + + +def get_tickets_list(email): + """ + Функция возвращает список тикетов пользователя Zendesk + """ + return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email)) + + +def get_tickets_list_for_group(group_name): + """ + Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk + """ + return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name)) + + +def update_profile(user_profile: UserProfile): + """ + Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. + + :param user_profile: Профиль пользователя + :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя + """ + user = zenpy.get_user(user_profile.user.email) + user_profile.name = user.name + user_profile.role = user.role + user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 + user_profile.image = user.photo['content_url'] if user.photo else None + user_profile.save() + + +def check_user_exist(email: str) -> bool: + """ + Функция проверяет, существует ли пользователь. + + :param email: Email пользователя + :return: Зарегистрирован ли пользователь в Zendesk + """ + return zenpy.check_user(email) + + +def get_user_organization(email: str) -> str: + """ + Функция возвращает организацию пользователя. + + :param email: Email пользователя + :return: Организация пользователя + """ + return zenpy.get_user_org(email) + + +def check_user_auth(email: str, password: str) -> bool: + """ + Функция проверяет, верны ли входные данные. + + :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован + """ + creds = { + 'email': email, + 'password': password, + 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, + } + try: + user = Zenpy(**creds) + user.search(email, type='user') + except APIException: + return False + return True + + +def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser): + """ + Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. + + :param profile: Профиль пользователя + :param zendesk_user: Данные пользователя в Zendesk + :return: Обновленный профиль пользователя + """ + profile.name = zendesk_user.name + profile.role = zendesk_user.role + profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None + if zendesk_user.custom_role_id is not None: + profile.custom_role_id = int(zendesk_user.custom_role_id) + profile.save() + + +def count_users(users) -> tuple: + """ + Функция подсчета количества сотрудников с ролями engineer и light_agent + """ + engineers, light_agents = 0, 0 + for user in users: + if user.custom_role_id == ROLES['engineer']: + engineers += 1 + elif user.custom_role_id == ROLES['light_agent']: + light_agents += 1 + return engineers, light_agents + + +def update_users_in_model(): + """ + Обновляет пользователей в модели UserProfile по списку пользователей в организации + """ + users = get_users_list() + for user in users: + try: + profile = User.objects.get(email=user.email).userprofile + update_user_in_model(profile, user) + except ObjectDoesNotExist: + pass + return users + + +def daterange(start_date, end_date) -> list: + """ + Функция возвращает список дней с start_date по end_date, исключая правую границу. + + :param start_date: Начальная дата + :param end_date: Конечная дата + :return: Список дней, не включая конечную дату + """ + dates = [] + for n in range(int((end_date - start_date).days)): + dates.append(start_date + timedelta(n)) + return dates + + +def get_timedelta(log, time=None) -> timedelta: + """ + Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, + который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён. + + :param log: Лог + :param time: Время + :return: Сколько времени прошло от начала суток до события + """ + if time is None: + time = log.change_time.time() + time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) + return time + + +def last_day_of_month(day: int) -> int: + """ + Функция возвращает последний день текущего месяца. + + :param day: Текущий день + :return: Последний день месяца + """ + next_month = day.replace(day=28) + timedelta(days=4) + return next_month - timedelta(days=next_month.day) + + +class DatabaseHandler(logging.Handler): + def __init__(self): + logging.Handler.__init__(self) + + def emit(self, record): + database = RoleChangeLogs() + users = record.msg + if users[1]: + user = users[0] + admin = users[1] + elif not users[1]: + user = users[0] + admin = users[0] + database.name = user.name + database.user = user.user + database.changed_by = admin.user + if user.custom_role_id == ROLES['engineer']: + database.old_role = ROLES['light_agent'] + elif user.custom_role_id == ROLES['light_agent']: + database.old_role = ROLES['engineer'] + database.new_role = user.custom_role_id + database.save() + + +class CsvFormatter(logging.Formatter): + def __init__(self): + logging.Formatter.__init__(self) + + def format(self, record): + users = record.msg + if users[1]: + user = users[0] + admin = users[1] + elif not users[1]: + user = users[0] + admin = users[0] + msg = '' + msg += user.name + if user.custom_role_id == ROLES['engineer']: + msg += ',engineer,' + elif user.custom_role_id == ROLES['light_agent']: + msg += ',light_agent,' + time = str(timezone.now().today()) + msg += time[:16] + msg += ',' + msg += admin.name + return msg + + +def log(user, admin=None): + """ + Осуществляет запись логов в базу данных и csv файл + :param admin: + :param user: + :return: + """ + users = [user, admin] + logger = logging.getLogger('MY_LOGGER') + if not logger.hasHandlers(): + dbhandler = DatabaseHandler() + csvformatter = CsvFormatter() + csvhandler = logging.FileHandler('logs/logs.csv', "a") + csvhandler.setFormatter(csvformatter) + logger.addHandler(dbhandler) + logger.addHandler(csvhandler) + logger.setLevel('INFO') + logger.info(users) + + +def set_session_params_for_work_page(request, count=None, is_confirm=True): + """ + Функция для страницы получения прав + Устанавливает данные сессии о успешности запроса и количестве назначенных тикетов + """ + request.session['is_confirm'] = is_confirm + request.session['count_tickets'] = count + return redirect('work', request.user.id) diff --git a/main/lib/statistic_data.py b/main/lib/statistic_data.py new file mode 100644 index 0000000..2e1061d --- /dev/null +++ b/main/lib/statistic_data.py @@ -0,0 +1,261 @@ +from datetime import date, datetime, timedelta +from typing import Optional + +from django.contrib.auth.models import User +from django.utils import timezone + +from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES +from main.lib.extra_func import last_day_of_month, get_timedelta, daterange +from main.models import RoleChangeLogs + + +class StatisticData: + """ + Класс для учета статистики интервалов работы пользователей. + Передаваемые параметры: start_date, end_date, email, stat. + + :param display: Формат отображения времени (часы, минуты) + :type display: :class:`list` + :param interval: Интервал времени в часах и минутах + :type interval: :class:`list` + :param start_date: Дата начала работы + :type start_date: :class:`date` + :param end_date: Дата окончания работы + :type end_date: :class:`date` + :param email: Email пользователя + :type email: :class:`str` + :param errors: Список ошибок + :type errors: :class:`list` + :param warnings: Список предупреждений + :type warnings: :class:`list` + :param data: Ретроспектива смены ролей пользователя + :type data: :class:`dict` + :param statistic: Интервалы работы пользователя + :type statistic: :class:`dict` + """ + + def __init__(self, start_date, end_date, user_email, stat=None): + self.display = None + self.interval = None + self.start_date = start_date + self.end_date = end_date + self.email = user_email + self.errors = list() + self.warnings = list() + self.data = dict() + self.statistic = dict() + self._init_data() + if stat is None: + self._init_statistic() + else: + self.statistic = stat + + def get_statistic(self) -> dict: + """ + Функция возвращает статистику работы пользователя. + + :return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании. + """ + if self.is_valid_statistic(): + stat = self.statistic + stat = self._use_display(stat) + stat = self._use_interval(stat) + return stat + else: + return None + + def is_valid_statistic(self) -> bool: + """ + Функция проверяет были ли ошибки при создании статистики. + + :return: True, при отсутствии ошибок + """ + return not self.errors and self.statistic + + def set_interval(self, interval: list) -> bool: + """ + Функция проверяет корректность представления интервала работы. + + :param interval: Интервал должен быть указан в днях или месяцах. + :return: True, если указан верно + """ + if interval not in ['months', 'days']: + self.errors += ['Интервал работы должен быть в днях или месяцах'] + return False + self.interval = interval + return True + + def set_display(self, display_format: list) -> bool: + """ + Функция проверяет корректность формата отображения интервала. + + :param display_format: Формат отображения должен быть указан в днях или месяцах. + :return: True, если указан верно + """ + if display_format not in ['days', 'hours']: + self.errors += ['Формат отображения должен быть в часах или днях'] + return False + self.display = display_format + return True + + def get_data(self) -> Optional[dict]: + """ + Функция возвращает данные - список объектов RoleChangeLogs. + """ + if self.is_valid_data(): + return self.data + else: + return None + + def is_valid_data(self) -> bool: + """ + Функция определяет были ли ошибки при получении логов. + + :return: True, если ошибок нет + """ + return not self.errors + + def _use_display(self, stat: list) -> list: + """ + Функция приводит данные к формату отображения. + + :param stat: Список данных статистики пользователя + :return: Обновленный список + """ + if not self.is_valid_statistic() or not self.display: + return stat + new_stat = {} + for key, item in stat.items(): + if self.display == 'hours': + new_stat[key] = item / 3600 + elif self.display == 'days': + new_stat[key] = item / (ONE_DAY * 3600) + return new_stat + + def _use_interval(self, stat: dict) -> dict: + """ + Функция объединяет ключи и значения в соответствии с интервалом работы. + + :param stat: Статистика работы пользователя + :return: Обновленная статистика + """ + if not self.is_valid_statistic() or not self.interval: + return stat + new_stat = {} + if self.interval == 'months': + # Переделываем ключи под формат('начало_месяца - конец_месяца') + for key, value in stat.items(): + current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) + current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) + index = ' - '.join([str(current_month_start), str(current_month_end)]) + if new_stat.get(index): + new_stat[index] += value + else: + new_stat[index] = value + elif self.interval == 'days': + new_stat = stat # статистика изначально в днях + return new_stat + + def check_time(self) -> bool: + """ + Функция проверяет корректность введенного времени. + + :return: True, если время указано корректно. Иначе, False + """ + if self.end_date < self.start_date or self.end_date > datetime.now().date(): + return False + return True + + def _init_data(self): + """ + Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email. + + :return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку. + """ + if not self.check_time(): + self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] + return + try: + self.data = RoleChangeLogs.objects.filter( + change_time__range=[self.start_date, self.end_date + timedelta(days=1)], + user=User.objects.get(email=self.email), + ).order_by('change_time') + except User.DoesNotExist: + self.errors += ['Пользователь не найден'] + + def _init_statistic(self) -> dict: + """ + Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд. + + :return: Статистика работы пользователя (statistic) + """ + self.clear_statistic() + if not self.get_data(): + self.warnings += ['Не обнаружены изменения роли в данном промежутке'] + return None + first_log, last_log = self.data[0], self.data[len(self.data) - 1] + + if first_log.old_role == ROLES['engineer']: + self.prev_engineer_logic(first_log) + + if last_log.new_role == ROLES['engineer']: + self.post_engineer_logic(last_log) + + for log_index in range(len(self.data) - 1): + if self.data[log_index].new_role == ROLES['engineer']: + self.engineer_logic(log_index) + + def engineer_logic(self, log_index): + """ + Функция обрабатывает основную часть работы инженера + """ + current_log, next_log = self.data[log_index], self.data[log_index + 1] + if current_log.change_time.date() != next_log.change_time.date(): + self.statistic[current_log.change_time.date()] += ( + timedelta(days=1) - get_timedelta(current_log)).total_seconds() + self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() + self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) + else: + elapsed_time = next_log.change_time - current_log.change_time + self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() + + def post_engineer_logic(self, last_log): + """ + Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона + """ + self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) + if last_log.change_time.date() == timezone.now().date(): + self.statistic[last_log.change_time.date()] += ( + get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) + ).total_seconds() + else: + self.statistic[last_log.change_time.date()] += ( + timedelta(days=1) - get_timedelta(last_log)).total_seconds() + if self.end_date == timezone.now().date(): + self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() + + def prev_engineer_logic(self, first_log): + """ + Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона + """ + self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date), + first_log.change_time.date()) + self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() + + def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict: + """ + Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне). + + :param first: Начальная дата интервала + :param last: Последняя дата интервала + :param val: Количество секунд в одном дне + """ + for day in daterange(first, last): + self.statistic[day] = val + + def clear_statistic(self) -> dict: + """ + Функция осуществляет обновление всех дней. + """ + self.statistic.clear() + self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0) diff --git a/main/lib/zendesk_admin.py b/main/lib/zendesk_admin.py new file mode 100644 index 0000000..2a689ce --- /dev/null +++ b/main/lib/zendesk_admin.py @@ -0,0 +1,99 @@ +from typing import Optional, Dict +from zenpy import Zenpy +from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup +from zenpy.lib.exception import APIException +from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \ + ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL + + +class ZendeskAdmin: + """ + Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора. + + :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) + :type credentials: :class:`Dict[str, str]` + """ + + def __init__(self, credentials: Dict[str, str]): + self.credentials = credentials + self.admin = self.create_admin() + self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id + self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id + + def update_user(self, user: ZenpyUser) -> bool: + """ + Функция сохраняет изменение пользователя в Zendesk. + + :param user: Пользователь с изменёнными данными + """ + self.admin.users.update(user) + + def check_user(self, email: str) -> bool: + """ + Функция осуществляет проверку существования пользователя в Zendesk по email. + + :param email: Email пользователя + :return: Является ли зарегистрированным + """ + return True if self.admin.search(email, type='user') else False + + def get_user(self, email: str) -> ZenpyUser: + """ + Функция возвращает пользователя (объект) по его email. + + :param email: Email пользователя + :return: Объект пользователя, найденного в БД + """ + return self.admin.users.search(email).values[0] + + def get_group(self, name: str) -> Optional[ZenpyGroup]: + """ + Функция возвращает группу по названию + + :param name: Имя пользователя + :return: Группы пользователя (в случае отсутствия None) + """ + groups = self.admin.search(name, type='group') + for group in groups: + return group + return None + + def get_user_org(self, email: str) -> str: + """ + Функция возвращает организацию, к которой относится пользователь по его email. + + :param email: Email пользователя + :return: Организация пользователя + """ + user = self.admin.users.search(email).values[0] + return user.organization.name if user.organization else None + + def create_admin(self) -> Zenpy: + """ + Функция создает администратора, проверяя наличие вводимых данных в env. + + :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env + :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk + """ + + if self.credentials.get('email') is None: + raise ValueError('access_controller email not in env') + + if self.credentials.get('token') is None and self.credentials.get('password') is None: + raise ValueError('access_controller token or password not in env') + + admin = Zenpy(**self.credentials) + try: + admin.search(self.credentials['email'], type='user') + except APIException: + raise ValueError('invalid access_controller`s login data') + + return admin + + +zenpy = ZendeskAdmin({ + 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, + 'email': ACTRL_API_EMAIL, + 'token': ACTRL_API_TOKEN, + 'password': ACTRL_API_PASSWORD, +})