access-controller/main/extra_func.py
2021-03-14 12:42:18 +03:00

467 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from datetime import timedelta, datetime, date
from django.contrib.auth.models import User
from zenpy import Zenpy
from zenpy.lib.exception import APIException
from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY
from main.models import UserProfile, RoleChangeLogs
class ZendeskAdmin:
"""
Класс **ZendeskAdmin** существует, чтобы в каждой фунциии отдельно не проверять аккаунт администратора.
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
:type credentials: :class:`dict`
:param email: Email администратора, указанный в env
:type email: :class:`str`
:param token: Токен администратора (формируется в Zendesk, указывается в env)
:type token: :class:`str`
:param password: Пароль администратора, указанный в env
:type password: :class:`str`
"""
credentials: dict = {
'subdomain': 'ngenix1612197338'
}
email: str = os.getenv('ACCESS_CONTROLLER_API_EMAIL')
token: str = os.getenv('ACCESS_CONTROLLER_API_TOKEN')
password: str = os.getenv('ACCESS_CONTROLLER_API_PASSWORD')
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.create_admin()
def check_user(self, email: str) -> bool:
"""
Функция осуществляет проверку существования пользователя в Zendesk по email.
:param email: Email пользователя
:return: Является ли зарегистрированным
"""
return True if self.admin.search(email, type='user') else False
def get_user_name(self, email: str) -> str:
"""
Функция возвращает имя пользователя по его email.
:param email: Email пользователя
:return: Имя пользователя
"""
user = self.admin.users.search(email).values[0]
return user.name
def get_user_role(self, email: str) -> str:
"""
Функция возвращает роль пользователя по его email.
:param email: Email пользователя
:return: Роль пользователя
"""
user = self.admin.users.search(email).values[0]
return user.role
def get_user_id(self, email: str) -> str:
"""
Функция возвращает id пользователя по его email
:param email: Email пользователя
:return: ID пользователя
"""
user = self.admin.users.search(email).values[0]
return user.id
def get_user_image(self, email: str) -> str:
"""
Функция возвращает url-ссылку на аватар пользователя по его email.
:param email: Email пользователя
:return: Аватар пользователя
"""
user = self.admin.users.search(email).values[0]
return user.photo['content_url'] if user.photo else None
def get_user(self, email: str):
"""
Функция возвращает пользователя (объект) по его email.
:param email: Email пользователя
:return: Объект пользователя, найденного в БД
"""
return self.admin.users.search(email).values[0]
def get_user_org(self, email: str) -> str:
"""
Функция возвращает организацию, к которой относится пользователь по его email.
:param email: Email пользователя
:return: Организация пользователя
"""
user = self.admin.users.search(email).values[0]
return user.organization.name if user.organization else None
def create_admin(self) -> Zenpy:
"""
Функция создает администратора, проверяя наличие вводимых данных в env.
:param credentials: В список полномочий администратора вносятся email, token, password из env
:type credentials: :class:`dict`
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
"""
if self.email is None:
raise ValueError('access_controller email not in env')
self.credentials['email'] = self.email
if self.token:
self.credentials['token'] = self.token
elif self.password:
self.credentials['password'] = self.password
else:
raise ValueError('access_controller token or password not in env')
self.admin = Zenpy(**self.credentials)
try:
self.admin.search(self.email, type='user')
except APIException:
raise ValueError('invalid access_controller`s login data')
def update_role(user_profile: UserProfile, role: str) -> UserProfile:
"""
Функция меняет роль пользователя.
:param user_profile: Профиль пользователя
:param role: Новая роль
:return: Пользователь с обновленной ролью
"""
zendesk = ZendeskAdmin()
user = zendesk.get_user(user_profile.user.email)
user.custom_role_id = role
zendesk.admin.users.update(user)
def make_engineer(user_profile: UserProfile) -> UserProfile:
"""
Функция устанавливапет пользователю роль инженера.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "endineer"
"""
update_role(user_profile, ROLES['engineer'])
def make_light_agent(user_profile: UserProfile) -> UserProfile:
"""
Функция устанавливапет пользователю роль легкого агента.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
"""
update_role(user_profile, ROLES['light_agent'])
def get_users_list() -> list:
"""
Функция возвращает список пользователей Zendesk, относящихся к организации.
"""
zendesk = ZendeskAdmin()
admin = zendesk.get_user(zendesk.email)
org = next(zendesk.admin.users.organizations(user=admin))
return zendesk.admin.organizations.users(org)
def update_profile(user_profile: UserProfile) -> UserProfile:
"""
Функция обновляет профиль пользователя в соотвтетствии с текущим в Zendesk.
:param user_profile: Профиль пользователя
:return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя
"""
user = ZendeskAdmin().get_user(user_profile.user.email)
user_profile.name = user.name
user_profile.role = user.role
user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0
user_profile.image = user.photo['content_url'] if user.photo else None
user_profile.save()
def check_user_exist(email: str) -> bool:
"""
Функция проверяет, существует ли пользователь.
:param email: Email пользователя
:return: Зарегистрирован ли пользователь в Zendesk
"""
return ZendeskAdmin().check_user(email)
def get_user_organization(email: str) -> str:
"""
Функция возвращает организацию пользователя.
:param email: Email пользователя
:return: Организацния пользователя
"""
return ZendeskAdmin().get_user_org(email)
def daterange(start_date: date, end_date: date) -> list:
"""
Функция возвращает список дней с start_date по end_date, исключая правую границу.
:param start_date: Начальная дата
:param end_date: Конечная дата
:return: Список дней, не включая конечную дату
"""
dates = []
for n in range(int((end_date - start_date).days)):
dates.append(start_date + timedelta(n))
return dates
def get_timedelta(log: RoleChangeLogs, time=None) -> timedelta:
"""
Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента,
который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён.
:param log: Лог
:param time: Время
:return: Сколько времени прошло от начала суток до события
"""
if time is None:
time = log.change_time.time()
time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
return time
def last_day_of_month(day):
"""
Функция возвращает последний день текущего месяца.
.. todo::
Дописать документацию по данной функции (не поняла типы и суть)
:param day: Текущий день
:return: Последний день месяца
"""
next_month = day.replace(day=28) + timedelta(days=4)
return next_month - timedelta(days=next_month.day)
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, user_email.
.. todo::
Дописать описание данного класса
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._set_data()
if stat is None:
self._set_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Cловарь statistic с применением формата отображения и интеравала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> list:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat):
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat:
:return:
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self):
"""
Проверка на правильность введенного времени
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _set_data(self):
"""
Функция возвращает логи в диапазоне дат start_date-end_date для пользователя с указанным email.
:return:
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _set_statistic(self):
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return:
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.fill_daterange(self.start_date, first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
if last_log.new_role == ROLES['engineer']:
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
self.statistic[last_log.change_time.date()] += (timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == datetime.now().date():
self.statistic[self.end_date] = get_timedelta(None, datetime.now().time()).total_seconds()
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def fill_daterange(self, first, last, val=24 * 3600):
"""
Функция заполеняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first:
:param last:
:param val:
:return:
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self):
"""
Функция осуществляет обновление всех дней
:return:
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)