261 lines
11 KiB
Python
261 lines
11 KiB
Python
import os
|
||
from datetime import timedelta, datetime
|
||
|
||
from django.contrib.auth.models import User
|
||
from zenpy import Zenpy
|
||
from zenpy.lib.exception import APIException
|
||
|
||
from access_controller.settings import ZENDESK_ROLES as ROLES
|
||
from main.models import UserProfile, RoleChangeLogs
|
||
|
||
|
||
class ZendeskAdmin:
|
||
"""
|
||
Класс **ZendeskAdmin** существует, чтобы в каждой фунциии отдельно не проверять аккаунт администратора
|
||
|
||
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
|
||
:type credentials: :class:`dict`
|
||
:param email: Email администратора, указанный в env
|
||
:type email: :class:`str`
|
||
:param token: Токен администратора (формируется в Zendesk, указывается в env)
|
||
:type token: :class:`str`
|
||
:param password: Пароль администратора, указанный в env
|
||
:type password: :class:`str`
|
||
"""
|
||
|
||
credentials: dict = {
|
||
'subdomain': 'ngenix1612197338'
|
||
}
|
||
email: str = os.getenv('ACCESS_CONTROLLER_API_EMAIL')
|
||
token: str = os.getenv('ACCESS_CONTROLLER_API_TOKEN')
|
||
password: str = os.getenv('ACCESS_CONTROLLER_API_PASSWORD')
|
||
_instance = None
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
self.create_admin()
|
||
|
||
def check_user(self, email: str) -> bool:
|
||
"""
|
||
Функция **check_user** осуществляет проверку существования пользователя в Zendesk по email
|
||
"""
|
||
return True if self.admin.search(email, type='user') else False
|
||
|
||
def get_user_name(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user_name** возвращает имя пользователя по его email
|
||
"""
|
||
user = self.admin.users.search(email).values[0]
|
||
return user.name
|
||
|
||
def get_user_role(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user_role** возвращает роль пользователя по его email
|
||
"""
|
||
user = self.admin.users.search(email).values[0]
|
||
return user.role
|
||
|
||
def get_user_id(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user_id** возвращает id пользователя по его email
|
||
"""
|
||
user = self.admin.users.search(email).values[0]
|
||
return user.id
|
||
|
||
def get_user_image(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user_image** возвращает url-ссылку на аватар пользователя по его email
|
||
"""
|
||
user = self.admin.users.search(email).values[0]
|
||
return user.photo['content_url'] if user.photo else None
|
||
|
||
def get_user(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user** возвращает пользователя (объект) по его email
|
||
|
||
:param email: email пользователя
|
||
:return: email пользователя, найденного в БД
|
||
"""
|
||
return self.admin.users.search(email).values[0]
|
||
|
||
def get_user_org(self, email: str) -> str:
|
||
"""
|
||
Функция **get_user_org** возвращает организацию, к которой относится пользователь по его email
|
||
"""
|
||
user = self.admin.users.search(email).values[0]
|
||
return user.organization.name if user.organization else None
|
||
|
||
def create_admin(self) -> Zenpy:
|
||
"""
|
||
Функция **Create_admin()** создает администратора, проверяя наличие вводимых данных в env.
|
||
|
||
:param credentials: В список полномочий администратора вносятся email, token, password из env
|
||
:type credentials: :class:`dict`
|
||
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
|
||
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
|
||
"""
|
||
|
||
if self.email is None:
|
||
raise ValueError('access_controller email not in env')
|
||
self.credentials['email'] = self.email
|
||
|
||
if self.token:
|
||
self.credentials['token'] = self.token
|
||
elif self.password:
|
||
self.credentials['password'] = self.password
|
||
else:
|
||
raise ValueError('access_controller token or password not in env')
|
||
self.admin = Zenpy(**self.credentials)
|
||
try:
|
||
self.admin.search(self.email, type='user')
|
||
except APIException:
|
||
raise ValueError('invalid access_controller`s login data')
|
||
|
||
|
||
def update_role(user_profile: UserProfile, role: str) -> UserProfile:
|
||
"""
|
||
Функция **update_role** меняет роль пользователя.
|
||
"""
|
||
zendesk = ZendeskAdmin()
|
||
user = zendesk.get_user(user_profile.user.email)
|
||
user.custom_role_id = role
|
||
zendesk.admin.users.update(user)
|
||
|
||
|
||
def make_engineer(user_profile: UserProfile) -> UserProfile:
|
||
"""
|
||
Функция **make_engineer** устанавливапет пользователю роль инженера.
|
||
"""
|
||
update_role(user_profile, ROLES['engineer'])
|
||
|
||
|
||
def make_light_agent(user_profile: UserProfile) -> UserProfile:
|
||
"""
|
||
Функция **make_light_agent** устанавливапет пользователю роль легкого агента.
|
||
"""
|
||
update_role(user_profile, ROLES['light_agent'])
|
||
|
||
|
||
def get_users_list() -> list:
|
||
"""
|
||
Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации.
|
||
"""
|
||
zendesk = ZendeskAdmin()
|
||
admin = zendesk.get_user(zendesk.email)
|
||
org = next(zendesk.admin.users.organizations(user=admin))
|
||
return zendesk.admin.organizations.users(org)
|
||
|
||
|
||
def update_profile(user_profile: UserProfile) -> UserProfile:
|
||
"""
|
||
Функция обновляет профиль пользователя в соотвтетствии с текущим в Zendesk
|
||
"""
|
||
user = ZendeskAdmin().get_user(user_profile.user.email)
|
||
user_profile.name = user.name
|
||
user_profile.role = user.role
|
||
user_profile.image = user.photo['content_url'] if user.photo else None
|
||
user_profile.save()
|
||
|
||
|
||
def check_user_exist(email: str) -> bool:
|
||
"""
|
||
Функция проверяет, существует ли пользователь
|
||
"""
|
||
return ZendeskAdmin().check_user(email)
|
||
|
||
|
||
def get_user_organization(email: str) -> str:
|
||
"""
|
||
Функция возвращает организацию пользователя
|
||
"""
|
||
return ZendeskAdmin().get_user_org(email)
|
||
|
||
|
||
def daterange(start_date, end_date) -> list:
|
||
"""
|
||
Возвращает список дней с start_date по end_date исключая правую границу
|
||
"""
|
||
dates = []
|
||
for n in range(int((end_date - start_date).days)):
|
||
dates.append(start_date + timedelta(n))
|
||
return dates
|
||
|
||
|
||
def get_timedelta(log) -> timedelta:
|
||
"""
|
||
Возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента,
|
||
который находится в log (объект класса RoleChangeLogs)
|
||
"""
|
||
time = log.change_time.time()
|
||
time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
|
||
return time
|
||
|
||
|
||
def last_day_of_month(day):
|
||
"""
|
||
Возвращает последний день любого месяца
|
||
"""
|
||
next_month = day.replace(day=28) + timedelta(days=4)
|
||
return next_month - timedelta(days=next_month.day)
|
||
|
||
|
||
def get_statistic_from_data(data, start_date, end_date):
|
||
"""
|
||
Функция возвращает словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд
|
||
data - массив объектов RoleChangeLogs, является списком логов пользователя
|
||
"""
|
||
if not data:
|
||
return None
|
||
# Обнуление всех дней
|
||
stat = {}
|
||
for day in daterange(start_date, end_date + timedelta(days=1)):
|
||
stat[day] = 0
|
||
first_log, last_log = data[0], data[len(data) - 1]
|
||
|
||
# Если инженер работал ещё до начала диапазона
|
||
if int(first_log.old_role) == ROLES['engineer']:
|
||
for day in daterange(start_date, first_log.change_time.date()):
|
||
stat[day] = 24 * 3600
|
||
stat[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
|
||
|
||
# Если инженер закончил работать после диапазона
|
||
if int(last_log.new_role) == ROLES['engineer']:
|
||
for day in daterange(last_log.change_time.date() + timedelta(days=1), end_date + timedelta(days=1)):
|
||
stat[day] = 24 * 3600
|
||
stat[last_log.change_time.date()] += (timedelta(days=1) - get_timedelta(last_log)).total_seconds()
|
||
# Цикл по логам
|
||
for log_index in range(len(data) - 1):
|
||
if int(data[log_index].new_role) == ROLES['engineer']:
|
||
current_log, next_log = data[log_index], data[log_index + 1]
|
||
# Если сессия закончилась НЕ в тот же день, что и началась
|
||
if current_log.change_time.date() != next_log.change_time.date():
|
||
stat[current_log.change_time.date()] += (timedelta(days=1) - get_timedelta(current_log)).total_seconds()
|
||
stat[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
|
||
# Если проработал несколько дней подряд, то заполнить эти дни по 24 часа
|
||
for day in daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()):
|
||
stat[day] = 24 * 3600
|
||
# Если сессия закончилась в тот же день, что и началась
|
||
else:
|
||
elapsed_time = next_log.change_time - current_log.change_time
|
||
stat[current_log.change_time.date()] += elapsed_time.total_seconds()
|
||
return stat
|
||
|
||
|
||
def get_data_logs(context, start_date, end_date, email):
|
||
"""
|
||
Функция возвращает список из лог-ов в диапазоне дат start_date-end_date для пользователя с почтой email
|
||
"""
|
||
data = []
|
||
try:
|
||
data = RoleChangeLogs.objects.filter(
|
||
change_time__range=[start_date, end_date + timedelta(days=1)],
|
||
user=User.objects.get(email=email),
|
||
).order_by('change_time')
|
||
except User.DoesNotExist:
|
||
context['errors'] = ['Пользователь не найден']
|
||
return data
|