Remove unnecessary files.

This commit is contained in:
Iurii Tatishchev 2021-05-06 09:07:21 -07:00
parent be1bfdd259
commit e886004069
Signed by: CaZzzer
GPG Key ID: 926BE949E29DCD03
8 changed files with 0 additions and 679 deletions

View File

View File

@ -1,319 +0,0 @@
import logging
from datetime import timedelta
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.shortcuts import redirect
from django.utils import timezone
from zenpy import Zenpy
from zenpy.lib.exception import APIException
from zenpy.lib.api_objects import User as ZenpyUser, Ticket as ZenpyTicket
from zenpy.lib.generator import SearchResultGenerator
from access_controller.settings import ZENDESK_ROLES as ROLES, ACTRL_ZENDESK_SUBDOMAIN
from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus
from main.requester import TicketListRequester
from main.lib.zendesk_admin import zenpy
def update_role(user_profile: UserProfile, role: int, who_changes: User) -> None:
"""
Функция меняет роль пользователя.
:param user_profile: Профиль пользователя
:param role: Новая роль
:param who_changes: Пользователь, меняющий роль
:return: Пользователь с обновленной ролью
"""
zendesk = zenpy
user = zendesk.get_user(user_profile.user.email)
user.custom_role_id = role
user_profile.custom_role_id = role
user_profile.save()
log(user_profile, who_changes.userprofile)
zendesk.update_user(user)
def make_engineer(user_profile: UserProfile, who_changes: User) -> None:
"""
Функция устанавливает пользователю роль инженера.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "engineer"
"""
update_role(user_profile, ROLES['engineer'], who_changes)
def make_light_agent(user_profile: UserProfile, who_changes: User) -> None:
"""
Функция устанавливает пользователю роль легкого агента.
:param user_profile: Профиль пользователя
:return: Вызов функции **update_role** с параметрами: профиль пользователя, роль "light_agent"
"""
tickets: SearchResultGenerator = get_tickets_list(user_profile.user.email)
ticket: ZenpyTicket
for ticket in tickets:
UnassignedTicket.objects.create(
assignee=user_profile.user,
ticket_id=ticket.id,
status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED
)
if ticket.status == 'solved':
ticket.assignee_id = zenpy.solved_tickets_user_id
else:
ticket.assignee = None
ticket.group_id = zenpy.buffer_group_id
if tickets:
zenpy.admin.tickets.update(tickets)
attempts, success = 20, False
while not success and attempts != 0:
try:
update_role(user_profile, ROLES['light_agent'], who_changes)
success = True
except APIException as e:
attempts -= 1
if attempts == 0:
raise e
def get_users_list() -> list:
"""
Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации SYSTEM.
"""
zendesk = zenpy
# У пользователей должна быть организация SYSTEM
org = next(zendesk.admin.search(type='organization', name='SYSTEM'))
users = zendesk.admin.organizations.users(org)
return users
def get_tickets_list(email):
"""
Функция возвращает список тикетов пользователя Zendesk
"""
return TicketListRequester().get_tickets_list_for_user(zenpy.get_user(email))
def get_tickets_list_for_group(group_name):
"""
Функция возвращает список неназначенных, нерешённых тикетов группы Zendesk
"""
return TicketListRequester().get_tickets_list_for_group(zenpy.get_group(group_name))
def update_profile(user_profile: UserProfile):
"""
Функция обновляет профиль пользователя в соответствии с текущим в Zendesk.
:param user_profile: Профиль пользователя
:return: Обновленный, в соответствие с текущими данными в Zendesk, профиль пользователя
"""
user = zenpy.get_user(user_profile.user.email)
user_profile.name = user.name
user_profile.role = user.role
user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0
user_profile.image = user.photo['content_url'] if user.photo else None
user_profile.save()
def check_user_exist(email: str) -> bool:
"""
Функция проверяет, существует ли пользователь.
:param email: Email пользователя
:return: Зарегистрирован ли пользователь в Zendesk
"""
return zenpy.check_user(email)
def get_user_organization(email: str) -> str:
"""
Функция возвращает организацию пользователя.
:param email: Email пользователя
:return: Организация пользователя
"""
return zenpy.get_user_org(email)
def check_user_auth(email: str, password: str) -> bool:
"""
Функция проверяет, верны ли входные данные.
:raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован
"""
creds = {
'email': email,
'password': password,
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
}
try:
user = Zenpy(**creds)
user.search(email, type='user')
except APIException:
return False
return True
def update_user_in_model(profile: UserProfile, zendesk_user: ZenpyUser):
"""
Функция обновляет профиль пользователя при изменении данных пользователя на Zendesk.
:param profile: Профиль пользователя
:param zendesk_user: Данные пользователя в Zendesk
:return: Обновленный профиль пользователя
"""
profile.name = zendesk_user.name
profile.role = zendesk_user.role
profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None
if zendesk_user.custom_role_id is not None:
profile.custom_role_id = int(zendesk_user.custom_role_id)
profile.save()
def count_users(users) -> tuple:
"""
Функция подсчета количества сотрудников с ролями engineer и light_agent
"""
engineers, light_agents = 0, 0
for user in users:
if user.custom_role_id == ROLES['engineer']:
engineers += 1
elif user.custom_role_id == ROLES['light_agent']:
light_agents += 1
return engineers, light_agents
def update_users_in_model():
"""
Обновляет пользователей в модели UserProfile по списку пользователей в организации
"""
users = get_users_list()
for user in users:
try:
profile = User.objects.get(email=user.email).userprofile
update_user_in_model(profile, user)
except ObjectDoesNotExist:
pass
return users
def daterange(start_date, end_date) -> list:
"""
Функция возвращает список дней с start_date по end_date, исключая правую границу.
:param start_date: Начальная дата
:param end_date: Конечная дата
:return: Список дней, не включая конечную дату
"""
dates = []
for n in range(int((end_date - start_date).days)):
dates.append(start_date + timedelta(n))
return dates
def get_timedelta(log, time=None) -> timedelta:
"""
Функция возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента,
который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён.
:param log: Лог
:param time: Время
:return: Сколько времени прошло от начала суток до события
"""
if time is None:
time = log.change_time.time()
time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
return time
def last_day_of_month(day: int) -> int:
"""
Функция возвращает последний день текущего месяца.
:param day: Текущий день
:return: Последний день месяца
"""
next_month = day.replace(day=28) + timedelta(days=4)
return next_month - timedelta(days=next_month.day)
class DatabaseHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
def emit(self, record):
database = RoleChangeLogs()
users = record.msg
if users[1]:
user = users[0]
admin = users[1]
elif not users[1]:
user = users[0]
admin = users[0]
database.name = user.name
database.user = user.user
database.changed_by = admin.user
if user.custom_role_id == ROLES['engineer']:
database.old_role = ROLES['light_agent']
elif user.custom_role_id == ROLES['light_agent']:
database.old_role = ROLES['engineer']
database.new_role = user.custom_role_id
database.save()
class CsvFormatter(logging.Formatter):
def __init__(self):
logging.Formatter.__init__(self)
def format(self, record):
users = record.msg
if users[1]:
user = users[0]
admin = users[1]
elif not users[1]:
user = users[0]
admin = users[0]
msg = ''
msg += user.name
if user.custom_role_id == ROLES['engineer']:
msg += ',engineer,'
elif user.custom_role_id == ROLES['light_agent']:
msg += ',light_agent,'
time = str(timezone.now().today())
msg += time[:16]
msg += ','
msg += admin.name
return msg
def log(user, admin=None):
"""
Осуществляет запись логов в базу данных и csv файл
:param admin:
:param user:
:return:
"""
users = [user, admin]
logger = logging.getLogger('MY_LOGGER')
if not logger.hasHandlers():
dbhandler = DatabaseHandler()
csvformatter = CsvFormatter()
csvhandler = logging.FileHandler('logs/logs.csv', "a")
csvhandler.setFormatter(csvformatter)
logger.addHandler(dbhandler)
logger.addHandler(csvhandler)
logger.setLevel('INFO')
logger.info(users)
def set_session_params_for_work_page(request, count=None, is_confirm=True):
"""
Функция для страницы получения прав
Устанавливает данные сессии о успешности запроса и количестве назначенных тикетов
"""
request.session['is_confirm'] = is_confirm
request.session['count_tickets'] = count
return redirect('work', request.user.id)

View File

@ -1,261 +0,0 @@
from datetime import date, datetime, timedelta
from typing import Optional
from django.contrib.auth.models import User
from django.utils import timezone
from access_controller.settings import ONE_DAY, ZENDESK_ROLES as ROLES
from main.lib.extra_func import last_day_of_month, get_timedelta, daterange
from main.models import RoleChangeLogs
class StatisticData:
"""
Класс для учета статистики интервалов работы пользователей.
Передаваемые параметры: start_date, end_date, email, stat.
:param display: Формат отображения времени (часы, минуты)
:type display: :class:`list`
:param interval: Интервал времени в часах и минутах
:type interval: :class:`list`
:param start_date: Дата начала работы
:type start_date: :class:`date`
:param end_date: Дата окончания работы
:type end_date: :class:`date`
:param email: Email пользователя
:type email: :class:`str`
:param errors: Список ошибок
:type errors: :class:`list`
:param warnings: Список предупреждений
:type warnings: :class:`list`
:param data: Ретроспектива смены ролей пользователя
:type data: :class:`dict`
:param statistic: Интервалы работы пользователя
:type statistic: :class:`dict`
"""
def __init__(self, start_date, end_date, user_email, stat=None):
self.display = None
self.interval = None
self.start_date = start_date
self.end_date = end_date
self.email = user_email
self.errors = list()
self.warnings = list()
self.data = dict()
self.statistic = dict()
self._init_data()
if stat is None:
self._init_statistic()
else:
self.statistic = stat
def get_statistic(self) -> dict:
"""
Функция возвращает статистику работы пользователя.
:return: Словарь statistic с применением формата отображения и интервала работы(если они есть). None, если были ошибки при создании.
"""
if self.is_valid_statistic():
stat = self.statistic
stat = self._use_display(stat)
stat = self._use_interval(stat)
return stat
else:
return None
def is_valid_statistic(self) -> bool:
"""
Функция проверяет были ли ошибки при создании статистики.
:return: True, при отсутствии ошибок
"""
return not self.errors and self.statistic
def set_interval(self, interval: list) -> bool:
"""
Функция проверяет корректность представления интервала работы.
:param interval: Интервал должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if interval not in ['months', 'days']:
self.errors += ['Интервал работы должен быть в днях или месяцах']
return False
self.interval = interval
return True
def set_display(self, display_format: list) -> bool:
"""
Функция проверяет корректность формата отображения интервала.
:param display_format: Формат отображения должен быть указан в днях или месяцах.
:return: True, если указан верно
"""
if display_format not in ['days', 'hours']:
self.errors += ['Формат отображения должен быть в часах или днях']
return False
self.display = display_format
return True
def get_data(self) -> Optional[dict]:
"""
Функция возвращает данные - список объектов RoleChangeLogs.
"""
if self.is_valid_data():
return self.data
else:
return None
def is_valid_data(self) -> bool:
"""
Функция определяет были ли ошибки при получении логов.
:return: True, если ошибок нет
"""
return not self.errors
def _use_display(self, stat: list) -> list:
"""
Функция приводит данные к формату отображения.
:param stat: Список данных статистики пользователя
:return: Обновленный список
"""
if not self.is_valid_statistic() or not self.display:
return stat
new_stat = {}
for key, item in stat.items():
if self.display == 'hours':
new_stat[key] = item / 3600
elif self.display == 'days':
new_stat[key] = item / (ONE_DAY * 3600)
return new_stat
def _use_interval(self, stat: dict) -> dict:
"""
Функция объединяет ключи и значения в соответствии с интервалом работы.
:param stat: Статистика работы пользователя
:return: Обновленная статистика
"""
if not self.is_valid_statistic() or not self.interval:
return stat
new_stat = {}
if self.interval == 'months':
# Переделываем ключи под формат('началоесяца - конец_месяца')
for key, value in stat.items():
current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1))
current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1)))
index = ' - '.join([str(current_month_start), str(current_month_end)])
if new_stat.get(index):
new_stat[index] += value
else:
new_stat[index] = value
elif self.interval == 'days':
new_stat = stat # статистика изначально в днях
return new_stat
def check_time(self) -> bool:
"""
Функция проверяет корректность введенного времени.
:return: True, если время указано корректно. Иначе, False
"""
if self.end_date < self.start_date or self.end_date > datetime.now().date():
return False
return True
def _init_data(self):
"""
Функция возвращает логи в диапазоне дат start_date - end_date для пользователя с указанным email.
:return: Данные о смене статусов пользователя. Если пользователь не найден или интервал времени некорректен - ошибку.
"""
if not self.check_time():
self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени']
return
try:
self.data = RoleChangeLogs.objects.filter(
change_time__range=[self.start_date, self.end_date + timedelta(days=1)],
user=User.objects.get(email=self.email),
).order_by('change_time')
except User.DoesNotExist:
self.errors += ['Пользователь не найден']
def _init_statistic(self) -> dict:
"""
Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд.
:return: Статистика работы пользователя (statistic)
"""
self.clear_statistic()
if not self.get_data():
self.warnings += ['Не обнаружены изменения роли в данном промежутке']
return None
first_log, last_log = self.data[0], self.data[len(self.data) - 1]
if first_log.old_role == ROLES['engineer']:
self.prev_engineer_logic(first_log)
if last_log.new_role == ROLES['engineer']:
self.post_engineer_logic(last_log)
for log_index in range(len(self.data) - 1):
if self.data[log_index].new_role == ROLES['engineer']:
self.engineer_logic(log_index)
def engineer_logic(self, log_index):
"""
Функция обрабатывает основную часть работы инженера
"""
current_log, next_log = self.data[log_index], self.data[log_index + 1]
if current_log.change_time.date() != next_log.change_time.date():
self.statistic[current_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(current_log)).total_seconds()
self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds()
self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date())
else:
elapsed_time = next_log.change_time - current_log.change_time
self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds()
def post_engineer_logic(self, last_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер работал и после диапазона
"""
self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1))
if last_log.change_time.date() == timezone.now().date():
self.statistic[last_log.change_time.date()] += (
get_timedelta(None, timezone.now().time()) - get_timedelta(last_log)
).total_seconds()
else:
self.statistic[last_log.change_time.date()] += (
timedelta(days=1) - get_timedelta(last_log)).total_seconds()
if self.end_date == timezone.now().date():
self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds()
def prev_engineer_logic(self, first_log):
"""
Функция обрабатывает случай, когда нам изветсно что инженер начал работу до диапазона
"""
self.fill_daterange(max(User.objects.get(email=self.email).date_joined.date(), self.start_date),
first_log.change_time.date())
self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds()
def fill_daterange(self, first: date, last: date, val: int = 24 * 3600) -> dict:
"""
Функция заполняет диапазон дат значением val (по умолчанию val = кол-во секунд в 1 дне).
:param first: Начальная дата интервала
:param last: Последняя дата интервала
:param val: Количество секунд в одном дне
"""
for day in daterange(first, last):
self.statistic[day] = val
def clear_statistic(self) -> dict:
"""
Функция осуществляет обновление всех дней.
"""
self.statistic.clear()
self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)

View File

@ -1,99 +0,0 @@
from typing import Optional, Dict
from zenpy import Zenpy
from zenpy.lib.api_objects import User as ZenpyUser, Group as ZenpyGroup
from zenpy.lib.exception import APIException
from access_controller.settings import ACTRL_ZENDESK_SUBDOMAIN, ACTRL_API_EMAIL, ACTRL_API_TOKEN, ACTRL_API_PASSWORD, \
ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL
class ZendeskAdmin:
"""
Класс **ZendeskAdmin** существует, чтобы в каждой функции отдельно не проверять аккаунт администратора.
:param credentials: Полномочия (первым указывается учетная запись организации в Zendesk)
:type credentials: :class:`Dict[str, str]`
"""
def __init__(self, credentials: Dict[str, str]):
self.credentials = credentials
self.admin = self.create_admin()
self.buffer_group_id: int = self.get_group(ZENDESK_GROUPS['buffer']).id
self.solved_tickets_user_id: int = self.get_user(SOLVED_TICKETS_EMAIL).id
def update_user(self, user: ZenpyUser) -> bool:
"""
Функция сохраняет изменение пользователя в Zendesk.
:param user: Пользователь с изменёнными данными
"""
self.admin.users.update(user)
def check_user(self, email: str) -> bool:
"""
Функция осуществляет проверку существования пользователя в Zendesk по email.
:param email: Email пользователя
:return: Является ли зарегистрированным
"""
return True if self.admin.search(email, type='user') else False
def get_user(self, email: str) -> ZenpyUser:
"""
Функция возвращает пользователя (объект) по его email.
:param email: Email пользователя
:return: Объект пользователя, найденного в БД
"""
return self.admin.users.search(email).values[0]
def get_group(self, name: str) -> Optional[ZenpyGroup]:
"""
Функция возвращает группу по названию
:param name: Имя пользователя
:return: Группы пользователя (в случае отсутствия None)
"""
groups = self.admin.search(name, type='group')
for group in groups:
return group
return None
def get_user_org(self, email: str) -> str:
"""
Функция возвращает организацию, к которой относится пользователь по его email.
:param email: Email пользователя
:return: Организация пользователя
"""
user = self.admin.users.search(email).values[0]
return user.organization.name if user.organization else None
def create_admin(self) -> Zenpy:
"""
Функция создает администратора, проверяя наличие вводимых данных в env.
:raise: :class:`ValueError`: исключение, вызываемое если email не введен в env
:raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk
"""
if self.credentials.get('email') is None:
raise ValueError('access_controller email not in env')
if self.credentials.get('token') is None and self.credentials.get('password') is None:
raise ValueError('access_controller token or password not in env')
admin = Zenpy(**self.credentials)
try:
admin.search(self.credentials['email'], type='user')
except APIException:
raise ValueError('invalid access_controller`s login data')
return admin
zenpy = ZendeskAdmin({
'subdomain': ACTRL_ZENDESK_SUBDOMAIN,
'email': ACTRL_API_EMAIL,
'token': ACTRL_API_TOKEN,
'password': ACTRL_API_PASSWORD,
})