access-controller/main/extra_func.py

563 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from datetime import timedelta, datetime, date
from typing import Optional
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.shortcuts import redirect
from django.utils import timezone
from zenpy import Zenpy
from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
from main.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int) -> None:
"""
Функция меняет роль пользователя.
:param user_profile: Профиль пользователя
:param role: Новая роль
:return: Пользователь с обновленной ролью
"""
zendesk = zenpy
user = zendesk.get_user(user_profile.user.email)
user.custom_role_id = role
user_profile.custom_role_id = role
user_profile.save()
zendesk.admin.users.update(user)
def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
"""
Функция устанавливает пользователю роль инженера.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
"""
update_role(user_profile, ROLES['engineer'])
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
"""
Функция устанавливает пользователю роль легкого агента.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
"""
tickets = get_tickets_list(user_profile.user.email)
ticket: ZenpyTicket
for ticket in tickets:
UnassignedTicket.objects.create(
assignee=user_profile.user,
ticket_id=ticket.id,
status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED
)
if ticket.status == 'solved':
ticket.assignee_id = zenpy.solved_tickets_user_id
else:
ticket.assignee = None
ticket.group_id = zenpy.buffer_group_id
zenpy.admin.tickets.update(tickets.values)
attempts, success = 5, False
while not success and attempts != 0:
try:
update_role(user_profile, ROLES['light_agent'])
success = True
except APIException as e:
attempts -= 1
if attempts == 0:
raise e
def get_users_list() -> list:
"""
Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM.
"""
zendesk = zenpy
# У пользователей должна быть организация SYSTEM
org = next(zendesk.admin.search(type='organization', name='SYSTEM'))
users = zendesk.admin.organizations.users(org)
return users
def get_tickets_list(email):
"""
Функция возвращает список тикетов пользователя Zendesk
"""
return zenpy.admin.search(assignee=email, type='ticket')
def update_profile(user_profile: UserProfile):
"""
Функция обновляет профиль пользователя в соответствии с текущим в Zendesk.
:param user_profile: Профиль пользователя
:return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя
"""
user = zenpy.get_user(user_profile.user.email)
user_profile.name = user.name
user_profile.role = user.role
user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0
user_profile.image = user.photo['content_url'] if user.photo else None
user_profile.save()
def check_user_exist(email: str) -> bool:
"""
Функция проверяет, существует ли пользователь.
:param email: Email пользователя
:return: Зарегистрирован ли пользователь в Zendesk
"""
return zenpy.check_user(email)
def get_user_organization(email: str) -> str:
"""
Функция возвращает организацию пользователя.
:param email: Email пользователя
:return: Организация пользователя
"""
return zenpy.get_user_org(email)
def check_user_auth(email: str, password: str) -> bool:
"""
Функция проверяет, верны ли входные данные.
:raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован
"""
creds = {
'email': email,
'password': password,
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
}
try:
user = Zenpy(**creds)
user.search(email, type='user')
except APIException:
return False
return True
def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser):
"""
Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk.
:param profile: Профиль пользователя
:param zendesk_user: Данные пользователя в Zendesk
:return: Обновленный профиль пользователя
"""
profile.name = zendesk_user.name
profile.role = zendesk_user.role
profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None
if zendesk_user.custom_role_id is not None:
profile.custom_role_id = int(zendesk_user.custom_role_id)
profile.save()
def count_users(users) -> tuple:
"""
Функция подсчета количества сотрудников с ролями engineer и light_agent
"""
engineers, light_agents = 0, 0
for user in users:
if user.custom_role_id == ROLES['engineer']:
engineers += 1
elif user.custom_role_id == ROLES['light_agent']:
light_agents += 1
return engineers, light_agents
def update_users_in_model():
"""
Обновляет пользователей в модели UserProfile по списку пользователей в организации
"""
users = get_users_list()
for user in users:
try:
profile = User.objects.get(email=user.email).userprofile
update_user_in_model(profile, user)
except ObjectDoesNotExist:
pass
return users
def daterange(start_date, end_date) -> list:
"""
Функция возвращает список дней с start_date по end_date, исключая правую границу.
:param start_date: Начальная дата
:param end_date: Конечная дата
:return: Список дней, не включая конечную дату
"""
dates = []
for n in range(int((end_date - start_date).days)):
dates.append(start_date + timedelta(n))
return dates
def get_timedelta(log, time=None) -> timedelta:
"""
Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента,
который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён.
:param log: Лог
:param time: Время
:return: Сколько времени прошло от начала суток до события
"""
if time is None:
time = log.change_time.time()
time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
return time
def last_day_of_month(day: int) -> int:
"""
Функция возвращает последний день текущего месяца.
:param day: Текущий день
:return: Последний день месяца
"""
next_month = day.replace(day=28) + timedelta(days=4)
return next_month - timedelta(days=next_month.day)
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
class DatabaseHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
def emit(self, record):
database = RoleChangeLogs()
users = record.msg
if users[1]:
user = users[0]
admin = users[1]
elif not users[1]:
user = users[0]
admin = users[0]
database.name = user.name
database.user = user.user
database.changed_by = admin.user
if user.custom_role_id == ROLES['engineer']:
database.old_role = ROLES['light_agent']
elif user.custom_role_id == ROLES['light_agent']:
database.old_role = ROLES['engineer']
database.new_role = user.custom_role_id
database.save()
class CsvFormatter(logging.Formatter):
def __init__(self):
logging.Formatter.__init__(self)
def format(self, record):
users = record.msg
if users[1]:
user = users[0]
admin = users[1]
elif not users[1]:
user = users[0]
admin = users[0]
msg = ''
msg += user.name
if user.custom_role_id == ROLES['engineer']:
msg += ',engineer,'
elif user.custom_role_id == ROLES['light_agent']:
msg += ',light_agent,'
time = str(timezone.now().today())
msg += time[:16]
msg += ','
msg += admin.name
return msg
def log(user, admin=0):
"""
Осуществляет запись логов в базу данных и csv файл
:param admin:
:param user:
:return:
"""
users = [user, admin]
logger = logging.getLogger('MY_LOGGER')
if not logger.hasHandlers():
dbhandler = DatabaseHandler()
csvformatter = CsvFormatter()
csvhandler = logging.FileHandler('logs/logs.csv', "a")
csvhandler.setFormatter(csvformatter)
logger.addHandler(dbhandler)
logger.addHandler(csvhandler)
logger.setLevel('INFO')
logger.info(users)
def set_session_params_for_work_page(request, count=None, is_confirm=True):
"""
Функция для страницы получения прав
Устанавливает данные сессии о успешности запроса и количестве назначенных тикетов
"""
request.session['is_confirm'] = is_confirm
request.session['count_tickets'] = count
return redirect('work', request.user.id)