feat: add models and partially custom error messages

This commit is contained in:
firedotguy
2026-01-31 12:10:20 +03:00
parent c7e3812ee8
commit a388426d8d
15 changed files with 352 additions and 71 deletions

View File

@@ -1,3 +1,4 @@
from uuid import UUID
from _io import BufferedReader
from typing import cast
@@ -14,7 +15,14 @@ from itd.routes.search import search
from itd.routes.files import upload_file
from itd.routes.auth import refresh_token, change_password, logout
from itd.routes.verification import verificate, get_verification_status
from itd.models.clan import Clan
from itd.models.user import User, UserProfileUpdate, UserPrivacy, UserFollower, UserWhoToFollow
from itd.models.pagination import Pagination
from itd.models.verification import Verification, VerificationStatus
from itd.request import set_cookies
from itd.exceptions import NoCookie, NoAuthData, SamePassword, InvalidOldPassword, UserNotFound, InvalidProfileData, UserBanned, PendingRequestExists
def refresh_on_error(func):
@@ -39,79 +47,261 @@ class Client:
set_cookies(self.cookies)
self.refresh_auth()
else:
raise ValueError('Provide token or cookie')
raise NoAuthData()
def refresh_auth(self):
if self.cookies:
self.token = refresh_token(self.cookies)
return self.token
else:
print('no cookies')
def refresh_auth(self) -> str:
"""Обновить access token
@refresh_on_error
def change_password(self, old: str, new: str):
Raises:
NoCookie: Нет cookie
Returns:
str: Токен
"""
print('refresh token')
if not self.cookies:
print('no cookies')
return
return change_password(self.cookies, self.token, old, new)
raise NoCookie()
res = refresh_token(self.cookies)
res.raise_for_status()
self.token = res.json()['accessToken']
return self.token
@refresh_on_error
def logout(self):
def change_password(self, old: str, new: str) -> dict:
"""Смена пароля
Args:
old (str): Страый пароль
new (str): Новый пароль
Raises:
NoCookie: Нет cookie
SamePassword: Одинаковые пароли
InvalidOldPassword: Старый пароль неверный
Returns:
dict: Ответ API `{'message': 'Password changed successfully'}`
"""
if not self.cookies:
print('no cookies')
return
return logout(self.cookies)
raise NoCookie()
res = change_password(self.cookies, self.token, old, new)
if res.json().get('error', {}).get('code') == 'SAME_PASSWORD':
raise SamePassword()
if res.json().get('error', {}).get('code') == 'INVALID_OLD_PASSWORD':
raise InvalidOldPassword()
res.raise_for_status()
return res.json()
@refresh_on_error
def get_user(self, username: str) -> dict:
return get_user(self.token, username)
def logout(self) -> dict:
"""Выход из аккаунта
Raises:
NoCookie: Нет cookie
Returns:
dict: Ответ API
"""
if not self.cookies:
raise NoCookie()
res = logout(self.cookies)
res.raise_for_status()
return res.json()
@refresh_on_error
def get_me(self) -> dict:
def get_user(self, username: str) -> User:
"""Получить пользователя
Args:
username (str): username или "me"
Raises:
UserNotFound: Пользователь не найден
UserBanned: Пользователь заблокирован
Returns:
User: Пользователь
"""
res = get_user(self.token, username)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise UserNotFound()
if res.json().get('error', {}).get('code') == 'USER_BLOCKED':
raise UserBanned()
res.raise_for_status()
return User.model_validate(res.json())
@refresh_on_error
def get_me(self) -> User:
"""Получить текущего пользователя (me)
Returns:
User: Пользователь
"""
return self.get_user('me')
@refresh_on_error
def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: str | None = None) -> dict:
return update_profile(self.token, bio, display_name, username, banner_id)
def update_profile(self, username: str | None = None, display_name: str | None = None, bio: str | None = None, banner_id: UUID | None = None) -> UserProfileUpdate:
"""Обновить профиль
Args:
username (str | None, optional): username. Defaults to None.
display_name (str | None, optional): Отображаемое имя. Defaults to None.
bio (str | None, optional): Биография (о себе). Defaults to None.
banner_id (UUID | None, optional): UUID баннера. Defaults to None.
Raises:
InvalidProfileData: Неправильные данные (валидация не прошла)
Returns:
UserProfileUpdate: Обновленный профиль
"""
res = update_profile(self.token, bio, display_name, username, banner_id)
if res.status_code == 422 and 'found' in res.json():
raise InvalidProfileData(*list(res.json()['found'].items())[0])
res.raise_for_status()
return UserProfileUpdate.model_validate(res.json())
@refresh_on_error
def update_privacy(self, wall_closed: bool = False, private: bool = False):
return update_privacy(self.token, wall_closed, private)
def update_privacy(self, wall_closed: bool = False, private: bool = False) -> UserPrivacy:
"""Обновить настройки приватности
Args:
wall_closed (bool, optional): Закрыть стену. Defaults to False.
private (bool, optional): Приватность. На данный момент неизвестно, что делает этот параметр. Defaults to False.
Returns:
UserPrivacy: Обновленные данные приватности
"""
res = update_privacy(self.token, wall_closed, private)
res.raise_for_status()
return UserPrivacy.model_validate(res.json())
@refresh_on_error
def follow(self, username: str) -> dict:
return follow(self.token, username)
def follow(self, username: str) -> int:
"""Подписаться на пользователя
Args:
username (str): username
Raises:
UserNotFound: Пользователь не найден
Returns:
int: Число подписчиков после подписки
"""
res = follow(self.token, username)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise UserNotFound()
res.raise_for_status()
return res.json()['followersCount']
@refresh_on_error
def unfollow(self, username: str) -> dict:
return unfollow(self.token, username)
def unfollow(self, username: str) -> int:
"""Отписаться от пользователя
Args:
username (str): username
Raises:
UserNotFound: Пользователь не найден
Returns:
int: Число подписчиков после отписки
"""
res = unfollow(self.token, username)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise UserNotFound()
res.raise_for_status()
return res.json()['followersCount']
@refresh_on_error
def get_followers(self, username: str) -> dict:
return get_followers(self.token, username)
def get_followers(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]:
"""Получить подписчиков пользователя
Args:
username (str): username
limit (int, optional): Лимит. Defaults to 30.
page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1.
Raises:
UserNotFound: Пользователь не найден
Returns:
list[UserFollower]: Список подписчиков
Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще)
"""
res = get_followers(self.token, username, limit, page)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise UserNotFound()
res.raise_for_status()
return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination'])
@refresh_on_error
def get_following(self, username: str) -> dict:
return get_following(self.token, username)
def get_following(self, username: str, limit: int = 30, page: int = 1) -> tuple[list[UserFollower], Pagination]:
"""Получить подписки пользователя
Args:
username (str): username
limit (int, optional): Лимит. Defaults to 30.
page (int, optional): Страница (при дозагрузке, увеличивайте на 1). Defaults to 1.
Raises:
UserNotFound: Пользователь не найден
Returns:
list[UserFollower]: Список подписок
Pagination: Данные пагинации (лимит, страница, сколько всего, есть ли еще)
"""
res = get_following(self.token, username, limit, page)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise UserNotFound()
res.raise_for_status()
return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination'])
@refresh_on_error
def verificate(self, file_url: str):
return verificate(self.token, file_url)
def verificate(self, file_url: str) -> Verification:
res = verificate(self.token, file_url)
if res.json().get('error', {}).get('code') == 'PENDING_REQUEST_EXISTS':
raise PendingRequestExists()
res.raise_for_status()
return Verification.model_validate(res.json())
@refresh_on_error
def get_verification_status(self):
return get_verification_status(self.token)
def get_verification_status(self) -> VerificationStatus:
res = get_verification_status(self.token)
res.raise_for_status()
return VerificationStatus.model_validate(res.json())
@refresh_on_error
def get_who_to_follow(self) -> dict:
return get_who_to_follow(self.token)
def get_who_to_follow(self) -> list[UserWhoToFollow]:
res = get_who_to_follow(self.token)
res.raise_for_status()
return [UserWhoToFollow.model_validate(user) for user in res.json()['users']]
@refresh_on_error
def get_top_clans(self) -> dict:
return get_top_clans(self.token)
def get_top_clans(self) -> list[Clan]:
res = get_top_clans(self.token)
res.raise_for_status()
return [Clan.model_validate(clan) for clan in res.json()['clans']]
@refresh_on_error
def get_platform_status(self) -> dict:

43
itd/exceptions.py Normal file
View File

@@ -0,0 +1,43 @@
class NoCookie(Exception):
def __str__(self):
return 'No cookie for refresh-token required action'
class NoAuthData(Exception):
def __str__(self):
return 'No auth data. Provide token or cookies'
class InvalidCookie(Exception):
def __str__(self):
return f'Invalid cookie data'
class InvalidToken(Exception):
def __str__(self):
return f'Invalid access token'
class SamePassword(Exception):
def __str__(self):
return 'Old and new password must not equals'
class InvalidOldPassword(Exception):
def __str__(self):
return 'Old password is incorrect'
class UserNotFound(Exception):
def __str__(self):
return 'User not found'
class UserBanned(Exception):
def __str__(self):
return 'User banned'
class InvalidProfileData(Exception):
def __init__(self, name: str, value: str):
self.name = name
self.value = value
def __str__(self):
return f'Invalid update profile data {self.name}: "{self.value}"'
class PendingRequestExists(Exception):
def __str__(self):
return 'Pending verifiaction request already exists'

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel, Field
from itd.models.user import UserPost
class _TextObject(BaseModel):
class TextObject(BaseModel):
id: UUID
content: str
author: UserPost

6
itd/models/clan.py Normal file
View File

@@ -0,0 +1,6 @@
from pydantic import BaseModel, Field
class Clan(BaseModel):
avatar: str
member_count: int = Field(0, alias='memberCount')

View File

@@ -1,12 +1,9 @@
from uuid import UUID
from datetime import datetime
from pydantic import Field
from itd.models._text import _TextObject
from itd.models._text import TextObject
class CommentShort(_TextObject):
class CommentShort(TextObject):
likes_count: int = Field(0, alias='likesCount')
replies_count: int = Field(0, alias='repliesCount')
is_liked: bool = Field(False, alias='isLiked')

7
itd/models/pagination.py Normal file
View File

@@ -0,0 +1,7 @@
from pydantic import BaseModel, Field
class Pagination(BaseModel):
page: int = 1
limit: int = 20
total: int | None = None
has_more: bool = Field(True, alias='hasMore')

View File

@@ -1,10 +1,10 @@
from pydantic import Field
from itd.models.user import UserPost
from itd.models._text import _TextObject
from itd.models._text import TextObject
class PostShort(_TextObject):
class PostShort(TextObject):
likes_count: int = Field(0, alias='likesCount')
comments_count: int = Field(0, alias='commentsCount')
reposts_count: int = Field(0, alias='repostsCount')

View File

@@ -3,6 +3,25 @@ from datetime import datetime
from pydantic import BaseModel, Field
class UserPrivacy(BaseModel):
private: bool | None = Field(None, alias='isPrivate') # none for not me
wall_closed: bool = Field(False, alias='wallClosed')
model_config = {'populate_by_name': True}
class UserProfileUpdate(BaseModel):
id: UUID
username: str
display_name: str = Field(alias='displayName')
bio: str | None = None
updated_at: datetime | None = Field(None, alias='updatedAt')
model_config = {'populate_by_name': True}
class UserNotification(BaseModel):
id: UUID
username: str
@@ -16,22 +35,26 @@ class UserPost(UserNotification):
verified: bool = False
class UserSearch(UserPost):
class UserWhoToFollow(UserPost):
followers_count: int = Field(0, alias='followersCount')
class User(UserSearch):
class UserFollower(UserPost):
is_following: bool = Field(False, alias='isFollowing') # none for me
class UserSearch(UserFollower, UserWhoToFollow):
pass
class User(UserSearch, UserPrivacy):
banner: str | None = None
bio: str | None = None
pinned_post_id: UUID | None
private: bool | None = Field(None, alias='isPrivate') # none for not me
wall_closed: bool = Field(False, alias='wallClosed')
pinned_post_id: UUID | None = Field(None, alias='pinnedPostId')
following_count: int = Field(0, alias='followingCount')
posts_count: int = Field(0, alias='postsCount')
is_following: bool | None = Field(None, alias='isFollowing') # none for me
is_followed: bool | None = Field(None, alias='isFollowedBy') # none for me
created_at: datetime = Field(alias='createdAt')

View File

@@ -2,6 +2,8 @@ from _io import BufferedReader
from requests import Session
from itd.exceptions import InvalidToken, InvalidCookie
s = Session()
@@ -29,8 +31,9 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str,
else:
res = s.request(method.upper(), base, timeout=20, json=params, headers=headers, files=files)
res.raise_for_status()
return res.json()
print(res.text)
return res
def set_cookies(cookies: str):
for cookie in cookies.split('; '):
@@ -65,5 +68,11 @@ def auth_fetch(cookies: str, method: str, url: str, params: dict = {}, token: st
res = s.get(f'https://xn--d1ah4a.com/api/{url}', timeout=20, params=params, headers=headers)
else:
res = s.request(method, f'https://xn--d1ah4a.com/api/{url}', timeout=20, json=params, headers=headers)
res.raise_for_status()
return res.json()
# print(res.text)
if res.text == 'UNAUTHORIZED':
raise InvalidToken()
if res.json().get('error', {}).get('code') in ('SESSION_NOT_FOUND', 'REFRESH_TOKEN_MISSING'):
raise InvalidCookie()
return res

View File

@@ -1,10 +1,12 @@
from requests import Response
from itd.request import auth_fetch
def refresh_token(cookies: str):
return auth_fetch(cookies, 'post', 'v1/auth/refresh')['accessToken']
def refresh_token(cookies: str) -> Response:
return auth_fetch(cookies, 'post', 'v1/auth/refresh')
def change_password(cookies: str, token: str, old: str, new: str):
def change_password(cookies: str, token: str, old: str, new: str) -> Response:
return auth_fetch(cookies, 'post', 'v1/auth/change-password', {'newPassword': new, 'oldPassword': old}, token)
def logout(cookies: str):
def logout(cookies: str) -> Response:
return auth_fetch(cookies, 'post', 'v1/auth/logout')

View File

@@ -7,7 +7,7 @@ def add_comment(token: str, post_id: str, content: str, reply_comment_id: str |
return fetch(token, 'post', f'posts/{post_id}/comments', data)
def get_comments(token: str, post_id: str, limit: int = 20, cursor: int = 0, sort: str = 'popular'):
return fetch(token, 'get', f'posts/{post_id}/comments', {'limit': limit, 'sort': sort, 'cursor': cursor})['data']
return fetch(token, 'get', f'posts/{post_id}/comments', {'limit': limit, 'sort': sort, 'cursor': cursor})
def like_comment(token: str, comment_id: str):
return fetch(token, 'post', f'comments/{comment_id}/like')

View File

@@ -4,4 +4,4 @@ from itd.request import fetch
def upload_file(token: str, name: str, data: BufferedReader):
return fetch(token, 'post', 'files/upload', files={'file': (name, data)})
return fetch(token, 'post', 'files/upload', files={'file': (name, data)})

View File

@@ -1,7 +1,7 @@
from itd.request import fetch
def get_hastags(token: str, limit: int = 10):
return fetch(token, 'get', 'hashtags/trending', {'limit': limit})['data']
return fetch(token, 'get', 'hashtags/trending', {'limit': limit})
def get_posts_by_hastag(token: str, hashtag: str, limit: int = 20, cursor: int = 0):
return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor})

View File

@@ -18,7 +18,7 @@ def get_posts(token: str, username: str | None = None, limit: int = 20, cursor:
if tab:
data['tab'] = tab
return fetch(token, 'get', 'posts', data)['data']
return fetch(token, 'get', 'posts', data)
def get_post(token: str, id: str):
return fetch(token, 'get', f'posts/{id}')
@@ -42,4 +42,6 @@ def view_post(token: str, id: str):
return fetch(token, 'post', f'posts/{id}/view')
def get_liked_posts(token: str, username: str, limit: int = 20, cursor: int = 0):
return fetch(token, 'get', f'posts/user/{username}/liked', {'limit': limit, 'cursor': cursor})
return fetch(token, 'get', f'posts/user/{username}/liked', {'limit': limit, 'cursor': cursor})
# todo post restore

View File

@@ -1,10 +1,12 @@
from uuid import UUID
from itd.request import fetch
def get_user(token: str, username: str):
return fetch(token, 'get', f'users/{username}')
def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: str | None = None):
def update_profile(token: str, bio: str | None = None, display_name: str | None = None, username: str | None = None, banner_id: UUID | None = None):
data = {}
if bio:
data['bio'] = bio
@@ -13,14 +15,14 @@ def update_profile(token: str, bio: str | None = None, display_name: str | None
if username:
data['username'] = username
if banner_id:
data['bannerId'] = banner_id
data['bannerId'] = str(banner_id)
return fetch(token, 'put', 'users/me', data)
def update_privacy(token: str, wall_closed: bool = False, private: bool = False):
data = {}
if wall_closed:
if wall_closed is not None:
data['wallClosed'] = wall_closed
if private:
if private is not None:
data['isPrivate'] = private
return fetch(token, 'put', 'users/me/privacy', data)