diff --git a/main/extra_func.py b/main/extra_func.py index 109f9c6..86b6739 100644 --- a/main/extra_func.py +++ b/main/extra_func.py @@ -1,5 +1,6 @@ import logging from datetime import timedelta, datetime, date +from typing import Optional from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist @@ -7,138 +8,11 @@ from django.shortcuts import redirect from django.utils import timezone from zenpy import Zenpy from zenpy.lib.exception import APIException +from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket -from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL, \ - ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, ACTRL_ZENDESK_SUBDOMAIN +from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus - - -class ZendeskAdmin: - """ - Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора. - - :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) - :type credentials: :class:`dict` - :param email: Email администратора, указанный в env - :type email: :class:`str` - :param token: Токен администратора (формируется в Zendesk, указывается в env) - :type token: :class:`str` - :param password: Пароль администратора, указанный в env - :type password: :class:`str` - """ - - credentials: dict = { - 'subdomain': ACTRL_ZENDESK_SUBDOMAIN - } - email: str = ACTRL_API_EMAIL - token: str = ACTRL_API_TOKEN - password: str = ACTRL_API_PASSWORD - - def __init__(self): - self.create_admin() - - def check_user(self, email: str) -> bool: - """ - Функция осуществляет проверку существования пользователя в Zendesk по email. - - :param email: Email пользователя - :return: Является ли зарегистрированным - """ - return True if self.admin.search(email, type='user') else False - - def get_user_name(self, email: str) -> str: - """ - Функция **get_user_name** возвращает имя пользователя по его email - """ - user = self.admin.users.search(email).values[0] - return user.name - - def get_user_role(self, email: str) -> str: - """ - Функция возвращает роль пользователя по его email. - - :param email: Email пользователя - :return: Роль пользователя - """ - user = self.admin.users.search(email).values[0] - return user.role - - def get_user_id(self, email: str) -> str: - """ - Функция возвращает id пользователя по его email - - :param email: Email пользователя - :return: ID пользователя - """ - user = self.admin.users.search(email).values[0] - return user.id - - def get_user_image(self, email: str) -> str: - """ - Функция возвращает url-ссылку на аватар пользователя по его email. - - :param email: Email пользователя - :return: Аватар пользователя - """ - user = self.admin.users.search(email).values[0] - return user.photo['content_url'] if user.photo else None - - def get_user(self, email: str): - """ - Функция возвращает пользователя (объект) по его email. - - :param email: Email пользователя - :return: Объект пользователя, найденного в БД - """ - return self.admin.users.search(email).values[0] - - def get_group(self, name: str) -> str: - """ - Функция возвращает группу по названию - - :param name: Имя пользователя - :return: Группы пользователя (в случае отсутствия None) - """ - groups = self.admin.search(name, type='group') - for group in groups: - return group - return None - - def get_user_org(self, email: str) -> str: - """ - Функция возвращает организацию, к которой относится пользователь по его email. - - :param email: Email пользователя - :return: Организация пользователя - """ - user = self.admin.users.search(email).values[0] - return user.organization.name if user.organization else None - - def create_admin(self) -> None: - """ - Функция создает администратора, проверяя наличие вводимых данных в env. - - :param credentials: В список полномочий администратора вносятся email, token, password из env - :type credentials: :class:`dict` - :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env - :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk - """ - - if self.email is None: - raise ValueError('access_controller email not in env') - self.credentials['email'] = self.email - - if self.token: - self.credentials['token'] = self.token - elif self.password: - self.credentials['password'] = self.password - else: - raise ValueError('access_controller token or password not in env') - self.admin = Zenpy(**self.credentials) - try: - self.admin.search(self.email, type='user') - except APIException: - raise ValueError('invalid access_controller`s login data') +from main.zendesk_admin import zenpy def update_role(user_profile: UserProfile, role: int) -> None: @@ -149,7 +23,7 @@ def update_role(user_profile: UserProfile, role: int) -> None: :param role: Новая роль :return: Пользователь с обновленной ролью """ - zendesk = ZendeskAdmin() + zendesk = zenpy user = zendesk.get_user(user_profile.user.email) user.custom_role_id = role user_profile.custom_role_id = role @@ -175,6 +49,7 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" """ tickets = get_tickets_list(user_profile.user.email) + ticket: ZenpyTicket for ticket in tickets: UnassignedTicket.objects.create( assignee=user_profile.user, @@ -182,19 +57,29 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED ) if ticket.status == 'solved': - ticket.assignee = ZendeskAdmin().get_user(SOLVED_TICKETS_EMAIL) + ticket.assignee_id = zenpy.solved_tickets_user_id else: ticket.assignee = None - ticket.group = ZendeskAdmin().get_group(ZENDESK_GROUPS['buffer']) - ZendeskAdmin().admin.tickets.update(ticket) - update_role(user_profile, ROLES['light_agent']) + ticket.group_id = zenpy.buffer_group_id + + zenpy.admin.tickets.update(tickets.values) + + attempts, success = 5, False + while not success and attempts != 0: + try: + update_role(user_profile, ROLES['light_agent']) + success = True + except APIException as e: + attempts -= 1 + if attempts == 0: + raise e def get_users_list() -> list: """ Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM. """ - zendesk = ZendeskAdmin() + zendesk = zenpy # У пользователей должна быть организация SYSTEM org = next(zendesk.admin.search(type='organization', name='SYSTEM')) @@ -206,17 +91,17 @@ def get_tickets_list(email): """ Функция возвращает список тикетов пользователя Zendesk """ - return ZendeskAdmin().admin.search(assignee=email, type='ticket') + return zenpy.admin.search(assignee=email, type='ticket') -def update_profile(user_profile: UserProfile) -> UserProfile: +def update_profile(user_profile: UserProfile): """ Функция обновляет профиль пользователя в соответствии с текущим в Zendesk. :param user_profile: Профиль пользователя :return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя """ - user = ZendeskAdmin().get_user(user_profile.user.email) + user = zenpy.get_user(user_profile.user.email) user_profile.name = user.name user_profile.role = user.role user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 @@ -231,7 +116,7 @@ def check_user_exist(email: str) -> bool: :param email: Email пользователя :return: Зарегистрирован ли пользователь в Zendesk """ - return ZendeskAdmin().check_user(email) + return zenpy.check_user(email) def get_user_organization(email: str) -> str: @@ -241,7 +126,7 @@ def get_user_organization(email: str) -> str: :param email: Email пользователя :return: Организация пользователя """ - return ZendeskAdmin().get_user_org(email) + return zenpy.get_user_org(email) def check_user_auth(email: str, password: str) -> bool: @@ -263,7 +148,7 @@ def check_user_auth(email: str, password: str) -> bool: return True -def update_user_in_model(profile: UserProfile, zendesk_user: User): +def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser): """ Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk. @@ -435,7 +320,7 @@ class StatisticData: self.display = display_format return True - def get_data(self) -> list: + def get_data(self) -> Optional[dict]: """ Функция возвращает данные - список объектов RoleChangeLogs. """ diff --git a/main/views.py b/main/views.py index e299607..d21a6e9 100644 --- a/main/views.py +++ b/main/views.py @@ -1,4 +1,5 @@ from smtplib import SMTPException +from typing import Dict, Any from django.contrib import messages from django.contrib.auth.decorators import login_required @@ -11,27 +12,26 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.messages.views import SuccessMessageMixin from django.core.handlers.wsgi import WSGIRequest from django.http import HttpResponseRedirect, HttpResponse -from django.shortcuts import render, redirect, get_list_or_404 +from django.shortcuts import render, redirect from django.urls import reverse_lazy, reverse from django.views.generic import FormView from django_registration.views import RegistrationView # Django REST from rest_framework import viewsets from rest_framework.response import Response -from zenpy.lib.api_objects import User as ZenpyUser from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS from main.extra_func import check_user_exist, update_profile, get_user_organization, \ make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \ - StatisticData, log, ZendeskAdmin, set_session_params_for_work_page + StatisticData, log, set_session_params_for_work_page +from main.zendesk_admin import zenpy from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm from main.serializers import ProfileSerializer, ZendeskUserSerializer from .models import UserProfile def setup_context(profile_lit: bool = False, control_lit: bool = False, work_lit: bool = False, - registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False): - print(profile_lit, control_lit, work_lit, registration_lit, login_lit) + registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False) -> Dict[str, Any]: context = { 'profile_lit': profile_lit, @@ -158,18 +158,6 @@ def profile_page(request: WSGIRequest) -> HttpResponse: return render(request, 'pages/profile.html', context) -def auth_user(request: WSGIRequest) -> ZenpyUser: - """ - Функция возвращает профиль пользователя на Zendesk. - - :param request: email, subdomain и token пользователя - :return: объект пользователя Zendesk - """ - admin = ZendeskAdmin().admin - zenpy_user: ZenpyUser = admin.users.search(request.user.email).values[0] - return zenpy_user, admin - - @login_required() def work_page(request: WSGIRequest, id: int) -> HttpResponse: """ @@ -213,8 +201,7 @@ def work_page(request: WSGIRequest, id: int) -> HttpResponse: @login_required() def work_hand_over(request: WSGIRequest): """ - Функция позволяет текущему пользователю (login_required) сдать права, а именно сменить в Zendesk роль с "engineer" на "light agent" - и установить роль "agent" в БД. Действия выполняются, если исходная роль пользователя "engineer". + Функция позволяет текущему пользователю сдать права, а именно сменить в Zendesk роль с "engineer" на "light_agent" :param request: данные текущего пользователя (login_required) :return: перезагрузка текущей страницы после выполнения смены роли @@ -226,27 +213,28 @@ def work_hand_over(request: WSGIRequest): @login_required() def work_become_engineer(request: WSGIRequest) -> HttpResponseRedirect: """ - Функция меняет роль пользователя в Zendesk на "engineer" и присваивает роль "agent" в БД (в случае, если исходная роль пользователя была "light_agent"). + Функция позволяет текущему пользователю получить права, а именно сменить в Zendesk роль с "light_agent" на "engineer" :param request: данные текущего пользователя (login_required) :return: перезагрузка текущей страницы после выполнения смены роли """ + make_engineer(request.user.userprofile, request.user) return set_session_params_for_work_page(request) @login_required() def work_get_tickets(request): - zenpy_user, admin = auth_user(request) + zenpy_user = zenpy.get_user(request.user.email) if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']: - tickets = [ticket for ticket in admin.search(type="ticket") if + tickets = [ticket for ticket in zenpy.admin.search(type="ticket") if ticket.group.name == 'Сменная группа' and ticket.assignee is None] count = 0 for i in range(len(tickets)): if i == int(request.GET.get('count_tickets')): return set_session_params_for_work_page(request, count) tickets[i].assignee = zenpy_user - admin.tickets.update(tickets[i]) + zenpy.admin.tickets.update(tickets[i]) count += 1 return set_session_params_for_work_page(request, count) return set_session_params_for_work_page(request, is_confirm=False) @@ -387,16 +375,16 @@ def statistic_page(request: WSGIRequest) -> HttpResponse: if form.is_valid(): start_date, end_date = form.cleaned_data['range_start'], form.cleaned_data['range_end'] interval, show = form.cleaned_data['interval'], form.cleaned_data['display_format'] - Data = StatisticData(start_date, end_date, form.cleaned_data['email']) - Data.set_display(show) - Data.set_interval(interval) - stats = Data.get_statistic() - if Data.errors: - context['errors'] = Data.errors - if Data.warnings: - context['warnings'] = Data.warnings + data = StatisticData(start_date, end_date, form.cleaned_data['email']) + data.set_display(show) + data.set_interval(interval) + stats = data.get_statistic() + if data.errors: + context['errors'] = data.errors + if data.warnings: + context['warnings'] = data.warnings context['log_stats'] = stats if not context['errors'] else None - if request.method == 'GET': + elif request.method == 'GET': form = StatisticForm() context['form'] = form return render(request, 'pages/statistic.html', context) diff --git a/main/zendesk_admin.py b/main/zendesk_admin.py new file mode 100644 index 0000000..8ecb877 --- /dev/null +++ b/main/zendesk_admin.py @@ -0,0 +1,93 @@ +from typing import Optional, Dict + +from zenpy import Zenpy +from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup +from zenpy.lib.exception import APIException + +from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \ + ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL + + +class ZendeskAdmin: + """ + Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора. + + :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) + :type credentials: :class:`Dict[str, str]` + """ + + def __init__(self, credentials: Dict[str, str]): + self.credentials = credentials + self.admin = self.create_admin() + self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id + self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id + + def check_user(self, email: str) -> bool: + """ + Функция осуществляет проверку существования пользователя в Zendesk по email. + + :param email: Email пользователя + :return: Является ли зарегистрированным + """ + return True if self.admin.search(email, type='user') else False + + def get_user(self, email: str) -> ZenpyUser: + """ + Функция возвращает пользователя (объект) по его email. + + :param email: Email пользователя + :return: Объект пользователя, найденного в БД + """ + return self.admin.users.search(email).values[0] + + def get_group(self, name: str) -> Optional[ZenpyGroup]: + """ + Функция возвращает группу по названию + + :param name: Имя пользователя + :return: Группы пользователя (в случае отсутствия None) + """ + groups = self.admin.search(name, type='group') + for group in groups: + return group + return None + + def get_user_org(self, email: str) -> str: + """ + Функция возвращает организацию, к которой относится пользователь по его email. + + :param email: Email пользователя + :return: Организация пользователя + """ + user = self.admin.users.search(email).values[0] + return user.organization.name if user.organization else None + + def create_admin(self) -> Zenpy: + """ + Функция создает администратора, проверяя наличие вводимых данных в env. + + :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env + :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk + """ + + if self.credentials.get('email') is None: + raise ValueError('access_controller email not in env') + + if self.credentials.get('token') is None and self.credentials.get('password') is None: + raise ValueError('access_controller token or password not in env') + + admin = Zenpy(**self.credentials) + try: + admin.search(self.credentials['email'], type='user') + except APIException: + raise ValueError('invalid access_controller`s login data') + + return admin + + +zenpy = ZendeskAdmin({ + 'subdomain': ACTRL_ZENDESK_SUBDOMAIN, + 'email': ACTRL_API_EMAIL, + 'token': ACTRL_API_TOKEN, + 'password': ACTRL_API_PASSWORD, +})