31 Commits
v1.0.1 ... main

Author SHA1 Message Date
kilyabin
3bc20a1192 Merge branch 'main' into main 2026-03-01 14:09:26 +04:00
firedotguy
994a38e945 fix: add unescape for spans 2026-03-01 00:06:16 +03:00
firedotguy
86a378b613 feat: add spans in client 2026-02-28 23:48:26 +03:00
firedotguy
ec58bae1e8 Merge branch 'main' of https://github.com/firedotguy/itd-sdk 2026-02-28 23:27:53 +03:00
firedotguy
cd27baa8d6 feat: add spans 2026-02-28 23:27:41 +03:00
firedotguy
839ad8aaa8 Update README.md 2026-03-01 02:09:28 +06:00
firedotguy
1c452c147c docs: новки умри 2026-02-14 03:00:30 +06:00
firedotguy
7795fb6d7e docs: main account ban 2026-02-13 23:27:47 +03:00
firedotguy
6edc40308a feat: add profiel required exception; add spans to post 2026-02-13 22:38:28 +03:00
firedotguy
b3b109613b feat: update privacy data 2026-02-12 23:40:59 +03:00
firedotguy
62730b48e9 feat: add polls 2026-02-12 19:56:57 +03:00
firedotguy
c1042d32ae fix: add BannedAccount error 2026-02-12 18:25:25 +03:00
firedotguy
7cc343dab5 docs: remove plans; remove sse example 2026-02-10 18:39:43 +03:00
firedotguy
aad83b55d5 chore: update version; move scripts into examples folder 2026-02-10 18:28:16 +03:00
firedotguy
8e8b0b3bb9 chore: stylize sse code; fix: add sseclient-py to requirements 2026-02-10 17:53:48 +03:00
firedotguy
51518ce0d7 fix: stylize examples 2026-02-10 15:49:06 +03:00
kilyabin
d49fb2d4cb feat(scripts): скрипты для постинга и смены баннера 2026-02-10 15:29:05 +03:00
firedotguy
337a1eb17b fix: remove deprecated functions 2026-02-10 15:29:05 +03:00
firedotguy
3ff5b90380 docs: update banner updare example 2026-02-10 15:29:04 +03:00
Vasily Domakov
a3a3c012ff #12 Added examples 2026-02-10 15:29:04 +03:00
Vasily Domakov
f2e18e08c0 #11 Added SSE listening 2026-02-10 15:29:04 +03:00
Vasily Domakov
13365fc23a #11 Add model for events 2026-02-10 15:29:04 +03:00
kilyabin
a47b4d01b5 feat(scripts): скрипты для постинга и смены баннера 2026-02-10 12:27:58 +04:00
firedotguy
0f3ada9148 Merge pull request #14 from droptrigger/itd-sdk-12
#12 Added examples
2026-02-10 02:27:47 +06:00
firedotguy
8935233276 Merge pull request #13 from droptrigger/itd-sdk-11
#11 Added SSE listening
2026-02-10 02:27:05 +06:00
Vasily Domakov
ff5e410307 #12 Added examples 2026-02-09 23:22:30 +03:00
Vasily Domakov
1a4f9f6c5a #11 Added SSE listening 2026-02-09 23:21:01 +03:00
Vasily Domakov
de48c30c78 #11 Add model for events 2026-02-09 23:17:42 +03:00
firedotguy
9a4c47bd8e feat: add get and delete files 2026-02-08 23:11:59 +03:00
firedotguy
c2413277d6 fix: fix error 'str' object has not attribute 'get' (rate limit); add AlreadyFollowing exception on follow 2026-02-08 22:49:06 +03:00
firedotguy
5ebcdb1ad5 feat: add get replies and get_user_posts 2026-02-07 23:20:00 +03:00
36 changed files with 1395 additions and 112 deletions

0
' Normal file
View File

3
.gitignore vendored
View File

@@ -4,4 +4,5 @@ venv/
__pycache__/
dist
itd_sdk.egg-info
nowkie.gif
nowkie.gif
g.gif

View File

@@ -1,5 +1,9 @@
# itd-sdk
Клиент ITD для python
> [!CAUTION]
> ~~Мой основной аккаунт itd_sdk был забанен. Новый акк - itdsdk. #димонверниаккаунты~~
> ~~Проект больше не будет обнолвяться! яPR буду мержить~~
> ладно, буду мейнтйнить потихоньку...
## Установка
@@ -18,10 +22,25 @@ c = ITDClient('TOKEN', 'refresh_token=...; __ddg1_=...; __ddgid_=...; is_auth=1;
print(c.get_me())
```
<!--
> [!NOTE]
> Берите куки из запроса /auth/refresh. В остальных запросах нету refresh_token
> ![cookie](cookie-screen.png)
> ![cookie](cookie-screen.png) -->
### Получение cookies
Для получения access_token требуются cookies с `refresh_token`. Как их получить:
1. Откройте [итд.com](https://xn--d1ah4a.com) в браузере
2. Откройте DevTools (F12)
3. Перейдите на вкладку **Network**
4. Обновите страницу
5. Найдите запрос к `/auth/refresh`
6. Скопируйте значение **Cookie** из Request Headers
> Пример: `refresh_token=123123A67BCdEfGG; is_auth=1`
> В cookies также могут присутствовать значения типа `__ddgX__` (DDoS-Guard cookies) или `_ym_XXXX` (`X` - любое число или буква). Они необязательные и их наличие не влияет на результат
![cookie](cookie-screen.png)
---
### Скрипт на обновление имени
@@ -46,10 +65,9 @@ while True:
```python
from itd import ITDClient
c = ITDClient(None, '...')
c = ITDClient(None, 'Ваши cookies')
id = c.upload_file('любое-имя.png', open('реальное-имя-файла.png', 'rb'))['id']
c.update_profile(banner_id=id)
c.update_banner('имя-файла.png')
print('баннер обновлен')
```
@@ -64,6 +82,52 @@ c.create_post('тест1') # создание постов
# итд
```
### Стилизация постов ("спаны")
С обновления 1.3.0 добавлена функция "спанов". Для парсинга пока поддерживается только html, но в будущем будет добавлен markdown.
```python
from itd import ITDClient
from itd.utils import parse_html
с = ITDClient(cookies='refresh_token=123')
print(с.create_post(*parse_html('значит, я это щас отправил со своего клиента, <b>воот</b>. И еще тут спаны написаны через html, по типу < i > <i>11</i>')))
```
Поддерживаемые теги:
- `<b>`: жирный
- `<i>`: курсивный
- `<s>`: зачеркнутый
- `<u>`: подчеркнутый
- `<code>`: код
- `<spoiler>`: спойлер
- `<a href="https://google.com">`: ссылка
- `<q>`: цитата
<!-- ### SSE - прослушивание уведомлений в реальном времени
```python
from itd import ITDClient, StreamConnect, StreamNotification
# Используйте cookies для автоматического обновления токена
c = ITDClient(cookies='refresh_token=...; __ddg1_=...; is_auth=1')
for event in c.stream_notifications():
if isinstance(event, StreamConnect):
print(f'! Подключено к SSE: {event.user_id}')
elif isinstance(event, StreamNotification):
print(f'-- {event.type.value}: {event.actor.display_name} (@{event.actor.username})')
```
> [!NOTE]
> SSE автоматически переподключается при истечении токена
Типы уведомлений:
- `like` - лайк на пост
- `follow` - новый подписчик
- `wall_post` - пост на вашей стене
- `comment` - комментарий к посту
- `reply` - ответ на комментарий
- `repost` - репост вашего поста -->
### Кастомные запросы
```python
@@ -77,17 +141,10 @@ fetch(c.token, 'метод', 'эндпоинт', {'данные': 'данные'
> [!NOTE]
> `xn--d1ah4a.com` - punycode от "итд.com"
## Планы
- Форматированные сообщения об ошибках
- Логирование (через logging)
- Добавление ООП (отдеьные классы по типу User или Post вместо обычного JSON)
- Голосовые сообщения
## Прочее
Лицезия: [MIT](./LICENSE)
Лицезия: [MIT](./LICENSE)
Идея (и часть эндпоинтов): https://github.com/FriceKa/ITD-SDK-js
- По сути этот проект является реворком, просто на другом языке
Автор: [itd_sdk](https://xn--d1ah4a.com/itd_sdk) (в итд) [@desicars](https://t.me/desicars) (в тг)
Автор: ~~[itd_sdk](https://xn--d1ah4a.com/itd_sdk) забанили~~ [itdsdk](https://xn--d1ah4a.com/itdsdk) (в итд) [@desicars](https://t.me/desicars) (в тг)

17
examples/README.md Normal file
View File

@@ -0,0 +1,17 @@
# Примеры использования ITD SDK
Эта папка содержит примеры использования ITD SDK для различных сценариев.
## Дополнительная информация
- [Основной README](../README.md) - Документация по всему SDK
- Каждая папка с примерами содержит свой README с подробностями
## Помощь
Если примеры не работают:
1. Проверьте, что cookies актуальные (не истекли)
2. Убедитесь, что установлены все зависимости
3. Проверьте формат cookies (должен содержать `refresh_token=`)
4. Используйте Python 3.13+ (для поддержки `deprecated`)

View File

@@ -0,0 +1,7 @@
# Коллекция скриптов от [@kilyabin](https://github.com/kilyabin)
Пока всего два скрипта, однако будет дополняться
Работают через аргументы командной строки (например, `--file` или `--text`)
Есть помощь при аргументе `-h`, потому - разберетесь

View File

@@ -0,0 +1,56 @@
#!/usr/bin/env python3
from argparse import ArgumentParser
from os import getenv
from os.path import isfile
import sys
from itd import ITDClient
def main():
parser = ArgumentParser(
description='Upload image and set it as profile banner'
)
parser.add_argument(
'--token',
default=getenv('ITD_TOKEN'),
help='API token (or ITD_TOKEN env var)'
)
parser.add_argument(
'--file',
required=True,
help='Path to image file'
)
args = parser.parse_args()
if not args.token:
print('❌ Токен не задан (--token или ITD_TOKEN)', file=sys.stderr)
quit()
file_path = args.file
if not isfile(file_path):
print(f'❌ Файл не найден: {file_path}', file=sys.stderr)
quit()
try:
client = ITDClient(None, args.token)
data, _ = client.update_banner_new(file_path)
print('✅ Баннер обновлён!')
print('📄 Информация о файле:')
print(f' id: {data.id}')
print(f' filename: {data.filename}')
print(f' mime_type: {data.mime_type}')
print(f' size: {data.size} bytes')
print(f' url: {data.url}')
except Exception as e:
print('❌ Произошла ошибка:', e, file=sys.stderr)
quit()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
from uuid import UUID
from argparse import ArgumentParser
from os import getenv
from os.path import isfile, basename
from itd import ITDClient
def main():
parser = ArgumentParser(
description='Create a post on ITD via CLI'
)
parser.add_argument(
'--token',
default=getenv('ITD_TOKEN'),
help='Refresh token (or set ITD_TOKEN environment variable)'
)
parser.add_argument(
'--text',
required=True,
help='Text content of the post'
)
parser.add_argument(
'--file',
help='Optional file to attach to the post'
)
parser.add_argument(
'--filename',
help='Filename on server (if --file is used, default: local filename)'
)
args = parser.parse_args()
if not args.token:
print('❌ Token not provided (--token or ITD_TOKEN)')
quit()
try:
client = ITDClient(None, args.token)
file_id = None
if args.file:
if not isfile(args.file):
print(f'❌ File not found: {args.file}')
quit()
server_name = args.filename or basename(args.file)
with open(args.file, 'rb') as f:
response = client.upload_file(server_name, f)
file_id = str(getattr(response, 'id', None))
if not file_id:
print('❌ Failed to get file ID')
quit()
print(f'✅ File uploaded: {response.filename} (id={file_id})')
# Создаём пост с правильным аргументом 'content'
if file_id:
post_resp = client.create_post(content=args.text, attach_ids=[UUID(file_id)])
else:
post_resp = client.create_post(content=args.text)
print('✅ Post created successfully!')
print(f' id: {post_resp.id}')
print(f' text: {args.text}')
if file_id:
print(f' attached file id: {file_id}')
except Exception as e:
print('❌ Error:', e)
quit()
if __name__ == '__main__':
main()

75
examples/stream/README.md Normal file
View File

@@ -0,0 +1,75 @@
# Stream - Прослушивание уведомлений
Примеры работы с SSE (Server-Sent Events) потоком уведомлений в реальном времени.
## Подготовка
1. Установите зависимости:
```bash
pip install -r ../../requirements.txt
```
2. Получите cookies с `refresh_token` (см. [главный README](../README.md))
3. Запускайте примеры из корня проекта или из папки `examples/stream/`
## Примеры
### basic_stream.py
Базовое прослушивание всех уведомлений.
```bash
python basic_stream.py
```
Показывает все входящие уведомления в реальном времени.
### stop_stream.py
Программная остановка потока через `client.stop_stream()`.
```bash
python stop_stream.py
```
Полезно для интеграции в многопоточные приложения.
### filter_notifications.py
Фильтрация уведомлений по типу.
```bash
python filter_notifications.py
```
Показывает только выбранные типы (like, follow, comment). Настраивается через `SHOW_TYPES`.
### notification_logger.py
Логирование всех уведомлений в JSON файл.
```bash
python notification_logger.py
```
Создает файл `notifications_YYYYMMDD_HHMMSS.log` с полной историей событий.
## Типы уведомлений
- **like** - Лайк на пост
- **follow** - Новый подписчик
- **wall_post** - Пост на вашей стене
- **comment** - Комментарий к посту
- **reply** - Ответ на комментарий
- **repost** - Репост вашего поста
## Особенности
- ✅ Автоматическое переподключение при разрыве
- ✅ Автоматическое обновление токена (при использовании cookies)
- ✅ Обработка всех типов уведомлений
- ✅ Graceful shutdown по Ctrl+C
## API Reference
Подробная документация по методам и моделям:
- [Основной README](../../README.md) - Общая информация об SDK
- [itd/client.py](../../itd/client.py) - Метод `stream_notifications()`
- [itd/models/event.py](../../itd/models/event.py) - Модели `StreamConnect` и `StreamNotification`

View File

@@ -0,0 +1,37 @@
"""
Базовый пример прослушивания SSE потока уведомлений
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from itd import ITDClient, StreamConnect
def main():
cookies = 'YOUR_COOKIES_HERE'
if cookies == 'YOUR_COOKIES_HERE':
print('! Укажите cookies в переменной cookies')
print(' См. examples/README.md для инструкций')
return
client = ITDClient(cookies=cookies)
print('-- Подключение к SSE...')
try:
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'-- Подключено! User ID: {event.user_id}')
print('-- Ожидание уведомлений...\n')
else:
print(f'* {event.type.value}: {event.actor.username}')
if event.preview:
preview = event.preview[:50] + '...' if len(event.preview) > 50 else event.preview
print(f' {preview}')
except KeyboardInterrupt:
print(f'\n! Отключение...')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,54 @@
"""
Пример фильтрации уведомлений по типу
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from itd import ITDClient, StreamConnect
from itd.enums import NotificationType
def main():
cookies = 'YOUR_COOKIES_HERE'
if cookies == 'YOUR_COOKIES_HERE':
print('! Укажите cookies в переменной cookies')
print(' См. examples/README.md для инструкций')
return
client = ITDClient(cookies=cookies)
SHOW_TYPES = {
NotificationType.LIKE,
NotificationType.FOLLOW,
NotificationType.COMMENT
}
print('-- Подключение к SSE...')
print(f'-- Фильтр: {", ".join(t.value for t in SHOW_TYPES)}\n')
try:
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'✅ Подключено! User ID: {event.user_id}\n')
continue
if event.type not in SHOW_TYPES:
continue
# Обработка разных типов
if event.type == NotificationType.LIKE:
print(f'❤️ {event.actor.display_name} лайкнул ваш пост')
elif event.type == NotificationType.FOLLOW:
print(f'👤 {event.actor.display_name} подписался на вас')
elif event.type == NotificationType.COMMENT:
print(f'💬 {event.actor.display_name}: {event.preview}')
except KeyboardInterrupt:
print(f'\n! Отключение...')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,60 @@
"""
Пример логирования уведомлений в файл
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from itd import ITDClient, StreamConnect
from datetime import datetime
import json
def main():
cookies = 'YOUR_COOKIES_HERE'
if cookies == 'YOUR_COOKIES_HERE':
print('! Укажите cookies в переменной cookies')
print(' См. examples/README.md для инструкций')
return
client = ITDClient(cookies=cookies)
log_file = f'notifications_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
print(f'-- Подключение к SSE...')
print(f'-- Логирование в: {log_file}\n')
try:
with open(log_file, 'w', encoding='utf-8') as f:
for event in client.stream_notifications():
timestamp = datetime.now().isoformat()
if isinstance(event, StreamConnect):
log_entry = {
'timestamp': timestamp,
'type': 'connect',
'user_id': str(event.user_id)
}
print(f'-- Подключено! User ID: {event.user_id}')
else:
log_entry = {
'timestamp': timestamp,
'type': event.type.value,
'id': str(event.id),
'actor': {
'username': event.actor.username,
'display_name': event.actor.display_name
},
'preview': event.preview,
'target_type': event.target_type.value if event.target_type else None,
'target_id': str(event.target_id) if event.target_id else None
}
print(f'* {event.type.value}: {event.actor.username}')
f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
f.flush()
except KeyboardInterrupt:
print(f'\n! Отключение... Лог сохранен в {log_file}')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,44 @@
"""
Пример остановки SSE потока из кода
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import threading
from itd import ITDClient, StreamConnect
def main():
cookies = 'YOUR_COOKIES_HERE'
if cookies == 'YOUR_COOKIES_HERE':
print('! Укажите cookies в переменной cookies')
return
client = ITDClient(cookies=cookies)
def listen():
print('! Начинаем прослушивание...')
try:
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'-- Подключено! User ID: {event.user_id}')
else:
print(f'🔔 {event.type.value}: {event.actor.username}')
except Exception as e:
print(f'! Ошибка: {e}')
thread = threading.Thread(target=listen, daemon=True)
thread.start()
print('Прослушивание запущено. Нажмите Enter для остановки...')
input()
print('!! Останавливаем прослушивание...')
client.stop_stream()
thread.join(timeout=5)
print('! Остановлено')
if __name__ == '__main__':
main()

View File

@@ -1 +1,4 @@
from itd.client import Client as ITDClient
from itd.client import Client as ITDClient
from itd.models.event import StreamConnect, StreamNotification
__all__ = ['ITDClient', 'StreamConnect', 'StreamNotification']

View File

@@ -1,42 +1,46 @@
from warnings import deprecated
from uuid import UUID
from _io import BufferedReader
from typing import cast
from typing import cast, Iterator
from datetime import datetime
from json import JSONDecodeError, loads
from time import sleep
from requests.exceptions import ConnectionError, HTTPError
from sseclient import SSEClient
from itd.routes.users import get_user, update_profile, follow, unfollow, get_followers, get_following, update_privacy
from itd.routes.users import get_user, update_profile, follow, unfollow, get_followers, get_following, update_privacy, update_privacy_new
from itd.routes.etc import get_top_clans, get_who_to_follow, get_platform_status
from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment, add_reply_comment
from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment, add_reply_comment, get_replies
from itd.routes.hashtags import get_hashtags, get_posts_by_hashtag
from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count
from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts, restore_post, like_post, unlike_post
from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count, stream_notifications
from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts, restore_post, like_post, unlike_post, get_user_posts, vote
from itd.routes.reports import report
from itd.routes.search import search
from itd.routes.files import upload_file
from itd.routes.files import upload_file, get_file, delete_file
from itd.routes.auth import refresh_token, change_password, logout
from itd.routes.verification import verify, get_verification_status
from itd.routes.pins import get_pins, remove_pin, set_pin
from itd.models.comment import Comment
from itd.models.notification import Notification
from itd.models.post import Post, NewPost
from itd.models.post import Post, NewPost, PollData, Poll, Span
from itd.models.clan import Clan
from itd.models.hashtag import Hashtag
from itd.models.user import User, UserProfileUpdate, UserPrivacy, UserFollower, UserWhoToFollow
from itd.models.user import User, UserProfileUpdate, UserPrivacy, UserFollower, UserWhoToFollow, UserPrivacyData
from itd.models.pagination import Pagination, PostsPagintaion, LikedPostsPagintaion
from itd.models.verification import Verification, VerificationStatus
from itd.models.report import NewReport
from itd.models.file import File
from itd.models.pin import Pin
from itd.models.event import StreamConnect, StreamNotification
from itd.enums import PostsTab, ReportTargetType, ReportTargetReason
from itd.request import set_cookies
from itd.exceptions import (
NoCookie, NoAuthData, SamePassword, InvalidOldPassword, NotFound, ValidationError, UserBanned,
PendingRequestExists, Forbidden, UsernameTaken, CantFollowYourself, Unauthorized,
CantRepostYourPost, AlreadyReposted, AlreadyReported, TooLarge, PinNotOwned
CantRepostYourPost, AlreadyReposted, AlreadyReported, TooLarge, PinNotOwned, NoContent,
AlreadyFollowing, NotFoundOrForbidden, OptionsNotBelong, NotMultipleChoice, EmptyOptions
)
@@ -56,6 +60,7 @@ def refresh_on_error(func):
class Client:
def __init__(self, token: str | None = None, cookies: str | None = None):
self.cookies = cookies
self._stream_active = False # Флаг для остановки stream_notifications
if token:
self.token = token.replace('Bearer ', '')
@@ -189,7 +194,7 @@ class Client:
@refresh_on_error
def update_privacy(self, wall_closed: bool = False, private: bool = False) -> UserPrivacy:
"""Обновить настройки приватности
"""(УСТАРЕЛО! Используйте update_privacy_new) настройки приватности
Args:
wall_closed (bool, optional): Закрыть стену. Defaults to False.
@@ -203,6 +208,21 @@ class Client:
return UserPrivacy.model_validate(res.json())
@refresh_on_error
def update_privacy_new(self, privacy: UserPrivacyData) -> UserPrivacy:
"""Обновить настройки приватности
Args:
privacy (UserPrivacyData): Данные приватности
Returns:
UserPrivacy: Обновленные данные приватности
"""
res = update_privacy_new(self.token, privacy)
res.raise_for_status()
return UserPrivacy.model_validate(res.json())
@refresh_on_error
def follow(self, username: str) -> int:
"""Подписаться на пользователя
@@ -220,6 +240,8 @@ class Client:
res = follow(self.token, username)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('User')
if res.json().get('error', {}).get('code') == 'CONFLICT':
raise AlreadyFollowing()
if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and res.status_code == 400:
raise CantFollowYourself()
res.raise_for_status()
@@ -292,21 +314,6 @@ class Client:
return [UserFollower.model_validate(user) for user in res.json()['data']['users']], Pagination.model_validate(res.json()['data']['pagination'])
@deprecated("verificate устарел используйте verify")
@refresh_on_error
def verificate(self, file_url: str) -> Verification:
"""Отправить запрос на верификацию
Args:
file_url (str): Ссылка на видео
Raises:
PendingRequestExists: Запрос уже отправлен
Returns:
Verification: Верификация
"""
return self.verify(file_url)
@refresh_on_error
def verify(self, file_url: str) -> Verification:
@@ -426,6 +433,8 @@ class Client:
raise NotFound('User')
if res.status_code == 422 and 'found' in res.json():
raise ValidationError(*list(res.json()['found'].items())[0])
if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR':
raise NoContent()
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('Comment')
res.raise_for_status()
@@ -458,6 +467,32 @@ class Client:
return [Comment.model_validate(comment) for comment in data['comments']], Pagination(page=(cursor // limit) or 1, limit=limit, total=data['total'], hasMore=data['hasMore'], nextCursor=None)
@refresh_on_error
def get_replies(self, comment_id: UUID, limit: int = 50, page: int = 1, sort: str = 'oldest') -> tuple[list[Comment], Pagination]:
"""Получить список ответов на комментарий
Args:
comment_id (UUID): UUID поста
limit (int, optional): Лимит. Defaults to 50.
page (int, optional): Курсор (сколько пропустить). Defaults to 1.
sort (str, optional): Сортировка. Defaults to 'oldest'.
Raises:
NotFound: Пост не найден
Returns:
list[Comment]: Список комментариев
Pagination: Пагинация
"""
res = get_replies(self.token, comment_id, page, limit, sort)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('Comment')
res.raise_for_status()
data = res.json()['data']
return [Comment.model_validate(comment) for comment in data['replies']], Pagination.model_validate(data['pagination'])
@refresh_on_error
def like_comment(self, id: UUID) -> int:
"""Лайкнуть комментарий
@@ -518,19 +553,6 @@ class Client:
raise Forbidden('delete comment')
res.raise_for_status()
@deprecated("get_hastags устарел используйте get_hashtags")
@refresh_on_error
def get_hastags(self, limit: int = 10) -> list[Hashtag]:
"""Получить список популярных хэштэгов
Args:
limit (int, optional): Лимит. Defaults to 10.
Returns:
list[Hashtag]: Список хэштэгов
"""
return self.get_hashtags(limit)
@refresh_on_error
def get_hashtags(self, limit: int = 10) -> list[Hashtag]:
"""Получить список популярных хэштэгов
@@ -608,7 +630,6 @@ class Client:
res = mark_all_as_read(self.token)
res.raise_for_status()
@refresh_on_error
def get_unread_notifications_count(self) -> int:
"""Получить количество непрочитанных уведомлений
@@ -623,13 +644,15 @@ class Client:
@refresh_on_error
def create_post(self, content: str, wall_recipient_id: UUID | None = None, attach_ids: list[UUID] = []) -> NewPost:
def create_post(self, content: str | None = None, spans: list[Span] = [], wall_recipient_id: UUID | None = None, attachment_ids: list[UUID] = [], poll: PollData | None = None) -> NewPost:
"""Создать пост
Args:
content (str): Содержимое
content (str | None, optional): Содержимое. Defaults to None.
spans (lsit[Span], optional): Стилизация содержимого. Defaults to [].
wall_recipient_id (UUID | None, optional): UUID пользователя (чтобы создать пост ему на стене). Defaults to None.
attach_ids (list[UUID], optional): UUID вложений. Defaults to [].
attachment_ids (list[UUID], optional): UUID вложений. Defaults to [].
poll (PollData | None, optional): Опрос. Defaults to None.
Raises:
NotFound: Пользователь не найден
@@ -638,7 +661,8 @@ class Client:
Returns:
NewPost: Новый пост
"""
res = create_post(self.token, content, wall_recipient_id, attach_ids)
res = create_post(self.token, content, [span.model_dump(mode="json") for span in spans], wall_recipient_id, attachment_ids, poll.poll if poll else None)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('Wall recipient')
if res.status_code == 422 and 'found' in res.json():
@@ -647,6 +671,41 @@ class Client:
return NewPost.model_validate(res.json())
@refresh_on_error
def vote(self, id: UUID, option_ids: list[UUID]) -> Poll:
"""Проголосовать в опросе
Args:
id (UUID): UUID поста
option_ids (list[UUID]): Список UUID вариантов
Raises:
EmptyOptions: Пустые варианты
NotFound: Пост не найден или в посте нет опроса
NotFound: _description_
OptionsNotBelong: Неверные варианты (варинты не пренадлежат опросу)
NotMultipleChoice: Можно выбрать только 1 вариант (для опросов, где не разрешены несколько ответов)
Returns:
Poll: Опрос
"""
if not option_ids:
raise EmptyOptions()
res = vote(self.token, id, option_ids)
if res.json().get('error', {}).get('code') == 'NOT_FOUND' and res.json().get('error', {}).get('message') == 'Опрос не найден':
raise NotFound('Poll')
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('Post')
if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and res.json().get('error', {}).get('message') == 'Один или несколько вариантов не принадлежат этому опросу':
raise OptionsNotBelong()
if res.json().get('error', {}).get('code') == 'VALIDATION_ERROR' and res.json().get('error', {}).get('message') == 'В этом опросе можно выбрать только один вариант':
raise NotMultipleChoice()
res.raise_for_status()
return Poll.model_validate(res.json()['data'])
@refresh_on_error
def get_posts(self, cursor: int = 0, tab: PostsTab = PostsTab.POPULAR) -> tuple[list[Post], PostsPagintaion]:
"""Получить список постов
@@ -801,6 +860,30 @@ class Client:
raise NotFound('Post')
res.raise_for_status()
@refresh_on_error
def get_user_posts(self, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None) -> tuple[list[Post], LikedPostsPagintaion]:
"""Получить список постов пользователя
Args:
username_or_id (str | UUID): UUID или username пользователя
limit (int, optional): Лимит. Defaults to 20.
cursor (datetime | None, optional): Сдвиг (next_cursor). Defaults to None.
Raises:
NotFound: Пользователь не найден
Returns:
list[Post]: Список постов
LikedPostsPagintaion: Пагинация
"""
res = get_user_posts(self.token, username_or_id, limit, cursor)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('User')
res.raise_for_status()
data = res.json()['data']
return [Post.model_validate(post) for post in data['posts']], LikedPostsPagintaion.model_validate(data['pagination'])
@refresh_on_error
def get_liked_posts(self, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None) -> tuple[list[Post], LikedPostsPagintaion]:
"""Получить список лайкнутых постов пользователя
@@ -925,8 +1008,46 @@ class Client:
return File.model_validate(res.json())
@refresh_on_error
def get_file(self, id: UUID) -> File:
"""Получить файл
Args:
id (UUID): UUID файла
Raises:
NotFoundOrForbidden: Файл не найден или нет доступа
Returns:
File: Файл
"""
res = get_file(self.token, id)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFoundOrForbidden('File')
res.raise_for_status()
return File.model_validate(res.json())
@refresh_on_error
def delete_file(self, id: UUID) -> File:
"""Удалить файл
Args:
id (UUID): UUID файла
Raises:
NotFound: Файл не найден
"""
res = delete_file(self.token, id)
if res.json().get('error', {}).get('code') == 'NOT_FOUND':
raise NotFound('File')
res.raise_for_status()
return File.model_validate(res.json())
# @deprecated # Этот декоратор появился в 3.13, а наша библиотека поддерживает с 3.9
def update_banner(self, name: str) -> UserProfileUpdate:
"""Обновить банер (шорткат из upload_file + update_profile)
"""[DEPRECATED] Обновить банер (шорткат из upload_file + update_profile)
Args:
name (str): Имя файла
@@ -937,6 +1058,19 @@ class Client:
id = self.upload_file(name, cast(BufferedReader, open(name, 'rb'))).id
return self.update_profile(banner_id=id)
def update_banner_new(self, name: str) -> tuple[File, UserProfileUpdate]:
"""Обновить банер (шорткат из upload_file + update_profile)
Args:
name (str): Имя файла
Returns:
File: Загруженный файл
UserProfileUpdate: Обновленный профиль
"""
file = self.upload_file(name, cast(BufferedReader, open(name, 'rb')))
return file, self.update_profile(banner_id=file.id)
@refresh_on_error
def restore_post(self, post_id: UUID) -> None:
"""Восстановить удалённый пост
@@ -1017,4 +1151,103 @@ class Client:
raise PinNotOwned(slug)
res.raise_for_status()
return res.json()['pin']
return res.json()['pin']
@refresh_on_error
def stream_notifications(self) -> Iterator[StreamConnect | StreamNotification]:
"""Слушать SSE поток уведомлений
Yields:
StreamConnect | StreamNotification: События подключения или уведомления
Example:
```python
from itd import ITDClient
client = ITDClient(cookies='refresh_token=...')
# Запуск прослушивания
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'Подключено: {event.user_id}')
else:
print(f'Уведомление: {event.type} от {event.actor.username}')
# Остановка из другого потока или обработчика
# client.stop_stream()
```
"""
self._stream_active = True
while self._stream_active:
try:
response = stream_notifications(self.token)
response.raise_for_status()
client = SSEClient(response)
for event in client.events():
if not self._stream_active:
response.close()
return
try:
if not event.data or event.data.strip() == '':
continue
data = loads(event.data)
if 'userId' in data and 'timestamp' in data and 'type' not in data:
yield StreamConnect.model_validate(data)
else:
yield StreamNotification.model_validate(data)
except JSONDecodeError:
print(f'Не удалось распарсить сообщение: {event.data}')
continue
except Exception as e:
print(f'Ошибка обработки события: {e}')
continue
except Unauthorized:
if self.cookies and self._stream_active:
print('Токен истек, обновляем...')
self.refresh_auth()
continue
else:
raise
except Exception as e:
if not self._stream_active:
return
print(f'Ошибка соединения: {e}, переподключение через 5 секунд...')
sleep(5)
continue
def stop_stream(self):
"""Остановить прослушивание SSE потока
Example:
```python
import threading
from itd import ITDClient
client = ITDClient(cookies='refresh_token=...')
# Запуск в отдельном потоке
def listen():
for event in client.stream_notifications():
print(event)
thread = threading.Thread(target=listen)
thread.start()
# Остановка через 10 секунд
import time
time.sleep(10)
client.stop_stream()
thread.join()
```
"""
print('stop event')
self._stream_active = False

View File

@@ -28,7 +28,26 @@ class AttachType(Enum):
AUDIO = 'audio'
IMAGE = 'image'
VIDEO = 'video'
FILE = 'file'
class PostsTab(Enum):
FOLLOWING = 'following'
POPULAR = 'popular'
class AccessType(Enum):
"""Типы разрешений для видимости лайков и записей на стене"""
NOBODY = 'nobody' # никто
MUTUAL = 'mutual' # взаимные
FOLLOWERS = 'followers' # подписчики
EVERYONE = 'everyone' # все
class SpanType(Enum):
MONOSPACE = 'monospace' # моноширный (код)
STRIKE = 'strike' # зачеркнутый
BOLD = 'bold' # жирный
ITALIC = 'italic' # курсив
SPOILER = 'spoiler' # спойлер
UNDERLINE = 'underline' # подчеркнутый
HASHTAG = 'hashtag' # хэштэг ? (появляется только при получении постов, при создании нету)
LINK = 'link' # ссылка
QUOTE = 'quote' # цитата

View File

@@ -37,16 +37,23 @@ class NotFound(Exception):
def __str__(self):
return f'{self.obj} not found'
class NotFoundOrForbidden(Exception):
def __init__(self, obj: str):
self.obj = obj
def __str__(self):
return f'{self.obj} not found or access denied'
class UserBanned(Exception):
def __str__(self):
return 'User banned'
class ValidationError(Exception):
def __init__(self, name: str, value: str):
self.name = name
self.value = value
# def __init__(self, name: str, value: str):
# self.name = name
# self.value = value
def __str__(self):
return f'Failed validation on {self.name}: "{self.value}"'
return 'Failed validation'# on {self.name}: "{self.value}"'
class PendingRequestExists(Exception):
def __str__(self):
@@ -98,4 +105,32 @@ class PinNotOwned(Exception):
def __init__(self, pin: str) -> None:
self.pin = pin
def __str__(self):
return f'You do not own "{self.pin}" pin'
return f'You do not own "{self.pin}" pin'
class NoContent(Exception):
def __str__(self) -> str:
return 'Content or attachments required'
class AlreadyFollowing(Exception):
def __str__(self) -> str:
return 'Already following user'
class AccountBanned(Exception):
def __str__(self) -> str:
return 'Account has been deactivated'
class OptionsNotBelong(Exception):
def __str__(self) -> str:
return 'One or more options do not belong to poll'
class NotMultipleChoice(Exception):
def __str__(self) -> str:
return 'Only one option can be choosen in this poll'
class EmptyOptions(Exception):
def __str__(self) -> str:
return 'Options cannot be empty (pre-validation)'
class ProfileRequired(Exception):
def __str__(self) -> str:
return 'No profile. Please create your profile first'

View File

@@ -0,0 +1,3 @@
from itd.models.event import StreamConnect, StreamNotification
__all__ = ['StreamConnect', 'StreamNotification']

17
itd/models/event.py Normal file
View File

@@ -0,0 +1,17 @@
from uuid import UUID
from pydantic import BaseModel, Field
from itd.models.notification import Notification
class StreamConnect(BaseModel):
"""Событие подключения к SSE потоку"""
user_id: UUID = Field(alias='userId')
timestamp: int
class StreamNotification(Notification):
"""Уведомление из SSE потока"""
user_id: UUID = Field(alias='userId')
sound: bool = True

View File

@@ -1,4 +1,5 @@
from uuid import UUID
from datetime import datetime
from pydantic import BaseModel, Field
@@ -10,6 +11,7 @@ class File(BaseModel):
filename: str
mime_type: str = Field(alias='mimeType')
size: int
created_at: datetime | None = Field(None, alias='createdAt')
class PostAttach(BaseModel):

View File

@@ -1,46 +1,104 @@
from uuid import UUID
from datetime import datetime
from pydantic import Field, BaseModel
from pydantic import Field, BaseModel, field_validator
from itd.models.user import UserPost, UserNewPost
from itd.models._text import TextObject
from itd.models.file import PostAttach
from itd.models.comment import Comment
from itd.enums import SpanType
class _PostShort(TextObject):
class NewPollOption(BaseModel):
text: str
class PollOption(NewPollOption):
id: UUID
position: int = 0
votes: int = Field(0, alias='votesCount')
class _Poll(BaseModel):
multiple: bool = Field(False, alias='multipleChoice')
question: str
class NewPoll(_Poll):
options: list[NewPollOption]
model_config = {'serialize_by_alias': True}
class PollData:
def __init__(self, question: str, options: list[str], multiple: bool = False):
self.poll = NewPoll(question=question, options=[NewPollOption(text=option) for option in options], multipleChoice=multiple)
class Poll(_Poll):
id: UUID
post_id: UUID = Field(alias='postId')
options: list[PollOption]
votes: int = Field(0, alias='totalVotes')
is_voted: bool = Field(False, alias='hasVoted')
voted_option_ids: list[UUID] = Field([], alias='votedOptionIds')
created_at: datetime = Field(alias='createdAt')
@field_validator('created_at', mode='plain')
@classmethod
def validate_created_at(cls, v: str):
try:
return datetime.strptime(v + '00', '%Y-%m-%d %H:%M:%S.%f%z')
except ValueError:
return datetime.strptime(v, '%Y-%m-%dT%H:%M:%S.%f')
class Span(BaseModel):
length: int
offset: int
type: SpanType
url: str | None = None
class _PostCounts(TextObject):
likes_count: int = Field(0, alias='likesCount')
comments_count: int = Field(0, alias='commentsCount')
reposts_count: int = Field(0, alias='repostsCount')
views_count: int = Field(0, alias='viewsCount')
spans: list[Span] = []
class PostShort(_PostShort):
class _PostAuthor(_PostCounts):
author: UserPost
class OriginalPost(PostShort):
class OriginalPost(_PostAuthor):
is_deleted: bool = Field(False, alias='isDeleted')
class _Post(_PostShort):
class _Post(_PostCounts):
is_liked: bool = Field(False, alias='isLiked')
is_reposted: bool = Field(False, alias='isReposted')
is_viewed: bool = Field(False, alias='isViewed')
is_owner: bool = Field(False, alias='isOwner')
is_pinned: bool = Field(False, alias='isPinned') # only for user wall
attachments: list[PostAttach] = []
comments: list[Comment] = []
original_post: OriginalPost | None = None
original_post: OriginalPost | None = None # for reposts
wall_recipient_id: UUID | None = Field(None, alias='wallRecipientId')
wall_recipient: UserPost | None = Field(None, alias='wallRecipient')
class Post(_Post, PostShort):
pass
class Post(_Post, _PostAuthor):
poll: Poll | None = None
class NewPost(_Post):
author: UserNewPost
poll: NewPoll | None = None

View File

@@ -4,13 +4,41 @@ from datetime import datetime
from pydantic import BaseModel, Field
from itd.models.pin import ShortPin
from itd.enums import AccessType
class UserPrivacy(BaseModel):
class _UserPrivacy(BaseModel):
private: bool | None = Field(None, alias='isPrivate') # none for not me
wall_closed: bool = Field(False, alias='wallClosed')
wall_closed: bool | None = Field(None, alias='wallClosed', deprecated=True)
wall_access: AccessType = Field(AccessType.EVERYONE, alias='wallAccess')
likes_visibility: AccessType = Field(AccessType.EVERYONE, alias='likesVisibility')
model_config = {'populate_by_name': True}
model_config = {'serialize_by_alias': True}
class UserPrivacy(_UserPrivacy):
show_last_seen: bool = Field(True, alias='showLastSeen')
class UserPrivacyData:
def __init__(self, private: bool | None = None, wall_access: AccessType | None = None, likes_visibility: AccessType | None = None, show_last_seen: bool | None = None) -> None:
self.private = private
self.wall_access = wall_access
self.likes_visibility = likes_visibility
self.show_last_seen = show_last_seen
def to_dict(self):
data = {}
if self.private is not None:
data['isPrivate'] = self.private
if self.wall_access is not None:
data['wallAccess'] = self.wall_access.value
if self.likes_visibility is not None:
data['likesVisibility'] = self.likes_visibility.value
if self.show_last_seen is not None:
data['showLastSeen'] = self.show_last_seen
return data
class UserProfileUpdate(BaseModel):
@@ -51,7 +79,7 @@ class UserSearch(UserFollower, UserWhoToFollow):
pass
class User(UserSearch, UserPrivacy):
class User(UserSearch, _UserPrivacy):
banner: str | None = None
bio: str | None = None
pinned_post_id: UUID | None = Field(None, alias='pinnedPostId')
@@ -62,3 +90,5 @@ class User(UserSearch, UserPrivacy):
is_followed: bool | None = Field(None, alias='isFollowedBy') # none for me
created_at: datetime = Field(alias='createdAt')
last_seen_at: datetime | None = Field(None, alias='lastSeen')
online: bool = False

View File

@@ -3,7 +3,7 @@ from _io import BufferedReader
from requests import Session
from requests.exceptions import JSONDecodeError
from itd.exceptions import InvalidToken, InvalidCookie, RateLimitExceeded, Unauthorized
from itd.exceptions import InvalidToken, InvalidCookie, RateLimitExceeded, Unauthorized, AccountBanned, ProfileRequired
s = Session()
@@ -33,11 +33,17 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str,
res = s.request(method.upper(), base, timeout=120 if files else 20, json=params, headers=headers, files=files)
try:
if res.json().get('error') == 'Too Many Requests':
raise RateLimitExceeded(0)
if res.json().get('error', {}).get('code') == 'RATE_LIMIT_EXCEEDED':
raise RateLimitExceeded(res.json()['error'].get('retryAfter', 0))
if res.json().get('error', {}).get('code') == 'UNAUTHORIZED':
raise Unauthorized()
except JSONDecodeError:
if res.json().get('error', {}).get('code') in ('ACCOUNT_BANNED', 'USER_BLOCKED'):
raise AccountBanned()
if res.json().get('error', {}).get('code') == 'PROFILE_REQUIRED':
raise ProfileRequired()
except (JSONDecodeError, AttributeError):
pass # todo
if not res.ok:
@@ -45,6 +51,17 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str,
return res
def fetch_stream(token: str, url: str):
"""Fetch для SSE streaming запросов"""
base = f'https://xn--d1ah4a.com/api/{url}'
headers = {
"Accept": "text/event-stream",
"Authorization": 'Bearer ' + token,
"Cache-Control": "no-cache"
}
return s.get(base, headers=headers, stream=True, timeout=None)
def set_cookies(cookies: str):
for cookie in cookies.split('; '):
s.cookies.set(cookie.split('=')[0], cookie.split('=')[-1], path='/', domain='xn--d1ah4a.com.com')
@@ -79,10 +96,11 @@ def auth_fetch(cookies: str, method: str, url: str, params: dict = {}, token: st
else:
res = s.request(method, f'https://xn--d1ah4a.com/api/{url}', timeout=20, json=params, headers=headers)
# print(res.text)
if res.text == 'UNAUTHORIZED':
raise InvalidToken()
try:
if res.json().get('error') == 'Too Many Requests':
raise RateLimitExceeded(0)
if res.json().get('error', {}).get('code') == 'RATE_LIMIT_EXCEEDED':
raise RateLimitExceeded(res.json()['error'].get('retryAfter', 0))
if res.json().get('error', {}).get('code') in ('SESSION_NOT_FOUND', 'REFRESH_TOKEN_MISSING', 'SESSION_REVOKED', 'SESSION_EXPIRED'):

View File

@@ -19,3 +19,6 @@ def unlike_comment(token: str, comment_id: UUID):
def delete_comment(token: str, comment_id: UUID):
return fetch(token, 'delete', f'comments/{comment_id}')
def get_replies(token: str, comment_id: UUID, page: int = 1, limit: int = 50, sort: str = 'oldest'):
return fetch(token, 'get', f'comments/{comment_id}/replies', {'page': page, 'limit': limit, 'sort': sort})

View File

@@ -1,7 +1,14 @@
from _io import BufferedReader
from uuid import UUID
from itd.request import fetch
def upload_file(token: str, name: str, data: BufferedReader):
return fetch(token, 'post', 'files/upload', files={'file': (name, data)})
def get_file(token: str, id: UUID):
return fetch(token, 'get', f'files/{id}')
def delete_file(token: str, id: UUID):
return fetch(token, 'delete', f'files/{id}')

View File

@@ -1,17 +1,8 @@
from warnings import deprecated
from uuid import UUID
from itd.request import fetch
@deprecated("get_hastags устарела используйте get_hashtags")
def get_hastags(token: str, limit: int = 10):
return fetch(token, 'get', 'hashtags/trending', {'limit': limit})
def get_hashtags(token: str, limit: int = 10):
return fetch(token, 'get', 'hashtags/trending', {'limit': limit})
@deprecated("get_posts_by_hastag устерла используй get_posts_by_hashtag")
def get_posts_by_hastag(token: str, hashtag: str, limit: int = 20, cursor: UUID | None = None):
return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor})
def get_posts_by_hashtag(token: str, hashtag: str, limit: int = 20, cursor: UUID | None = None):
return fetch(token, 'get', f'hashtags/{hashtag}/posts', {'limit': limit, 'cursor': cursor})

View File

@@ -1,6 +1,6 @@
from uuid import UUID
from itd.request import fetch
from itd.request import fetch, fetch_stream
def get_notifications(token: str, limit: int = 20, offset: int = 0):
return fetch(token, 'get', 'notifications', {'limit': limit, 'offset': offset})
@@ -12,4 +12,12 @@ def mark_all_as_read(token: str):
return fetch(token, 'post', f'notifications/read-all')
def get_unread_notifications_count(token: str):
return fetch(token, 'get', 'notifications/count')
return fetch(token, 'get', 'notifications/count')
def stream_notifications(token: str):
"""Получить SSE поток уведомлений
Returns:
Response: Streaming response для SSE
"""
return fetch_stream(token, 'notifications/stream')

View File

@@ -3,13 +3,18 @@ from uuid import UUID
from itd.request import fetch
from itd.enums import PostsTab
from itd.models.post import NewPoll
def create_post(token: str, content: str, wall_recipient_id: UUID | None = None, attachment_ids: list[UUID] = []):
data: dict = {'content': content}
def create_post(token: str, content: str | None = None, spans: list[dict] = [], wall_recipient_id: UUID | None = None, attachment_ids: list[UUID] = [], poll: NewPoll | None = None):
data: dict = {'content': content or ''}
if spans:
data['spans'] = spans
if wall_recipient_id:
data['wallRecipientId'] = str(wall_recipient_id)
if attachment_ids:
data['attachmentIds'] = list(map(str, attachment_ids))
if poll:
data['poll'] = poll.model_dump()
return fetch(token, 'post', 'posts', data)
@@ -40,11 +45,17 @@ def view_post(token: str, id: UUID):
def get_liked_posts(token: str, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None):
return fetch(token, 'get', f'posts/user/{username_or_id}/liked', {'limit': limit, 'cursor': cursor})
def restore_post(token: str, post_id: UUID):
return fetch(token, "post", f"posts/{post_id}/restore",)
def get_user_posts(token: str, username_or_id: str | UUID, limit: int = 20, cursor: datetime | None = None):
return fetch(token, 'get', f'posts/user/{username_or_id}', {'limit': limit, 'cursor': cursor})
def like_post(token: str, post_id: UUID):
return fetch(token, "post", f"posts/{post_id}/like")
def restore_post(token: str, id: UUID):
return fetch(token, "post", f"posts/{id}/restore",)
def unlike_post(token: str, post_id: UUID):
return fetch(token, "delete", f"posts/{post_id}/like")
def like_post(token: str, id: UUID):
return fetch(token, "post", f"posts/{id}/like")
def unlike_post(token: str, id: UUID):
return fetch(token, "delete", f"posts/{id}/like")
def vote(token: str, id: UUID, options: list[UUID]):
return fetch(token, 'post', f'posts/{id}/poll/vote', {'optionIds': [str(option) for option in options]})

View File

@@ -1,6 +1,7 @@
from uuid import UUID
from itd.request import fetch
from itd.models.user import UserPrivacyData
def get_user(token: str, username: str):
@@ -26,6 +27,9 @@ def update_privacy(token: str, wall_closed: bool = False, private: bool = False)
data['isPrivate'] = private
return fetch(token, 'put', 'users/me/privacy', data)
def update_privacy_new(token: str, privacy: UserPrivacyData):
return fetch(token, 'put', 'users/me/privacy', privacy.to_dict())
def follow(token: str, username: str):
return fetch(token, 'post', f'users/{username}/follow')

View File

@@ -1,14 +1,8 @@
from warnings import deprecated
from itd.request import fetch
def verify(token: str, file_url: str):
# {"success":true,"request":{"id":"fc54e54f-8586-4d8c-809e-df93161f99da","userId":"9096a85b-c319-483e-8940-6921be427ad0","videoUrl":"https://943701f000610900cbe86b72234e451d.bckt.ru/videos/354f28a6-9ac7-48a6-879a-a454062b1d6b.mp4","status":"pending","rejectionReason":null,"reviewedBy":null,"reviewedAt":null,"createdAt":"2026-01-30T12:58:14.228Z","updatedAt":"2026-01-30T12:58:14.228Z"}}
return fetch(token, 'post', 'verification/submit', {'videoUrl': file_url})
@deprecated("verificate устарела используйте verify")
def verificate(token: str, file_url: str):
return verify(token, file_url)
def get_verification_status(token: str):
return fetch(token, 'get', 'verification/status')

195
itd/utils.py Normal file
View File

@@ -0,0 +1,195 @@
# новая версия от чат гпт. у меня самого не получилось сделать
import re
from itd.models.post import Span
from itd.enums import SpanType
class Tag:
def __init__(self, open: str, close: str, type: SpanType):
self.open = open
self.close = close
self.type = type
def _parse_spans(text: str, tags: list[Tag]) -> tuple[str, list[Span]]:
spans: list[Span] = []
stack: list[tuple[int, SpanType, int, int, str | None]] = []
clean_chars: list[str] = []
i = 0
while i < len(text):
# Проверка на экранирование
escaped = text[i] == '\\'
# Сначала проверяем закрывающие теги (с проверкой на экранирование)
closed = False
for idx, tag in enumerate(tags):
if text.startswith(tag.close, i) and stack and stack[-1][0] == idx:
if escaped:
# Экранированный закрывающий тег — выводим как текст (без слэша)
clean_chars.append(tag.close)
i += len(tag.close)
closed = True
break
_, span_type, offset, _, url = stack.pop()
spans.append(Span(length=len(clean_chars) - offset, offset=offset, type=span_type, url=url))
i += len(tag.close)
closed = True
break
if closed:
continue
# Затем проверяем открывающие теги
opened = False
for idx, tag in enumerate(tags):
if tag.type == SpanType.LINK:
if escaped:
match = re.match(tag.open, text[i+1:])
if match:
# Экранированный открывающий тег — выводим как текст (без слэша)
clean_chars.append(match.group(0))
i += 1 + match.end()
opened = True
break
else:
match = re.match(tag.open, text[i:])
if match:
url = match.group(1) if match.groups() else None
stack.append((idx, tag.type, len(clean_chars), i, url))
i += match.end()
opened = True
break
elif text.startswith(tag.open, i):
if escaped:
# Экранированный обычный тег — пропускаем, будет обработан в блоке is_escape
break
stack.append((idx, tag.type, len(clean_chars), i, None))
i += len(tag.open)
opened = True
break
if opened:
continue
# Если это слэш, проверяем, не экранирует ли он следующий тег
if escaped:
# Проверяем, следует ли за слэшем тег
is_escape = False
for tag in tags:
if tag.type == SpanType.LINK:
if re.match(tag.open, text[i+1:]):
is_escape = True
break
elif text.startswith(tag.open, i+1):
is_escape = True
break
# Проверяем закрывающие теги
if not is_escape:
for tag in tags:
if text.startswith(tag.close, i+1):
is_escape = True
break
if is_escape:
# Пропускаем слэш и выводим следующий тег как текст
i += 1
# Находим и выводим экранированный тег
skip = False
for tag in tags:
if tag.type == SpanType.LINK:
match = re.match(tag.open, text[i:])
if match:
clean_chars.append(match.group(0))
i += match.end()
skip = True
break
elif text.startswith(tag.open, i):
clean_chars.append(tag.open)
i += len(tag.open)
skip = True
break
if not skip:
# Проверяем закрывающие теги
for tag in tags:
if text.startswith(tag.close, i):
clean_chars.append(tag.close)
i += len(tag.close)
skip = True
break
continue
clean_chars.append(text[i])
i += 1
if stack:
_, last_type, _, raw_pos, _ = stack[-1]
raise ValueError(f'No closing tag for {last_type.value} at pos {raw_pos}')
spans.sort(key=lambda span: span.offset)
return ''.join(clean_chars), spans
def parse_html(text: str) -> tuple[str, list[Span]]:
return _parse_spans(
text,
[
Tag('<b>', '</b>', SpanType.BOLD),
Tag('<i>', '</i>', SpanType.ITALIC),
Tag('<s>', '</s>', SpanType.STRIKE),
Tag('<u>', '</u>', SpanType.UNDERLINE),
Tag('<code>', '</code>', SpanType.MONOSPACE),
Tag('<spoiler>', '</spoiler>', SpanType.SPOILER),
Tag(r'<a href="([^"]+)">', '</a>', SpanType.LINK),
Tag(r'<q>', '</q>', SpanType.QUOTE),
],
)
# версия от человека (не работает с вложенными тэгами)
# from re import finditer, Match
# from itd.models.post import Span
# from itd.enums import SpanType
# class Tag:
# def __init__(self, open: str, close: str, type: SpanType):
# self.open = open
# self.close = close
# self.type = type
# def raise_error(self, pos: int):
# raise ValueError(f'No closing tag for {self.type.value} at pos {pos - len(self.open)}')
# def to_span(self, start: int, end: int) -> Span:
# return Span(length=end - (start - len(self.open)), offset=start - len(self.open), type=self.type)
# def get_pos(self, match: Match[str], text: str, offset: int) -> tuple[int, int, str]:
# start = match.end() - offset
# text = text[:match.start() - offset] + text[start:]
# end = text.find(self.close, start)
# if end == -1:
# self.raise_error(start)
# return start - len(self.open), end, text[:end] + text[end + len(self.close):]
# def parse_html(text: str) -> tuple[str, list[Span]]:
# spans = []
# for tag in [
# Tag('<b>', '</b>', SpanType.BOLD),
# Tag('<i>', '</i>', SpanType.ITALIC),
# Tag('<s>', '</s>', SpanType.STRIKE),
# Tag('<u>', '</u>', SpanType.UNDERLINE),
# Tag('<code>', '</code>', SpanType.MONOSPACE),
# Tag('<spoiler>', '</spoiler>', SpanType.SPOILER),
# ]:
# offset = 0
# full_text = text
# for match in finditer(tag.open, full_text):
# start, end, text = tag.get_pos(match, text, offset)
# spans.append(tag.to_span(start, end))
# offset += len(tag.open) + len(tag.close)
# return text, spans

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "itd-sdk"
version = "1.0.1"
version = "1.2.0"
description = "ITD client for python"
readme = "README.md"
authors = [
@@ -12,6 +12,6 @@ authors = [
]
license = "MIT"
dependencies = [
"requests", "pydantic"
"requests", "pydantic", "sseclient-py"
]
requires-python = ">=3.9"

View File

@@ -1,2 +1,3 @@
pydantic==2.11.9
requests==2.32.3
requests==2.32.3
sseclient-py==1.8.0

7
scripts/README.md Normal file
View File

@@ -0,0 +1,7 @@
# Коллекция скриптов от [@kilyabin](https://github.com/kilyabin)
Пока всего два скрипта, однако будет дополняться
Работают через аргументы командной строки (например, `--file` или `--text`)
Есть помощь при аргументе `-h`, потому - разберетесь

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
import argparse
import os
import sys
from itd import ITDClient
def main():
parser = argparse.ArgumentParser(
description='Upload image and set it as profile banner'
)
parser.add_argument(
'--token',
default=os.getenv('ITD_TOKEN'),
help='API token (or ITD_TOKEN env var)'
)
parser.add_argument(
'--file',
required=True,
help='Path to image file'
)
parser.add_argument(
'--name',
help='Filename on server (default: local filename)'
)
args = parser.parse_args()
if not args.token:
print('❌ Токен не задан (--token или ITD_TOKEN)', file=sys.stderr)
sys.exit(1)
file_path = args.file
if not os.path.isfile(file_path):
print(f'❌ Файл не найден: {file_path}', file=sys.stderr)
sys.exit(1)
server_name = args.name or os.path.basename(file_path)
try:
client = ITDClient(None, args.token)
# Загружаем файл
with open(file_path, 'rb') as f:
response = client.upload_file(server_name, f)
# Проверяем, что получили id
file_id = getattr(response, 'id', None)
if file_id is None:
print('Не удалось получить id файла')
print(response)
sys.exit(1)
# Преобразуем UUID в строку
file_id_str = str(file_id)
# Обновляем баннер
update_resp = client.update_profile(banner_id=file_id_str)
print('✅ Баннер обновлён!')
print('📄 Информация о файле:')
print(f' id: {file_id_str}')
print(f' filename: {response.filename}')
print(f' mime_type: {response.mime_type}')
print(f' size: {response.size} bytes')
print(f' url: {response.url}')
except Exception as e:
print('❌ Произошла ошибка:', e, file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
import argparse
import os
import sys
from itd import ITDClient
def main():
parser = argparse.ArgumentParser(
description='Create a post on ITD via CLI'
)
parser.add_argument(
'--token',
default=os.getenv('ITD_TOKEN'),
help='Refresh token (or set ITD_TOKEN environment variable)'
)
parser.add_argument(
'--text',
required=True,
help='Text content of the post'
)
parser.add_argument(
'--file',
help='Optional file to attach to the post'
)
parser.add_argument(
'--filename',
help='Filename on server (if --file is used, default: local filename)'
)
args = parser.parse_args()
if not args.token:
print('❌ Token not provided (--token or ITD_TOKEN)', file=sys.stderr)
sys.exit(1)
try:
client = ITDClient(None, args.token)
file_id = None
if args.file:
if not os.path.isfile(args.file):
print(f'❌ File not found: {args.file}', file=sys.stderr)
sys.exit(1)
server_name = args.filename or os.path.basename(args.file)
with open(args.file, 'rb') as f:
response = client.upload_file(server_name, f)
file_id = str(getattr(response, 'id', None))
if not file_id:
print('❌ Failed to get file ID')
sys.exit(1)
print(f'✅ File uploaded: {response.filename} (id={file_id})')
# Создаём пост с правильным аргументом 'content'
if file_id:
post_resp = client.create_post(content=args.text, file_ids=[file_id])
else:
post_resp = client.create_post(content=args.text)
# Вывод результата
print('✅ Post created successfully!')
print(f' id: {post_resp.id}')
if hasattr(post_resp, 'url'):
print(f' url: {post_resp.url}')
print(f' text: {args.text}')
if file_id:
print(f' attached file id: {file_id}')
except Exception as e:
print('❌ Error:', e, file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -2,10 +2,10 @@ from setuptools import setup, find_packages
setup(
name='itd-sdk',
version='1.0.1',
version='1.2.0',
packages=find_packages(),
install_requires=[
'requests', 'pydantic'
'requests', 'pydantic', 'sseclient-py'
],
python_requires=">=3.9"
)