From 6bc1c6d1089ceadd24bcd10a809816a51abb5068 Mon Sep 17 00:00:00 2001 From: Iurii Tatishchev Date: Thu, 6 May 2021 08:45:15 -0700 Subject: [PATCH] Move main helper files to main/lib. --- main/apiauth.py | 49 ------- main/extra_func.py | 319 ----------------------------------------- main/requester.py | 2 +- main/statistic_data.py | 261 --------------------------------- main/tests.py | 14 +- main/views.py | 11 +- main/zendesk_admin.py | 99 ------------- 7 files changed, 13 insertions(+), 742 deletions(-) delete mode 100644 main/apiauth.py delete mode 100644 main/extra_func.py delete mode 100644 main/statistic_data.py delete mode 100644 main/zendesk_admin.py diff --git a/main/apiauth.py b/main/apiauth.py deleted file mode 100644 index 08a018c..0000000 --- a/main/apiauth.py +++ /dev/null @@ -1,49 +0,0 @@ -import os - -from zenpy import Zenpy -from zenpy.lib.api_objects import User as ZenpyUser - -from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD - - -def api_auth() -> dict: - """ - Функция создания пользователя с использованием Zendesk API. - - Получает из env Zendesk - email, token, password пользователя. - Если данные валидны и пользователь Zendesk с указанным email и токеном или паролем существует, - создается словарь данных пользователя, полученных через API c Zendesk. - - :return: данные пользователя - """ - credentials = { - 'subdomain': ACTRL_ZENDESK_SUBDOMAIN - } - email = ACTRL_API_EMAIL - token = ACTRL_API_TOKEN - password = ACTRL_API_PASSWORD - - if email is None: - raise ValueError('access_controller email not in env') - credentials['email'] = email - - # prefer token, use password if token not provided - if token: - credentials['token'] = token - elif password: - credentials['password'] = password - else: - raise ValueError('access_controller token or password not in env') - - zenpy_client = Zenpy(**credentials) - zenpy_user: ZenpyUser = zenpy_client.users.search(email).values[0] - - user = { - 'id': zenpy_user.id, - 'name': zenpy_user.name, # Zendesk doesn't have separate first and last name fields - 'email': zenpy_user.email, - 'role': zenpy_user.role, # str like 'admin' or 'agent', not id - 'photo': zenpy_user.photo['content_url'] if zenpy_user.photo is not None else None, - } - - return user diff --git a/main/extra_func.py b/main/extra_func.py deleted file mode 100644 index e6a2a97..0000000 --- a/main/extra_func.py +++ /dev/null @@ -1,319 +0,0 @@ -import logging -from datetime import timedelta - -from django.contrib.auth.models import User -from django.core.exceptions import ObjectDoesNotExist -from django.shortcuts import redirect -from django.utils import timezone -from zenpy import Zenpy -from zenpy.lib.exception import APIException -from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket -from zenpy.lib.generator import SearchResultGenerator - -from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN -from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus -from main.requester import TicketListRequester -from main.zendesk_admin import zenpy - - -def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None: - """ - Функция меняет роль пользователя. - - :param user_profile: Профиль пользователя - :param role: Новая роль - :param who_changes: Пользователь, меняющий роль - :return: Пользователь с обновленной ролью - """ - zendesk = zenpy - user = zendesk.get_user(user_profile.user.email) - user.custom_role_id = role - user_profile.custom_role_id = role - user_profile.save() - log(user_profile, who_changes.userprofile) - zendesk.update_user(user) - - -def make_engineer(user_profile: UserProfile, who_changes: User) -> None: - """ - Функция устанавливает пользователю роль инженера. - - :param user_profile: Профиль пользователя - :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" - """ - update_role(user_profile, ROLES['engineer'], who_changes) - - -def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: - """ - Функция устанавливает пользователю роль легкого агента. - - :param user_profile: Профиль пользователя - :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" - """ - tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email) - ticket: ZenpyTicket - for ticket in tickets: - UnassignedTicket.objects.create( - assignee=user_profile.user, - ticket_id=ticket.id, - status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED - ) - if ticket.status == 'solved': - ticket.assignee_id = zenpy.solved_tickets_user_id - else: - ticket.assignee = None - ticket.group_id = zenpy.buffer_group_id - if tickets: - zenpy.admin.tickets.update(tickets) - attempts, success = 20, False - while not success and attempts != 0: - try: - update_role(user_profile, ROLES['light_agent'], who_changes) - success = True - except APIException as e: - attempts -= 1 - if attempts == 0: - raise e - - -def get_users_list() -> list: - """ - Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. - """ - zendesk = zenpy - - # У пользователей должна быть организация SYSTEM - org = next(zendesk.admin.search(type='organization', name='SYSTEM')) - users = zendesk.admin.organizations.users(org) - return users - - -def get_tickets_list(email): - """ - Функция возвращает список тикетов пользователя Zendesk - """ - return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email)) - - -def get_tickets_list_for_group(group_name): - """ - Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk - """ - return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name)) - - -def update_profile(user_profile: UserProfile): - """ - Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. - - :param user_profile: Профиль пользователя - :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя - """ - user = zenpy.get_user(user_profile.user.email) - user_profile.name = user.name - user_profile.role = user.role - user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 - user_profile.image = user.photo['content_url'] if user.photo else None - user_profile.save() - - -def check_user_exist(email: str) -> bool: - """ - Функция проверяет, существует ли пользователь. - - :param email: Email пользователя - :return: Зарегистрирован ли пользователь в Zendesk - """ - return zenpy.check_user(email) - - -def get_user_organization(email: str) -> str: - """ - Функция возвращает организацию пользователя. - - :param email: Email пользователя - :return: Организация пользователя - """ - return zenpy.get_user_org(email) - - -def check_user_auth(email: str, password: str) -> bool: - """ - Функция проверяет, верны ли входные данные. - - :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован - """ - creds = { - 'email': email, - 'password': password, - 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, - } - try: - user = Zenpy(**creds) - user.search(email, type='user') - except APIException: - return False - return True - - -def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser): - """ - Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. - - :param profile: Профиль пользователя - :param zendesk_user: Данные пользователя в Zendesk - :return: Обновленный профиль пользователя - """ - profile.name = zendesk_user.name - profile.role = zendesk_user.role - profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None - if zendesk_user.custom_role_id is not None: - profile.custom_role_id = int(zendesk_user.custom_role_id) - profile.save() - - -def count_users(users) -> tuple: - """ - Функция подсчета количества сотрудников с ролями engineer и light_agent - """ - engineers, light_agents = 0, 0 - for user in users: - if user.custom_role_id == ROLES['engineer']: - engineers += 1 - elif user.custom_role_id == ROLES['light_agent']: - light_agents += 1 - return engineers, light_agents - - -def update_users_in_model(): - """ - Обновляет пользователей в модели UserProfile по списку пользователей в организации - """ - users = get_users_list() - for user in users: - try: - profile = User.objects.get(email=user.email).userprofile - update_user_in_model(profile, user) - except ObjectDoesNotExist: - pass - return users - - -def daterange(start_date, end_date) -> list: - """ - Функция возвращает список дней с start_date по end_date, исключая правую границу. - - :param start_date: Начальная дата - :param end_date: Конечная дата - :return: Список дней, не включая конечную дату - """ - dates = [] - for n in range(int((end_date - start_date).days)): - dates.append(start_date + timedelta(n)) - return dates - - -def get_timedelta(log, time=None) -> timedelta: - """ - Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, - который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён. - - :param log: Лог - :param time: Время - :return: Сколько времени прошло от начала суток до события - """ - if time is None: - time = log.change_time.time() - time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) - return time - - -def last_day_of_month(day: int) -> int: - """ - Функция возвращает последний день текущего месяца. - - :param day: Текущий день - :return: Последний день месяца - """ - next_month = day.replace(day=28) + timedelta(days=4) - return next_month - timedelta(days=next_month.day) - - -class DatabaseHandler(logging.Handler): - def __init__(self): - logging.Handler.__init__(self) - - def emit(self, record): - database = RoleChangeLogs() - users = record.msg - if users[1]: - user = users[0] - admin = users[1] - elif not users[1]: - user = users[0] - admin = users[0] - database.name = user.name - database.user = user.user - database.changed_by = admin.user - if user.custom_role_id == ROLES['engineer']: - database.old_role = ROLES['light_agent'] - elif user.custom_role_id == ROLES['light_agent']: - database.old_role = ROLES['engineer'] - database.new_role = user.custom_role_id - database.save() - - -class CsvFormatter(logging.Formatter): - def __init__(self): - logging.Formatter.__init__(self) - - def format(self, record): - users = record.msg - if users[1]: - user = users[0] - admin = users[1] - elif not users[1]: - user = users[0] - admin = users[0] - msg = '' - msg += user.name - if user.custom_role_id == ROLES['engineer']: - msg += ',engineer,' - elif user.custom_role_id == ROLES['light_agent']: - msg += ',light_agent,' - time = str(timezone.now().today()) - msg += time[:16] - msg += ',' - msg += admin.name - return msg - - -def log(user, admin=None): - """ - Осуществляет запись логов в базу данных и csv файл - :param admin: - :param user: - :return: - """ - users = [user, admin] - logger = logging.getLogger('MY_LOGGER') - if not logger.hasHandlers(): - dbhandler = DatabaseHandler() - csvformatter = CsvFormatter() - csvhandler = logging.FileHandler('logs/logs.csv', "a") - csvhandler.setFormatter(csvformatter) - logger.addHandler(dbhandler) - logger.addHandler(csvhandler) - logger.setLevel('INFO') - logger.info(users) - - -def set_session_params_for_work_page(request, count=None, is_confirm=True): - """ - Функция для страницы получения прав - Устанавливает данные сессии о успешности запроса и количестве назначенных тикетов - """ - request.session['is_confirm'] = is_confirm - request.session['count_tickets'] = count - return redirect('work', request.user.id) diff --git a/main/requester.py b/main/requester.py index 468abee..d3bdd18 100644 --- a/main/requester.py +++ b/main/requester.py @@ -2,7 +2,7 @@ import requests from zenpy import TicketApi from zenpy.lib.api_objects import Ticket -from main.zendesk_admin import zenpy +from main.lib.zendesk_admin import zenpy class TicketListRequester: diff --git a/main/statistic_data.py b/main/statistic_data.py deleted file mode 100644 index fa1ab24..0000000 --- a/main/statistic_data.py +++ /dev/null @@ -1,261 +0,0 @@ -from datetime import date, datetime, timedelta -from typing import Optional - -from django.contrib.auth.models import User -from django.utils import timezone - -from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES -from main.extra_func import last_day_of_month, get_timedelta, daterange -from main.models import RoleChangeLogs - - -class StatisticData: - """ - Класс для учета статистики интервалов работы пользователей. - Передаваемые параметры: start_date, end_date, email, stat. - - :param display: Формат отображения времени (часы, минуты) - :type display: :class:`list` - :param interval: Интервал времени в часах и минутах - :type interval: :class:`list` - :param start_date: Дата начала работы - :type start_date: :class:`date` - :param end_date: Дата окончания работы - :type end_date: :class:`date` - :param email: Email пользователя - :type email: :class:`str` - :param errors: Список ошибок - :type errors: :class:`list` - :param warnings: Список предупреждений - :type warnings: :class:`list` - :param data: Ретроспектива смены ролей пользователя - :type data: :class:`dict` - :param statistic: Интервалы работы пользователя - :type statistic: :class:`dict` - """ - - def __init__(self, start_date, end_date, user_email, stat=None): - self.display = None - self.interval = None - self.start_date = start_date - self.end_date = end_date - self.email = user_email - self.errors = list() - self.warnings = list() - self.data = dict() - self.statistic = dict() - self._init_data() - if stat is None: - self._init_statistic() - else: - self.statistic = stat - - def get_statistic(self) -> dict: - """ - Функция возвращает статистику работы пользователя. - - :return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании. - """ - if self.is_valid_statistic(): - stat = self.statistic - stat = self._use_display(stat) - stat = self._use_interval(stat) - return stat - else: - return None - - def is_valid_statistic(self) -> bool: - """ - Функция проверяет были ли ошибки при создании статистики. - - :return: True, при отсутствии ошибок - """ - return not self.errors and self.statistic - - def set_interval(self, interval: list) -> bool: - """ - Функция проверяет корректность представления интервала работы. - - :param interval: Интервал должен быть указан в днях или месяцах. - :return: True, если указан верно - """ - if interval not in ['months', 'days']: - self.errors += ['Интервал работы должен быть в днях или месяцах'] - return False - self.interval = interval - return True - - def set_display(self, display_format: list) -> bool: - """ - Функция проверяет корректность формата отображения интервала. - - :param display_format: Формат отображения должен быть указан в днях или месяцах. - :return: True, если указан верно - """ - if display_format not in ['days', 'hours']: - self.errors += ['Формат отображения должен быть в часах или днях'] - return False - self.display = display_format - return True - - def get_data(self) -> Optional[dict]: - """ - Функция возвращает данные - список объектов RoleChangeLogs. - """ - if self.is_valid_data(): - return self.data - else: - return None - - def is_valid_data(self) -> bool: - """ - Функция определяет были ли ошибки при получении логов. - - :return: True, если ошибок нет - """ - return not self.errors - - def _use_display(self, stat: list) -> list: - """ - Функция приводит данные к формату отображения. - - :param stat: Список данных статистики пользователя - :return: Обновленный список - """ - if not self.is_valid_statistic() or not self.display: - return stat - new_stat = {} - for key, item in stat.items(): - if self.display == 'hours': - new_stat[key] = item / 3600 - elif self.display == 'days': - new_stat[key] = item / (ONE_DAY * 3600) - return new_stat - - def _use_interval(self, stat: dict) -> dict: - """ - Функция объединяет ключи и значения в соответствии с интервалом работы. - - :param stat: Статистика работы пользователя - :return: Обновленная статистика - """ - if not self.is_valid_statistic() or not self.interval: - return stat - new_stat = {} - if self.interval == 'months': - # Переделываем ключи под формат('начало_месяца - конец_месяца') - for key, value in stat.items(): - current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) - current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) - index = ' - '.join([str(current_month_start), str(current_month_end)]) - if new_stat.get(index): - new_stat[index] += value - else: - new_stat[index] = value - elif self.interval == 'days': - new_stat = stat # статистика изначально в днях - return new_stat - - def check_time(self) -> bool: - """ - Функция проверяет корректность введенного времени. - - :return: True, если время указано корректно. Иначе, False - """ - if self.end_date < self.start_date or self.end_date > datetime.now().date(): - return False - return True - - def _init_data(self): - """ - Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email. - - :return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку. - """ - if not self.check_time(): - self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] - return - try: - self.data = RoleChangeLogs.objects.filter( - change_time__range=[self.start_date, self.end_date + timedelta(days=1)], - user=User.objects.get(email=self.email), - ).order_by('change_time') - except User.DoesNotExist: - self.errors += ['Пользователь не найден'] - - def _init_statistic(self) -> dict: - """ - Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд. - - :return: Статистика работы пользователя (statistic) - """ - self.clear_statistic() - if not self.get_data(): - self.warnings += ['Не обнаружены изменения роли в данном промежутке'] - return None - first_log, last_log = self.data[0], self.data[len(self.data) - 1] - - if first_log.old_role == ROLES['engineer']: - self.prev_engineer_logic(first_log) - - if last_log.new_role == ROLES['engineer']: - self.post_engineer_logic(last_log) - - for log_index in range(len(self.data) - 1): - if self.data[log_index].new_role == ROLES['engineer']: - self.engineer_logic(log_index) - - def engineer_logic(self, log_index): - """ - Функция обрабатывает основную часть работы инженера - """ - current_log, next_log = self.data[log_index], self.data[log_index + 1] - if current_log.change_time.date() != next_log.change_time.date(): - self.statistic[current_log.change_time.date()] += ( - timedelta(days=1) - get_timedelta(current_log)).total_seconds() - self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() - self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) - else: - elapsed_time = next_log.change_time - current_log.change_time - self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() - - def post_engineer_logic(self, last_log): - """ - Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона - """ - self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) - if last_log.change_time.date() == timezone.now().date(): - self.statistic[last_log.change_time.date()] += ( - get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) - ).total_seconds() - else: - self.statistic[last_log.change_time.date()] += ( - timedelta(days=1) - get_timedelta(last_log)).total_seconds() - if self.end_date == timezone.now().date(): - self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() - - def prev_engineer_logic(self, first_log): - """ - Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона - """ - self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date), - first_log.change_time.date()) - self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() - - def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict: - """ - Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне). - - :param first: Начальная дата интервала - :param last: Последняя дата интервала - :param val: Количество секунд в одном дне - """ - for day in daterange(first, last): - self.statistic[day] = val - - def clear_statistic(self) -> dict: - """ - Функция осуществляет обновление всех дней. - """ - self.statistic.clear() - self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0) diff --git a/main/tests.py b/main/tests.py index 99d58cc..6271ae8 100644 --- a/main/tests.py +++ b/main/tests.py @@ -8,7 +8,7 @@ from django.urls import reverse, reverse_lazy from django.utils import translation import access_controller.settings as sets -from main.zendesk_admin import zenpy +from main.lib.zendesk_admin import zenpy class RegistrationTestCase(TestCase): @@ -96,31 +96,31 @@ class MakeEngineerTestCase(TestCase): self.admin_client = Client() self.admin_client.force_login(User.objects.get(email=self.admin)) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_redirect(self, ZenpyMock): user = User.objects.get(email=self.light_agent) resp = self.client.post(reverse_lazy('work_become_engineer')) self.assertRedirects(resp, reverse('work', args=[user.id])) self.assertEqual(resp.status_code, 302) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_light_agent_make_engineer(self, ZenpyMock): self.client.post(reverse_lazy('work_become_engineer')) self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer']) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_admin_make_engineer(self, ZenpyMock): self.admin_client.post(reverse_lazy('work_become_engineer')) self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer']) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_engineer_make_engineer(self, ZenpyMock): client = Client() client.force_login(User.objects.get(email=self.engineer)) client.post(reverse_lazy('work_become_engineer')) self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer']) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_control_page_make_one(self, ZenpyMock): self.admin_client.post( reverse_lazy('control'), @@ -131,7 +131,7 @@ class MakeEngineerTestCase(TestCase): self.assertEqual(len(call_list), 1) self.assertEqual(mock_object.custom_role_id, sets.ZENDESK_ROLES['engineer']) - @patch('main.extra_func.zenpy') + @patch('main.lib.extra_func.zenpy') def test_control_page_make_many(self, ZenpyMock): self.admin_client.post( reverse_lazy('control'), diff --git a/main/views.py b/main/views.py index 467e925..4355141 100644 --- a/main/views.py +++ b/main/views.py @@ -13,7 +13,7 @@ from django.contrib.messages.views import SuccessMessageMixin from django.core.handlers.wsgi import WSGIRequest from django.http import HttpResponseRedirect, HttpResponse from django.shortcuts import render, redirect -from django.urls import reverse_lazy, reverse +from django.urls import reverse_lazy from django.views.generic import FormView from django_registration.views import RegistrationView # Django REST @@ -21,12 +21,11 @@ from rest_framework import viewsets from rest_framework.response import Response from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS, ZENDESK_GROUPS -from main.extra_func import check_user_exist, update_profile, get_user_organization, \ +from main.lib.extra_func import check_user_exist, update_profile, get_user_organization, \ make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \ - log, set_session_params_for_work_page, get_tickets_list_for_group -from .statistic_data import StatisticData -from main.zendesk_admin import zenpy -from main.requester import TicketListRequester + set_session_params_for_work_page, get_tickets_list_for_group +from main.lib.statistic_data import StatisticData +from main.lib.zendesk_admin import zenpy from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm from main.serializers import ProfileSerializer, ZendeskUserSerializer from .models import UserProfile diff --git a/main/zendesk_admin.py b/main/zendesk_admin.py deleted file mode 100644 index 2a689ce..0000000 --- a/main/zendesk_admin.py +++ /dev/null @@ -1,99 +0,0 @@ -from typing import Optional, Dict -from zenpy import Zenpy -from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup -from zenpy.lib.exception import APIException -from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \ - ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL - - -class ZendeskAdmin: - """ - Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора. - - :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) - :type credentials: :class:`Dict[str, str]` - """ - - def __init__(self, credentials: Dict[str, str]): - self.credentials = credentials - self.admin = self.create_admin() - self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id - self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id - - def update_user(self, user: ZenpyUser) -> bool: - """ - Функция сохраняет изменение пользователя в Zendesk. - - :param user: Пользователь с изменёнными данными - """ - self.admin.users.update(user) - - def check_user(self, email: str) -> bool: - """ - Функция осуществляет проверку существования пользователя в Zendesk по email. - - :param email: Email пользователя - :return: Является ли зарегистрированным - """ - return True if self.admin.search(email, type='user') else False - - def get_user(self, email: str) -> ZenpyUser: - """ - Функция возвращает пользователя (объект) по его email. - - :param email: Email пользователя - :return: Объект пользователя, найденного в БД - """ - return self.admin.users.search(email).values[0] - - def get_group(self, name: str) -> Optional[ZenpyGroup]: - """ - Функция возвращает группу по названию - - :param name: Имя пользователя - :return: Группы пользователя (в случае отсутствия None) - """ - groups = self.admin.search(name, type='group') - for group in groups: - return group - return None - - def get_user_org(self, email: str) -> str: - """ - Функция возвращает организацию, к которой относится пользователь по его email. - - :param email: Email пользователя - :return: Организация пользователя - """ - user = self.admin.users.search(email).values[0] - return user.organization.name if user.organization else None - - def create_admin(self) -> Zenpy: - """ - Функция создает администратора, проверяя наличие вводимых данных в env. - - :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env - :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk - """ - - if self.credentials.get('email') is None: - raise ValueError('access_controller email not in env') - - if self.credentials.get('token') is None and self.credentials.get('password') is None: - raise ValueError('access_controller token or password not in env') - - admin = Zenpy(**self.credentials) - try: - admin.search(self.credentials['email'], type='user') - except APIException: - raise ValueError('invalid access_controller`s login data') - - return admin - - -zenpy = ZendeskAdmin({ - 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, - 'email': ACTRL_API_EMAIL, - 'token': ACTRL_API_TOKEN, - 'password': ACTRL_API_PASSWORD, -})