This commit is contained in:
Dmitriy Andreev 2021-05-09 15:05:43 +03:00
commit 818a3b7dbf
11 changed files with 646 additions and 292 deletions

View File

@ -19,10 +19,10 @@ BASE_DIR = Path(__file__).resolve().parent.parent
# See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/ # See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret! # SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.getenv('ACTRL_SECRET_KEY','empty') SECRET_KEY = os.getenv('ACTRL_SECRET_KEY', 'empty')
# SECURITY WARNING: don't run with debug turned on in production! # SECURITY WARNING: don't run with debug turned on in production!
DEBUG = bool(int(os.getenv('ACTRL_DEBUG',1))) DEBUG = bool(int(os.getenv('ACTRL_DEBUG', 1)))
ALLOWED_HOSTS = [ ALLOWED_HOSTS = [
'127.0.0.1', '127.0.0.1',
@ -57,13 +57,13 @@ MIDDLEWARE = [
ROOT_URLCONF = 'access_controller.urls' ROOT_URLCONF = 'access_controller.urls'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = os.getenv('ACTRL_EMAIL_HOST','smtp.gmail.com') EMAIL_HOST = os.getenv('ACTRL_EMAIL_HOST', 'smtp.gmail.com')
EMAIL_PORT = int(os.getenv('ACTRL_EMAIL_PORT',587)) EMAIL_PORT = int(os.getenv('ACTRL_EMAIL_PORT', 587))
EMAIL_USE_TLS = bool(int(os.getenv('ACTRL_EMAIL_TLS',1))) EMAIL_USE_TLS = bool(int(os.getenv('ACTRL_EMAIL_TLS', 1)))
EMAIL_HOST_USER = os.getenv('ACTRL_EMAIL_HOST_USER','group02django@gmail.com') EMAIL_HOST_USER = os.getenv('ACTRL_EMAIL_HOST_USER', 'group02django@gmail.com')
EMAIL_HOST_PASSWORD = os.getenv('ACTRL_EMAIL_HOST_PASSWORD','djangogroup02') EMAIL_HOST_PASSWORD = os.getenv('ACTRL_EMAIL_HOST_PASSWORD', 'djangogroup02')
DEFAULT_FROM_EMAIL = os.getenv('ACTRL_FROM_EMAIL',EMAIL_HOST_USER) DEFAULT_FROM_EMAIL = os.getenv('ACTRL_FROM_EMAIL', EMAIL_HOST_USER)
SERVER_EMAIL = os.getenv('ACTRL_SERVER_EMAIL',EMAIL_HOST_USER) SERVER_EMAIL = os.getenv('ACTRL_SERVER_EMAIL', EMAIL_HOST_USER)
TEMPLATES = [ TEMPLATES = [
{ {
@ -150,8 +150,8 @@ AUTHENTICATION_BACKENDS = [
ZENDESK_ROLES = { ZENDESK_ROLES = {
'engineer': int(os.getenv('ENG_CROLE_ID',0)), 'engineer': int(os.getenv('ENG_CROLE_ID', 0)),
'light_agent': int(os.getenv('LA_CROLE_ID',0)), 'light_agent': int(os.getenv('LA_CROLE_ID', 0)),
} }
ZENDESK_GROUPS = { ZENDESK_GROUPS = {
@ -161,7 +161,7 @@ ZENDESK_GROUPS = {
SOLVED_TICKETS_EMAIL = os.getenv('ST_EMAIL') SOLVED_TICKETS_EMAIL = os.getenv('ST_EMAIL')
ZENDESK_MAX_AGENTS = int(os.getenv('LICENSE_NO',0)) ZENDESK_MAX_AGENTS = int(os.getenv('LICENSE_NO', 0))
REST_FRAMEWORK = { REST_FRAMEWORK = {
# Use Django's standard `django.contrib.auth` permissions, # Use Django's standard `django.contrib.auth` permissions,
@ -171,7 +171,7 @@ REST_FRAMEWORK = {
] ]
} }
ONE_DAY = int(os.getenv('SHIFTH',0)) # Количество часов в 1 рабочем дне ONE_DAY = int(os.getenv('SHIFTH', 0)) # Количество часов в 1 рабочем дне
ACTRL_ZENDESK_SUBDOMAIN = os.getenv('ACTRL_ZENDESK_SUBDOMAIN') or os.getenv('ZD_DOMAIN') ACTRL_ZENDESK_SUBDOMAIN = os.getenv('ACTRL_ZENDESK_SUBDOMAIN') or os.getenv('ZD_DOMAIN')
ACTRL_API_EMAIL = os.getenv('ACTRL_API_EMAIL') or os.getenv('ACCESS_CONTROLLER_API_EMAIL') ACTRL_API_EMAIL = os.getenv('ACTRL_API_EMAIL') or os.getenv('ACCESS_CONTROLLER_API_EMAIL')

View File

@ -14,7 +14,6 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
""" """
from django.contrib import admin from django.contrib import admin
from django.contrib.auth import views
from django.urls import path, include from django.urls import path, include
from main.views import main_page, profile_page, CustomRegistrationView, CustomLoginView, registration_error from main.views import main_page, profile_page, CustomRegistrationView, CustomLoginView, registration_error
@ -22,6 +21,7 @@ from main.views import work_page, work_hand_over, work_become_engineer, work_get
AdminPageView, statistic_page AdminPageView, statistic_page
from main.urls import router from main.urls import router
urlpatterns = [ urlpatterns = [
path('admin/', admin.site.urls, name='admin'), path('admin/', admin.site.urls, name='admin'),
path('', main_page, name='index'), path('', main_page, name='index'),

57
fixtures/data.json Normal file
View File

@ -0,0 +1,57 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

View File

@ -0,0 +1,85 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$216000$gHBBCr1jBELf$ZkEDW3IEd8Wij7u8vkv+0Eze32CS01bcaYWhcD9OIC4=",
"last_login": null,
"is_superuser": true,
"username": "admin@gmail.com",
"first_name": "",
"last_name": "",
"email": "admin@gmail.com",
"is_staff": true,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": [33]
}
},
{
"model": "main.userprofile",
"pk": 1,
"fields": {
"name": "ZendeskAdmin",
"user": 1,
"role": "admin"
}
},
{
"model": "auth.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "123@test.ru",
"first_name": "",
"last_name": "",
"email": "123@test.ru",
"is_staff": false,
"is_active": true,
"date_joined": "2021-03-10T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 2,
"fields": {
"name": "UserForAccessTest",
"user": 2,
"role": "agent",
"custom_role_id": "360005209000"
}
},
{
"model": "auth.user",
"pk": 3,
"fields": {
"password": "pbkdf2_sha256$216000$5qLJgrm2Quq9$KDBNNymVZXkUx0HKBPFst2m83kLe0egPBnkW7KnkORU=",
"last_login": null,
"is_superuser": false,
"username": "customer@example.com",
"first_name": "",
"last_name": "",
"email": "customer@example.com",
"is_staff": false,
"is_active": true,
"date_joined": "2021-04-15T16:38:56.303Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "main.userprofile",
"pk": 3,
"fields": {
"name": "UserForAccessTest",
"user": 3,
"role": "agent",
"custom_role_id": "360005209000"
}
}
]

View File

@ -1,6 +1,5 @@
import logging import logging
from datetime import timedelta, datetime, date from datetime import timedelta
from typing import Optional
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
@ -11,17 +10,19 @@ from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from zenpy.lib.generator import SearchResultGenerator from zenpy.lib.generator import SearchResultGenerator
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
from main.requester import TicketListRequester
from main.zendesk_admin import zenpy from main.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int) -> None: def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None:
""" """
Функция меняет роль пользователя. Функция меняет роль пользователя.
:param user_profile: Профиль пользователя :param user_profile: Профиль пользователя
:param role: Новая роль :param role: Новая роль
:param who_changes: Пользователь, меняющий роль
:return: Пользователь с обновленной ролью :return: Пользователь с обновленной ролью
""" """
zendesk = zenpy zendesk = zenpy
@ -29,7 +30,8 @@ def update_role(user_profile: UserProfile, role: int) -> None:
user.custom_role_id = role user.custom_role_id = role
user_profile.custom_role_id = role user_profile.custom_role_id = role
user_profile.save() user_profile.save()
zendesk.admin.users.update(user) log(user_profile, who_changes.userprofile)
zendesk.update_user(user)
def make_engineer(user_profile: UserProfile, who_changes: User) -> None: def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
@ -39,7 +41,7 @@ def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
:param user_profile: Профиль пользователя :param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer" :return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
""" """
update_role(user_profile, ROLES['engineer']) update_role(user_profile, ROLES['engineer'], who_changes)
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None: def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
@ -62,14 +64,12 @@ def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
else: else:
ticket.assignee = None ticket.assignee = None
ticket.group_id = zenpy.buffer_group_id ticket.group_id = zenpy.buffer_group_id
if tickets:
if tickets.count: zenpy.admin.tickets.update(tickets)
zenpy.admin.tickets.update(tickets.values) attempts, success = 20, False
attempts, success = 5, False
while not success and attempts != 0: while not success and attempts != 0:
try: try:
update_role(user_profile, ROLES['light_agent']) update_role(user_profile, ROLES['light_agent'], who_changes)
success = True success = True
except APIException as e: except APIException as e:
attempts -= 1 attempts -= 1
@ -93,7 +93,14 @@ def get_tickets_list(email):
""" """
Функция возвращает список тикетов пользователя Zendesk Функция возвращает список тикетов пользователя Zendesk
""" """
return zenpy.admin.search(assignee=email, type='ticket') return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email))
def get_tickets_list_for_group(group_name):
"""
Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk
"""
return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name))
def update_profile(user_profile: UserProfile): def update_profile(user_profile: UserProfile):
@ -233,258 +240,6 @@ def last_day_of_month(day: int) -> int:
return next_month - timedelta(days=next_month.day) return next_month - timedelta(days=next_month.day)
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
class DatabaseHandler(logging.Handler): class DatabaseHandler(logging.Handler):
def __init__(self): def __init__(self):
logging.Handler.__init__(self) logging.Handler.__init__(self)
@ -534,7 +289,7 @@ class CsvFormatter(logging.Formatter):
return msg return msg
def log(user, admin=0): def log(user, admin=None):
""" """
Осуществляет запись логов в базу данных и csv файл Осуществляет запись логов в базу данных и csv файл
:param admin: :param admin:

38
main/requester.py Normal file
View File

@ -0,0 +1,38 @@
import requests
from zenpy import TicketApi
from zenpy.lib.api_objects import Ticket
from main.zendesk_admin import zenpy
class TicketListRequester:
def __init__(self):
self.email = zenpy.credentials['email']
if zenpy.credentials.get('token'):
self.token_or_password = zenpy.credentials.get('token')
self.email += '/token'
else:
self.token_or_password = zenpy.credentials.get('password')
self.prefix = f'https://{zenpy.credentials.get("subdomain")}.zendesk.com/api/v2/'
def get_tickets_list_for_user(self, zendesk_user):
url = self.prefix + f'users/{zendesk_user.id}/tickets/assigned'
return self._get_tickets(url)
def get_tickets_list_for_group(self, group):
url = self.prefix + '/tickets'
all_tickets = self._get_tickets(url)
tickets = list()
for ticket in all_tickets:
if (ticket.status != 'solved') and (ticket.group_id == group.id) and (ticket.assignee_id is None):
tickets.append(ticket)
return tickets
def _get_tickets(self, url):
response = requests.get(url, auth=(self.email, self.token_or_password))
tickets = []
if response.status_code != 200:
return None
for ticket in response.json()['tickets']:
tickets.append(Ticket(api=TicketApi, **ticket))
return tickets

261
main/statistic_data.py Normal file
View File

@ -0,0 +1,261 @@
from datetime import date, datetime, timedelta
from typing import Optional
from django.contrib.auth.models import User
from django.utils import timezone
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
from main.extra_func import last_day_of_month, get_timedelta, daterange
from main.models import RoleChangeLogs
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)

View File

@ -17,9 +17,9 @@
<script src="https://unpkg.com/babel-standalone@6/babel.min.js"></script> <script src="https://unpkg.com/babel-standalone@6/babel.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script> <script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<script src="{% static 'modules/notifications/dist/notifications.js' %}"></script>
<script src="{% static 'main/js/control.js'%}" type="text/babel"></script> <script src="{% static 'main/js/control.js'%}" type="text/babel"></script>
<script src="{% static 'main/js/notifications.js' %}"></script> <script src="{% static 'main/js/notifications.js' %}"></script> {# Для #}
<script src="{% static 'modules/notifications/dist/notifications.js' %}"></script> {# Уведомлений #}
{% endblock%} {% endblock%}
{% block content %} {% block content %}
<div class="container-md"> <div class="container-md">

View File

@ -1,2 +1,150 @@
from unittest.mock import patch
from urllib.parse import urlparse
from django.contrib.auth.models import User
from django.core import mail
from django.test import TestCase, Client from django.test import TestCase, Client
from django.urls import reverse, reverse_lazy
from django.utils import translation
import access_controller.settings as sets import access_controller.settings as sets
from main.zendesk_admin import zenpy
class RegistrationTestCase(TestCase):
fixtures = ['fixtures/data.json']
def setUp(self):
self.email_backend = 'django.core.mail.backends.locmem.EmailBackend'
self.any_zendesk_user_email = 'idar.sokurov.05@mail.ru'
self.zendesk_admin_email = 'idar.sokurov.05@mail.ru'
self.client = Client()
def test_registration_complete_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertRedirects(resp, reverse('password_reset_done'))
def test_registration_fail_redirect(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
resp = self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email + 'asd'})
self.assertRedirects(resp, reverse('django_registration_disallowed'))
def test_registration_user_already_exist(self):
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
resp = self.client.post(reverse('registration'), data={'email': '123@test.ru'})
self.assertContains(resp, 'Этот адрес электронной почты уже используется', count=1, status_code=200)
def test_registration_email_sending(self):
# TODO: Найти способ лучше проверять сообщения
email_template = [
'',
'Вы получили это письмо, потому что вы (или кто-то другой) запросили восстановление пароля '
'от учётной записи на сайте testserver, которая связана с этим адресом электронной почты.',
'',
'Пожалуйста, перейдите на эту страницу и введите новый пароль:',
'',
'url',
'',
f'Ваше имя пользователя (на случай, если вы его забыли): {self.any_zendesk_user_email}',
'',
'Спасибо, что используете наш сайт!',
'',
'Команда сайта testserver',
'',
'',
'',
]
with self.settings(EMAIL_BACKEND=self.email_backend) and translation.override('ru'):
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].to, [self.zendesk_admin_email])
self.assertEqual(mail.outbox[0].from_email, sets.DEFAULT_FROM_EMAIL)
message = mail.outbox[0].body.split('\n')
for i in range(len(message)):
if email_template[i] != 'url':
self.assertEqual(message[i], email_template[i])
else:
self.assertTrue(urlparse(message[i]).scheme)
def test_registration_user_creating(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.any_zendesk_user_email})
user = User.objects.get(email=self.any_zendesk_user_email)
zendesk_user = zenpy.get_user(self.any_zendesk_user_email)
self.assertEqual(user.userprofile.name, zendesk_user.name)
def test_permissions_applying(self):
with self.settings(EMAIL_BACKEND=self.email_backend):
self.client.post(reverse('registration'), data={'email': self.zendesk_admin_email})
user = User.objects.get(email=self.zendesk_admin_email)
self.assertEqual(user.userprofile.role, 'admin')
self.assertTrue(user.has_perm('main.has_control_access'))
class MakeEngineerTestCase(TestCase):
fixtures = ['fixtures/test_make_engineer.json']
def setUp(self):
self.light_agent = '123@test.ru'
self.admin = 'admin@gmail.com'
self.engineer = 'customer@example.com'
self.client = Client()
self.client.force_login(User.objects.get(email=self.light_agent))
self.admin_client = Client()
self.admin_client.force_login(User.objects.get(email=self.admin))
@patch('main.extra_func.zenpy')
def test_redirect(self, ZenpyMock):
user = User.objects.get(email=self.light_agent)
resp = self.client.post(reverse_lazy('work_become_engineer'))
self.assertRedirects(resp, reverse('work', args=[user.id]))
self.assertEqual(resp.status_code, 302)
@patch('main.extra_func.zenpy')
def test_light_agent_make_engineer(self, ZenpyMock):
self.client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_admin_make_engineer(self, ZenpyMock):
self.admin_client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_engineer_make_engineer(self, ZenpyMock):
client = Client()
client.force_login(User.objects.get(email=self.engineer))
client.post(reverse_lazy('work_become_engineer'))
self.assertEqual(ZenpyMock.update_user.call_args[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_one(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={'users': [User.objects.get(email=self.light_agent).userprofile.id], 'engineer': 'engineer'}
)
call_list = ZenpyMock.update_user.call_args_list
mock_object = call_list[0][0][0]
self.assertEqual(len(call_list), 1)
self.assertEqual(mock_object.custom_role_id, sets.ZENDESK_ROLES['engineer'])
@patch('main.extra_func.zenpy')
def test_control_page_make_many(self, ZenpyMock):
self.admin_client.post(
reverse_lazy('control'),
data={
'users': [
User.objects.get(email=self.light_agent).userprofile.id,
User.objects.get(email=self.engineer).userprofile.id,
],
'engineer': 'engineer'
}
)
call_list = ZenpyMock.update_user.call_args_list
mock_objects = list(call_list)
self.assertEqual(len(call_list), 2)
for obj in mock_objects:
self.assertEqual(obj[0][0].custom_role_id, sets.ZENDESK_ROLES['engineer'])

View File

@ -20,11 +20,13 @@ from django_registration.views import RegistrationView
from rest_framework import viewsets from rest_framework import viewsets
from rest_framework.response import Response from rest_framework.response import Response
from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS from access_controller.settings import DEFAULT_FROM_EMAIL, ZENDESK_ROLES, ZENDESK_MAX_AGENTS, ZENDESK_GROUPS
from main.extra_func import check_user_exist, update_profile, get_user_organization, \ from main.extra_func import check_user_exist, update_profile, get_user_organization, \
make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \ make_engineer, make_light_agent, get_users_list, update_users_in_model, count_users, \
StatisticData, log, set_session_params_for_work_page log, set_session_params_for_work_page, get_tickets_list_for_group
from .statistic_data import StatisticData
from main.zendesk_admin import zenpy from main.zendesk_admin import zenpy
from main.requester import TicketListRequester
from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm from main.forms import AdminPageUsers, CustomRegistrationForm, CustomAuthenticationForm, StatisticForm
from main.serializers import ProfileSerializer, ZendeskUserSerializer from main.serializers import ProfileSerializer, ZendeskUserSerializer
from .models import UserProfile from .models import UserProfile
@ -105,7 +107,7 @@ class CustomRegistrationView(RegistrationView):
except SMTPException: except SMTPException:
self.redirect_url = 'email_sending_error' self.redirect_url = 'email_sending_error'
else: else:
raise ValueError('Непредвиденная ошибка') self.redirect_url = 'email_sending_error'
else: else:
self.redirect_url = 'invalid_zendesk_email' self.redirect_url = 'invalid_zendesk_email'
@ -227,15 +229,19 @@ def work_become_engineer(request: WSGIRequest) -> HttpResponseRedirect:
def work_get_tickets(request): def work_get_tickets(request):
zenpy_user = zenpy.get_user(request.user.email) zenpy_user = zenpy.get_user(request.user.email)
if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']: if zenpy_user.role == 'admin' or zenpy_user.custom_role_id == ZENDESK_ROLES['engineer']:
tickets = [ticket for ticket in zenpy.admin.search(type="ticket") if tickets = get_tickets_list_for_group(ZENDESK_GROUPS['buffer'])
ticket.group.name == 'Сменная группа' and ticket.assignee is None] assigned_tickets = []
count = 0 count = 0
for i in range(len(tickets)): for i in range(len(tickets)):
if i == int(request.GET.get('count_tickets')): if i == int(request.GET.get('count_tickets')):
if assigned_tickets:
zenpy.admin.tickets.update(assigned_tickets)
return set_session_params_for_work_page(request, count) return set_session_params_for_work_page(request, count)
tickets[i].assignee = zenpy_user tickets[i].assignee = zenpy_user
zenpy.admin.tickets.update(tickets[i]) assigned_tickets.append(tickets[i])
count += 1 count += 1
if assigned_tickets:
zenpy.admin.tickets.update(assigned_tickets)
return set_session_params_for_work_page(request, count) return set_session_params_for_work_page(request, count)
return set_session_params_for_work_page(request, is_confirm=False) return set_session_params_for_work_page(request, is_confirm=False)
@ -289,7 +295,6 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
""" """
for user in users: for user in users:
make_engineer(user, self.request.user) make_engineer(user, self.request.user)
log(user, self.request.user.userprofile)
def make_light_agents(self, users): def make_light_agents(self, users):
""" """
@ -300,7 +305,6 @@ class AdminPageView(LoginRequiredMixin, PermissionRequiredMixin, SuccessMessageM
""" """
for user in users: for user in users:
make_light_agent(user, self.request.user) make_light_agent(user, self.request.user)
log(user, self.request.user.userprofile)
class CustomLoginView(LoginView): class CustomLoginView(LoginView):

View File

@ -1,9 +1,7 @@
from typing import Optional, Dict from typing import Optional, Dict
from zenpy import Zenpy from zenpy import Zenpy
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup
from zenpy.lib.exception import APIException from zenpy.lib.exception import APIException
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \ from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL
@ -22,6 +20,14 @@ class ZendeskAdmin:
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
def update_user(self, user: ZenpyUser) -> bool:
"""
Функция сохраняет изменение пользователя в Zendesk.
:param user: Пользователь с изменёнными данными
"""
self.admin.users.update(user)
def check_user(self, email: str) -> bool: def check_user(self, email: str) -> bool:
""" """
Функция осуществляет проверку существования пользователя в Zendesk по email. Функция осуществляет проверку существования пользователя в Zendesk по email.