fix
This commit is contained in:
parent
6bc1c6d108
commit
d426e9a2c4
0
main/lib/__init__.py
Normal file
0
main/lib/__init__.py
Normal file
BIN
main/lib/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
main/lib/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
main/lib/__pycache__/extra_func.cpython-39.pyc
Normal file
BIN
main/lib/__pycache__/extra_func.cpython-39.pyc
Normal file
Binary file not shown.
BIN
main/lib/__pycache__/statistic_data.cpython-39.pyc
Normal file
BIN
main/lib/__pycache__/statistic_data.cpython-39.pyc
Normal file
Binary file not shown.
BIN
main/lib/__pycache__/zendesk_admin.cpython-39.pyc
Normal file
BIN
main/lib/__pycache__/zendesk_admin.cpython-39.pyc
Normal file
Binary file not shown.
319
main/lib/extra_func.py
Normal file
319
main/lib/extra_func.py
Normal file
@ -0,0 +1,319 @@
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.shortcuts import redirect
|
||||
from django.utils import timezone
|
||||
from zenpy import Zenpy
|
||||
from zenpy.lib.exception import APIException
|
||||
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
|
||||
from zenpy.lib.generator import SearchResultGenerator
|
||||
|
||||
from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN
|
||||
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
|
||||
from main.requester import TicketListRequester
|
||||
from main.lib.zendesk_admin import zenpy
|
||||
|
||||
|
||||
def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None:
|
||||
"""
|
||||
Функция меняет роль пользователя.
|
||||
|
||||
:param user_profile: Профиль пользователя
|
||||
:param role: Новая роль
|
||||
:param who_changes: Пользователь, меняющий роль
|
||||
:return: Пользователь с обновленной ролью
|
||||
"""
|
||||
zendesk = zenpy
|
||||
user = zendesk.get_user(user_profile.user.email)
|
||||
user.custom_role_id = role
|
||||
user_profile.custom_role_id = role
|
||||
user_profile.save()
|
||||
log(user_profile, who_changes.userprofile)
|
||||
zendesk.update_user(user)
|
||||
|
||||
|
||||
def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
|
||||
"""
|
||||
Функция устанавливает пользователю роль инженера.
|
||||
|
||||
:param user_profile: Профиль пользователя
|
||||
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
|
||||
"""
|
||||
update_role(user_profile, ROLES['engineer'], who_changes)
|
||||
|
||||
|
||||
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
|
||||
"""
|
||||
Функция устанавливает пользователю роль легкого агента.
|
||||
|
||||
:param user_profile: Профиль пользователя
|
||||
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
|
||||
"""
|
||||
tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email)
|
||||
ticket: ZenpyTicket
|
||||
for ticket in tickets:
|
||||
UnassignedTicket.objects.create(
|
||||
assignee=user_profile.user,
|
||||
ticket_id=ticket.id,
|
||||
status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED
|
||||
)
|
||||
if ticket.status == 'solved':
|
||||
ticket.assignee_id = zenpy.solved_tickets_user_id
|
||||
else:
|
||||
ticket.assignee = None
|
||||
ticket.group_id = zenpy.buffer_group_id
|
||||
if tickets:
|
||||
zenpy.admin.tickets.update(tickets)
|
||||
attempts, success = 20, False
|
||||
while not success and attempts != 0:
|
||||
try:
|
||||
update_role(user_profile, ROLES['light_agent'], who_changes)
|
||||
success = True
|
||||
except APIException as e:
|
||||
attempts -= 1
|
||||
if attempts == 0:
|
||||
raise e
|
||||
|
||||
|
||||
def get_users_list() -> list:
|
||||
"""
|
||||
Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM.
|
||||
"""
|
||||
zendesk = zenpy
|
||||
|
||||
# У пользователей должна быть организация SYSTEM
|
||||
org = next(zendesk.admin.search(type='organization', name='SYSTEM'))
|
||||
users = zendesk.admin.organizations.users(org)
|
||||
return users
|
||||
|
||||
|
||||
def get_tickets_list(email):
|
||||
"""
|
||||
Функция возвращает список тикетов пользователя Zendesk
|
||||
"""
|
||||
return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email))
|
||||
|
||||
|
||||
def get_tickets_list_for_group(group_name):
|
||||
"""
|
||||
Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk
|
||||
"""
|
||||
return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name))
|
||||
|
||||
|
||||
def update_profile(user_profile: UserProfile):
|
||||
"""
|
||||
Функция обновляет профиль пользователя в соответствии с текущим в Zendesk.
|
||||
|
||||
:param user_profile: Профиль пользователя
|
||||
:return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя
|
||||
"""
|
||||
user = zenpy.get_user(user_profile.user.email)
|
||||
user_profile.name = user.name
|
||||
user_profile.role = user.role
|
||||
user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0
|
||||
user_profile.image = user.photo['content_url'] if user.photo else None
|
||||
user_profile.save()
|
||||
|
||||
|
||||
def check_user_exist(email: str) -> bool:
|
||||
"""
|
||||
Функция проверяет, существует ли пользователь.
|
||||
|
||||
:param email: Email пользователя
|
||||
:return: Зарегистрирован ли пользователь в Zendesk
|
||||
"""
|
||||
return zenpy.check_user(email)
|
||||
|
||||
|
||||
def get_user_organization(email: str) -> str:
|
||||
"""
|
||||
Функция возвращает организацию пользователя.
|
||||
|
||||
:param email: Email пользователя
|
||||
:return: Организация пользователя
|
||||
"""
|
||||
return zenpy.get_user_org(email)
|
||||
|
||||
|
||||
def check_user_auth(email: str, password: str) -> bool:
|
||||
"""
|
||||
Функция проверяет, верны ли входные данные.
|
||||
|
||||
:raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован
|
||||
"""
|
||||
creds = {
|
||||
'email': email,
|
||||
'password': password,
|
||||
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
|
||||
}
|
||||
try:
|
||||
user = Zenpy(**creds)
|
||||
user.search(email, type='user')
|
||||
except APIException:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser):
|
||||
"""
|
||||
Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk.
|
||||
|
||||
:param profile: Профиль пользователя
|
||||
:param zendesk_user: Данные пользователя в Zendesk
|
||||
:return: Обновленный профиль пользователя
|
||||
"""
|
||||
profile.name = zendesk_user.name
|
||||
profile.role = zendesk_user.role
|
||||
profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None
|
||||
if zendesk_user.custom_role_id is not None:
|
||||
profile.custom_role_id = int(zendesk_user.custom_role_id)
|
||||
profile.save()
|
||||
|
||||
|
||||
def count_users(users) -> tuple:
|
||||
"""
|
||||
Функция подсчета количества сотрудников с ролями engineer и light_agent
|
||||
"""
|
||||
engineers, light_agents = 0, 0
|
||||
for user in users:
|
||||
if user.custom_role_id == ROLES['engineer']:
|
||||
engineers += 1
|
||||
elif user.custom_role_id == ROLES['light_agent']:
|
||||
light_agents += 1
|
||||
return engineers, light_agents
|
||||
|
||||
|
||||
def update_users_in_model():
|
||||
"""
|
||||
Обновляет пользователей в модели UserProfile по списку пользователей в организации
|
||||
"""
|
||||
users = get_users_list()
|
||||
for user in users:
|
||||
try:
|
||||
profile = User.objects.get(email=user.email).userprofile
|
||||
update_user_in_model(profile, user)
|
||||
except ObjectDoesNotExist:
|
||||
pass
|
||||
return users
|
||||
|
||||
|
||||
def daterange(start_date, end_date) -> list:
|
||||
"""
|
||||
Функция возвращает список дней с start_date по end_date, исключая правую границу.
|
||||
|
||||
:param start_date: Начальная дата
|
||||
:param end_date: Конечная дата
|
||||
:return: Список дней, не включая конечную дату
|
||||
"""
|
||||
dates = []
|
||||
for n in range(int((end_date - start_date).days)):
|
||||
dates.append(start_date + timedelta(n))
|
||||
return dates
|
||||
|
||||
|
||||
def get_timedelta(log, time=None) -> timedelta:
|
||||
"""
|
||||
Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента,
|
||||
который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён.
|
||||
|
||||
:param log: Лог
|
||||
:param time: Время
|
||||
:return: Сколько времени прошло от начала суток до события
|
||||
"""
|
||||
if time is None:
|
||||
time = log.change_time.time()
|
||||
time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
|
||||
return time
|
||||
|
||||
|
||||
def last_day_of_month(day: int) -> int:
|
||||
"""
|
||||
Функция возвращает последний день текущего месяца.
|
||||
|
||||
:param day: Текущий день
|
||||
:return: Последний день месяца
|
||||
"""
|
||||
next_month = day.replace(day=28) + timedelta(days=4)
|
||||
return next_month - timedelta(days=next_month.day)
|
||||
|
||||
|
||||
class DatabaseHandler(logging.Handler):
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
|
||||
def emit(self, record):
|
||||
database = RoleChangeLogs()
|
||||
users = record.msg
|
||||
if users[1]:
|
||||
user = users[0]
|
||||
admin = users[1]
|
||||
elif not users[1]:
|
||||
user = users[0]
|
||||
admin = users[0]
|
||||
database.name = user.name
|
||||
database.user = user.user
|
||||
database.changed_by = admin.user
|
||||
if user.custom_role_id == ROLES['engineer']:
|
||||
database.old_role = ROLES['light_agent']
|
||||
elif user.custom_role_id == ROLES['light_agent']:
|
||||
database.old_role = ROLES['engineer']
|
||||
database.new_role = user.custom_role_id
|
||||
database.save()
|
||||
|
||||
|
||||
class CsvFormatter(logging.Formatter):
|
||||
def __init__(self):
|
||||
logging.Formatter.__init__(self)
|
||||
|
||||
def format(self, record):
|
||||
users = record.msg
|
||||
if users[1]:
|
||||
user = users[0]
|
||||
admin = users[1]
|
||||
elif not users[1]:
|
||||
user = users[0]
|
||||
admin = users[0]
|
||||
msg = ''
|
||||
msg += user.name
|
||||
if user.custom_role_id == ROLES['engineer']:
|
||||
msg += ',engineer,'
|
||||
elif user.custom_role_id == ROLES['light_agent']:
|
||||
msg += ',light_agent,'
|
||||
time = str(timezone.now().today())
|
||||
msg += time[:16]
|
||||
msg += ','
|
||||
msg += admin.name
|
||||
return msg
|
||||
|
||||
|
||||
def log(user, admin=None):
|
||||
"""
|
||||
Осуществляет запись логов в базу данных и csv файл
|
||||
:param admin:
|
||||
:param user:
|
||||
:return:
|
||||
"""
|
||||
users = [user, admin]
|
||||
logger = logging.getLogger('MY_LOGGER')
|
||||
if not logger.hasHandlers():
|
||||
dbhandler = DatabaseHandler()
|
||||
csvformatter = CsvFormatter()
|
||||
csvhandler = logging.FileHandler('logs/logs.csv', "a")
|
||||
csvhandler.setFormatter(csvformatter)
|
||||
logger.addHandler(dbhandler)
|
||||
logger.addHandler(csvhandler)
|
||||
logger.setLevel('INFO')
|
||||
logger.info(users)
|
||||
|
||||
|
||||
def set_session_params_for_work_page(request, count=None, is_confirm=True):
|
||||
"""
|
||||
Функция для страницы получения прав
|
||||
Устанавливает данные сессии о успешности запроса и количестве назначенных тикетов
|
||||
"""
|
||||
request.session['is_confirm'] = is_confirm
|
||||
request.session['count_tickets'] = count
|
||||
return redirect('work', request.user.id)
|
261
main/lib/statistic_data.py
Normal file
261
main/lib/statistic_data.py
Normal file
@ -0,0 +1,261 @@
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils import timezone
|
||||
|
||||
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
|
||||
from main.lib.extra_func import last_day_of_month, get_timedelta, daterange
|
||||
from main.models import RoleChangeLogs
|
||||
|
||||
|
||||
class StatisticData:
|
||||
"""
|
||||
Класс для учета статистики интервалов работы пользователей.
|
||||
Передаваемые параметры: start_date, end_date, email, stat.
|
||||
|
||||
:param display: Формат отображения времени (часы, минуты)
|
||||
:type display: :class:`list`
|
||||
:param interval: Интервал времени в часах и минутах
|
||||
:type interval: :class:`list`
|
||||
:param start_date: Дата начала работы
|
||||
:type start_date: :class:`date`
|
||||
:param end_date: Дата окончания работы
|
||||
:type end_date: :class:`date`
|
||||
:param email: Email пользователя
|
||||
:type email: :class:`str`
|
||||
:param errors: Список ошибок
|
||||
:type errors: :class:`list`
|
||||
:param warnings: Список предупреждений
|
||||
:type warnings: :class:`list`
|
||||
:param data: Ретроспектива смены ролей пользователя
|
||||
:type data: :class:`dict`
|
||||
:param statistic: Интервалы работы пользователя
|
||||
:type statistic: :class:`dict`
|
||||
"""
|
||||
|
||||
def __init__(self, start_date, end_date, user_email, stat=None):
|
||||
self.display = None
|
||||
self.interval = None
|
||||
self.start_date = start_date
|
||||
self.end_date = end_date
|
||||
self.email = user_email
|
||||
self.errors = list()
|
||||
self.warnings = list()
|
||||
self.data = dict()
|
||||
self.statistic = dict()
|
||||
self._init_data()
|
||||
if stat is None:
|
||||
self._init_statistic()
|
||||
else:
|
||||
self.statistic = stat
|
||||
|
||||
def get_statistic(self) -> dict:
|
||||
"""
|
||||
Функция возвращает статистику работы пользователя.
|
||||
|
||||
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
|
||||
"""
|
||||
if self.is_valid_statistic():
|
||||
stat = self.statistic
|
||||
stat = self._use_display(stat)
|
||||
stat = self._use_interval(stat)
|
||||
return stat
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_valid_statistic(self) -> bool:
|
||||
"""
|
||||
Функция проверяет были ли ошибки при создании статистики.
|
||||
|
||||
:return: True, при отсутствии ошибок
|
||||
"""
|
||||
return not self.errors and self.statistic
|
||||
|
||||
def set_interval(self, interval: list) -> bool:
|
||||
"""
|
||||
Функция проверяет корректность представления интервала работы.
|
||||
|
||||
:param interval: Интервал должен быть указан в днях или месяцах.
|
||||
:return: True, если указан верно
|
||||
"""
|
||||
if interval not in ['months', 'days']:
|
||||
self.errors += ['Интервал работы должен быть в днях или месяцах']
|
||||
return False
|
||||
self.interval = interval
|
||||
return True
|
||||
|
||||
def set_display(self, display_format: list) -> bool:
|
||||
"""
|
||||
Функция проверяет корректность формата отображения интервала.
|
||||
|
||||
:param display_format: Формат отображения должен быть указан в днях или месяцах.
|
||||
:return: True, если указан верно
|
||||
"""
|
||||
if display_format not in ['days', 'hours']:
|
||||
self.errors += ['Формат отображения должен быть в часах или днях']
|
||||
return False
|
||||
self.display = display_format
|
||||
return True
|
||||
|
||||
def get_data(self) -> Optional[dict]:
|
||||
"""
|
||||
Функция возвращает данные - список объектов RoleChangeLogs.
|
||||
"""
|
||||
if self.is_valid_data():
|
||||
return self.data
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_valid_data(self) -> bool:
|
||||
"""
|
||||
Функция определяет были ли ошибки при получении логов.
|
||||
|
||||
:return: True, если ошибок нет
|
||||
"""
|
||||
return not self.errors
|
||||
|
||||
def _use_display(self, stat: list) -> list:
|
||||
"""
|
||||
Функция приводит данные к формату отображения.
|
||||
|
||||
:param stat: Список данных статистики пользователя
|
||||
:return: Обновленный список
|
||||
"""
|
||||
if not self.is_valid_statistic() or not self.display:
|
||||
return stat
|
||||
new_stat = {}
|
||||
for key, item in stat.items():
|
||||
if self.display == 'hours':
|
||||
new_stat[key] = item / 3600
|
||||
elif self.display == 'days':
|
||||
new_stat[key] = item / (ONE_DAY * 3600)
|
||||
return new_stat
|
||||
|
||||
def _use_interval(self, stat: dict) -> dict:
|
||||
"""
|
||||
Функция объединяет ключи и значения в соответствии с интервалом работы.
|
||||
|
||||
:param stat: Статистика работы пользователя
|
||||
:return: Обновленная статистика
|
||||
"""
|
||||
if not self.is_valid_statistic() or not self.interval:
|
||||
return stat
|
||||
new_stat = {}
|
||||
if self.interval == 'months':
|
||||
# Переделываем ключи под формат('начало_месяца - конец_месяца')
|
||||
for key, value in stat.items():
|
||||
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
|
||||
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
|
||||
index = ' - '.join([str(current_month_start), str(current_month_end)])
|
||||
if new_stat.get(index):
|
||||
new_stat[index] += value
|
||||
else:
|
||||
new_stat[index] = value
|
||||
elif self.interval == 'days':
|
||||
new_stat = stat # статистика изначально в днях
|
||||
return new_stat
|
||||
|
||||
def check_time(self) -> bool:
|
||||
"""
|
||||
Функция проверяет корректность введенного времени.
|
||||
|
||||
:return: True, если время указано корректно. Иначе, False
|
||||
"""
|
||||
if self.end_date < self.start_date or self.end_date > datetime.now().date():
|
||||
return False
|
||||
return True
|
||||
|
||||
def _init_data(self):
|
||||
"""
|
||||
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
|
||||
|
||||
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
|
||||
"""
|
||||
if not self.check_time():
|
||||
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
|
||||
return
|
||||
try:
|
||||
self.data = RoleChangeLogs.objects.filter(
|
||||
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
|
||||
user=User.objects.get(email=self.email),
|
||||
).order_by('change_time')
|
||||
except User.DoesNotExist:
|
||||
self.errors += ['Пользователь не найден']
|
||||
|
||||
def _init_statistic(self) -> dict:
|
||||
"""
|
||||
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
|
||||
|
||||
:return: Статистика работы пользователя (statistic)
|
||||
"""
|
||||
self.clear_statistic()
|
||||
if not self.get_data():
|
||||
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
|
||||
return None
|
||||
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
|
||||
|
||||
if first_log.old_role == ROLES['engineer']:
|
||||
self.prev_engineer_logic(first_log)
|
||||
|
||||
if last_log.new_role == ROLES['engineer']:
|
||||
self.post_engineer_logic(last_log)
|
||||
|
||||
for log_index in range(len(self.data) - 1):
|
||||
if self.data[log_index].new_role == ROLES['engineer']:
|
||||
self.engineer_logic(log_index)
|
||||
|
||||
def engineer_logic(self, log_index):
|
||||
"""
|
||||
Функция обрабатывает основную часть работы инженера
|
||||
"""
|
||||
current_log, next_log = self.data[log_index], self.data[log_index + 1]
|
||||
if current_log.change_time.date() != next_log.change_time.date():
|
||||
self.statistic[current_log.change_time.date()] += (
|
||||
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
|
||||
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
|
||||
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
|
||||
else:
|
||||
elapsed_time = next_log.change_time - current_log.change_time
|
||||
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
|
||||
|
||||
def post_engineer_logic(self, last_log):
|
||||
"""
|
||||
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
|
||||
"""
|
||||
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
|
||||
if last_log.change_time.date() == timezone.now().date():
|
||||
self.statistic[last_log.change_time.date()] += (
|
||||
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
|
||||
).total_seconds()
|
||||
else:
|
||||
self.statistic[last_log.change_time.date()] += (
|
||||
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
|
||||
if self.end_date == timezone.now().date():
|
||||
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
|
||||
|
||||
def prev_engineer_logic(self, first_log):
|
||||
"""
|
||||
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
|
||||
"""
|
||||
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
|
||||
first_log.change_time.date())
|
||||
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
|
||||
|
||||
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
|
||||
"""
|
||||
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
|
||||
|
||||
:param first: Начальная дата интервала
|
||||
:param last: Последняя дата интервала
|
||||
:param val: Количество секунд в одном дне
|
||||
"""
|
||||
for day in daterange(first, last):
|
||||
self.statistic[day] = val
|
||||
|
||||
def clear_statistic(self) -> dict:
|
||||
"""
|
||||
Функция осуществляет обновление всех дней.
|
||||
"""
|
||||
self.statistic.clear()
|
||||
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)
|
99
main/lib/zendesk_admin.py
Normal file
99
main/lib/zendesk_admin.py
Normal file
@ -0,0 +1,99 @@
|
||||
from typing import Optional, Dict
|
||||
from zenpy import Zenpy
|
||||
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup
|
||||
from zenpy.lib.exception import APIException
|
||||
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
|
||||
ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL
|
||||
|
||||
|
||||
class ZendeskAdmin:
|
||||
"""
|
||||
Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора.
|
||||
|
||||
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
|
||||
:type credentials: :class:`Dict[str, str]`
|
||||
"""
|
||||
|
||||
def __init__(self, credentials: Dict[str, str]):
|
||||
self.credentials = credentials
|
||||
self.admin = self.create_admin()
|
||||
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
|
||||
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
|
||||
|
||||
def update_user(self, user: ZenpyUser) -> bool:
|
||||
"""
|
||||
Функция сохраняет изменение пользователя в Zendesk.
|
||||
|
||||
:param user: Пользователь с изменёнными данными
|
||||
"""
|
||||
self.admin.users.update(user)
|
||||
|
||||
def check_user(self, email: str) -> bool:
|
||||
"""
|
||||
Функция осуществляет проверку существования пользователя в Zendesk по email.
|
||||
|
||||
:param email: Email пользователя
|
||||
:return: Является ли зарегистрированным
|
||||
"""
|
||||
return True if self.admin.search(email, type='user') else False
|
||||
|
||||
def get_user(self, email: str) -> ZenpyUser:
|
||||
"""
|
||||
Функция возвращает пользователя (объект) по его email.
|
||||
|
||||
:param email: Email пользователя
|
||||
:return: Объект пользователя, найденного в БД
|
||||
"""
|
||||
return self.admin.users.search(email).values[0]
|
||||
|
||||
def get_group(self, name: str) -> Optional[ZenpyGroup]:
|
||||
"""
|
||||
Функция возвращает группу по названию
|
||||
|
||||
:param name: Имя пользователя
|
||||
:return: Группы пользователя (в случае отсутствия None)
|
||||
"""
|
||||
groups = self.admin.search(name, type='group')
|
||||
for group in groups:
|
||||
return group
|
||||
return None
|
||||
|
||||
def get_user_org(self, email: str) -> str:
|
||||
"""
|
||||
Функция возвращает организацию, к которой относится пользователь по его email.
|
||||
|
||||
:param email: Email пользователя
|
||||
:return: Организация пользователя
|
||||
"""
|
||||
user = self.admin.users.search(email).values[0]
|
||||
return user.organization.name if user.organization else None
|
||||
|
||||
def create_admin(self) -> Zenpy:
|
||||
"""
|
||||
Функция создает администратора, проверяя наличие вводимых данных в env.
|
||||
|
||||
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
|
||||
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
|
||||
"""
|
||||
|
||||
if self.credentials.get('email') is None:
|
||||
raise ValueError('access_controller email not in env')
|
||||
|
||||
if self.credentials.get('token') is None and self.credentials.get('password') is None:
|
||||
raise ValueError('access_controller token or password not in env')
|
||||
|
||||
admin = Zenpy(**self.credentials)
|
||||
try:
|
||||
admin.search(self.credentials['email'], type='user')
|
||||
except APIException:
|
||||
raise ValueError('invalid access_controller`s login data')
|
||||
|
||||
return admin
|
||||
|
||||
|
||||
zenpy = ZendeskAdmin({
|
||||
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
|
||||
'email': ACTRL_API_EMAIL,
|
||||
'token': ACTRL_API_TOKEN,
|
||||
'password': ACTRL_API_PASSWORD,
|
||||
})
|
Loading…
x
Reference in New Issue
Block a user