From a388426d8d9dd3598643d45191a920697a9c2d40 Mon Sep 17 00:00:00 2001 From: firedotguy Date: Sat, 31 Jan 2026 12:10:20 +0300 Subject: [PATCH] feat: add models and partially custom error messages --- itd/client.py | 268 +++++++++++++++++++++++++++++++++------ itd/exceptions.py | 43 +++++++ itd/models/_text.py | 2 +- itd/models/clan.py | 6 + itd/models/comment.py | 7 +- itd/models/pagination.py | 7 + itd/models/post.py | 4 +- itd/models/user.py | 37 +++++- itd/request.py | 17 ++- itd/routes/auth.py | 10 +- itd/routes/comments.py | 2 +- itd/routes/files.py | 2 +- itd/routes/hashtags.py | 2 +- itd/routes/posts.py | 6 +- itd/routes/users.py | 10 +- 15 files changed, 352 insertions(+), 71 deletions(-) create mode 100644 itd/exceptions.py create mode 100644 itd/models/clan.py create mode 100644 itd/models/pagination.py diff --git a/itd/client.py b/itd/client.py index 326241b..d42861c 100644 --- a/itd/client.py +++ b/itd/client.py @@ -1,3 +1,4 @@ +from uuid import UUID from _io import BufferedReader from typing import cast @@ -14,7 +15,14 @@ from itd.routes.search import search from itd.routes.files import upload_file from itd.routes.auth import refresh_token, change_password, logout from itd.routes.verification import verificate, get_verification_status + +from itd.models.clan import Clan +from itd.models.user import User, UserProfileUpdate, UserPrivacy, UserFollower, UserWhoToFollow +from itd.models.pagination import Pagination +from itd.models.verification import Verification, VerificationStatus + from itd.request import set_cookies +from itd.exceptions import NoCookie, NoAuthData, SamePassword, InvalidOldPassword, UserNotFound, InvalidProfileData, UserBanned, PendingRequestExists def refresh_on_error(func): @@ -39,79 +47,261 @@ class Client: set_cookies(self.cookies) self.refresh_auth() else: - raise ValueError('Provide token or cookie') + raise NoAuthData() - def refresh_auth(self): - if self.cookies: - self.token = refresh_token(self.cookies) - return self.token - else: - print('no cookies') + def refresh_auth(self) -> str: + """Обновить access token - @refresh_on_error - def change_password(self, old: str, new: str): + Raises: + NoCookie: Нет cookie + + Returns: + str: Токен + """ + print('refresh token') if not self.cookies: - print('no cookies') - return - return change_password(self.cookies, self.token, old, new) + raise NoCookie() + + res = refresh_token(self.cookies) + res.raise_for_status() + + self.token = res.json()['accessToken'] + return self.token @refresh_on_error - def logout(self): + def change_password(self, old: str, new: str) -> dict: + """Смена пароля + + Args: + old (str): Страый пароль + new (str): Новый пароль + + Raises: + NoCookie: Нет cookie + SamePassword: Одинаковые пароли + InvalidOldPassword: Старый пароль неверный + + Returns: + dict: Ответ API `{'message': 'Password changed successfully'}` + """ if not self.cookies: - print('no cookies') - return - return logout(self.cookies) + raise NoCookie() + res = change_password(self.cookies, self.token, old, new) + if res.json().get('error', {}).get('code') == 'SAME_PASSWORD': + raise SamePassword() + if res.json().get('error', {}).get('code') == 'INVALID_OLD_PASSWORD': + raise InvalidOldPassword() + res.raise_for_status() + + return res.json() @refresh_on_error - def get_user(self, username: str) -> dict: - return get_user(self.token, username) + def logout(self) -> dict: + """Выход из аккаунта + + Raises: + NoCookie: Нет cookie + + Returns: + dict: Ответ API + """ + if not self.cookies: + raise NoCookie() + + res = logout(self.cookies) + res.raise_for_status() + + return res.json() @refresh_on_error - def get_me(self) -> dict: + def get_user(self, username: str) -> User: + """Получить пользователя + + Args: + username (str): username или "me" + + Raises: + UserNotFound: Пользователь не найден + UserBanned: Пользователь заблокирован + + Returns: + User: Пользователь + """ + res = get_user(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise UserNotFound() + if res.json().get('error', {}).get('code') == 'USER_BLOCKED': + raise UserBanned() + res.raise_for_status() + + return User.model_validate(res.json()) + + @refresh_on_error + def get_me(self) -> User: + """Получить текущего пользователя (me) + + Returns: + User: Пользователь + """ return self.get_user('me') @refresh_on_error - def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: str | None = None) -> dict: - return update_profile(self.token, bio, display_name, username, banner_id) + def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: UUID | None = None) -> UserProfileUpdate: + """Обновить профиль + + Args: + username (str | None, optional): username. Defaults to None. + display_name (str | None, optional): Отображаемое имя. Defaults to None. + bio (str | None, optional): Биография (о себе). Defaults to None. + banner_id (UUID | None, optional): UUID баннера. Defaults to None. + + Raises: + InvalidProfileData: Неправильные данные (валидация не прошла) + + Returns: + UserProfileUpdate: Обновленный профиль + """ + res = update_profile(self.token, bio, display_name, username, banner_id) + if res.status_code == 422 and 'found' in res.json(): + raise InvalidProfileData(*list(res.json()['found'].items())[0]) + res.raise_for_status() + + return UserProfileUpdate.model_validate(res.json()) @refresh_on_error - def update_privacy(self, wall_closed: bool = False, private: bool = False): - return update_privacy(self.token, wall_closed, private) + def update_privacy(self, wall_closed: bool = False, private: bool = False) -> UserPrivacy: + """Обновить настройки приватности + + Args: + wall_closed (bool, optional): Закрыть стену. Defaults to False. + private (bool, optional): Приватность. На данный момент неизвестно, что делает этот параметр. Defaults to False. + + Returns: + UserPrivacy: Обновленные данные приватности + """ + res = update_privacy(self.token, wall_closed, private) + res.raise_for_status() + + return UserPrivacy.model_validate(res.json()) @refresh_on_error - def follow(self, username: str) -> dict: - return follow(self.token, username) + def follow(self, username: str) -> int: + """Подписаться на пользователя + + Args: + username (str): username + + Raises: + UserNotFound: Пользователь не найден + + Returns: + int: Число подписчиков после подписки + """ + res = follow(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise UserNotFound() + res.raise_for_status() + + return res.json()['followersCount'] @refresh_on_error - def unfollow(self, username: str) -> dict: - return unfollow(self.token, username) + def unfollow(self, username: str) -> int: + """Отписаться от пользователя + + Args: + username (str): username + + Raises: + UserNotFound: Пользователь не найден + + Returns: + int: Число подписчиков после отписки + """ + res = unfollow(self.token, username) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise UserNotFound() + res.raise_for_status() + + return res.json()['followersCount'] @refresh_on_error - def get_followers(self, username: str) -> dict: - return get_followers(self.token, username) + def get_followers(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]: + """Получить подписчиков пользователя + + Args: + username (str): username + limit (int, optional): Лимит. Defaults to 30. + page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1. + + Raises: + UserNotFound: Пользователь не найден + + Returns: + list[UserFollower]: Список подписчиков + Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще) + """ + res = get_followers(self.token, username, limit, page) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise UserNotFound() + res.raise_for_status() + + return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination']) @refresh_on_error - def get_following(self, username: str) -> dict: - return get_following(self.token, username) + def get_following(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]: + """Получить подписки пользователя + + Args: + username (str): username + limit (int, optional): Лимит. Defaults to 30. + page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1. + + Raises: + UserNotFound: Пользователь не найден + + Returns: + list[UserFollower]: Список подписок + Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще) + """ + res = get_following(self.token, username, limit, page) + if res.json().get('error', {}).get('code') == 'NOT_FOUND': + raise UserNotFound() + res.raise_for_status() + + return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination']) @refresh_on_error - def verificate(self, file_url: str): - return verificate(self.token, file_url) + def verificate(self, file_url: str) -> Verification: + res = verificate(self.token, file_url) + if res.json().get('error', {}).get('code') == 'PENDING_REQUEST_EXISTS': + raise PendingRequestExists() + res.raise_for_status() + + return Verification.model_validate(res.json()) @refresh_on_error - def get_verification_status(self): - return get_verification_status(self.token) + def get_verification_status(self) -> VerificationStatus: + res = get_verification_status(self.token) + res.raise_for_status() + + return VerificationStatus.model_validate(res.json()) @refresh_on_error - def get_who_to_follow(self) -> dict: - return get_who_to_follow(self.token) + def get_who_to_follow(self) -> list[UserWhoToFollow]: + res = get_who_to_follow(self.token) + res.raise_for_status() + + return [UserWhoToFollow.model_validate(user) for user in res.json()['users']] @refresh_on_error - def get_top_clans(self) -> dict: - return get_top_clans(self.token) + def get_top_clans(self) -> list[Clan]: + res = get_top_clans(self.token) + res.raise_for_status() + + return [Clan.model_validate(clan) for clan in res.json()['clans']] @refresh_on_error def get_platform_status(self) -> dict: diff --git a/itd/exceptions.py b/itd/exceptions.py new file mode 100644 index 0000000..a3e39ca --- /dev/null +++ b/itd/exceptions.py @@ -0,0 +1,43 @@ +class NoCookie(Exception): + def __str__(self): + return 'No cookie for refresh-token required action' + +class NoAuthData(Exception): + def __str__(self): + return 'No auth data. Provide token or cookies' + +class InvalidCookie(Exception): + def __str__(self): + return f'Invalid cookie data' + +class InvalidToken(Exception): + def __str__(self): + return f'Invalid access token' + + +class SamePassword(Exception): + def __str__(self): + return 'Old and new password must not equals' + +class InvalidOldPassword(Exception): + def __str__(self): + return 'Old password is incorrect' + +class UserNotFound(Exception): + def __str__(self): + return 'User not found' + +class UserBanned(Exception): + def __str__(self): + return 'User banned' + +class InvalidProfileData(Exception): + def __init__(self, name: str, value: str): + self.name = name + self.value = value + def __str__(self): + return f'Invalid update profile data {self.name}: "{self.value}"' + +class PendingRequestExists(Exception): + def __str__(self): + return 'Pending verifiaction request already exists' \ No newline at end of file diff --git a/itd/models/_text.py b/itd/models/_text.py index aa6a372..70944a0 100644 --- a/itd/models/_text.py +++ b/itd/models/_text.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, Field from itd.models.user import UserPost -class _TextObject(BaseModel): +class TextObject(BaseModel): id: UUID content: str author: UserPost diff --git a/itd/models/clan.py b/itd/models/clan.py new file mode 100644 index 0000000..10bd4b4 --- /dev/null +++ b/itd/models/clan.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel, Field + + +class Clan(BaseModel): + avatar: str + member_count: int = Field(0, alias='memberCount') \ No newline at end of file diff --git a/itd/models/comment.py b/itd/models/comment.py index ecf4324..2187242 100644 --- a/itd/models/comment.py +++ b/itd/models/comment.py @@ -1,12 +1,9 @@ -from uuid import UUID -from datetime import datetime - from pydantic import Field -from itd.models._text import _TextObject +from itd.models._text import TextObject -class CommentShort(_TextObject): +class CommentShort(TextObject): likes_count: int = Field(0, alias='likesCount') replies_count: int = Field(0, alias='repliesCount') is_liked: bool = Field(False, alias='isLiked') diff --git a/itd/models/pagination.py b/itd/models/pagination.py new file mode 100644 index 0000000..df81f56 --- /dev/null +++ b/itd/models/pagination.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel, Field + +class Pagination(BaseModel): + page: int = 1 + limit: int = 20 + total: int | None = None + has_more: bool = Field(True, alias='hasMore') \ No newline at end of file diff --git a/itd/models/post.py b/itd/models/post.py index 8969cad..dfe33dd 100644 --- a/itd/models/post.py +++ b/itd/models/post.py @@ -1,10 +1,10 @@ from pydantic import Field from itd.models.user import UserPost -from itd.models._text import _TextObject +from itd.models._text import TextObject -class PostShort(_TextObject): +class PostShort(TextObject): likes_count: int = Field(0, alias='likesCount') comments_count: int = Field(0, alias='commentsCount') reposts_count: int = Field(0, alias='repostsCount') diff --git a/itd/models/user.py b/itd/models/user.py index 760128b..a388647 100644 --- a/itd/models/user.py +++ b/itd/models/user.py @@ -3,6 +3,25 @@ from datetime import datetime from pydantic import BaseModel, Field + +class UserPrivacy(BaseModel): + private: bool | None = Field(None, alias='isPrivate') # none for not me + wall_closed: bool = Field(False, alias='wallClosed') + + model_config = {'populate_by_name': True} + + +class UserProfileUpdate(BaseModel): + id: UUID + username: str + display_name: str = Field(alias='displayName') + bio: str | None = None + + updated_at: datetime | None = Field(None, alias='updatedAt') + + model_config = {'populate_by_name': True} + + class UserNotification(BaseModel): id: UUID username: str @@ -16,22 +35,26 @@ class UserPost(UserNotification): verified: bool = False -class UserSearch(UserPost): +class UserWhoToFollow(UserPost): followers_count: int = Field(0, alias='followersCount') -class User(UserSearch): +class UserFollower(UserPost): + is_following: bool = Field(False, alias='isFollowing') # none for me + + +class UserSearch(UserFollower, UserWhoToFollow): + pass + + +class User(UserSearch, UserPrivacy): banner: str | None = None bio: str | None = None - pinned_post_id: UUID | None - - private: bool | None = Field(None, alias='isPrivate') # none for not me - wall_closed: bool = Field(False, alias='wallClosed') + pinned_post_id: UUID | None = Field(None, alias='pinnedPostId') following_count: int = Field(0, alias='followingCount') posts_count: int = Field(0, alias='postsCount') - is_following: bool | None = Field(None, alias='isFollowing') # none for me is_followed: bool | None = Field(None, alias='isFollowedBy') # none for me created_at: datetime = Field(alias='createdAt') diff --git a/itd/request.py b/itd/request.py index 10d73f2..ed02f31 100644 --- a/itd/request.py +++ b/itd/request.py @@ -2,6 +2,8 @@ from _io import BufferedReader from requests import Session +from itd.exceptions import InvalidToken, InvalidCookie + s = Session() @@ -29,8 +31,9 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str, else: res = s.request(method.upper(), base, timeout=20, json=params, headers=headers, files=files) - res.raise_for_status() - return res.json() + print(res.text) + return res + def set_cookies(cookies: str): for cookie in cookies.split('; '): @@ -65,5 +68,11 @@ def auth_fetch(cookies: str, method: str, url: str, params: dict = {}, token: st res = s.get(f'https://xn--d1ah4a.com/api/{url}', timeout=20, params=params, headers=headers) else: res = s.request(method, f'https://xn--d1ah4a.com/api/{url}', timeout=20, json=params, headers=headers) - res.raise_for_status() - return res.json() + + # print(res.text) + if res.text == 'UNAUTHORIZED': + raise InvalidToken() + if res.json().get('error', {}).get('code') in ('SESSION_NOT_FOUND', 'REFRESH_TOKEN_MISSING'): + raise InvalidCookie() + + return res diff --git a/itd/routes/auth.py b/itd/routes/auth.py index 0401467..73f3c5b 100644 --- a/itd/routes/auth.py +++ b/itd/routes/auth.py @@ -1,10 +1,12 @@ +from requests import Response + from itd.request import auth_fetch -def refresh_token(cookies: str): - return auth_fetch(cookies, 'post', 'v1/auth/refresh')['accessToken'] +def refresh_token(cookies: str) -> Response: + return auth_fetch(cookies, 'post', 'v1/auth/refresh') -def change_password(cookies: str, token: str, old: str, new: str): +def change_password(cookies: str, token: str, old: str, new: str) -> Response: return auth_fetch(cookies, 'post', 'v1/auth/change-password', {'newPassword': new, 'oldPassword': old}, token) -def logout(cookies: str): +def logout(cookies: str) -> Response: return auth_fetch(cookies, 'post', 'v1/auth/logout') diff --git a/itd/routes/comments.py b/itd/routes/comments.py index 0cdc702..e0e7395 100644 --- a/itd/routes/comments.py +++ b/itd/routes/comments.py @@ -7,7 +7,7 @@ def add_comment(token: str, post_id: str, content: str, reply_comment_id: str | return fetch(token, 'post', f'posts/{post_id}/comments', data) def get_comments(token: str, post_id: str, limit: int = 20, cursor: int = 0, sort: str = 'popular'): - return fetch(token, 'get', f'posts/{post_id}/comments', {'limit': limit, 'sort': sort, 'cursor': cursor})['data'] + return fetch(token, 'get', f'posts/{post_id}/comments', {'limit': limit, 'sort': sort, 'cursor': cursor}) def like_comment(token: str, comment_id: str): return fetch(token, 'post', f'comments/{comment_id}/like') diff --git a/itd/routes/files.py b/itd/routes/files.py index d13f48f..9303b82 100644 --- a/itd/routes/files.py +++ b/itd/routes/files.py @@ -4,4 +4,4 @@ from itd.request import fetch def upload_file(token: str, name: str, data: BufferedReader): - return fetch(token, 'post', 'files/upload', files={'file': (name, data)}) \ No newline at end of file + return fetch(token, 'post', 'files/upload', files={'file': (name, data)}) diff --git a/itd/routes/hashtags.py b/itd/routes/hashtags.py index b255096..e6e9a63 100644 --- a/itd/routes/hashtags.py +++ b/itd/routes/hashtags.py @@ -1,7 +1,7 @@ from itd.request import fetch def get_hastags(token: str, limit: int = 10): - return fetch(token, 'get', 'hashtags/trending', {'limit': limit})['data'] + return fetch(token, 'get', 'hashtags/trending', {'limit': limit}) def get_posts_by_hastag(token: str, hashtag: str, limit: int = 20, cursor: int = 0): return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor}) diff --git a/itd/routes/posts.py b/itd/routes/posts.py index 87223cc..b2a0705 100644 --- a/itd/routes/posts.py +++ b/itd/routes/posts.py @@ -18,7 +18,7 @@ def get_posts(token: str, username: str | None = None, limit: int = 20, cursor: if tab: data['tab'] = tab - return fetch(token, 'get', 'posts', data)['data'] + return fetch(token, 'get', 'posts', data) def get_post(token: str, id: str): return fetch(token, 'get', f'posts/{id}') @@ -42,4 +42,6 @@ def view_post(token: str, id: str): return fetch(token, 'post', f'posts/{id}/view') def get_liked_posts(token: str, username: str, limit: int = 20, cursor: int = 0): - return fetch(token, 'get', f'posts/user/{username}/liked', {'limit': limit, 'cursor': cursor}) \ No newline at end of file + return fetch(token, 'get', f'posts/user/{username}/liked', {'limit': limit, 'cursor': cursor}) + +# todo post restore \ No newline at end of file diff --git a/itd/routes/users.py b/itd/routes/users.py index 1fa7424..ddbbfa4 100644 --- a/itd/routes/users.py +++ b/itd/routes/users.py @@ -1,10 +1,12 @@ +from uuid import UUID + from itd.request import fetch def get_user(token: str, username: str): return fetch(token, 'get', f'users/{username}') -def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: str | None = None): +def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: UUID | None = None): data = {} if bio: data['bio'] = bio @@ -13,14 +15,14 @@ def update_profile(token: str, bio: str | None = None, display_name: str | None if username: data['username'] = username if banner_id: - data['bannerId'] = banner_id + data['bannerId'] = str(banner_id) return fetch(token, 'put', 'users/me', data) def update_privacy(token: str, wall_closed: bool = False, private: bool = False): data = {} - if wall_closed: + if wall_closed is not None: data['wallClosed'] = wall_closed - if private: + if private is not None: data['isPrivate'] = private return fetch(token, 'put', 'users/me/privacy', data)