Merge branch 'develop' into feature/react_test

# Conflicts:
#	main/templates/pages/adm_ruleset.html
#	static/main/js/control.js
This commit is contained in:
Andrew Smirnov 2021-05-13 20:02:27 +03:00
commit 8b70827f07
No known key found for this signature in database
GPG Key ID: 0EFE318E5BB2A82A
16 changed files with 1029 additions and 334 deletions

View File

@ -14,7 +14,6 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
""" """
from django.contrib import admin from django.contrib import admin
from django.contrib.auth import views
from django.urls import path, include from django.urls import path, include
from main.views import main_page, profile_page, CustomRegistrationView, CustomLoginView, registration_error from main.views import main_page, profile_page, CustomRegistrationView, CustomLoginView, registration_error
@ -22,6 +21,7 @@ from main.views import work_page, work_hand_over, work_become_engineer, work_get
AdminPageView, statistic_page AdminPageView, statistic_page
from main.urls import router from main.urls import router
urlpatterns = [ urlpatterns = [
path('admin/', admin.site.urls, name='admin'), path('admin/', admin.site.urls, name='admin'),
path('', main_page, name='index'), path('', main_page, name='index'),

57
fixtures/data.json Normal file
View File

@ -0,0 +1,57 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

59
fixtures/profile.json Normal file
View File

@ -0,0 +1,59 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "idar.sokurov.05@mail.ru",
"first_name": "",
"last_name": "",
"email": "idar.sokurov.05@mail.ru",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [
33
]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "krav-88@mail.ru",
"first_name": "",
"last_name": "",
"email": "krav-88@mail.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

View File

@ -0,0 +1,85 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
},
{
"model": "auth.user",
"pk": 3,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "customer@example.com",
"first_name": "",
"last_name": "",
"email": "customer@example.com",
"is_staff": false,
"is_active": true,
"date_joined": "2021-04-15T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 3,
"fields": {
"name": "UserForAccessTest",
"user": 3,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

BIN
layouts/work/workv2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -1,26 +1,28 @@
import logging import logging
from datetime import timedelta, datetime, date from datetime import timedelta
from typing import Optional
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.shortcuts import redirect from django.shortcuts import redirect
from django.utils import timezone from django.utils import timezone
from zenpy import Zenpy from zenpy import Zenpy
from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from zenpy.lib.exception import APIException
from zenpy.lib.generator import SearchResultGenerator
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
from main.requester import TicketListRequester
from main.zendesk_admin import zenpy from main.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int) -> None: def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None:
""" """
Функция меняет роль пользователя. Функция меняет роль пользователя.
:param user_profile: Профиль пользователя :param user_profile: Профиль пользователя
:param role: Новая роль :param role: Новая роль
:param who_changes: Пользователь, меняющий роль
:return: Пользователь с обновленной ролью :return: Пользователь с обновленной ролью
""" """
zendesk = zenpy zendesk = zenpy
@ -28,7 +30,8 @@ def update_role(user_profile: UserProfile, role: int) -> None:
user.custom_role_id = role user.custom_role_id = role
user_profile.custom_role_id = role user_profile.custom_role_id = role
user_profile.save() user_profile.save()
zendesk.admin.users.update(user) log(user_profile, who_changes.userprofile)
zendesk.update_user(user)
def make_engineer(user_profile: UserProfile, who_changes: User) -> None: def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
@ -38,7 +41,7 @@ def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
:param user_profile: Профиль пользователя :param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
""" """
update_role(user_profile, ROLES['engineer']) update_role(user_profile, ROLES['engineer'], who_changes)
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
@ -48,7 +51,7 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
:param user_profile: Профиль пользователя :param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent" :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
""" """
tickets = get_tickets_list(user_profile.user.email) tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email)
ticket: ZenpyTicket ticket: ZenpyTicket
for ticket in tickets: for ticket in tickets:
UnassignedTicket.objects.create( UnassignedTicket.objects.create(
@ -61,13 +64,12 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
else: else:
ticket.assignee = None ticket.assignee = None
ticket.group_id = zenpy.buffer_group_id ticket.group_id = zenpy.buffer_group_id
if tickets:
zenpy.admin.tickets.update(tickets.values) zenpy.admin.tickets.update(tickets)
attempts, success = 20, False
attempts, success = 5, False
while not success and attempts != 0: while not success and attempts != 0:
try: try:
update_role(user_profile, ROLES['light_agent']) update_role(user_profile, ROLES['light_agent'], who_changes)
success = True success = True
except APIException as e: except APIException as e:
attempts -= 1 attempts -= 1
@ -91,7 +93,14 @@ def get_tickets_list(email):
""" """
Функция возвращает список тикетов пользователя Zendesk Функция возвращает список тикетов пользователя Zendesk
""" """
return zenpy.admin.search(assignee=email, type='ticket') return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email))
def get_tickets_list_for_group(group_name):
"""
Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk
"""
return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name))
def update_profile(user_profile: UserProfile): def update_profile(user_profile: UserProfile):
@ -231,258 +240,6 @@ def last_day_of_month(day: int) -> int:
return next_month - timedelta(days=next_month.day) return next_month - timedelta(days=next_month.day)
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
class DatabaseHandler(logging.Handler): class DatabaseHandler(logging.Handler):
def __init__(self): def __init__(self):
logging.Handler.__init__(self) logging.Handler.__init__(self)
@ -532,7 +289,7 @@ class CsvFormatter(logging.Formatter):
return msg return msg
def log(user, admin=0): def log(user, admin=None):
""" """
Осуществляет запись логов в базу данных и csv файл Осуществляет запись логов в базу данных и csv файл
:param admin: :param admin:

View File

@ -140,3 +140,23 @@ class StatisticForm(forms.Form):
} }
), ),
) )
class WorkGetTicketsForm(forms.Form):
"""
Форма получения количества тикетов для страницы work и work_get_tickets.
:param count_tickets: Поле для ввода количества тикетов
:type count_tickets: :class:`django.forms.fields.IntegerField`
"""
count_tickets = forms.IntegerField(
min_value=0,
max_value=100,
required=True,
widget=forms.NumberInput(
attrs={
'class': 'form-control mb-3',
'value': 1
}
),
)

38
main/requester.py Normal file
View File

@ -0,0 +1,38 @@
import requests
from zenpy import TicketApi
from zenpy.lib.api_objects import Ticket
from main.zendesk_admin import zenpy
class TicketListRequester:
def __init__(self):
self.email = zenpy.credentials['email']
if zenpy.credentials.get('token'):
self.token_or_password = zenpy.credentials.get('token')
self.email += '/token'
else:
self.token_or_password = zenpy.credentials.get('password')
self.prefix = f'https://{zenpy.credentials.get("subdomain")}.zendesk.com/api/v2/'
def get_tickets_list_for_user(self, zendesk_user):
url = self.prefix + f'users/{zendesk_user.id}/tickets/assigned'
return self._get_tickets(url)
def get_tickets_list_for_group(self, group):
url = self.prefix + '/tickets'
all_tickets = self._get_tickets(url)
tickets = list()
for ticket in all_tickets:
if (ticket.status != 'solved') and (ticket.group_id == group.id) and (ticket.assignee_id is None):
tickets.append(ticket)
return tickets
def _get_tickets(self, url):
response = requests.get(url, auth=(self.email, self.token_or_password))
tickets = []
if response.status_code != 200:
return None
for ticket in response.json()['tickets']:
tickets.append(Ticket(api=TicketApi, **ticket))
return tickets

261
main/statistic_data.py Normal file
View File

@ -0,0 +1,261 @@
from datetime import date, datetime, timedelta
from typing import Optional
from django.contrib.auth.models import User
from django.utils import timezone
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
from main.extra_func import last_day_of_month, get_timedelta, daterange
from main.models import RoleChangeLogs
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)

View File

@ -3,56 +3,62 @@
<meta charset="utf-8"> <meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<nav class="navbar navbar-light" style="background-color: #113A60;"> <nav class="navbar navbar-light py-3" style="background-color: #113A60;">
<a class="navbar-brand" href="{% url 'index' %}"> <a class="navbar-brand" href="{% url 'index' %}">
<img src="{% static 'main/img/logo_real.png' %}" width="107" height="22" class="d-inline-block align-top" style="margin-left: 15px" alt="" loading="lazy"> <img src="{% static 'main/img/logo_real.png' %}" width="107" height="22" class="d-inline-block align-top" style="margin-left: 15px" alt="" loading="lazy">
<t style="color:#FFFFFF">Access Controller</t> <t class="px-2" style="color:#FFFFFF">Access Controller</t>
</a> </a>
{% if request.user.is_authenticated %} {% if request.user.is_authenticated %}
<div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px"> <div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px">
<a {% if profile_lit %} {% url 'profile' as profile_url %}
<a {% if request.path == profile_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="{% url 'profile' %}">Профиль</a> href="{{ profile_url }}">Профиль</a>
{% if perms.main.has_control_access %} {% if perms.main.has_control_access %}
<a {% if control_lit %} {% url 'control' as control_url %}
<a {% if request.path == control_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="{% url 'control' %}">Управление</a> href="{{ control_url }}">Управление</a>
<a {% if stats_lit %} {% url 'statistic' as statistic_url %}
<a {% if request.path == statistic_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="{% url 'statistic' %}">Статистика</a> href="{{ statistic_url }}">Статистика</a>
{% else %} {% else %}
<a {% if work_lit %} {% url 'work' request.user.id as work_url %}
<a {% if request.path == work_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="{% url 'work' request.user.id %}">Запрос прав</a> href="{{ work_url }}">Запрос прав</a>
{% endif %} {% endif %}
<a class="btn btn-secondary" href="{% url 'logout' %}">Выйти</a> <a class="btn btn-secondary" href="{% url 'logout' %}">Выйти</a>
</div> </div>
{% else %} {% else %}
<div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px"> <div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px">
<a {% if login_lit %} {% url 'login' as login_url %}
<a {% if request.path == login_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="/accounts/login">Войти</a> href="{{ login_url }}">Войти</a>
<a {% if registration_lit %} {% url 'registration' as registration_url %}
<a {% if request.path == registration_url %}
class="btn btn-primary" class="btn btn-primary"
{% else %} {% else %}
class="btn btn-secondary" class="btn btn-secondary"
{% endif %} {% endif %}
href="/accounts/register">Зарегистрироваться</a> href="{{ registration_url }}">Зарегистрироваться</a>
</div> </div>
{% endif %} {% endif %}
</nav> </nav>

View File

@ -66,10 +66,11 @@
<a href="/work/hand_over" class="hand-over-acess-button default-button">Сдать права инженера</a> <a href="/work/hand_over" class="hand-over-acess-button default-button">Сдать права инженера</a>
</div> </div>
<div class="col-10"> <div class="col-10">
<form method="GET" action="/work/get_tickets"> <form method="post" action="{% url 'work_get_tickets' %}">
<input class="form-control mb-3" type="number" min="1" value="1" name="count_tickets"> {% csrf_token %}
<button type="submit" class="default-button">Взять тикеты в работу</button> {{ get_tickets_form.count_tickets }}
</form> <button type="submit" class="default-button">Взять тикеты в работу</button>
</form>
</div> </div>
{% for message in messages %} {% for message in messages %}
<script>create_notification('{{message}}','','{{message.tags}}',2000)</script> <script>create_notification('{{message}}','','{{message.tags}}',2000)</script>

View File

@ -1,2 +1,414 @@
import random
from unittest.mock import patch, Mock
from django.contrib.auth.models import User
from django.core import mail
from django.http import HttpResponseRedirect
from django.template.loader import render_to_string
from django.test import TestCase, Client from django.test import TestCase, Client
from django.urls import reverse, reverse_lazy
from django.utils import translation
import access_controller.settings as sets import access_controller.settings as sets
from main.zendesk_admin import zenpy
class RegistrationTestCase(TestCase):
fixtures = ['fixtures/data.json']
def setUp(self):
self.email_backend = 'django.core.mail.backends.locmem.EmailBackend'
self.any_zendesk_user_email = 'idar.sokurov.05@mail.ru'
self.zendesk_admin_email = 'idar.sokurov.05@mail.ru'
self.client = Client()
def test_registration_complete_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertRedirects(resp, reverse('password_reset_done'))
def test_registration_fail_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email + 'asd'})
self.assertRedirects(resp, reverse('django_registration_disallowed'))
def test_registration_user_already_exist(self):
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
resp = self.client.post(reverse('registration'), data={'email': '123@test.ru'})
self.assertContains(resp, 'Этот адрес электронной почты уже используется', count=1, status_code=200)
def test_registration_send_email(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
response: HttpResponseRedirect = \
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertEqual(response.status_code, 302)
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].to, [self.any_zendesk_user_email])
# context that the email template was rendered with
email_context = response.context[0].dicts[1]
correct_subject = render_to_string('registration/password_reset_subject.txt', email_context, response.request)
self.assertEqual(mail.outbox[0].subject, correct_subject.strip())
correct_body = render_to_string('registration/password_reset_email.html', email_context, response.request)
self.assertEqual(mail.outbox[0].body, correct_body)
def test_registration_user_creating(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
user = User.objects.get(email=self.any_zendesk_user_email)
zendesk_user = zenpy.get_user(self.any_zendesk_user_email)
self.assertEqual(user.userprofile.name, zendesk_user.name)
def test_permissions_applying(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.zendesk_admin_email})
user = User.objects.get(email=self.zendesk_admin_email)
self.assertEqual(user.userprofile.role, 'admin')
self.assertTrue(user.has_perm('main.has_control_access'))
class MakeEngineerTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.light_agent = '123@test.ru'
self.admin = 'admin@gmail.com'
self.engineer = 'customer@example.com'
self.client = Client()
self.client.force_login(User.objects.get(email=self.light_agent))
self.admin_client = Client()
self.admin_client.force_login(User.objects.get(email=self.admin))
@patch('main.extra_func.zenpy')
def test_redirect(self, ZenpyMock):
user = User.objects.get(email=self.light_agent)
resp = self.client.post(reverse_lazy('work_become_engineer'))
self.assertRedirects(resp, reverse('work', args=[user.id]))
self.assertEqual(resp.status_code, 302)
@patch('main.extra_func.zenpy')
def test_light_agent_make_engineer(self, ZenpyMock):
self.client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_admin_make_engineer(self, ZenpyMock):
self.admin_client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_engineer_make_engineer(self, ZenpyMock):
client = Client()
client.force_login(User.objects.get(email=self.engineer))
client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_one(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={'users': [User.objects.get(email=self.light_agent).userprofile.id], 'engineer': 'engineer'}
)
call_list = ZenpyMock.update_user.call_args_list
mock_object = call_list[0][0][0]
self.assertEqual(len(call_list), 1)
self.assertEqual(mock_object.custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_many(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={
'users': [
User.objects.get(email=self.light_agent).userprofile.id,
User.objects.get(email=self.engineer).userprofile.id,
],
'engineer': 'engineer'
}
)
call_list = ZenpyMock.update_user.call_args_list
mock_objects = list(call_list)
self.assertEqual(len(call_list), 2)
for obj in mock_objects:
self.assertEqual(obj[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
class PasswordResetTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.user = '123@test.ru'
self.email_backend = 'django.core.mail.backends.locmem.EmailBackend'
self.client = Client()
self.client.force_login(User.objects.get(email=self.user))
def test_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': self.user})
self.assertRedirects(resp, reverse('password_reset_done'))
self.assertEqual(resp.status_code, 302)
def test_send_email(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
response: HttpResponseRedirect = \
self.client.post(reverse_lazy('password_reset'), data={'email': self.user})
self.assertEqual(response.status_code, 302)
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].to, [self.user])
# context that the email template was rendered with
email_context = response.context[0].dicts[1]
correct_subject = render_to_string('registration/password_reset_subject.txt', email_context, response.request)
self.assertEqual(mail.outbox[0].subject, correct_subject.strip())
correct_body = render_to_string('registration/password_reset_email.html', email_context, response.request)
self.assertEqual(mail.outbox[0].body, correct_body)
def test_email_invalid(self):
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': 1})
self.assertContains(resp, 'Введите правильный адрес электронной почты.', count=1, status_code=200)
def test_user_does_not_exist(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': self.user + str(random.random())})
self.assertRedirects(resp, reverse('password_reset_done'))
self.assertEqual(resp.status_code, 302)
self.assertEqual(len(mail.outbox), 0)
class PasswordChangeTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.user = '123@test.ru'
self.client = Client()
self.client.force_login(User.objects.get(email=self.user))
self.set_password()
def set_password(self):
user: User = User.objects.get(email=self.user)
user.set_password('ImpossiblyHardPassword')
user.save()
self.client.force_login(User.objects.get(email=self.user))
def test_change_successful(self):
self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword',
}
)
user = User.objects.get(email=self.user)
self.assertTrue(user.check_password('EasyPassword'))
def test_invalid_old_password(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'EasyPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword',
}
)
self.assertContains(resp, 'Ваш старый пароль введен неправильно', count=1, status_code=200)
def test_different_new_passwords(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword1',
}
)
self.assertContains(resp, 'Введенные пароли не совпадают', count=1, status_code=200)
def test_invalid_new_password1(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'short',
'new_password2': 'short',
}
)
self.assertContains(resp, 'Введённый пароль слишком короткий', count=1, status_code=200)
def test_invalid_new_password2(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': '123123123123123123132123123',
'new_password2': '123123123123123123132123123',
}
)
self.assertContains(resp, 'Введённый пароль состоит только из цифр', count=1, status_code=200)
def test_invalid_new_password3(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': self.user,
'new_password2': self.user,
}
)
self.assertContains(resp, 'Введённый пароль слишком похож на имя пользователя', count=1, status_code=200)
class GetTicketsTestCase(TestCase):
"""
Класс тестов для проверки функции получения тикетов.
"""
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
"""
Предустановленные значения для проведения тестов.
"""
self.light_agent = '123@test.ru'
self.engineer = 'customer@example.com'
self.client = Client()
self.client.force_login(User.objects.get(email=self.engineer))
self.light_agent_client = Client()
self.light_agent_client.force_login(User.objects.get(email=self.light_agent))
@patch('main.views.zenpy.get_user')
@patch('main.extra_func.zenpy')
def test_redirect(self, ZenpyMock, GetUserMock):
"""
Функция проверки переадресации пользователя на рабочую страницу.
"""
GetUserMock.return_value = Mock()
user = User.objects.get(email=self.engineer)
resp = self.client.post(reverse('work_get_tickets'))
self.assertRedirects(resp, reverse('work', args=[user.id]))
self.assertEqual(resp.status_code, 302)
@patch('main.views.zenpy')
@patch('main.views.get_tickets_list_for_group')
def test_take_one_ticket(self, TicketsMock, ZenpyViewsMock):
"""
Функция проверки назначения одного тикета на engineer.
"""
TicketsMock.return_value = [Mock()]
ZenpyViewsMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 1})
tickets = ZenpyViewsMock.update_tickets.call_args
self.assertEqual(tickets[0][0][0].assignee, ZenpyViewsMock.get_user.return_value)
@patch('main.views.get_tickets_list_for_group')
@patch('main.views.zenpy')
def test_take_many_tickets(self, ZenpyMock, TicketsMock):
"""
Функция проверки назначения нескольких тикетов на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 3})
tickets = ZenpyMock.update_tickets.call_args
for ticket in tickets[0][0]:
self.assertEqual(ticket.assignee, ZenpyMock.get_user.return_value)
@patch('main.views.zenpy.get_user')
@patch('main.views.zenpy')
def test_light_agent_take_ticket(self, ZenpyMock, GetUserMock):
"""
Функция проверки попытки назначения тикета на light_agent.
"""
GetUserMock.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['light_agent'])
self.light_agent_client.post(reverse('work_get_tickets'), data={'count_tickets': 3})
tickets = ZenpyMock.update_tickets.call_args
self.assertIsNone(tickets)
@patch('main.views.zenpy')
@patch('main.views.get_tickets_list_for_group')
def test_take_zero_tickets(self, TicketsMock, ZenpyMock):
"""
Функция проверки попытки назначения нуля тикета на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 0})
tickets = ZenpyMock.update_tickets.call_args[0][0]
self.assertListEqual(tickets, [])
@patch('main.views.get_tickets_list_for_group')
@patch('main.views.zenpy')
def test_take_invalid_count_tickets(self, ZenpyMock, TicketsMock, ):
"""
Функция проверки попытки назначения нуля тикетов на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 'asd'})
tickets = ZenpyMock.update_tickets.call_args
self.assertIsNone(tickets)
class ProfileTestCase(TestCase):
"""
Класс тестов для проверки синхронизации профиля пользователя.
"""
fixtures = ['fixtures/profile.json']
def setUp(self):
"""
Предустановленные значения для проведения тестов.
"""
self.zendesk_agent_email = 'krav-88@mail.ru'
self.zendesk_admin_email = 'idar.sokurov.05@mail.ru'
self.client = Client()
self.client.force_login(User.objects.get(email=self.zendesk_agent_email))
self.admin_client = Client()
self.admin_client.force_login(User.objects.get(email=self.zendesk_admin_email))
def test_correct_username(self):
"""
Функция проверки синхронизации имени пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].name, zenpy.get_user(self.zendesk_agent_email).name)
def test_correct_email(self):
"""
Функция проверки синхронизации почты пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].user.email, zenpy.get_user(self.zendesk_agent_email).email)
def test_correct_role(self):
"""
Функция проверки синхронизации роли пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].role, zenpy.get_user(self.zendesk_agent_email).role)
resp = self.admin_client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].role, zenpy.get_user(self.zendesk_admin_email).role)
def test_correct_custom_role_id(self):
"""
Функция проверки синхронизации рабочей роли пользователя.
"""
resp = self.client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_agent_email)
self.assertEqual(resp.context['profile'].custom_role_id, user.custom_role_id if user.custom_role_id else 0)
resp = self.admin_client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_admin_email)
self.assertEqual(resp.context['profile'].custom_role_id, user.custom_role_id if user.custom_role_id else 0)
def test_correct_image(self):
"""
Функция проверки синхронизации изображения пользователя.
"""
resp = self.client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_agent_email)
self.assertEqual(resp.context['profile'].image, user.photo['content_url'] if user.photo else None)

View File

@ -1,47 +1,34 @@
from smtplib import SMTPException from smtplib import SMTPException
from typing import Dict, Any
from django.contrib import messages from django.contrib import messages
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.contrib.auth.models import User, Permission from django.contrib.auth.models import User, Permission
from django.contrib.auth.tokens import default_token_generator from django.contrib.auth.tokens import default_token_generator
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.views import LoginView from django.contrib.auth.views import LoginView
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.contrib.messages.views import SuccessMessageMixin from django.contrib.messages.views import SuccessMessageMixin
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.http import HttpResponseRedirect, HttpResponse from django.http import HttpResponseRedirect, HttpResponse
from django.shortcuts import render, redirect from django.shortcuts import render, redirect
from django.urls import reverse_lazy, reverse from django.urls import reverse_lazy
from django.views.generic import FormView from django.views.generic import FormView
from django_registration.views import RegistrationView from django_registration.views import RegistrationView
# Django REST # Django REST
from rest_framework import viewsets from rest_framework import viewsets
from rest_framework.response import Response from rest_framework.response import Response
from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS, ZENDESK_GROUPS
from main.extra_func import check_user_exist, update_profile, get_user_organization, \ from main.extra_func import check_user_exist, update_profile, get_user_organization, \
make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \ make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \
StatisticData, log, set_session_params_for_work_page set_session_params_for_work_page, get_tickets_list_for_group
from main.zendesk_admin import zenpy from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm, \
from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm WorkGetTicketsForm
from main.serializers import ProfileSerializer, ZendeskUserSerializer from main.serializers import ProfileSerializer, ZendeskUserSerializer
from main.zendesk_admin import zenpy
from .models import UserProfile from .models import UserProfile
from .statistic_data import StatisticData
def setup_context(profile_lit: bool = False, control_lit: bool = False, work_lit: bool = False,
registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False) -> Dict[str, Any]:
context = {
'profile_lit': profile_lit,
'control_lit': control_lit,
'work_lit': work_lit,
'registration_lit': registration_lit,
'login_lit': login_lit,
'stats_lit': stats_lit,
}
return context
class CustomRegistrationView(RegistrationView): class CustomRegistrationView(RegistrationView):
@ -57,7 +44,6 @@ class CustomRegistrationView(RegistrationView):
:param is_allowed: Определение зарегистрирован ли пользователь с введенным email на Zendesk и принадлежит ли он к организации SYSTEM :param is_allowed: Определение зарегистрирован ли пользователь с введенным email на Zendesk и принадлежит ли он к организации SYSTEM
:type is_allowed: :class:`bool` :type is_allowed: :class:`bool`
""" """
extra_context = setup_context(registration_lit=True)
form_class = CustomRegistrationForm form_class = CustomRegistrationForm
template_name = 'django_registration/registration_form.html' template_name = 'django_registration/registration_form.html'
urls = { urls = {
@ -105,7 +91,7 @@ class CustomRegistrationView(RegistrationView):
except SMTPException: except SMTPException:
self.redirect_url = 'email_sending_error' self.redirect_url = 'email_sending_error'
else: else:
raise ValueError('Непредвиденная ошибка') self.redirect_url = 'email_sending_error'
else: else:
self.redirect_url = 'invalid_zendesk_email' self.redirect_url = 'invalid_zendesk_email'
@ -149,12 +135,11 @@ def profile_page(request: WSGIRequest) -> HttpResponse:
""" """
user_profile: UserProfile = request.user.userprofile user_profile: UserProfile = request.user.userprofile
update_profile(user_profile) update_profile(user_profile)
context = setup_context(profile_lit=True) context = {
context.update({
'profile': user_profile, 'profile': user_profile,
'pagename': 'Страница профиля', 'pagename': 'Страница профиля',
'ZENDESK_ROLES': ZENDESK_ROLES, 'ZENDESK_ROLES': ZENDESK_ROLES,
}) }
return render(request, 'pages/profile.html', context) return render(request, 'pages/profile.html', context)
@ -186,14 +171,14 @@ def work_page(request: WSGIRequest, id: int) -> HttpResponse:
engineers.append(user) engineers.append(user)
elif user.custom_role_id == ZENDESK_ROLES['light_agent']: elif user.custom_role_id == ZENDESK_ROLES['light_agent']:
light_agents.append(user) light_agents.append(user)
context = setup_context(work_lit=True) context = {
context.update({
'engineers': engineers, 'engineers': engineers,
'agents': light_agents, 'agents': light_agents,
'messages': messages.get_messages(request), 'messages': messages.get_messages(request),
'licences_remaining': max(0, ZENDESK_MAX_AGENTS - len(engineers)), 'licences_remaining': max(0, ZENDESK_MAX_AGENTS - len(engineers)),
'pagename': 'Управление правами', 'pagename': 'Управление правами',
}) 'get_tickets_form': WorkGetTicketsForm()
}
return render(request, 'pages/work.html', context) return render(request, 'pages/work.html', context)
return redirect("login") return redirect("login")
@ -226,17 +211,18 @@ def work_become_engineer(request: WSGIRequest) -> HttpResponseRedirect:
@login_required() @login_required()
def work_get_tickets(request): def work_get_tickets(request):
zenpy_user = zenpy.get_user(request.user.email) zenpy_user = zenpy.get_user(request.user.email)
if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
tickets = [ticket for ticket in zenpy.admin.search(type="ticket") if if request.method == 'POST':
ticket.group.name == 'Сменная группа' and ticket.assignee is None] if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
count = 0 form = WorkGetTicketsForm(request.POST)
for i in range(len(tickets)): if form.is_valid():
if i == int(request.GET.get('count_tickets')): tickets = get_tickets_list_for_group(ZENDESK_GROUPS['buffer'])
return set_session_params_for_work_page(request, count) assigned_tickets = []
tickets[i].assignee = zenpy_user for i in range(min(form.cleaned_data['count_tickets'], len(tickets))):
zenpy.admin.tickets.update(tickets[i]) tickets[i].assignee = zenpy_user
count += 1 assigned_tickets.append(tickets[i])
return set_session_params_for_work_page(request, count) zenpy.update_tickets(assigned_tickets)
return set_session_params_for_work_page(request, len(assigned_tickets))
return set_session_params_for_work_page(request, is_confirm=False) return set_session_params_for_work_page(request, is_confirm=False)
@ -289,7 +275,6 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
""" """
for user in users: for user in users:
make_engineer(user, self.request.user) make_engineer(user, self.request.user)
log(user, self.request.user.userprofile)
def make_light_agents(self, users): def make_light_agents(self, users):
""" """
@ -300,14 +285,12 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
""" """
for user in users: for user in users:
make_light_agent(user, self.request.user) make_light_agent(user, self.request.user)
log(user, self.request.user.userprofile)
class CustomLoginView(LoginView): class CustomLoginView(LoginView):
""" """
Отображение страницы авторизации пользователя Отображение страницы авторизации пользователя
""" """
extra_context = setup_context(login_lit=True)
form_class = CustomAuthenticationForm form_class = CustomAuthenticationForm
@ -365,11 +348,10 @@ def statistic_page(request: WSGIRequest) -> HttpResponse:
if not request.user.has_perm("main.has_control_access"): if not request.user.has_perm("main.has_control_access"):
return redirect('index') return redirect('index')
context = setup_context(stats_lit=True) context = {
context.update({
'pagename': 'страница статистики', 'pagename': 'страница статистики',
'errors': list(), 'errors': list(),
}) }
if request.method == "POST": if request.method == "POST":
form = StatisticForm(request.POST) form = StatisticForm(request.POST)
if form.is_valid(): if form.is_valid():

View File

@ -1,7 +1,7 @@
from typing import Optional, Dict from typing import Optional, Dict, List
from zenpy import Zenpy from zenpy import Zenpy
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup, Ticket as ZenpyTicket
from zenpy.lib.exception import APIException from zenpy.lib.exception import APIException
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \ from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
@ -22,6 +22,23 @@ class ZendeskAdmin:
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
def update_user(self, user: ZenpyUser) -> bool:
"""
Функция сохраняет изменение пользователя в Zendesk.
:param user: Пользователь с изменёнными данными
"""
self.admin.users.update(user)
def update_tickets(self, tickets: List[ZenpyTicket]):
"""
Функция сохраняет изменение тикетов в Zendesk.
:param tickets: Тикеты с изменёнными данными
"""
if tickets:
self.admin.tickets.update(tickets)
def check_user(self, email: str) -> bool: def check_user(self, email: str) -> bool:
""" """
Функция осуществляет проверку существования пользователя в Zendesk по email. Функция осуществляет проверку существования пользователя в Zendesk по email.

View File