import os from datetime import timedelta, datetime, date from django.contrib.auth.models import User from django.utils import timezone from zenpy import Zenpy from zenpy.lib.exception import APIException from main.models import UserProfile, RoleChangeLogs from django.core.exceptions import ObjectDoesNotExist from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY from access_controller.settings import ZENDESK_ROLES as ROLES, ONE_DAY, ZENDESK_GROUPS, SOLVED_TICKETS_EMAIL from main.models import UserProfile, RoleChangeLogs, UnassignedTicket, UnassignedTicketStatus class ZendeskAdmin: """ Класс **ZendeskAdmin** существует, чтобы в каждой фунциии отдельно не проверять аккаунт администратора :param credentials: Полномочия (первым указывается учетная запись организации в Zendesk) :type credentials: :class:`dict` :param email: Email администратора, указанный в env :type email: :class:`str` :param token: Токен администратора (формируется в Zendesk, указывается в env) :type token: :class:`str` :param password: Пароль администратора, указанный в env :type password: :class:`str` """ credentials: dict = { 'subdomain': 'ngenix1612197338' } email: str = os.getenv('ACCESS_CONTROLLER_API_EMAIL') token: str = os.getenv('ACCESS_CONTROLLER_API_TOKEN') password: str = os.getenv('ACCESS_CONTROLLER_API_PASSWORD') def __init__(self): self.create_admin() def check_user(self, email: str) -> bool: """ Функция **check_user** осуществляет проверку существования пользователя в Zendesk по email """ return True if self.admin.search(email, type='user') else False def get_user_name(self, email: str) -> str: """ Функция **get_user_name** возвращает имя пользователя по его email """ user = self.admin.users.search(email).values[0] return user.name def get_user_role(self, email: str) -> str: """ Функция **get_user_role** возвращает роль пользователя по его email """ user = self.admin.users.search(email).values[0] return user.role def get_user_id(self, email: str) -> str: """ Функция **get_user_id** возвращает id пользователя по его email """ user = self.admin.users.search(email).values[0] return user.id def get_user_image(self, email: str) -> str: """ Функция **get_user_image** возвращает url-ссылку на аватар пользователя по его email """ user = self.admin.users.search(email).values[0] return user.photo['content_url'] if user.photo else None def get_user(self, email: str): """ Функция **get_user** возвращает пользователя (объект) по его email :param email: email пользователя :return: email пользователя, найденного в БД """ return self.admin.users.search(email).values[0] def get_group(self, name): groups = self.admin.search(name) for group in groups: return group return None def get_user_org(self, email: str) -> str: """ Функция **get_user_org** возвращает организацию, к которой относится пользователь по его email """ user = self.admin.users.search(email).values[0] return user.organization.name if user.organization else None def create_admin(self) -> Zenpy: """ Функция **Create_admin()** создает администратора, проверяя наличие вводимых данных в env. :param credentials: В список полномочий администратора вносятся email, token, password из env :type credentials: :class:`dict` :raise: :class:`ValueError`: исключение, вызываемое если email не введен в env :raise: :class:`APIException`: исключение, вызываемое если пользователя с таким email не существует в Zendesk """ if self.email is None: raise ValueError('access_controller email not in env') self.credentials['email'] = self.email if self.token: self.credentials['token'] = self.token elif self.password: self.credentials['password'] = self.password else: raise ValueError('access_controller token or password not in env') self.admin = Zenpy(**self.credentials) try: self.admin.search(self.email, type='user') except APIException: raise ValueError('invalid access_controller`s login data') def update_role(user_profile: UserProfile, role: str) -> UserProfile: """ Функция **update_role** меняет роль пользователя. """ zendesk = ZendeskAdmin() user = zendesk.get_user(user_profile.user.email) user.custom_role_id = role zendesk.admin.users.update(user) def make_engineer(user_profile: UserProfile, who_changes: User) -> UserProfile: """ Функция **make_engineer** устанавливает пользователю роль инженера. """ RoleChangeLogs.objects.create( user=user_profile.user, old_role=user_profile.custom_role_id, new_role=ROLES['engineer'], changed_by=who_changes ) update_role(user_profile, ROLES['engineer']) def make_light_agent(user_profile: UserProfile, who_changes: User) -> UserProfile: """ Функция **make_light_agent** устанавливапет пользователю роль легкого агента. """ tickets = get_tickets_list(user_profile.user.email) for ticket in tickets: UnassignedTicket.objects.create( assignee=user_profile.user, ticket_id=ticket.id, status=UnassignedTicketStatus.SOLVED if ticket.status == 'solved' else UnassignedTicketStatus.UNASSIGNED ) if ticket.status == 'solved': ticket.assignee = ZendeskAdmin().get_user(SOLVED_TICKETS_EMAIL) else: ticket.assignee = None ticket.group = ZendeskAdmin().get_group(ZENDESK_GROUPS['buffer']) ZendeskAdmin().admin.tickets.update(ticket) RoleChangeLogs.objects.create( user=user_profile.user, old_role=user_profile.custom_role_id, new_role=ROLES['light_agent'], changed_by=who_changes ) update_role(user_profile, ROLES['light_agent']) def get_users_list() -> list: """ Функция **get_users_list** возвращает список пользователей Zendesk, относящихся к организации. """ zendesk = ZendeskAdmin() # У пользователей должна быть организация SYSTEM org = next(zendesk.admin.search(type='organization', name='SYSTEM')) users = zendesk.admin.organizations.users(org) return users def get_tickets_list(email): """ Функция возвращает список тикетов пользователя Zendesk """ return ZendeskAdmin().admin.search(assignee=email, type='ticket') def update_profile(user_profile: UserProfile) -> UserProfile: """ Функция обновляет профиль пользователя в соотвтетствии с текущим в Zendesk """ user = ZendeskAdmin().get_user(user_profile.user.email) user_profile.name = user.name user_profile.role = user.role user_profile.custom_role_id = user.custom_role_id if user.custom_role_id else 0 user_profile.image = user.photo['content_url'] if user.photo else None user_profile.save() def check_user_exist(email: str) -> bool: """ Функция проверяет, существует ли пользователь """ return ZendeskAdmin().check_user(email) def get_user_organization(email: str) -> str: """ Функция возвращает организацию пользователя """ return ZendeskAdmin().get_user_org(email) def check_user_auth(email: str, password: str) -> bool: """ Функция проверяет, верны ли входные данные :raise: :class:`APIException`: исключение, вызываемое если пользователь не аутентифицирован """ creds = { 'email': email, 'password': password, 'subdomain': 'ngenix1612197338', } try: user = Zenpy(**creds) user.search(email, type='user') except APIException: return False return True def update_user_in_model(profile, zendesk_user): profile.name = zendesk_user.name profile.role = zendesk_user.role profile.image = zendesk_user.photo['content_url'] if zendesk_user.photo else None profile.custom_role_id = zendesk_user.custom_role_id profile.save() def count_users(users) -> tuple: """ Функция подсчета количества сотрудников с ролями engineer и light_a .. todo:: this func counts users from all zendesk instead of just from a model: """ engineers, light_agents = 0, 0 for user in users: if user.custom_role_id == ROLES['engineer']: engineers += 1 elif user.custom_role_id == ROLES['light_agent']: light_agents += 1 return engineers, light_agents def update_users_in_model(): """ Обновляет пользователей в модели UserProfile по списку пользователей в организации """ users = get_users_list() for user in users: try: profile = User.objects.get(email=user.email).userprofile update_user_in_model(profile, user) except ObjectDoesNotExist: pass return users def daterange(start_date, end_date) -> list: """ Возвращает список дней с start_date по end_date исключая правую границу """ dates = [] for n in range(int((end_date - start_date).days)): dates.append(start_date + timedelta(n)) return dates def get_timedelta(log, time=None) -> timedelta: """ Возвращает объект класса timedelta, который хранит промежуток времени от начала суток до момента, который находится в log (объект класса RoleChangeLogs) или в time(datetime.time), если введён """ if time is None: time = log.change_time.time() time = timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) return time def last_day_of_month(day): """ Возвращает последний день любого месяца """ next_month = day.replace(day=28) + timedelta(days=4) return next_month - timedelta(days=next_month.day) class StatisticData: def __init__(self, start_date, end_date, user_email, stat=None): self.display = None self.interval = None self.start_date = start_date self.end_date = end_date self.email = user_email self.errors = list() self.warnings = list() self.data = dict() self.statistic = dict() self._init_data() if stat is None: self._init_statistic() else: self.statistic = stat def get_statistic(self): """ Вернуть словарь statistic с применением формата отображения и интеравала работы(если они есть) None, если были ошибки при создании """ if self.is_valid_statistic(): stat = self.statistic stat = self._use_display(stat) stat = self._use_interval(stat) return stat else: return None def is_valid_statistic(self): """ Были ли ошибки при создании статистики """ return not self.errors and self.statistic def set_interval(self, interval): """ Устанавливает интервал работы """ if interval not in ['months', 'days']: self.errors += ['Интервал работы должен быть в днях или месяцах'] return False self.interval = interval return True def set_display(self, display_format): """ Устанавливает формат отображения """ if display_format not in ['days', 'hours']: self.errors += ['Формат отображения должен быть в часах или днях'] return False self.display = display_format return True def get_data(self): """ Вернуть данные data - массив объектов RoleChangeLogs, является списком логов пользователя data может быть пустым списком """ if self.is_valid_data(): return self.data else: return None def is_valid_data(self): """ Были ли ошибки при получении логов """ return not self.errors def _use_display(self, stat): """ Приводит данные к формату отображения """ if not self.is_valid_statistic() or not self.display: return stat new_stat = {} for key, item in stat.items(): if self.display == 'hours': new_stat[key] = item / 3600 elif self.display == 'days': new_stat[key] = item / (ONE_DAY * 3600) return new_stat def _use_interval(self, stat): """ Объединяет ключи и значения в соответствии с интервалом работы """ if not self.is_valid_statistic() or not self.interval: return stat new_stat = {} if self.interval == 'months': # Переделываем ключи под формат('начало_месяца - конец_месяца') for key, value in stat.items(): current_month_start = max(self.start_date, date(year=key.year, month=key.month, day=1)) current_month_end = min(self.end_date, last_day_of_month(date(year=key.year, month=key.month, day=1))) index = ' - '.join([str(current_month_start), str(current_month_end)]) if new_stat.get(index): new_stat[index] += value else: new_stat[index] = value elif self.interval == 'days': new_stat = stat # статистика изначально в днях return new_stat def check_time(self): """ Проверка на правильность введенного времени """ if self.end_date < self.start_date or self.end_date > datetime.now().date(): return False return True def _init_data(self): """ Получение логов в диапазоне дат start_date-end_date для пользователя с почтой email """ if not self.check_time(): self.errors += ['Конец диапазона должен быть позже начала диапазона и раньше текущего времени'] return try: self.data = RoleChangeLogs.objects.filter( change_time__range=[self.start_date, self.end_date + timedelta(days=1)], user=User.objects.get(email=self.email), ).order_by('change_time') except User.DoesNotExist: self.errors += ['Пользователь не найден'] def _init_statistic(self): """ Функция заполняет словарь, в котором ключ - дата, значение - кол-во проработанных в этот день секунд """ self.clear_statistic() if not self.get_data(): self.warnings += ['Не обнаружены изменения роли в данном промежутке'] return None first_log, last_log = self.data[0], self.data[len(self.data) - 1] if first_log.old_role == ROLES['engineer']: self.statistic[first_log.change_time.date()] += get_timedelta(first_log).total_seconds() if last_log.new_role == ROLES['engineer']: self.fill_daterange(last_log.change_time.date() + timedelta(days=1), self.end_date + timedelta(days=1)) if last_log.change_time.date() == timezone.now().date(): self.statistic[last_log.change_time.date()] += ( get_timedelta(None, timezone.now().time()) - get_timedelta(last_log) ).total_seconds() else: self.statistic[last_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(last_log)).total_seconds() if self.end_date == timezone.now().date(): self.statistic[self.end_date] = get_timedelta(None, timezone.now().time()).total_seconds() for log_index in range(len(self.data) - 1): if self.data[log_index].new_role == ROLES['engineer']: current_log, next_log = self.data[log_index], self.data[log_index + 1] if current_log.change_time.date() != next_log.change_time.date(): self.statistic[current_log.change_time.date()] += ( timedelta(days=1) - get_timedelta(current_log)).total_seconds() self.statistic[next_log.change_time.date()] += get_timedelta(next_log).total_seconds() self.fill_daterange(current_log.change_time.date() + timedelta(days=1), next_log.change_time.date()) else: elapsed_time = next_log.change_time - current_log.change_time self.statistic[current_log.change_time.date()] += elapsed_time.total_seconds() def fill_daterange(self, first, last, val=24 * 3600): """ Заполение диапазона дат значением val по умолчанию val = кол-во секунд в 1 дне """ for day in daterange(first, last): self.statistic[day] = val def clear_statistic(self): """ Обнуление всех дней """ self.statistic.clear() self.fill_daterange(self.start_date, self.end_date + timedelta(days=1), 0)