Merge branch 'develop' into feature/registration_failed/html

This commit is contained in:
Andrew Smirnov 2021-05-13 19:57:44 +03:00
commit c1329e6788
No known key found for this signature in database
GPG Key ID: 0EFE318E5BB2A82A
17 changed files with 1042 additions and 348 deletions

View File

@ -19,10 +19,10 @@ BASE_DIR = Path(__file__).resolve().parent.parent
# See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.getenv('ACTRL_SECRET_KEY','empty')
SECRET_KEY = os.getenv('ACTRL_SECRET_KEY', 'empty')
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = bool(int(os.getenv('ACTRL_DEBUG',1)))
DEBUG = bool(int(os.getenv('ACTRL_DEBUG', 1)))
ALLOWED_HOSTS = [
'127.0.0.1',
@ -57,13 +57,13 @@ MIDDLEWARE = [
ROOT_URLCONF = 'access_controller.urls'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = os.getenv('ACTRL_EMAIL_HOST','smtp.gmail.com')
EMAIL_PORT = int(os.getenv('ACTRL_EMAIL_PORT',587))
EMAIL_USE_TLS = bool(int(os.getenv('ACTRL_EMAIL_TLS',1)))
EMAIL_HOST_USER = os.getenv('ACTRL_EMAIL_HOST_USER','group02django@gmail.com')
EMAIL_HOST_PASSWORD = os.getenv('ACTRL_EMAIL_HOST_PASSWORD','djangogroup02')
DEFAULT_FROM_EMAIL = os.getenv('ACTRL_FROM_EMAIL',EMAIL_HOST_USER)
SERVER_EMAIL = os.getenv('ACTRL_SERVER_EMAIL',EMAIL_HOST_USER)
EMAIL_HOST = os.getenv('ACTRL_EMAIL_HOST', 'smtp.gmail.com')
EMAIL_PORT = int(os.getenv('ACTRL_EMAIL_PORT', 587))
EMAIL_USE_TLS = bool(int(os.getenv('ACTRL_EMAIL_TLS', 1)))
EMAIL_HOST_USER = os.getenv('ACTRL_EMAIL_HOST_USER', 'group02django@gmail.com')
EMAIL_HOST_PASSWORD = os.getenv('ACTRL_EMAIL_HOST_PASSWORD', 'djangogroup02')
DEFAULT_FROM_EMAIL = os.getenv('ACTRL_FROM_EMAIL', EMAIL_HOST_USER)
SERVER_EMAIL = os.getenv('ACTRL_SERVER_EMAIL', EMAIL_HOST_USER)
TEMPLATES = [
{
@ -150,8 +150,8 @@ AUTHENTICATION_BACKENDS = [
ZENDESK_ROLES = {
'engineer': int(os.getenv('ENG_CROLE_ID',0)),
'light_agent': int(os.getenv('LA_CROLE_ID',0)),
'engineer': int(os.getenv('ENG_CROLE_ID', 0)),
'light_agent': int(os.getenv('LA_CROLE_ID', 0)),
}
ZENDESK_GROUPS = {
@ -161,7 +161,7 @@ ZENDESK_GROUPS = {
SOLVED_TICKETS_EMAIL = os.getenv('ST_EMAIL')
ZENDESK_MAX_AGENTS = int(os.getenv('LICENSE_NO',0))
ZENDESK_MAX_AGENTS = int(os.getenv('LICENSE_NO', 0))
REST_FRAMEWORK = {
# Use Django's standard `django.contrib.auth` permissions,
@ -171,7 +171,7 @@ REST_FRAMEWORK = {
]
}
ONE_DAY = int(os.getenv('SHIFTH',0)) # Количество часов в 1 рабочем дне
ONE_DAY = int(os.getenv('SHIFTH', 0)) # Количество часов в 1 рабочем дне
ACTRL_ZENDESK_SUBDOMAIN = os.getenv('ACTRL_ZENDESK_SUBDOMAIN') or os.getenv('ZD_DOMAIN')
ACTRL_API_EMAIL = os.getenv('ACTRL_API_EMAIL') or os.getenv('ACCESS_CONTROLLER_API_EMAIL')

View File

@ -22,6 +22,7 @@ from main.views import registration_failed
from main.views import work_page, work_hand_over, work_become_engineer, work_get_tickets, \
AdminPageView, statistic_page
urlpatterns = [
path('admin/', admin.site.urls, name='admin'),
path('', main_page, name='index'),

57
fixtures/data.json Normal file
View File

@ -0,0 +1,57 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

59
fixtures/profile.json Normal file
View File

@ -0,0 +1,59 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "idar.sokurov.05@mail.ru",
"first_name": "",
"last_name": "",
"email": "idar.sokurov.05@mail.ru",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [
33
]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "krav-88@mail.ru",
"first_name": "",
"last_name": "",
"email": "krav-88@mail.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

View File

@ -0,0 +1,85 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
},
{
"model": "auth.user",
"pk": 3,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "customer@example.com",
"first_name": "",
"last_name": "",
"email": "customer@example.com",
"is_staff": false,
"is_active": true,
"date_joined": "2021-04-15T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 3,
"fields": {
"name": "UserForAccessTest",
"user": 3,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

BIN
layouts/work/workv2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -1,27 +1,28 @@
import logging
from datetime import timedelta, datetime, date
from typing import Optional
from datetime import timedelta
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.shortcuts import redirect
from django.utils import timezone
from zenpy import Zenpy
from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from zenpy.lib.exception import APIException
from zenpy.lib.generator import SearchResultGenerator
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN
from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
from main.requester import TicketListRequester
from main.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int) -> None:
def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None:
"""
Функция меняет роль пользователя.
:param user_profile: Профиль пользователя
:param role: Новая роль
:param who_changes: Пользователь, меняющий роль
:return: Пользователь с обновленной ролью
"""
zendesk = zenpy
@ -29,7 +30,8 @@ def update_role(user_profile: UserProfile, role: int) -> None:
user.custom_role_id = role
user_profile.custom_role_id = role
user_profile.save()
zendesk.admin.users.update(user)
log(user_profile, who_changes.userprofile)
zendesk.update_user(user)
def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
@ -39,7 +41,7 @@ def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
"""
update_role(user_profile, ROLES['engineer'])
update_role(user_profile, ROLES['engineer'], who_changes)
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
@ -62,14 +64,12 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
else:
ticket.assignee = None
ticket.group_id = zenpy.buffer_group_id
if tickets.count:
zenpy.admin.tickets.update(tickets.values)
attempts, success = 5, False
if tickets:
zenpy.admin.tickets.update(tickets)
attempts, success = 20, False
while not success and attempts != 0:
try:
update_role(user_profile, ROLES['light_agent'])
update_role(user_profile, ROLES['light_agent'], who_changes)
success = True
except APIException as e:
attempts -= 1
@ -93,7 +93,14 @@ def get_tickets_list(email):
"""
Функция возвращает список тикетов пользователя Zendesk
"""
return zenpy.admin.search(assignee=email, type='ticket')
return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email))
def get_tickets_list_for_group(group_name):
"""
Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk
"""
return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name))
def update_profile(user_profile: UserProfile):
@ -233,258 +240,6 @@ def last_day_of_month(day: int) -> int:
return next_month - timedelta(days=next_month.day)
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
class DatabaseHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
@ -534,7 +289,7 @@ class CsvFormatter(logging.Formatter):
return msg
def log(user, admin=0):
def log(user, admin=None):
"""
Осуществляет запись логов в базу данных и csv файл
:param admin:

View File

@ -140,3 +140,23 @@ class StatisticForm(forms.Form):
}
),
)
class WorkGetTicketsForm(forms.Form):
"""
Форма получения количества тикетов для страницы work и work_get_tickets.
:param count_tickets: Поле для ввода количества тикетов
:type count_tickets: :class:`django.forms.fields.IntegerField`
"""
count_tickets = forms.IntegerField(
min_value=0,
max_value=100,
required=True,
widget=forms.NumberInput(
attrs={
'class': 'form-control mb-3',
'value': 1
}
),
)

38
main/requester.py Normal file
View File

@ -0,0 +1,38 @@
import requests
from zenpy import TicketApi
from zenpy.lib.api_objects import Ticket
from main.zendesk_admin import zenpy
class TicketListRequester:
def __init__(self):
self.email = zenpy.credentials['email']
if zenpy.credentials.get('token'):
self.token_or_password = zenpy.credentials.get('token')
self.email += '/token'
else:
self.token_or_password = zenpy.credentials.get('password')
self.prefix = f'https://{zenpy.credentials.get("subdomain")}.zendesk.com/api/v2/'
def get_tickets_list_for_user(self, zendesk_user):
url = self.prefix + f'users/{zendesk_user.id}/tickets/assigned'
return self._get_tickets(url)
def get_tickets_list_for_group(self, group):
url = self.prefix + '/tickets'
all_tickets = self._get_tickets(url)
tickets = list()
for ticket in all_tickets:
if (ticket.status != 'solved') and (ticket.group_id == group.id) and (ticket.assignee_id is None):
tickets.append(ticket)
return tickets
def _get_tickets(self, url):
response = requests.get(url, auth=(self.email, self.token_or_password))
tickets = []
if response.status_code != 200:
return None
for ticket in response.json()['tickets']:
tickets.append(Ticket(api=TicketApi, **ticket))
return tickets

261
main/statistic_data.py Normal file
View File

@ -0,0 +1,261 @@
from datetime import date, datetime, timedelta
from typing import Optional
from django.contrib.auth.models import User
from django.utils import timezone
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
from main.extra_func import last_day_of_month, get_timedelta, daterange
from main.models import RoleChangeLogs
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)

View File

@ -3,56 +3,62 @@
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<nav class="navbar navbar-light" style="background-color: #113A60;">
<nav class="navbar navbar-light py-3" style="background-color: #113A60;">
<a class="navbar-brand" href="{% url 'index' %}">
<img src="{% static 'main/img/logo_real.png' %}" width="107" height="22" class="d-inline-block align-top" style="margin-left: 15px" alt="" loading="lazy">
<t style="color:#FFFFFF">Access Controller</t>
<t class="px-2" style="color:#FFFFFF">Access Controller</t>
</a>
{% if request.user.is_authenticated %}
<div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px">
<a {% if profile_lit %}
{% url 'profile' as profile_url %}
<a {% if request.path == profile_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="{% url 'profile' %}">Профиль</a>
href="{{ profile_url }}">Профиль</a>
{% if perms.main.has_control_access %}
<a {% if control_lit %}
{% url 'control' as control_url %}
<a {% if request.path == control_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="{% url 'control' %}">Управление</a>
<a {% if stats_lit %}
href="{{ control_url }}">Управление</a>
{% url 'statistic' as statistic_url %}
<a {% if request.path == statistic_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="{% url 'statistic' %}">Статистика</a>
href="{{ statistic_url }}">Статистика</a>
{% else %}
<a {% if work_lit %}
{% url 'work' request.user.id as work_url %}
<a {% if request.path == work_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="{% url 'work' request.user.id %}">Запрос прав</a>
href="{{ work_url }}">Запрос прав</a>
{% endif %}
<a class="btn btn-secondary" href="{% url 'logout' %}">Выйти</a>
</div>
{% else %}
<div class="btn-group" role="group" aria-label="Basic example" style="margin-right: 9px">
<a {% if login_lit %}
{% url 'login' as login_url %}
<a {% if request.path == login_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="/accounts/login">Войти</a>
<a {% if registration_lit %}
href="{{ login_url }}">Войти</a>
{% url 'registration' as registration_url %}
<a {% if request.path == registration_url %}
class="btn btn-primary"
{% else %}
class="btn btn-secondary"
{% endif %}
href="/accounts/register">Зарегистрироваться</a>
href="{{ registration_url }}">Зарегистрироваться</a>
</div>
{% endif %}
</nav>

View File

@ -17,9 +17,9 @@
<script src="https://unpkg.com/babel-standalone@6/babel.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<script src="{% static 'modules/notifications/dist/notifications.js' %}"></script>
<script src="{% static 'main/js/control.js'%}" type="text/babel"></script>
<script src="{% static 'main/js/notifications.js' %}"></script>
<script src="{% static 'main/js/notifications.js' %}"></script> {# Для #}
<script src="{% static 'modules/notifications/dist/notifications.js' %}"></script> {# Уведомлений #}
{% endblock%}
{% block content %}
<div class="container-md">

View File

@ -66,10 +66,11 @@
<a href="/work/hand_over" class="hand-over-acess-button default-button">Сдать права инженера</a>
</div>
<div class="col-10">
<form method="GET" action="/work/get_tickets">
<input class="form-control mb-3" type="number" min="1" value="1" name="count_tickets">
<button type="submit" class="default-button">Взять тикеты в работу</button>
</form>
<form method="post" action="{% url 'work_get_tickets' %}">
{% csrf_token %}
{{ get_tickets_form.count_tickets }}
<button type="submit" class="default-button">Взять тикеты в работу</button>
</form>
</div>
{% for message in messages %}
<script>create_notification('{{message}}','','{{message.tags}}',2000)</script>

View File

@ -1,2 +1,414 @@
import random
from unittest.mock import patch, Mock
from django.contrib.auth.models import User
from django.core import mail
from django.http import HttpResponseRedirect
from django.template.loader import render_to_string
from django.test import TestCase, Client
from django.urls import reverse, reverse_lazy
from django.utils import translation
import access_controller.settings as sets
from main.zendesk_admin import zenpy
class RegistrationTestCase(TestCase):
fixtures = ['fixtures/data.json']
def setUp(self):
self.email_backend = 'django.core.mail.backends.locmem.EmailBackend'
self.any_zendesk_user_email = 'idar.sokurov.05@mail.ru'
self.zendesk_admin_email = 'idar.sokurov.05@mail.ru'
self.client = Client()
def test_registration_complete_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertRedirects(resp, reverse('password_reset_done'))
def test_registration_fail_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email + 'asd'})
self.assertRedirects(resp, reverse('django_registration_disallowed'))
def test_registration_user_already_exist(self):
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
resp = self.client.post(reverse('registration'), data={'email': '123@test.ru'})
self.assertContains(resp, 'Этот адрес электронной почты уже используется', count=1, status_code=200)
def test_registration_send_email(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
response: HttpResponseRedirect = \
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertEqual(response.status_code, 302)
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].to, [self.any_zendesk_user_email])
# context that the email template was rendered with
email_context = response.context[0].dicts[1]
correct_subject = render_to_string('registration/password_reset_subject.txt', email_context, response.request)
self.assertEqual(mail.outbox[0].subject, correct_subject.strip())
correct_body = render_to_string('registration/password_reset_email.html', email_context, response.request)
self.assertEqual(mail.outbox[0].body, correct_body)
def test_registration_user_creating(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
user = User.objects.get(email=self.any_zendesk_user_email)
zendesk_user = zenpy.get_user(self.any_zendesk_user_email)
self.assertEqual(user.userprofile.name, zendesk_user.name)
def test_permissions_applying(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.zendesk_admin_email})
user = User.objects.get(email=self.zendesk_admin_email)
self.assertEqual(user.userprofile.role, 'admin')
self.assertTrue(user.has_perm('main.has_control_access'))
class MakeEngineerTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.light_agent = '123@test.ru'
self.admin = 'admin@gmail.com'
self.engineer = 'customer@example.com'
self.client = Client()
self.client.force_login(User.objects.get(email=self.light_agent))
self.admin_client = Client()
self.admin_client.force_login(User.objects.get(email=self.admin))
@patch('main.extra_func.zenpy')
def test_redirect(self, ZenpyMock):
user = User.objects.get(email=self.light_agent)
resp = self.client.post(reverse_lazy('work_become_engineer'))
self.assertRedirects(resp, reverse('work', args=[user.id]))
self.assertEqual(resp.status_code, 302)
@patch('main.extra_func.zenpy')
def test_light_agent_make_engineer(self, ZenpyMock):
self.client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_admin_make_engineer(self, ZenpyMock):
self.admin_client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_engineer_make_engineer(self, ZenpyMock):
client = Client()
client.force_login(User.objects.get(email=self.engineer))
client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_one(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={'users': [User.objects.get(email=self.light_agent).userprofile.id], 'engineer': 'engineer'}
)
call_list = ZenpyMock.update_user.call_args_list
mock_object = call_list[0][0][0]
self.assertEqual(len(call_list), 1)
self.assertEqual(mock_object.custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_many(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={
'users': [
User.objects.get(email=self.light_agent).userprofile.id,
User.objects.get(email=self.engineer).userprofile.id,
],
'engineer': 'engineer'
}
)
call_list = ZenpyMock.update_user.call_args_list
mock_objects = list(call_list)
self.assertEqual(len(call_list), 2)
for obj in mock_objects:
self.assertEqual(obj[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
class PasswordResetTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.user = '123@test.ru'
self.email_backend = 'django.core.mail.backends.locmem.EmailBackend'
self.client = Client()
self.client.force_login(User.objects.get(email=self.user))
def test_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': self.user})
self.assertRedirects(resp, reverse('password_reset_done'))
self.assertEqual(resp.status_code, 302)
def test_send_email(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
response: HttpResponseRedirect = \
self.client.post(reverse_lazy('password_reset'), data={'email': self.user})
self.assertEqual(response.status_code, 302)
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].to, [self.user])
# context that the email template was rendered with
email_context = response.context[0].dicts[1]
correct_subject = render_to_string('registration/password_reset_subject.txt', email_context, response.request)
self.assertEqual(mail.outbox[0].subject, correct_subject.strip())
correct_body = render_to_string('registration/password_reset_email.html', email_context, response.request)
self.assertEqual(mail.outbox[0].body, correct_body)
def test_email_invalid(self):
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': 1})
self.assertContains(resp, 'Введите правильный адрес электронной почты.', count=1, status_code=200)
def test_user_does_not_exist(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse_lazy('password_reset'), data={'email': self.user + str(random.random())})
self.assertRedirects(resp, reverse('password_reset_done'))
self.assertEqual(resp.status_code, 302)
self.assertEqual(len(mail.outbox), 0)
class PasswordChangeTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.user = '123@test.ru'
self.client = Client()
self.client.force_login(User.objects.get(email=self.user))
self.set_password()
def set_password(self):
user: User = User.objects.get(email=self.user)
user.set_password('ImpossiblyHardPassword')
user.save()
self.client.force_login(User.objects.get(email=self.user))
def test_change_successful(self):
self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword',
}
)
user = User.objects.get(email=self.user)
self.assertTrue(user.check_password('EasyPassword'))
def test_invalid_old_password(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'EasyPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword',
}
)
self.assertContains(resp, 'Ваш старый пароль введен неправильно', count=1, status_code=200)
def test_different_new_passwords(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'EasyPassword',
'new_password2': 'EasyPassword1',
}
)
self.assertContains(resp, 'Введенные пароли не совпадают', count=1, status_code=200)
def test_invalid_new_password1(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': 'short',
'new_password2': 'short',
}
)
self.assertContains(resp, 'Введённый пароль слишком короткий', count=1, status_code=200)
def test_invalid_new_password2(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': '123123123123123123132123123',
'new_password2': '123123123123123123132123123',
}
)
self.assertContains(resp, 'Введённый пароль состоит только из цифр', count=1, status_code=200)
def test_invalid_new_password3(self):
with translation.override('ru'):
resp = self.client.post(
reverse_lazy('password_change'),
data={
'old_password': 'ImpossiblyHardPassword',
'new_password1': self.user,
'new_password2': self.user,
}
)
self.assertContains(resp, 'Введённый пароль слишком похож на имя пользователя', count=1, status_code=200)
class GetTicketsTestCase(TestCase):
"""
Класс тестов для проверки функции получения тикетов.
"""
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
"""
Предустановленные значения для проведения тестов.
"""
self.light_agent = '123@test.ru'
self.engineer = 'customer@example.com'
self.client = Client()
self.client.force_login(User.objects.get(email=self.engineer))
self.light_agent_client = Client()
self.light_agent_client.force_login(User.objects.get(email=self.light_agent))
@patch('main.views.zenpy.get_user')
@patch('main.extra_func.zenpy')
def test_redirect(self, ZenpyMock, GetUserMock):
"""
Функция проверки переадресации пользователя на рабочую страницу.
"""
GetUserMock.return_value = Mock()
user = User.objects.get(email=self.engineer)
resp = self.client.post(reverse('work_get_tickets'))
self.assertRedirects(resp, reverse('work', args=[user.id]))
self.assertEqual(resp.status_code, 302)
@patch('main.views.zenpy')
@patch('main.views.get_tickets_list_for_group')
def test_take_one_ticket(self, TicketsMock, ZenpyViewsMock):
"""
Функция проверки назначения одного тикета на engineer.
"""
TicketsMock.return_value = [Mock()]
ZenpyViewsMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 1})
tickets = ZenpyViewsMock.update_tickets.call_args
self.assertEqual(tickets[0][0][0].assignee, ZenpyViewsMock.get_user.return_value)
@patch('main.views.get_tickets_list_for_group')
@patch('main.views.zenpy')
def test_take_many_tickets(self, ZenpyMock, TicketsMock):
"""
Функция проверки назначения нескольких тикетов на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 3})
tickets = ZenpyMock.update_tickets.call_args
for ticket in tickets[0][0]:
self.assertEqual(ticket.assignee, ZenpyMock.get_user.return_value)
@patch('main.views.zenpy.get_user')
@patch('main.views.zenpy')
def test_light_agent_take_ticket(self, ZenpyMock, GetUserMock):
"""
Функция проверки попытки назначения тикета на light_agent.
"""
GetUserMock.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['light_agent'])
self.light_agent_client.post(reverse('work_get_tickets'), data={'count_tickets': 3})
tickets = ZenpyMock.update_tickets.call_args
self.assertIsNone(tickets)
@patch('main.views.zenpy')
@patch('main.views.get_tickets_list_for_group')
def test_take_zero_tickets(self, TicketsMock, ZenpyMock):
"""
Функция проверки попытки назначения нуля тикета на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 0})
tickets = ZenpyMock.update_tickets.call_args[0][0]
self.assertListEqual(tickets, [])
@patch('main.views.get_tickets_list_for_group')
@patch('main.views.zenpy')
def test_take_invalid_count_tickets(self, ZenpyMock, TicketsMock, ):
"""
Функция проверки попытки назначения нуля тикетов на engineer.
"""
TicketsMock.return_value = [Mock()] * 3
ZenpyMock.get_user.return_value = Mock(role='agent', custom_role_id=sets.ZENDESK_ROLES['engineer'])
self.client.post(reverse('work_get_tickets'), data={'count_tickets': 'asd'})
tickets = ZenpyMock.update_tickets.call_args
self.assertIsNone(tickets)
class ProfileTestCase(TestCase):
"""
Класс тестов для проверки синхронизации профиля пользователя.
"""
fixtures = ['fixtures/profile.json']
def setUp(self):
"""
Предустановленные значения для проведения тестов.
"""
self.zendesk_agent_email = 'krav-88@mail.ru'
self.zendesk_admin_email = 'idar.sokurov.05@mail.ru'
self.client = Client()
self.client.force_login(User.objects.get(email=self.zendesk_agent_email))
self.admin_client = Client()
self.admin_client.force_login(User.objects.get(email=self.zendesk_admin_email))
def test_correct_username(self):
"""
Функция проверки синхронизации имени пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].name, zenpy.get_user(self.zendesk_agent_email).name)
def test_correct_email(self):
"""
Функция проверки синхронизации почты пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].user.email, zenpy.get_user(self.zendesk_agent_email).email)
def test_correct_role(self):
"""
Функция проверки синхронизации роли пользователя.
"""
resp = self.client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].role, zenpy.get_user(self.zendesk_agent_email).role)
resp = self.admin_client.get(reverse('profile'))
self.assertEqual(resp.context['profile'].role, zenpy.get_user(self.zendesk_admin_email).role)
def test_correct_custom_role_id(self):
"""
Функция проверки синхронизации рабочей роли пользователя.
"""
resp = self.client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_agent_email)
self.assertEqual(resp.context['profile'].custom_role_id, user.custom_role_id if user.custom_role_id else 0)
resp = self.admin_client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_admin_email)
self.assertEqual(resp.context['profile'].custom_role_id, user.custom_role_id if user.custom_role_id else 0)
def test_correct_image(self):
"""
Функция проверки синхронизации изображения пользователя.
"""
resp = self.client.get(reverse('profile'))
user = zenpy.get_user(self.zendesk_agent_email)
self.assertEqual(resp.context['profile'].image, user.photo['content_url'] if user.photo else None)

View File

@ -1,47 +1,34 @@
from smtplib import SMTPException
from typing import Dict, Any
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.contrib.auth.models import User, Permission
from django.contrib.auth.tokens import default_token_generator
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.views import LoginView
from django.contrib.contenttypes.models import ContentType
from django.contrib.messages.views import SuccessMessageMixin
from django.core.handlers.wsgi import WSGIRequest
from django.http import HttpResponseRedirect, HttpResponse
from django.shortcuts import render, redirect
from django.urls import reverse_lazy, reverse
from django.urls import reverse_lazy
from django.views.generic import FormView
from django_registration.views import RegistrationView
# Django REST
from rest_framework import viewsets
from rest_framework.response import Response
from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS
from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS, ZENDESK_GROUPS
from main.extra_func import check_user_exist, update_profile, get_user_organization, \
make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \
StatisticData, log, set_session_params_for_work_page
from main.zendesk_admin import zenpy
from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm
set_session_params_for_work_page, get_tickets_list_for_group
from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm, \
WorkGetTicketsForm
from main.serializers import ProfileSerializer, ZendeskUserSerializer
from main.zendesk_admin import zenpy
from .models import UserProfile
def setup_context(profile_lit: bool = False, control_lit: bool = False, work_lit: bool = False,
registration_lit: bool = False, login_lit: bool = False, stats_lit: bool = False) -> Dict[str, Any]:
context = {
'profile_lit': profile_lit,
'control_lit': control_lit,
'work_lit': work_lit,
'registration_lit': registration_lit,
'login_lit': login_lit,
'stats_lit': stats_lit,
}
return context
from .statistic_data import StatisticData
class CustomRegistrationView(RegistrationView):
@ -57,7 +44,6 @@ class CustomRegistrationView(RegistrationView):
:param is_allowed: Определение зарегистрирован ли пользователь с введенным email на Zendesk и принадлежит ли он к организации SYSTEM
:type is_allowed: :class:`bool`
"""
extra_context = setup_context(registration_lit=True)
form_class = CustomRegistrationForm
template_name = 'django_registration/registration_form.html'
urls = {
@ -105,7 +91,7 @@ class CustomRegistrationView(RegistrationView):
except SMTPException:
self.redirect_url = 'email_sending_error'
else:
raise ValueError('Непредвиденная ошибка')
self.redirect_url = 'email_sending_error'
else:
self.redirect_url = 'invalid_zendesk_email'
@ -149,12 +135,11 @@ def profile_page(request: WSGIRequest) -> HttpResponse:
"""
user_profile: UserProfile = request.user.userprofile
update_profile(user_profile)
context = setup_context(profile_lit=True)
context.update({
context = {
'profile': user_profile,
'pagename': 'Страница профиля',
'ZENDESK_ROLES': ZENDESK_ROLES,
})
}
return render(request, 'pages/profile.html', context)
@ -186,14 +171,14 @@ def work_page(request: WSGIRequest, id: int) -> HttpResponse:
engineers.append(user)
elif user.custom_role_id == ZENDESK_ROLES['light_agent']:
light_agents.append(user)
context = setup_context(work_lit=True)
context.update({
context = {
'engineers': engineers,
'agents': light_agents,
'messages': messages.get_messages(request),
'licences_remaining': max(0, ZENDESK_MAX_AGENTS - len(engineers)),
'pagename': 'Управление правами',
})
'get_tickets_form': WorkGetTicketsForm()
}
return render(request, 'pages/work.html', context)
return redirect("login")
@ -226,17 +211,18 @@ def work_become_engineer(request: WSGIRequest) -> HttpResponseRedirect:
@login_required()
def work_get_tickets(request):
zenpy_user = zenpy.get_user(request.user.email)
if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
tickets = [ticket for ticket in zenpy.admin.search(type="ticket") if
ticket.group.name == 'Сменная группа' and ticket.assignee is None]
count = 0
for i in range(len(tickets)):
if i == int(request.GET.get('count_tickets')):
return set_session_params_for_work_page(request, count)
tickets[i].assignee = zenpy_user
zenpy.admin.tickets.update(tickets[i])
count += 1
return set_session_params_for_work_page(request, count)
if request.method == 'POST':
if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
form = WorkGetTicketsForm(request.POST)
if form.is_valid():
tickets = get_tickets_list_for_group(ZENDESK_GROUPS['buffer'])
assigned_tickets = []
for i in range(min(form.cleaned_data['count_tickets'], len(tickets))):
tickets[i].assignee = zenpy_user
assigned_tickets.append(tickets[i])
zenpy.update_tickets(assigned_tickets)
return set_session_params_for_work_page(request, len(assigned_tickets))
return set_session_params_for_work_page(request, is_confirm=False)
@ -289,7 +275,6 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
"""
for user in users:
make_engineer(user, self.request.user)
log(user, self.request.user.userprofile)
def make_light_agents(self, users):
"""
@ -300,14 +285,12 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
"""
for user in users:
make_light_agent(user, self.request.user)
log(user, self.request.user.userprofile)
class CustomLoginView(LoginView):
"""
Отображение страницы авторизации пользователя
"""
extra_context = setup_context(login_lit=True)
form_class = CustomAuthenticationForm
@ -365,11 +348,10 @@ def statistic_page(request: WSGIRequest) -> HttpResponse:
if not request.user.has_perm("main.has_control_access"):
return redirect('index')
context = setup_context(stats_lit=True)
context.update({
context = {
'pagename': 'страница статистики',
'errors': list(),
})
}
if request.method == "POST":
form = StatisticForm(request.POST)
if form.is_valid():

View File

@ -1,7 +1,7 @@
from typing import Optional, Dict
from typing import Optional, Dict, List
from zenpy import Zenpy
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup, Ticket as ZenpyTicket
from zenpy.lib.exception import APIException
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
@ -22,6 +22,23 @@ class ZendeskAdmin:
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
def update_user(self, user: ZenpyUser) -> bool:
"""
Функция сохраняет изменение пользователя в Zendesk.
:param user: Пользователь с изменёнными данными
"""
self.admin.users.update(user)
def update_tickets(self, tickets: List[ZenpyTicket]):
"""
Функция сохраняет изменение тикетов в Zendesk.
:param tickets: Тикеты с изменёнными данными
"""
if tickets:
self.admin.tickets.update(tickets)
def check_user(self, email: str) -> bool:
"""
Функция осуществляет проверку существования пользователя в Zendesk по email.