diff --git a/.gitignore b/.gitignore index ddc7b0c..0faebdf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ test.py +like.py venv/ __pycache__/ dist diff --git a/itd/client.py b/itd/client.py index ba7a36b..2b5deaa 100644 --- a/itd/client.py +++ b/itd/client.py @@ -1,239 +1,1020 @@ +from warnings import deprecated +from uuid import UUID from _io import BufferedReader from typing import cast +from datetime import datetime -from requests.exceptions import HTTPError +from requests.exceptions import ConnectionError, HTTPError from itd.routes.users import get_user, update_profile, follow, unfollow, get_followers, get_following, update_privacy from itd.routes.etc import get_top_clans, get_who_to_follow, get_platform_status -from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment -from itd.routes.hashtags import get_hastags, get_posts_by_hastag +from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment, add_reply_comment +from itd.routes.hashtags import get_hashtags, get_posts_by_hashtag from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count -from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts +from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts, restore_post, like_post, unlike_post from itd.routes.reports import report from itd.routes.search import search from itd.routes.files import upload_file from itd.routes.auth import refresh_token, change_password, logout -from itd.routes.verification import verificate, get_verification_status +from itd.routes.verification import verify, get_verification_status +from itd.routes.pins import get_pins, remove_pin, set_pin + +from itd.models.comment import Comment +from itd.models.notification import Notification +from itd.models.post import Post, NewPost +from itd.models.clan import Clan +from itd.models.hashtag import Hashtag +from itd.models.user import User, UserProfileUpdate, UserPrivacy, UserFollower, UserWhoToFollow +from itd.models.pagination import Pagination, PostsPagintaion, LikedPostsPagintaion +from itd.models.verification import Verification, VerificationStatus +from itd.models.report import NewReport +from itd.models.file import File +from itd.models.pin import Pin + +from itd.enums import PostsTab, ReportTargetType, ReportTargetReason +from itd.request import set_cookies +from itd.exceptions import ( + NoCookie, NoAuthData, SamePassword, InvalidOldPassword, NotFound, ValidationError, UserBanned, + PendingRequestExists, Forbidden, UsernameTaken, CantFollowYourself, Unauthorized, + CantRepostYourPost, AlreadyReposted, AlreadyReported, TooLarge, PinNotOwned +) def refresh_on_error(func): def wrapper(self, *args, **kwargs): - try: - return func(self, *args, **kwargs) - except HTTPError as e: - if '401' in str(e): + if self.cookies: + try: + return func(self, *args, **kwargs) + except (Unauthorized, ConnectionError, HTTPError): self.refresh_auth() return func(self, *args, **kwargs) - raise e + else: + return func(self, *args, **kwargs) return wrapper class Client: - def __init__(self, token: str | None, cookies: str | None = None): + def __init__(self, token: str | None = None, cookies: str | None = None): self.cookies = cookies if token: self.token = token.replace('Bearer ', '') elif self.cookies: + set_cookies(self.cookies) self.refresh_auth() else: - raise ValueError('Provide token or cookie') + raise NoAuthData() - def refresh_auth(self): - if self.cookies: - self.token = refresh_token(self.cookies) - return self.token - else: - print('no cookies') + def refresh_auth(self) -> str: + """Обновить access token - @refresh_on_error - def change_password(self, old: str, new: str): + Raises: + NoCookie: Нет cookie + + Returns: + str: Токен + """ + print('refresh token') if not self.cookies: - print('no cookies') - return - return change_password(self.cookies, self.token, old, new) + raise NoCookie() + + res = refresh_token(self.cookies) + res.raise_for_status() + + self.token = res.json()['accessToken'] + return self.token @refresh_on_error - def logout(self): + def change_password(self, old: str, new: str) -> dict: + """Смена пароля + + Args: + old (str): Старый пароль + new (str): Новый пароль + + Raises: + NoCookie: Нет cookie + SamePassword: Одинаковые пароли + InvalidOldPassword: Старый пароль неверный + + Returns: + dict: Ответ API `{'message': 'Password changed successfully'}` + """ if not self.cookies: - print('no cookies') - return - return logout(self.cookies) + raise NoCookie() + res = change_password(self.cookies, self.token, old, new) + if res.json().get('error', {}).get('code') == 'SAME_PASSWORD': + raise SamePassword() + if res.json().get('error', {}).get('code') == 'INVALID_OLD_PASSWORD': + raise InvalidOldPassword() + res.raise_for_status() + + return res.json() @refresh_on_error - def get_user(self, username: str) -> dict: - return get_user(self.token, username) + def logout(self) -> dict: + """Выход из аккаунта + + Raises: + NoCookie: Нет cookie + + Returns: + dict: Ответ API + """ + if not self.cookies: + raise NoCookie() + + res = logout(self.cookies) + res.raise_for_status() + + return res.json() @refresh_on_error - def get_me(self) -> dict: + def get_user(self, username: str) -> User: + """Получить пользователя + + Args: + username (str): username или "me" + + Raises: + NotFound: Пользователь не найден + UserBanned: Пользователь заблокирован + + Returns: + User: Пользователь + """ + res = get_user(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + if res.json().get('error', {}).get('code') == 'USER_BLOCKED': + raise UserBanned() + res.raise_for_status() + + return User.model_validate(res.json()) + + @refresh_on_error + def get_me(self) -> User: + """Получить текущего пользователя (me) + + Returns: + User: Пользователь + """ return self.get_user('me') @refresh_on_error - def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: str | None = None) -> dict: - return update_profile(self.token, bio, display_name, username, banner_id) + def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: UUID | None = None) -> UserProfileUpdate: + """Обновить профиль + + Args: + username (str | None, optional): username. Defaults to None. + display_name (str | None, optional): Отображаемое имя. Defaults to None. + bio (str | None, optional): Биография (о себе). Defaults to None. + banner_id (UUID | None, optional): UUID баннера. Defaults to None. + + Raises: + ValidationError: Ошибка валидации + + Returns: + UserProfileUpdate: Обновленный профиль + """ + res = update_profile(self.token, bio, display_name, username, banner_id) + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + if res.json().get('error', {}).get('code') == 'USERNAME_TAKEN': + raise UsernameTaken() + res.raise_for_status() + + return UserProfileUpdate.model_validate(res.json()) @refresh_on_error - def update_privacy(self, wall_closed: bool = False, private: bool = False): - return update_privacy(self.token, wall_closed, private) + def update_privacy(self, wall_closed: bool = False, private: bool = False) -> UserPrivacy: + """Обновить настройки приватности + + Args: + wall_closed (bool, optional): Закрыть стену. Defaults to False. + private (bool, optional): Приватность. На данный момент неизвестно, что делает этот параметр. Defaults to False. + + Returns: + UserPrivacy: Обновленные данные приватности + """ + res = update_privacy(self.token, wall_closed, private) + res.raise_for_status() + + return UserPrivacy.model_validate(res.json()) @refresh_on_error - def follow(self, username: str) -> dict: - return follow(self.token, username) + def follow(self, username: str) -> int: + """Подписаться на пользователя + + Args: + username (str): username + + Raises: + NotFound: Пользователь не найден + CantFollowYourself: Невозможно подписаться на самого себе + + Returns: + int: Число подписчиков после подписки + """ + res = follow(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and res.status_code == 400: + raise CantFollowYourself() + res.raise_for_status() + + return res.json()['followersCount'] @refresh_on_error - def unfollow(self, username: str) -> dict: - return unfollow(self.token, username) + def unfollow(self, username: str) -> int: + """Отписаться от пользователя + + Args: + username (str): username + + Raises: + NotFound: Пользователь не найден + + Returns: + int: Число подписчиков после отписки + """ + res = unfollow(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + res.raise_for_status() + + return res.json()['followersCount'] @refresh_on_error - def get_followers(self, username: str) -> dict: - return get_followers(self.token, username) + def get_followers(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]: + """Получить подписчиков пользователя + + Args: + username (str): username + limit (int, optional): Лимит. Defaults to 30. + page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1. + + Raises: + NotFound: Пользователь не найден + + Returns: + list[UserFollower]: Список подписчиков + Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще) + """ + res = get_followers(self.token, username, limit, page) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + res.raise_for_status() + + return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination']) @refresh_on_error - def get_following(self, username: str) -> dict: - return get_following(self.token, username) + def get_following(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]: + """Получить подписки пользователя + + Args: + username (str): username + limit (int, optional): Лимит. Defaults to 30. + page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1. + + Raises: + NotFound: Пользователь не найден + + Returns: + list[UserFollower]: Список подписок + Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще) + """ + res = get_following(self.token, username, limit, page) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + res.raise_for_status() + + return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination']) + + @deprecated("verificate устарел используйте verify") + @refresh_on_error + def verificate(self, file_url: str) -> Verification: + """Отправить запрос на верификацию + + Args: + file_url (str): Ссылка на видео + + Raises: + PendingRequestExists: Запрос уже отправлен + + Returns: + Verification: Верификация + """ + return self.verify(file_url) + + @refresh_on_error + def verify(self, file_url: str) -> Verification: + """Отправить запрос на верификацию + + Args: + file_url (str): Ссылка на видео + + Raises: + PendingRequestExists: Запрос уже отправлен + + Returns: + Verification: Верификация + """ + res = verify(self.token, file_url) + if res.json().get('error', {}).get('code') == 'PENDING_REQUEST_EXISTS': + raise PendingRequestExists() + res.raise_for_status() + + return Verification.model_validate(res.json()) + + @refresh_on_error + def get_verification_status(self) -> VerificationStatus: + """Получить статус верификации + + Returns: + VerificationStatus: Верификация + """ + res = get_verification_status(self.token) + res.raise_for_status() + + return VerificationStatus.model_validate(res.json()) + + @refresh_on_error + def get_who_to_follow(self) -> list[UserWhoToFollow]: + """Получить список популярных пользователей (кого читать) + + Returns: + list[UserWhoToFollow]: Список пользователей + """ + res = get_who_to_follow(self.token) + res.raise_for_status() + + return [UserWhoToFollow.model_validate(user) for user in res.json()['users']] + + @refresh_on_error + def get_top_clans(self) -> list[Clan]: + """Получить топ кланов + + Returns: + list[Clan]: Топ кланов + """ + res = get_top_clans(self.token) + res.raise_for_status() + + return [Clan.model_validate(clan) for clan in res.json()['clans']] + + @refresh_on_error + def get_platform_status(self) -> bool: + """Получить статус платформы + + Returns: + bool: read only + """ + res = get_platform_status(self.token) + res.raise_for_status() + + return res.json()['readOnly'] @refresh_on_error - def verificate(self, file_url: str): - return verificate(self.token, file_url) + def add_comment(self, post_id: UUID, content: str, attachment_ids: list[UUID] = []) -> Comment: + """Добавить комментарий - @refresh_on_error - def get_verification_status(self): - return get_verification_status(self.token) + Args: + post_id (str): UUID поста + content (str): Содержание + attachment_ids (list[UUID]): Список UUID прикреплённых файлов + reply_comment_id (UUID | None, optional): ID коммента для ответа. Defaults to None. + + Raises: + ValidationError: Ошибка валидации + NotFound: Пост не найден + + Returns: + Comment: Комментарий + """ + res = add_comment(self.token, post_id, content, attachment_ids) + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + res.raise_for_status() + + return Comment.model_validate(res.json()) @refresh_on_error - def get_who_to_follow(self) -> dict: - return get_who_to_follow(self.token) + def add_reply_comment(self, comment_id: UUID, content: str, author_id: UUID, attachment_ids: list[UUID] = []) -> Comment: + """Добавить ответный комментарий - @refresh_on_error - def get_top_clans(self) -> dict: - return get_top_clans(self.token) + Args: + comment_id (str): UUID комментария + content (str): Содержание + author_id (UUID | None, optional): ID пользователя, отправившего комментарий. Defaults to None. + attachment_ids (list[UUID]): Список UUID прикреплённых файлов - @refresh_on_error - def get_platform_status(self) -> dict: - return get_platform_status(self.token) + Raises: + ValidationError: Ошибка валидации + NotFound: Пользователь или комментарий не найден + + Returns: + Comment: Комментарий + """ + res = add_reply_comment(self.token, comment_id, content, author_id, attachment_ids) + if res.status_code == 500 and 'Failed query' in res.text: + raise NotFound('User') + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Comment') + res.raise_for_status() + + return Comment.model_validate(res.json()) @refresh_on_error - def add_comment(self, post_id: str, content: str, reply_comment_id: str | None = None): - return add_comment(self.token, post_id, content, reply_comment_id) + def get_comments(self, post_id: UUID, limit: int = 20, cursor: int = 0, sort: str = 'popular') -> tuple[list[Comment], Pagination]: + """Получить список комментариев + + Args: + post_id (UUID): UUID поста + limit (int, optional): Лимит. Defaults to 20. + cursor (int, optional): Курсор (сколько пропустить). Defaults to 0. + sort (str, optional): Сортировка. Defaults to 'popular'. + + Raises: + NotFound: Пост не найден + + Returns: + list[Comment]: Список комментариев + Pagination: Пагинация + """ + res = get_comments(self.token, post_id, limit, cursor, sort) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + res.raise_for_status() + data = res.json()['data'] + + return [Comment.model_validate(comment) for comment in data['comments']], Pagination(page=(cursor // limit) or 1, limit=limit, total=data['total'], hasMore=data['hasMore'], nextCursor=None) @refresh_on_error - def get_comments(self, post_id: str, limit: int = 20, cursor: int = 0, sort: str = 'popular'): - return get_comments(self.token, post_id, limit, cursor, sort) + def like_comment(self, id: UUID) -> int: + """Лайкнуть комментарий + + Args: + id (UUID): UUID комментария + + Raises: + NotFound: Комментарий не найден + + Returns: + int: Количество лайков + """ + res = like_comment(self.token, id) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Comment') + res.raise_for_status() + + return res.json()['likesCount'] @refresh_on_error - def like_comment(self, id: str): - return like_comment(self.token, id) + def unlike_comment(self, id: UUID) -> int: + """Убрать лайк с комментария + + Args: + id (UUID): UUID комментария + + Raises: + NotFound: Комментарий не найден + + Returns: + int: Количество лайков + """ + res = unlike_comment(self.token, id) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Comment') + res.raise_for_status() + + return res.json()['likesCount'] @refresh_on_error - def unlike_comment(self, id: str): - return unlike_comment(self.token, id) + def delete_comment(self, id: UUID) -> None: + """Удалить комментарий + + Args: + id (UUID): UUID комментария + + Raises: + NotFound: Комментарий не найден + Forbidden: Нет прав на удаление + """ + res = delete_comment(self.token, id) + if res.status_code == 204: + return + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Comment') + if res.json().get('error', {}).get('code') == 'FORBIDDEN': + raise Forbidden('delete comment') + res.raise_for_status() + + @deprecated("get_hastags устарел используйте get_hashtags") + @refresh_on_error + def get_hastags(self, limit: int = 10) -> list[Hashtag]: + """Получить список популярных хэштэгов + + Args: + limit (int, optional): Лимит. Defaults to 10. + + Returns: + list[Hashtag]: Список хэштэгов + """ + return self.get_hashtags(limit) @refresh_on_error - def delete_comment(self, id: str): - return delete_comment(self.token, id) + def get_hashtags(self, limit: int = 10) -> list[Hashtag]: + """Получить список популярных хэштэгов + + Args: + limit (int, optional): Лимит. Defaults to 10. + + Returns: + list[Hashtag]: Список хэштэгов + """ + res = get_hashtags(self.token, limit) + res.raise_for_status() + + return [Hashtag.model_validate(hashtag) for hashtag in res.json()['data']['hashtags']] + + @refresh_on_error + def get_posts_by_hashtag(self, hashtag: str, limit: int = 20, cursor: UUID | None = None) -> tuple[Hashtag | None, list[Post], Pagination]: + """Получить посты по хэштэгу + + Args: + hashtag (str): Хэштэг (без #) + limit (int, optional): Лимит. Defaults to 20. + cursor (UUID | None, optional): Курсор (UUID последнего поста, после которого брать данные). Defaults to None. + + Returns: + Hashtag | None: Хэштэг + list[Post]: Посты + Pagination: Пагинация + """ + res = get_posts_by_hashtag(self.token, hashtag, limit, cursor) + res.raise_for_status() + data = res.json()['data'] + + return Hashtag.model_validate(data['hashtag']), [Post.model_validate(post) for post in data['posts']], Pagination.model_validate(data['pagination']) @refresh_on_error - def get_hastags(self, limit: int = 10): - return get_hastags(self.token, limit) + def get_notifications(self, limit: int = 20, offset: int = 0) -> tuple[list[Notification], Pagination]: + """Получить уведомления + + Args: + limit (int, optional): Лимит. Defaults to 20. + offset (int, optional): Сдвиг. Defaults to 0. + + Returns: + list[Notification]: Уведомления + Pagination: Пагинация + """ + res = get_notifications(self.token, limit, offset) + res.raise_for_status() + + return ( + [Notification.model_validate(notification) for notification in res.json()['notifications']], + Pagination(page=(offset // limit) + 1, limit=limit, hasMore=res.json()['hasMore'], nextCursor=None) + ) @refresh_on_error - def get_posts_by_hashtag(self, hashtag: str, limit: int = 20, cursor: int = 0): - return get_posts_by_hastag(self.token, hashtag, limit, cursor) + def mark_as_read(self, id: UUID) -> bool: + """Прочитать уведомление + + Args: + id (UUID): UUID уведомления + + Returns: + bool: Успешно (False - уже прочитано) + """ + res = mark_as_read(self.token, id) + res.raise_for_status() + + return res.json()['success'] + + @refresh_on_error + def mark_all_as_read(self) -> None: + """Прочитать все уведомления""" + res = mark_all_as_read(self.token) + res.raise_for_status() @refresh_on_error - def get_notifications(self, limit: int = 20, cursor: int = 0, type: str | None = None): - return get_notifications(self.token, limit, cursor, type) + def get_unread_notifications_count(self) -> int: + """Получить количество непрочитанных уведомлений - @refresh_on_error - def mark_as_read(self, id: str): - return mark_as_read(self.token, id) + Returns: + int: Количество + """ + res = get_unread_notifications_count(self.token) + res.raise_for_status() - @refresh_on_error - def mark_all_as_read(self): - return mark_all_as_read(self.token) - - @refresh_on_error - def get_unread_notifications_count(self): - return get_unread_notifications_count(self.token) + return res.json()['count'] @refresh_on_error - def create_post(self, content: str, wall_recipient_id: int | None = None, attach_ids: list[str] = []): - return create_post(self.token, content, wall_recipient_id, attach_ids) + def create_post(self, content: str, wall_recipient_id: UUID | None = None, attach_ids: list[UUID] = []) -> NewPost: + """Создать пост + + Args: + content (str): Содержимое + wall_recipient_id (UUID | None, optional): UUID пользователя (чтобы создать пост ему на стене). Defaults to None. + attach_ids (list[UUID], optional): UUID вложений. Defaults to []. + + Raises: + NotFound: Пользователь не найден + ValidationError: Ошибка валидации + + Returns: + NewPost: Новый пост + """ + res = create_post(self.token, content, wall_recipient_id, attach_ids) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Wall recipient') + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + res.raise_for_status() + + return NewPost.model_validate(res.json()) @refresh_on_error - def get_posts(self, username: str | None = None, limit: int = 20, cursor: int = 0, sort: str = '', tab: str = ''): - return get_posts(self.token, username, limit, cursor, sort, tab) + def get_posts(self, cursor: int = 0, tab: PostsTab = PostsTab.POPULAR) -> tuple[list[Post], PostsPagintaion]: + """Получить список постов + + Args: + cursor (int, optional): Страница. Defaults to 0. + tab (PostsTab, optional): Вкладка (популярное или подписки). Defaults to PostsTab.POPULAR. + + Returns: + list[Post]: Список постов + Pagination: Пагинация + """ + res = get_posts(self.token, cursor, tab) + res.raise_for_status() + data = res.json()['data'] + + return [Post.model_validate(post) for post in data['posts']], PostsPagintaion.model_validate(data['pagination']) @refresh_on_error - def get_post(self, id: str): - return get_post(self.token, id) + def get_post(self, id: UUID) -> Post: + """Получить пост + + Args: + id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + + Returns: + Post: Пост + """ + res = get_post(self.token, id) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + res.raise_for_status() + + return Post.model_validate(res.json()['data']) @refresh_on_error - def edit_post(self, id: str, content: str): - return edit_post(self.token, id, content) + def edit_post(self, id: UUID, content: str) -> str: + """Редактировать пост + + Args: + id (UUID): UUID поста + content (str): Содержимое + + Raises: + NotFound: Пост не найден + Forbidden: Нет доступа + ValidationError: Ошибка валидации + + Returns: + str: Новое содержимое + """ + res = edit_post(self.token, id, content) + + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + if res.json().get('error', {}).get('code') == 'FORBIDDEN': + raise Forbidden('edit post') + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + res.raise_for_status() + + return res.json()['content'] @refresh_on_error - def delete_post(self, id: str): - return delete_post(self.token, id) + def delete_post(self, id: UUID) -> None: + """Удалить пост + + Args: + id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + Forbidden: Нет доступа + """ + res = delete_post(self.token, id) + if res.status_code == 204: + return + + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + if res.json().get('error', {}).get('code') == 'FORBIDDEN': + raise Forbidden('delete post') + res.raise_for_status() @refresh_on_error - def pin_post(self, id: str): - return pin_post(self.token, id) + def pin_post(self, id: UUID): + """Закрепить пост + + Args: + id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + Forbidden: Нет доступа + """ + res = pin_post(self.token, id) + + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + if res.json().get('error', {}).get('code') == 'FORBIDDEN': + raise Forbidden('pin post') + res.raise_for_status() @refresh_on_error - def repost(self, id: str, content: str | None = None): - return repost(self.token, id, content) + def repost(self, id: UUID, content: str | None = None) -> NewPost: + """Репостнуть пост + + Args: + id (UUID): UUID поста + content (str | None, optional): Содержимое (доп. комментарий). Defaults to None. + + Raises: + NotFound: Пост не найден + AlreadyReposted: Пост уже репостнут + CantRepostYourPost: Нельзя репостить самого себя + ValidationError: Ошибка валидации + + Returns: + NewPost: Новый пост + """ + res = repost(self.token, id, content) + + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + if res.json().get('error', {}).get('code') == 'CONFLICT': + raise AlreadyReposted() + if res.status_code == 422 and res.json().get('message') == 'Cannot repost your own post': + raise CantRepostYourPost() + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + res.raise_for_status() + + return NewPost.model_validate(res.json()) @refresh_on_error - def view_post(self, id: str): - return view_post(self.token, id) + def view_post(self, id: UUID) -> None: + """Просмотреть пост + + Args: + id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + """ + res = view_post(self.token, id) + if res.status_code == 204: + return + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('Post') + res.raise_for_status() @refresh_on_error - def get_liked_posts(self, username: str, limit: int = 20, cursor: int = 0): - return get_liked_posts(self.token, username, limit, cursor) + def get_liked_posts(self, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None) -> tuple[list[Post], LikedPostsPagintaion]: + """Получить список лайкнутых постов пользователя + + Args: + username_or_id (str | UUID): UUID или username пользователя + limit (int, optional): Лимит. Defaults to 20. + cursor (datetime | None, optional): Сдвиг (next_cursor). Defaults to None. + + Raises: + NotFound: Пользователь не найден + + Returns: + list[Post]: Список постов + LikedPostsPagintaion: Пагинация + """ + res = get_liked_posts(self.token, username_or_id, limit, cursor) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise NotFound('User') + res.raise_for_status() + data = res.json()['data'] + + return [Post.model_validate(post) for post in data['posts']], LikedPostsPagintaion.model_validate(data['pagination']) @refresh_on_error - def report(self, id: str, type: str = 'post', reason: str = 'other', description: str = ''): - return report(self.token, id, type, reason, description) + def report(self, id: UUID, type: ReportTargetType = ReportTargetType.POST, reason: ReportTargetReason = ReportTargetReason.OTHER, description: str | None = None) -> NewReport: + """Отправить жалобу - @refresh_on_error - def report_user(self, id: str, reason: str = 'other', description: str = ''): - return report(self.token, id, 'user', reason, description) + Args: + id (UUID): UUID цели + type (ReportTargetType, optional): Тип цели (пост/пользователь/комментарий). Defaults to ReportTargetType.POST. + reason (ReportTargetReason, optional): Причина. Defaults to ReportTargetReason.OTHER. + description (str | None, optional): Описание. Defaults to None. - @refresh_on_error - def report_post(self, id: str, reason: str = 'other', description: str = ''): - return report(self.token, id, 'post', reason, description) + Raises: + NotFound: Цель не найдена + AlreadyReported: Жалоба уже отправлена + ValidationError: Ошибка валидации - @refresh_on_error - def report_comment(self, id: str, reason: str = 'other', description: str = ''): - return report(self.token, id, 'comment', reason, description) + Returns: + NewReport: Новая жалоба + """ + res = report(self.token, id, type, reason, description) + + if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and 'не найден' in res.json()['error'].get('message', ''): + raise NotFound(type.value.title()) + if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and 'Вы уже отправляли жалобу' in res.json()['error'].get('message', ''): + raise AlreadyReported(type.value.title()) + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[-1]) + res.raise_for_status() + + return NewReport.model_validate(res.json()['data']) @refresh_on_error - def search(self, query: str, user_limit: int = 5, hashtag_limit: int = 5): - return search(self.token, query, user_limit, hashtag_limit) + def search(self, query: str, user_limit: int = 5, hashtag_limit: int = 5) -> tuple[list[UserWhoToFollow], list[Hashtag]]: + """Поиск по пользователям и хэштэгам + + Args: + query (str): Запрос + user_limit (int, optional): Лимит пользователей. Defaults to 5. + hashtag_limit (int, optional): Лимит хэштэгов. Defaults to 5. + + Raises: + TooLarge: Слишком длинный запрос + + Returns: + list[UserWhoToFollow]: Список пользователей + list[Hashtag]: Список хэштэгов + """ + res = search(self.token, query, user_limit, hashtag_limit) + + if res.status_code == 414: + raise TooLarge() + res.raise_for_status() + data = res.json()['data'] + + return [UserWhoToFollow.model_validate(user) for user in data['users']], [Hashtag.model_validate(hashtag) for hashtag in data['hashtags']] @refresh_on_error - def search_user(self, query: str, limit: int = 5): - return search(self.token, query, limit, 0) + def search_user(self, query: str, limit: int = 5) -> list[UserWhoToFollow]: + """Поиск пользователей + + Args: + query (str): Запрос + limit (int, optional): Лимит. Defaults to 5. + + Returns: + list[UserWhoToFollow]: Список пользователей + """ + return self.search(query, limit, 0)[0] @refresh_on_error - def search_hashtag(self, query: str, limit: int = 5): - return search(self.token, query, 0, limit) + def search_hashtag(self, query: str, limit: int = 5) -> list[Hashtag]: + """Поиск хэштэгов + + Args: + query (str): Запрос + limit (int, optional): Лимит. Defaults to 5. + + Returns: + list[Hashtag]: Список хэштэгов + """ + return self.search(query, 0, limit)[1] @refresh_on_error - def upload_file(self, name: str, data: BufferedReader): - return upload_file(self.token, name, data) + def upload_file(self, name: str, data: BufferedReader) -> File: + """Загрузить файл - def update_banner(self, name: str): - id = self.upload_file(name, cast(BufferedReader, open(name, 'rb')))['id'] - return self.update_profile(banner_id=id) \ No newline at end of file + Args: + name (str): Имя файла + data (BufferedReader): Содержимое (open('имя', 'rb')) + + Returns: + File: Файл + """ + res = upload_file(self.token, name, data) + res.raise_for_status() + + return File.model_validate(res.json()) + + def update_banner(self, name: str) -> UserProfileUpdate: + """Обновить банер (шорткат из upload_file + update_profile) + + Args: + name (str): Имя файла + + Returns: + UserProfileUpdate: Обновленный профиль + """ + id = self.upload_file(name, cast(BufferedReader, open(name, 'rb'))).id + return self.update_profile(banner_id=id) + + @refresh_on_error + def restore_post(self, post_id: UUID) -> None: + """Восстановить удалённый пост + + Args: + post_id: UUID поста + """ + res = restore_post(self.token, post_id) + res.raise_for_status() + + @refresh_on_error + def like_post(self, post_id: UUID) -> int: + """Лайкнуть пост + + Args: + post_id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + + Returns: + int: Количество лайков + """ + res = like_post(self.token, post_id) + + if res.status_code == 404: + raise NotFound("Post") + + return res.json()['likesCount'] + + @refresh_on_error + def unlike_post(self, post_id: UUID) -> int: + """Убрать лайк с поста + + Args: + post_id (UUID): UUID поста + + Raises: + NotFound: Пост не найден + + Returns: + int: Количество лайков + """ + res = unlike_post(self.token, post_id) + + if res.status_code == 404: + raise NotFound("Post not found") + + return res.json()['likesCount'] + + + @refresh_on_error + def get_pins(self) -> tuple[list[Pin], str]: + """Список пинов + + Returns: + list[Pin]: Список пинов + str: Активный пин + """ + res = get_pins(self.token) + res.raise_for_status() + data = res.json()['data'] + + return [Pin.model_validate(pin) for pin in data['pins']], data['activePin'] + + @refresh_on_error + def remove_pin(self): + """Снять пин""" + res = remove_pin(self.token) + res.raise_for_status() + + @refresh_on_error + def set_pin(self, slug: str): + res = set_pin(self.token, slug) + if res.status_code == 422 and 'found' in res.json(): + raise ValidationError(*list(res.json()['found'].items())[0]) + if res.json().get('error', {}).get('code') == 'PIN_NOT_OWNED': + raise PinNotOwned(slug) + res.raise_for_status() + + return res.json()['pin'] \ No newline at end of file diff --git a/itd/enums.py b/itd/enums.py new file mode 100644 index 0000000..ff474b9 --- /dev/null +++ b/itd/enums.py @@ -0,0 +1,33 @@ +from enum import Enum + +class NotificationType(Enum): + WALL_POST = 'wall_post' + REPLY = 'reply' + REPOST = 'repost' + COMMENT = 'comment' + FOLLOW = 'follow' + LIKE = 'like' + +class NotificationTargetType(Enum): + POST = 'post' + +class ReportTargetType(Enum): + POST = 'post' + USER = 'user' + COMMENT = 'comment' + +class ReportTargetReason(Enum): + SPAM = 'spam' # спам + VIOLENCE = 'violence' # насилие + HATE = 'hate' # ненависть + ADULT = 'adult' # 18+ + FRAUD = 'fraud' # обман\мошенничество + OTHER = 'other' # другое + +class AttachType(Enum): + AUDIO = 'audio' + IMAGE = 'image' + +class PostsTab(Enum): + FOLLOWING = 'following' + POPULAR = 'popular' diff --git a/itd/exceptions.py b/itd/exceptions.py new file mode 100644 index 0000000..121929a --- /dev/null +++ b/itd/exceptions.py @@ -0,0 +1,101 @@ +class NoCookie(Exception): + def __str__(self): + return 'No cookie for refresh-token required action' + +class NoAuthData(Exception): + def __str__(self): + return 'No auth data. Provide token or cookies' + +class InvalidCookie(Exception): + def __init__(self, code: str): + self.code = code + def __str__(self): + if self.code == 'SESSION_NOT_FOUND': + return 'Invalid cookie data: Session not found (incorrect refresh token)' + elif self.code == 'REFRESH_TOKEN_MISSING': + return 'Invalid cookie data: No refresh token' + elif self.code == 'SESSION_EXPIRED': + return 'Invalid cookie data: Session expired' + # SESSION_REVOKED + return 'Invalid cookie data: Session revoked (logged out)' + +class InvalidToken(Exception): + def __str__(self): + return 'Invalid access token' + +class SamePassword(Exception): + def __str__(self): + return 'Old and new password must not equals' + +class InvalidOldPassword(Exception): + def __str__(self): + return 'Old password is incorrect' + +class NotFound(Exception): + def __init__(self, obj: str): + self.obj = obj + def __str__(self): + return f'{self.obj} not found' + +class UserBanned(Exception): + def __str__(self): + return 'User banned' + +class ValidationError(Exception): + def __init__(self, name: str, value: str): + self.name = name + self.value = value + def __str__(self): + return f'Failed validation on {self.name}: "{self.value}"' + +class PendingRequestExists(Exception): + def __str__(self): + return 'Pending verifiaction request already exists' + +class RateLimitExceeded(Exception): + def __init__(self, retry_after: int): + self.retry_after = retry_after + def __str__(self): + return f'Rate limit exceeded - too much requests. Retry after {self.retry_after} seconds' + +class Forbidden(Exception): + def __init__(self, action: str): + self.action = action + def __str__(self): + return f'Forbidden to {self.action}' + +class UsernameTaken(Exception): + def __str__(self): + return 'Username is already taken' + +class CantFollowYourself(Exception): + def __str__(self): + return 'Cannot follow yourself' + +class Unauthorized(Exception): + def __str__(self): + return 'Auth required - refresh token' + +class CantRepostYourPost(Exception): + def __str__(self): + return 'Cannot repost your own post' + +class AlreadyReposted(Exception): + def __str__(self): + return 'Post already reposted' + +class AlreadyReported(Exception): + def __init__(self, obj: str) -> None: + self.obj = obj + def __str__(self): + return f'{self.obj} already reported' + +class TooLarge(Exception): + def __str__(self): + return 'Search query too large' + +class PinNotOwned(Exception): + def __init__(self, pin: str) -> None: + self.pin = pin + def __str__(self): + return f'You do not own "{self.pin}" pin' \ No newline at end of file diff --git a/itd/models/_text.py b/itd/models/_text.py new file mode 100644 index 0000000..96aa99e --- /dev/null +++ b/itd/models/_text.py @@ -0,0 +1,21 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field, field_validator + + +class TextObject(BaseModel): + id: UUID + content: str + + created_at: datetime = Field(alias='createdAt') + + model_config = {'populate_by_name': True} + + @field_validator('created_at', mode='plain') + @classmethod + def validate_created_at(cls, v: str): + try: + return datetime.strptime(v + '00', '%Y-%m-%d %H:%M:%S.%f%z') + except ValueError: + return datetime.strptime(v, '%Y-%m-%dT%H:%M:%S.%fZ') \ No newline at end of file diff --git a/itd/models/clan.py b/itd/models/clan.py new file mode 100644 index 0000000..10bd4b4 --- /dev/null +++ b/itd/models/clan.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel, Field + + +class Clan(BaseModel): + avatar: str + member_count: int = Field(0, alias='memberCount') \ No newline at end of file diff --git a/itd/models/comment.py b/itd/models/comment.py new file mode 100644 index 0000000..1330225 --- /dev/null +++ b/itd/models/comment.py @@ -0,0 +1,17 @@ +from pydantic import Field + +from itd.models._text import TextObject +from itd.models.user import UserPost +from itd.models.file import Attach + + +class Comment(TextObject): + author: UserPost + + likes_count: int = Field(0, alias='likesCount') + replies_count: int = Field(0, alias='repliesCount') + is_liked: bool = Field(False, alias='isLiked') + + attachments: list[Attach] = [] + replies: list['Comment'] = [] + reply_to: UserPost | None = None # author of replied comment, if this comment is reply \ No newline at end of file diff --git a/itd/models/file.py b/itd/models/file.py new file mode 100644 index 0000000..635df8c --- /dev/null +++ b/itd/models/file.py @@ -0,0 +1,29 @@ +from uuid import UUID + +from pydantic import BaseModel, Field + +from itd.enums import AttachType + +class File(BaseModel): + id: UUID + url: str + filename: str + mime_type: str = Field(alias='mimeType') + size: int + + +class PostAttach(BaseModel): + id: UUID + type: AttachType = AttachType.IMAGE + url: str + thumbnail_url: str | None = Field(None, alias='thumbnailUrl') + width: int | None = None + height: int | None = None + + +class Attach(PostAttach): + filename: str + mime_type: str = Field(alias='mimeType') + size: int + duration: int | None = None + order: int = 0 \ No newline at end of file diff --git a/itd/models/hashtag.py b/itd/models/hashtag.py new file mode 100644 index 0000000..d1d970f --- /dev/null +++ b/itd/models/hashtag.py @@ -0,0 +1,8 @@ +from uuid import UUID + +from pydantic import BaseModel, Field + +class Hashtag(BaseModel): + id: UUID + name: str + posts_count: int = Field(0, alias='postsCount') \ No newline at end of file diff --git a/itd/models/notification.py b/itd/models/notification.py new file mode 100644 index 0000000..4dca9fe --- /dev/null +++ b/itd/models/notification.py @@ -0,0 +1,22 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + +from itd.enums import NotificationType, NotificationTargetType +from itd.models.user import UserNotification + +class Notification(BaseModel): + id: UUID + type: NotificationType + + target_type: NotificationTargetType | None = Field(None, alias='targetType') # none - follows, other - NotificationTragetType.POST + target_id: UUID | None = Field(None, alias='targetId') # none - follows + + preview: str | None = None # follow - none, comment/reply - content, repost - original post content, like - post content, wall_post - wall post content + + read: bool = False + read_at: datetime | None = Field(None, alias='readAt') + created_at: datetime = Field(alias='createdAt') + + actor: UserNotification \ No newline at end of file diff --git a/itd/models/pagination.py b/itd/models/pagination.py new file mode 100644 index 0000000..a970999 --- /dev/null +++ b/itd/models/pagination.py @@ -0,0 +1,23 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + +class Pagination(BaseModel): + page: int | None = 1 + limit: int = 20 + total: int | None = None + has_more: bool = Field(True, alias='hasMore') + next_cursor: UUID | None = Field(None, alias='nextCursor') + + +class PostsPagintaion(BaseModel): + limit: int = 20 + next_cursor: int | None = Field(1, alias='nextCursor') + has_more: bool = Field(True, alias='hasMore') + + +class LikedPostsPagintaion(BaseModel): + limit: int = 20 + next_cursor: datetime | None = Field(None, alias='nextCursor') + has_more: bool = Field(True, alias='hasMore') diff --git a/itd/models/pin.py b/itd/models/pin.py new file mode 100644 index 0000000..b0a07b1 --- /dev/null +++ b/itd/models/pin.py @@ -0,0 +1,12 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +class ShortPin(BaseModel): + slug: str + name: str + description: str + + +class Pin(ShortPin): + granted_at: datetime = Field(alias='grantedAt') \ No newline at end of file diff --git a/itd/models/post.py b/itd/models/post.py new file mode 100644 index 0000000..429de17 --- /dev/null +++ b/itd/models/post.py @@ -0,0 +1,46 @@ +from uuid import UUID + +from pydantic import Field, BaseModel + +from itd.models.user import UserPost, UserNewPost +from itd.models._text import TextObject +from itd.models.file import PostAttach +from itd.models.comment import Comment + + +class _PostShort(TextObject): + likes_count: int = Field(0, alias='likesCount') + comments_count: int = Field(0, alias='commentsCount') + reposts_count: int = Field(0, alias='repostsCount') + views_count: int = Field(0, alias='viewsCount') + + +class PostShort(_PostShort): + author: UserPost + + +class OriginalPost(PostShort): + is_deleted: bool = Field(False, alias='isDeleted') + + +class _Post(_PostShort): + is_liked: bool = Field(False, alias='isLiked') + is_reposted: bool = Field(False, alias='isReposted') + is_viewed: bool = Field(False, alias='isViewed') + is_owner: bool = Field(False, alias='isOwner') + + attachments: list[PostAttach] = [] + comments: list[Comment] = [] + + original_post: OriginalPost | None = None + + wall_recipient_id: UUID | None = Field(None, alias='wallRecipientId') + wall_recipient: UserPost | None = Field(None, alias='wallRecipient') + + +class Post(_Post, PostShort): + pass + + +class NewPost(_Post): + author: UserNewPost diff --git a/itd/models/report.py b/itd/models/report.py new file mode 100644 index 0000000..425bec1 --- /dev/null +++ b/itd/models/report.py @@ -0,0 +1,19 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + +from itd.enums import ReportTargetType, ReportTargetReason + + +class NewReport(BaseModel): + id: UUID + created_at: datetime = Field(alias='createdAt') + + +class Report(NewReport): + reason: ReportTargetReason + description: str | None = None + + target_type: ReportTargetType = Field(alias='targetType') + target_id: UUID diff --git a/itd/models/user.py b/itd/models/user.py new file mode 100644 index 0000000..5a90f02 --- /dev/null +++ b/itd/models/user.py @@ -0,0 +1,64 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + +from itd.models.pin import ShortPin + + +class UserPrivacy(BaseModel): + private: bool | None = Field(None, alias='isPrivate') # none for not me + wall_closed: bool = Field(False, alias='wallClosed') + + model_config = {'populate_by_name': True} + + +class UserProfileUpdate(BaseModel): + id: UUID + username: str | None = None + display_name: str = Field(alias='displayName') + bio: str | None = None + + updated_at: datetime | None = Field(None, alias='updatedAt') + + +class UserNewPost(BaseModel): + username: str | None = None + display_name: str = Field(alias='displayName') + avatar: str + pin: ShortPin | None = None + + verified: bool = False + + +class UserNotification(UserNewPost): + id: UUID + + +class UserPost(UserNotification, UserNewPost): + pass + + +class UserWhoToFollow(UserPost): + followers_count: int = Field(0, alias='followersCount') + + +class UserFollower(UserPost): + is_following: bool = Field(False, alias='isFollowing') # none for me + + +class UserSearch(UserFollower, UserWhoToFollow): + pass + + +class User(UserSearch, UserPrivacy): + banner: str | None = None + bio: str | None = None + pinned_post_id: UUID | None = Field(None, alias='pinnedPostId') + + following_count: int = Field(0, alias='followingCount') + posts_count: int = Field(0, alias='postsCount') + + is_followed: bool | None = Field(None, alias='isFollowedBy') # none for me + + created_at: datetime = Field(alias='createdAt') diff --git a/itd/models/verification.py b/itd/models/verification.py new file mode 100644 index 0000000..6b55b0d --- /dev/null +++ b/itd/models/verification.py @@ -0,0 +1,23 @@ +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + +class Verification(BaseModel): + id: UUID + user_id: UUID = Field(alias='userId') + video_url: str = Field(alias='videoUrl') + status: str # should be enum, but we dont know all statuses (what status for accepted?) + + reject_reason: str | None = Field(None, alias='rejectionReason') + reviewer: str | None = Field(None, alias='reviewedBy') + reviewed_at: datetime | None = Field(None, alias='reviewedAt') + + created_at: datetime = Field(alias='createdAt') + updated_at: datetime = Field(alias='updatedAt') + + +class VerificationStatus(BaseModel): + status: str # should be enum, but we dont know all statuses (what status for accepted?) + request_id: UUID = Field(alias='requestId') + submitted_at: datetime = Field(alias='submittedAt') \ No newline at end of file diff --git a/itd/request.py b/itd/request.py index 10d73f2..b943969 100644 --- a/itd/request.py +++ b/itd/request.py @@ -1,6 +1,9 @@ from _io import BufferedReader from requests import Session +from requests.exceptions import JSONDecodeError + +from itd.exceptions import InvalidToken, InvalidCookie, RateLimitExceeded, Unauthorized s = Session() @@ -27,10 +30,20 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str, if method == "get": res = s.get(base, timeout=120 if files else 20, params=params, headers=headers) else: - res = s.request(method.upper(), base, timeout=20, json=params, headers=headers, files=files) + res = s.request(method.upper(), base, timeout=120 if files else 20, json=params, headers=headers, files=files) + + try: + if res.json().get('error', {}).get('code') == 'RATE_LIMIT_EXCEEDED': + raise RateLimitExceeded(res.json()['error'].get('retryAfter', 0)) + if res.json().get('error', {}).get('code') == 'UNAUTHORIZED': + raise Unauthorized() + except JSONDecodeError: + pass # todo + + if not res.ok: + print(res.text) + return res - res.raise_for_status() - return res.json() def set_cookies(cookies: str): for cookie in cookies.split('; '): @@ -65,5 +78,18 @@ def auth_fetch(cookies: str, method: str, url: str, params: dict = {}, token: st res = s.get(f'https://xn--d1ah4a.com/api/{url}', timeout=20, params=params, headers=headers) else: res = s.request(method, f'https://xn--d1ah4a.com/api/{url}', timeout=20, json=params, headers=headers) - res.raise_for_status() - return res.json() + + # print(res.text) + if res.text == 'UNAUTHORIZED': + raise InvalidToken() + try: + if res.json().get('error', {}).get('code') == 'RATE_LIMIT_EXCEEDED': + raise RateLimitExceeded(res.json()['error'].get('retryAfter', 0)) + if res.json().get('error', {}).get('code') in ('SESSION_NOT_FOUND', 'REFRESH_TOKEN_MISSING', 'SESSION_REVOKED', 'SESSION_EXPIRED'): + raise InvalidCookie(res.json()['error']['code']) + if res.json().get('error', {}).get('code') == 'UNAUTHORIZED': + raise Unauthorized() + except JSONDecodeError: + print('fail to parse json') + + return res diff --git a/itd/routes/auth.py b/itd/routes/auth.py index 0401467..73f3c5b 100644 --- a/itd/routes/auth.py +++ b/itd/routes/auth.py @@ -1,10 +1,12 @@ +from requests import Response + from itd.request import auth_fetch -def refresh_token(cookies: str): - return auth_fetch(cookies, 'post', 'v1/auth/refresh')['accessToken'] +def refresh_token(cookies: str) -> Response: + return auth_fetch(cookies, 'post', 'v1/auth/refresh') -def change_password(cookies: str, token: str, old: str, new: str): +def change_password(cookies: str, token: str, old: str, new: str) -> Response: return auth_fetch(cookies, 'post', 'v1/auth/change-password', {'newPassword': new, 'oldPassword': old}, token) -def logout(cookies: str): +def logout(cookies: str) -> Response: return auth_fetch(cookies, 'post', 'v1/auth/logout') diff --git a/itd/routes/comments.py b/itd/routes/comments.py index e0e7395..c211b17 100644 --- a/itd/routes/comments.py +++ b/itd/routes/comments.py @@ -1,19 +1,21 @@ +from uuid import UUID + from itd.request import fetch -def add_comment(token: str, post_id: str, content: str, reply_comment_id: str | None = None): - data = {'content': content} - if reply_comment_id: - data['replyTo'] = str(reply_comment_id) - return fetch(token, 'post', f'posts/{post_id}/comments', data) +def add_comment(token: str, post_id: UUID, content: str, attachment_ids: list[UUID] = []): + return fetch(token, 'post', f'posts/{post_id}/comments', {'content': content, "attachmentIds": list(map(str, attachment_ids))}) -def get_comments(token: str, post_id: str, limit: int = 20, cursor: int = 0, sort: str = 'popular'): +def add_reply_comment(token: str, comment_id: UUID, content: str, author_id: UUID, attachment_ids: list[UUID] = []): + return fetch(token, 'post', f'comments/{comment_id}/replies', {'content': content, 'replyToUserId': str(author_id), "attachmentIds": list(map(str, attachment_ids))}) + +def get_comments(token: str, post_id: UUID, limit: int = 20, cursor: int = 0, sort: str = 'popular'): return fetch(token, 'get', f'posts/{post_id}/comments', {'limit': limit, 'sort': sort, 'cursor': cursor}) -def like_comment(token: str, comment_id: str): +def like_comment(token: str, comment_id: UUID): return fetch(token, 'post', f'comments/{comment_id}/like') -def unlike_comment(token: str, comment_id: str): +def unlike_comment(token: str, comment_id: UUID): return fetch(token, 'delete', f'comments/{comment_id}/like') -def delete_comment(token: str, comment_id: str): +def delete_comment(token: str, comment_id: UUID): return fetch(token, 'delete', f'comments/{comment_id}') diff --git a/itd/routes/files.py b/itd/routes/files.py index d13f48f..9303b82 100644 --- a/itd/routes/files.py +++ b/itd/routes/files.py @@ -4,4 +4,4 @@ from itd.request import fetch def upload_file(token: str, name: str, data: BufferedReader): - return fetch(token, 'post', 'files/upload', files={'file': (name, data)}) \ No newline at end of file + return fetch(token, 'post', 'files/upload', files={'file': (name, data)}) diff --git a/itd/routes/hashtags.py b/itd/routes/hashtags.py index e6e9a63..cd4a836 100644 --- a/itd/routes/hashtags.py +++ b/itd/routes/hashtags.py @@ -1,7 +1,17 @@ +from warnings import deprecated +from uuid import UUID from itd.request import fetch +@deprecated("get_hastags устарела используйте get_hashtags") def get_hastags(token: str, limit: int = 10): return fetch(token, 'get', 'hashtags/trending', {'limit': limit}) -def get_posts_by_hastag(token: str, hashtag: str, limit: int = 20, cursor: int = 0): +def get_hashtags(token: str, limit: int = 10): + return fetch(token, 'get', 'hashtags/trending', {'limit': limit}) + +@deprecated("get_posts_by_hastag устерла используй get_posts_by_hashtag") +def get_posts_by_hastag(token: str, hashtag: str, limit: int = 20, cursor: UUID | None = None): + return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor}) + +def get_posts_by_hashtag(token: str, hashtag: str, limit: int = 20, cursor: UUID | None = None): return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor}) diff --git a/itd/routes/notifications.py b/itd/routes/notifications.py index 8ad6759..de0a741 100644 --- a/itd/routes/notifications.py +++ b/itd/routes/notifications.py @@ -1,16 +1,15 @@ +from uuid import UUID + from itd.request import fetch -def get_notifications(token: str, limit: int = 20, cursor: int = 0, type: str | None = None): - data = {'limit': str(limit), 'cursor': str(cursor)} - if type: - data['type'] = type - return fetch(token, 'get', 'notifications', data) +def get_notifications(token: str, limit: int = 20, offset: int = 0): + return fetch(token, 'get', 'notifications', {'limit': limit, 'offset': offset}) -def mark_as_read(token: str, id: str): - return fetch(token, 'post', f'notification/{id}/read') +def mark_as_read(token: str, id: UUID): + return fetch(token, 'post', f'notifications/{id}/read') def mark_all_as_read(token: str): - return fetch(token, 'post', f'notification/read-all') + return fetch(token, 'post', f'notifications/read-all') def get_unread_notifications_count(token: str): return fetch(token, 'get', 'notifications/count') \ No newline at end of file diff --git a/itd/routes/pins.py b/itd/routes/pins.py new file mode 100644 index 0000000..e53e257 --- /dev/null +++ b/itd/routes/pins.py @@ -0,0 +1,10 @@ +from itd.request import fetch + +def get_pins(token: str): + return fetch(token, 'get', 'users/me/pins') + +def remove_pin(token: str): + return fetch(token, 'delete', 'users/me/pin') + +def set_pin(token: str, slug: str): + return fetch(token, 'put', 'users/me/pin', {'slug': slug}) \ No newline at end of file diff --git a/itd/routes/posts.py b/itd/routes/posts.py index 55a129b..0cd1290 100644 --- a/itd/routes/posts.py +++ b/itd/routes/posts.py @@ -1,45 +1,50 @@ -from itd.request import fetch +from datetime import datetime +from uuid import UUID -def create_post(token: str, content: str, wall_recipient_id: int | None = None, attach_ids: list[str] = []): +from itd.request import fetch +from itd.enums import PostsTab + +def create_post(token: str, content: str, wall_recipient_id: UUID | None = None, attachment_ids: list[UUID] = []): data: dict = {'content': content} if wall_recipient_id: - data['wallRecipientId'] = wall_recipient_id - if attach_ids: - data['attachmentIds'] = attach_ids + data['wallRecipientId'] = str(wall_recipient_id) + if attachment_ids: + data['attachmentIds'] = list(map(str, attachment_ids)) return fetch(token, 'post', 'posts', data) -def get_posts(token: str, username: str | None = None, limit: int = 20, cursor: int = 0, sort: str = '', tab: str = ''): - data: dict = {'limit': limit, 'cursor': cursor} - if username: - data['username'] = username - if sort: - data['sort'] = sort - if tab: - data['tab'] = tab +def get_posts(token: str, cursor: int = 0, tab: PostsTab = PostsTab.POPULAR): + return fetch(token, 'get', 'posts', {'cursor': cursor, 'tab': tab.value}) - return fetch(token, 'get', 'posts', data) - -def get_post(token: str, id: str): +def get_post(token: str, id: UUID): return fetch(token, 'get', f'posts/{id}') -def edit_post(token: str, id: str, content: str): +def edit_post(token: str, id: UUID, content: str): return fetch(token, 'put', f'posts/{id}', {'content': content}) -def delete_post(token: str, id: str): +def delete_post(token: str, id: UUID): return fetch(token, 'delete', f'posts/{id}') -def pin_post(token: str, id: str): +def pin_post(token: str, id: UUID): return fetch(token, 'post', f'posts/{id}/pin') -def repost(token: str, id: str, content: str | None = None): +def repost(token: str, id: UUID, content: str | None = None): data = {} if content: data['content'] = content return fetch(token, 'post', f'posts/{id}/repost', data) -def view_post(token: str, id: str): +def view_post(token: str, id: UUID): return fetch(token, 'post', f'posts/{id}/view') -def get_liked_posts(token: str, username: str, limit: int = 20, cursor: int = 0): - return fetch(token, 'get', f'posts/user/{username}/liked', {'limit': limit, 'cursor': cursor}) \ No newline at end of file +def get_liked_posts(token: str, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None): + return fetch(token, 'get', f'posts/user/{username_or_id}/liked', {'limit': limit, 'cursor': cursor}) + +def restore_post(token: str, post_id: UUID): + return fetch(token, "post", f"posts/{post_id}/restore",) + +def like_post(token: str, post_id: UUID): + return fetch(token, "post", f"posts/{post_id}/like") + +def unlike_post(token: str, post_id: UUID): + return fetch(token, "delete", f"posts/{post_id}/like") diff --git a/itd/routes/reports.py b/itd/routes/reports.py index 933f48d..7306bac 100644 --- a/itd/routes/reports.py +++ b/itd/routes/reports.py @@ -1,4 +1,9 @@ -from itd.request import fetch +from uuid import UUID -def report(token: str, id: str, type: str = 'post', reason: str = 'other', description: str = ''): - return fetch(token, 'post', 'reports', {'targetId': id, 'targetType': type, 'reason': reason, 'description': description}) \ No newline at end of file +from itd.request import fetch +from itd.enums import ReportTargetReason, ReportTargetType + +def report(token: str, id: UUID, type: ReportTargetType = ReportTargetType.POST, reason: ReportTargetReason = ReportTargetReason.OTHER, description: str | None = None): + if description is None: + description = '' + return fetch(token, 'post', 'reports', {'targetId': str(id), 'targetType': type.value, 'reason': reason.value, 'description': description}) \ No newline at end of file diff --git a/itd/routes/users.py b/itd/routes/users.py index 1fa7424..ddbbfa4 100644 --- a/itd/routes/users.py +++ b/itd/routes/users.py @@ -1,10 +1,12 @@ +from uuid import UUID + from itd.request import fetch def get_user(token: str, username: str): return fetch(token, 'get', f'users/{username}') -def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: str | None = None): +def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: UUID | None = None): data = {} if bio: data['bio'] = bio @@ -13,14 +15,14 @@ def update_profile(token: str, bio: str | None = None, display_name: str | None if username: data['username'] = username if banner_id: - data['bannerId'] = banner_id + data['bannerId'] = str(banner_id) return fetch(token, 'put', 'users/me', data) def update_privacy(token: str, wall_closed: bool = False, private: bool = False): data = {} - if wall_closed: + if wall_closed is not None: data['wallClosed'] = wall_closed - if private: + if private is not None: data['isPrivate'] = private return fetch(token, 'put', 'users/me/privacy', data) diff --git a/itd/routes/verification.py b/itd/routes/verification.py index 8c66426..ec643d5 100644 --- a/itd/routes/verification.py +++ b/itd/routes/verification.py @@ -1,8 +1,14 @@ +from warnings import deprecated from itd.request import fetch -def verificate(token: str, file_url: str): +def verify(token: str, file_url: str): # {"success":true,"request":{"id":"fc54e54f-8586-4d8c-809e-df93161f99da","userId":"9096a85b-c319-483e-8940-6921be427ad0","videoUrl":"https://943701f000610900cbe86b72234e451d.bckt.ru/videos/354f28a6-9ac7-48a6-879a-a454062b1d6b.mp4","status":"pending","rejectionReason":null,"reviewedBy":null,"reviewedAt":null,"createdAt":"2026-01-30T12:58:14.228Z","updatedAt":"2026-01-30T12:58:14.228Z"}} return fetch(token, 'post', 'verification/submit', {'videoUrl': file_url}) +@deprecated("verificate устарела используйте verify") +def verificate(token: str, file_url: str): + return verify(token, file_url) + + def get_verification_status(token: str): return fetch(token, 'get', 'verification/status') \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8ed5ff2..a3d862d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,6 @@ authors = [ ] license = "MIT" dependencies = [ - "requests" + "requests", "pydantic" ] requires-python = ">=3.9" diff --git a/requirements.txt b/requirements.txt index e69de29..541740e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,2 @@ +pydantic==2.11.9 +requests==2.32.3 \ No newline at end of file diff --git a/setup.py b/setup.py index 5cab16b..40cf6e8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( version='0.3.0', packages=find_packages(), install_requires=[ - 'requests' + 'requests', 'pydantic' ], python_requires=">=3.9" )