Merge branch 'feature/refactor' into 'develop'

Zendesk API Query Optimizations and Refactor, Small Fixes

Closes #58

See merge request 2020-2021/online/s101/group-02/access_controller!62
This commit is contained in:
Кравченко Артем 2021-04-22 07:05:23 +00:00
commit 1bb951dcda
3 changed files with 141 additions and 175 deletions

View File

@ -1,5 +1,6 @@
import logging
from datetime import timedelta, datetime, date
from typing import Optional
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
@ -7,138 +8,11 @@ from django.shortcuts import redirect
from django.utils import timezone
from zenpy import Zenpy
from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL, \
ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, ACTRL_ZENDESK_SUBDOMAIN
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
class ZendeskAdmin:
"""
Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора.
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
:type credentials: :class:`dict`
:param email: Email администратора, указанный в env
:type email: :class:`str`
:param token: Токен администратора (формируется в Zendesk, указывается в env)
:type token: :class:`str`
:param password: Пароль администратора, указанный в env
:type password: :class:`str`
"""
credentials: dict = {
'subdomain': ACTRL_ZENDESK_SUBDOMAIN
}
email: str = ACTRL_API_EMAIL
token: str = ACTRL_API_TOKEN
password: str = ACTRL_API_PASSWORD
def __init__(self):
self.create_admin()
def check_user(self, email: str) -> bool:
"""
Функция осуществляет проверку существования пользователя в Zendesk по email.
:param email: Email пользователя
:return: Является ли зарегистрированным
"""
return True if self.admin.search(email, type='user') else False
def get_user_name(self, email: str) -> str:
"""
Функция **get_user_name** возвращает имя пользователя по его email
"""
user = self.admin.users.search(email).values[0]
return user.name
def get_user_role(self, email: str) -> str:
"""
Функция возвращает роль пользователя по его email.
:param email: Email пользователя
:return: Роль пользователя
"""
user = self.admin.users.search(email).values[0]
return user.role
def get_user_id(self, email: str) -> str:
"""
Функция возвращает id пользователя по его email
:param email: Email пользователя
:return: ID пользователя
"""
user = self.admin.users.search(email).values[0]
return user.id
def get_user_image(self, email: str) -> str:
"""
Функция возвращает url-ссылку на аватар пользователя по его email.
:param email: Email пользователя
:return: Аватар пользователя
"""
user = self.admin.users.search(email).values[0]
return user.photo['content_url'] if user.photo else None
def get_user(self, email: str):
"""
Функция возвращает пользователя (объект) по его email.
:param email: Email пользователя
:return: Объект пользователя, найденного в БД
"""
return self.admin.users.search(email).values[0]
def get_group(self, name: str) -> str:
"""
Функция возвращает группу по названию
:param name: Имя пользователя
:return: Группы пользователя (в случае отсутствия None)
"""
groups = self.admin.search(name, type='group')
for group in groups:
return group
return None
def get_user_org(self, email: str) -> str:
"""
Функция возвращает организацию, к которой относится пользователь по его email.
:param email: Email пользователя
:return: Организация пользователя
"""
user = self.admin.users.search(email).values[0]
return user.organization.name if user.organization else None
def create_admin(self) -> None:
"""
Функция создает администратора, проверяя наличие вводимых данных в env.
:param credentials: В список полномочий администратора вносятся email, token, password из env
:type credentials: :class:`dict`
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
"""
if self.email is None:
raise ValueError('access_controller email not in env')
self.credentials['email'] = self.email
if self.token:
self.credentials['token'] = self.token
elif self.password:
self.credentials['password'] = self.password
else:
raise ValueError('access_controller token or password not in env')
self.admin = Zenpy(**self.credentials)
try:
self.admin.search(self.email, type='user')
except APIException:
raise ValueError('invalid access_controller`s login data')
from main.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int) -> None:
@ -149,7 +23,7 @@ def update_role(user_profile: UserProfile, role: int) -> None:
:param role: Новая роль
:return: Пользователь с обновленной ролью
"""
zendesk = ZendeskAdmin()
zendesk = zenpy
user = zendesk.get_user(user_profile.user.email)
user.custom_role_id = role
user_profile.custom_role_id = role
@ -175,6 +49,7 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
"""
tickets = get_tickets_list(user_profile.user.email)
ticket: ZenpyTicket
for ticket in tickets:
UnassignedTicket.objects.create(
assignee=user_profile.user,
@ -182,19 +57,29 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED
)
if ticket.status == 'solved':
ticket.assignee = ZendeskAdmin().get_user(SOLVED_TICKETS_EMAIL)
ticket.assignee_id = zenpy.solved_tickets_user_id
else:
ticket.assignee = None
ticket.group = ZendeskAdmin().get_group(ZENDESK_GROUPS['buffer'])
ZendeskAdmin().admin.tickets.update(ticket)
update_role(user_profile, ROLES['light_agent'])
ticket.group_id = zenpy.buffer_group_id
zenpy.admin.tickets.update(tickets.values)
attempts, success = 5, False
while not success and attempts != 0:
try:
update_role(user_profile, ROLES['light_agent'])
success = True
except APIException as e:
attempts -= 1
if attempts == 0:
raise e
def get_users_list() -> list:
"""
Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM.
"""
zendesk = ZendeskAdmin()
zendesk = zenpy
# У пользователей должна быть организация SYSTEM
org = next(zendesk.admin.search(type='organization', name='SYSTEM'))
@ -206,17 +91,17 @@ def get_tickets_list(email):
"""
Функция возвращает список тикетов пользователя Zendesk
"""
return ZendeskAdmin().admin.search(assignee=email, type='ticket')
return zenpy.admin.search(assignee=email, type='ticket')
def update_profile(user_profile: UserProfile) -> UserProfile:
def update_profile(user_profile: UserProfile):
"""
Функция обновляет профиль пользователя в соответствии с текущим в Zendesk.
:param user_profile: Профиль пользователя
:return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя
"""
user = ZendeskAdmin().get_user(user_profile.user.email)
user = zenpy.get_user(user_profile.user.email)
user_profile.name = user.name
user_profile.role = user.role
user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0
@ -231,7 +116,7 @@ def check_user_exist(email: str) -> bool:
:param email: Email пользователя
:return: Зарегистрирован ли пользователь в Zendesk
"""
return ZendeskAdmin().check_user(email)
return zenpy.check_user(email)
def get_user_organization(email: str) -> str:
@ -241,7 +126,7 @@ def get_user_organization(email: str) -> str:
:param email: Email пользователя
:return: Организация пользователя
"""
return ZendeskAdmin().get_user_org(email)
return zenpy.get_user_org(email)
def check_user_auth(email: str, password: str) -> bool:
@ -263,7 +148,7 @@ def check_user_auth(email: str, password: str) -> bool:
return True
def update_user_in_model(profile: UserProfile, zendesk_user: User):
def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser):
"""
Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk.
@ -435,7 +320,7 @@ class StatisticData:
self.display = display_format
return True
def get_data(self) -> list:
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""

View File

@ -1,4 +1,5 @@
from smtplib import SMTPException
from typing import Dict, Any
from django.contrib import messages
from django.contrib.auth.decorators import login_required
@ -11,27 +12,26 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.messages.views import SuccessMessageMixin
from django.core.handlers.wsgi import WSGIRequest
from django.http import HttpResponseRedirect, HttpResponse
from django.shortcuts import render, redirect, get_list_or_404
from django.shortcuts import render, redirect
from django.urls import reverse_lazy, reverse
from django.views.generic import FormView
from django_registration.views import RegistrationView
# Django REST
from rest_framework import viewsets
from rest_framework.response import Response
from zenpy.lib.api_objects import User as ZenpyUser
from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS
from main.extra_func import check_user_exist, update_profile, get_user_organization, \
make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \
StatisticData, log, ZendeskAdmin, set_session_params_for_work_page
StatisticData, log, set_session_params_for_work_page
from main.zendesk_admin import zenpy
from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm
from main.serializers import ProfileSerializer, ZendeskUserSerializer
from .models import UserProfile
def setup_context(profile_lit: bool = False, control_lit: bool = False, work_lit: bool = False,
registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False):
print(profile_lit, control_lit, work_lit, registration_lit, login_lit)
registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False) -> Dict[str, Any]:
context = {
'profile_lit': profile_lit,
@ -158,18 +158,6 @@ def profile_page(request: WSGIRequest) -> HttpResponse:
return render(request, 'pages/profile.html', context)
def auth_user(request: WSGIRequest) -> ZenpyUser:
"""
Функция возвращает профиль пользователя на Zendesk.
:param request: email, subdomain и token пользователя
:return: объект пользователя Zendesk
"""
admin = ZendeskAdmin().admin
zenpy_user: ZenpyUser = admin.users.search(request.user.email).values[0]
return zenpy_user, admin
@login_required()
def work_page(request: WSGIRequest, id: int) -> HttpResponse:
"""
@ -213,8 +201,7 @@ def work_page(request: WSGIRequest, id: int) -> HttpResponse:
@login_required()
def work_hand_over(request: WSGIRequest):
"""
Функция позволяет текущему пользователю (login_required) сдать права, а именно сменить в Zendesk роль с "engineer" на "light agent"
и установить роль "agent" в БД. Действия выполняются, если исходная роль пользователя "engineer".
Функция позволяет текущему пользователю сдать права, а именно сменить в Zendesk роль с "engineer" на "light_agent"
:param request: данные текущего пользователя (login_required)
:return: перезагрузка текущей страницы после выполнения смены роли
@ -226,27 +213,28 @@ def work_hand_over(request: WSGIRequest):
@login_required()
def work_become_engineer(request: WSGIRequest) -> HttpResponseRedirect:
"""
Функция меняет роль пользователя в Zendesk на "engineer" и присваивает роль "agent" в БД (в случае, если исходная роль пользователя была "light_agent").
Функция позволяет текущему пользователю получить права, а именно сменить в Zendesk роль с "light_agent" на "engineer"
:param request: данные текущего пользователя (login_required)
:return: перезагрузка текущей страницы после выполнения смены роли
"""
make_engineer(request.user.userprofile, request.user)
return set_session_params_for_work_page(request)
@login_required()
def work_get_tickets(request):
zenpy_user, admin = auth_user(request)
zenpy_user = zenpy.get_user(request.user.email)
if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
tickets = [ticket for ticket in admin.search(type="ticket") if
tickets = [ticket for ticket in zenpy.admin.search(type="ticket") if
ticket.group.name == 'Сменная группа' and ticket.assignee is None]
count = 0
for i in range(len(tickets)):
if i == int(request.GET.get('count_tickets')):
return set_session_params_for_work_page(request, count)
tickets[i].assignee = zenpy_user
admin.tickets.update(tickets[i])
zenpy.admin.tickets.update(tickets[i])
count += 1
return set_session_params_for_work_page(request, count)
return set_session_params_for_work_page(request, is_confirm=False)
@ -387,16 +375,16 @@ def statistic_page(request: WSGIRequest) -> HttpResponse:
if form.is_valid():
start_date, end_date = form.cleaned_data['range_start'], form.cleaned_data['range_end']
interval, show = form.cleaned_data['interval'], form.cleaned_data['display_format']
Data = StatisticData(start_date, end_date, form.cleaned_data['email'])
Data.set_display(show)
Data.set_interval(interval)
stats = Data.get_statistic()
if Data.errors:
context['errors'] = Data.errors
if Data.warnings:
context['warnings'] = Data.warnings
data = StatisticData(start_date, end_date, form.cleaned_data['email'])
data.set_display(show)
data.set_interval(interval)
stats = data.get_statistic()
if data.errors:
context['errors'] = data.errors
if data.warnings:
context['warnings'] = data.warnings
context['log_stats'] = stats if not context['errors'] else None
if request.method == 'GET':
elif request.method == 'GET':
form = StatisticForm()
context['form'] = form
return render(request, 'pages/statistic.html', context)

93
main/zendesk_admin.py Normal file
View File

@ -0,0 +1,93 @@
from typing import Optional, Dict
from zenpy import Zenpy
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup
from zenpy.lib.exception import APIException
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL
class ZendeskAdmin:
"""
Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора.
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
:type credentials: :class:`Dict[str, str]`
"""
def __init__(self, credentials: Dict[str, str]):
self.credentials = credentials
self.admin = self.create_admin()
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
def check_user(self, email: str) -> bool:
"""
Функция осуществляет проверку существования пользователя в Zendesk по email.
:param email: Email пользователя
:return: Является ли зарегистрированным
"""
return True if self.admin.search(email, type='user') else False
def get_user(self, email: str) -> ZenpyUser:
"""
Функция возвращает пользователя (объект) по его email.
:param email: Email пользователя
:return: Объект пользователя, найденного в БД
"""
return self.admin.users.search(email).values[0]
def get_group(self, name: str) -> Optional[ZenpyGroup]:
"""
Функция возвращает группу по названию
:param name: Имя пользователя
:return: Группы пользователя (в случае отсутствия None)
"""
groups = self.admin.search(name, type='group')
for group in groups:
return group
return None
def get_user_org(self, email: str) -> str:
"""
Функция возвращает организацию, к которой относится пользователь по его email.
:param email: Email пользователя
:return: Организация пользователя
"""
user = self.admin.users.search(email).values[0]
return user.organization.name if user.organization else None
def create_admin(self) -> Zenpy:
"""
Функция создает администратора, проверяя наличие вводимых данных в env.
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
"""
if self.credentials.get('email') is None:
raise ValueError('access_controller email not in env')
if self.credentials.get('token') is None and self.credentials.get('password') is None:
raise ValueError('access_controller token or password not in env')
admin = Zenpy(**self.credentials)
try:
admin.search(self.credentials['email'], type='user')
except APIException:
raise ValueError('invalid access_controller`s login data')
return admin
zenpy = ZendeskAdmin({
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
'email': ACTRL_API_EMAIL,
'token': ACTRL_API_TOKEN,
'password': ACTRL_API_PASSWORD,
})