diff --git a/itd/__init__.py b/itd/__init__.py index e33c29f..5d71f2a 100644 --- a/itd/__init__.py +++ b/itd/__init__.py @@ -1 +1,4 @@ -from itd.client import Client as ITDClient \ No newline at end of file +from itd.client import Client as ITDClient +from itd.models.event import StreamConnect, StreamNotification + +__all__ = ['ITDClient', 'StreamConnect', 'StreamNotification'] \ No newline at end of file diff --git a/itd/client.py b/itd/client.py index c63b8b1..7d36255 100644 --- a/itd/client.py +++ b/itd/client.py @@ -1,16 +1,19 @@ # from warnings import deprecated from uuid import UUID from _io import BufferedReader -from typing import cast +from typing import cast, Iterator from datetime import datetime +import json +import time from requests.exceptions import ConnectionError, HTTPError +from sseclient import SSEClient from itd.routes.users import get_user, update_profile, follow, unfollow, get_followers, get_following, update_privacy from itd.routes.etc import get_top_clans, get_who_to_follow, get_platform_status from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment, add_reply_comment, get_replies from itd.routes.hashtags import get_hashtags, get_posts_by_hashtag -from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count +from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count, stream_notifications from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts, restore_post, like_post, unlike_post, get_user_posts from itd.routes.reports import report from itd.routes.search import search @@ -30,6 +33,7 @@ from itd.models.verification import Verification, VerificationStatus from itd.models.report import NewReport from itd.models.file import File from itd.models.pin import Pin +from itd.models.event import StreamConnect, StreamNotification from itd.enums import PostsTab, ReportTargetType, ReportTargetReason from itd.request import set_cookies @@ -57,6 +61,7 @@ def refresh_on_error(func): class Client: def __init__(self, token: str | None = None, cookies: str | None = None): self.cookies = cookies + self._stream_active = False # Флаг для остановки stream_notifications if token: self.token = token.replace('Bearer ', '') @@ -1081,4 +1086,101 @@ class Client: raise PinNotOwned(slug) res.raise_for_status() - return res.json()['pin'] \ No newline at end of file + return res.json()['pin'] + + @refresh_on_error + def stream_notifications(self) -> Iterator[StreamConnect | StreamNotification]: + """Слушать SSE поток уведомлений + + Yields: + StreamConnect | StreamNotification: События подключения или уведомления + + Example: + ```python + from itd import ITDClient + + client = ITDClient(cookies='refresh_token=...') + + # Запуск прослушивания + for event in client.stream_notifications(): + if isinstance(event, StreamConnect): + print(f'Подключено: {event.user_id}') + else: + print(f'Уведомление: {event.type} от {event.actor.username}') + + # Остановка из другого потока или обработчика + # client.stop_stream() + ``` + """ + self._stream_active = True + + while self._stream_active: + try: + response = stream_notifications(self.token) + response.raise_for_status() + + client = SSEClient(response) + + for event in client.events(): + if not self._stream_active: + response.close() + return + + try: + if not event.data or event.data.strip() == '': + continue + + data = json.loads(event.data) + + if 'userId' in data and 'timestamp' in data and 'type' not in data: + yield StreamConnect.model_validate(data) + else: + yield StreamNotification.model_validate(data) + + except json.JSONDecodeError: + print(f'Не удалось распарсить сообщение: {event.data}') + continue + except Exception as e: + print(f'Ошибка обработки события: {e}') + continue + + except Unauthorized: + if self.cookies and self._stream_active: + print('Токен истек, обновляем...') + self.refresh_auth() + continue + else: + raise + except Exception as e: + if not self._stream_active: + return + print(f'Ошибка соединения: {e}, переподключение через 5 секунд...') + time.sleep(5) + continue + + def stop_stream(self): + """Остановить прослушивание SSE потока + + Example: + ```python + import threading + from itd import ITDClient + + client = ITDClient(cookies='refresh_token=...') + + # Запуск в отдельном потоке + def listen(): + for event in client.stream_notifications(): + print(event) + + thread = threading.Thread(target=listen) + thread.start() + + # Остановка через 10 секунд + import time + time.sleep(10) + client.stop_stream() + thread.join() + ``` + """ + self._stream_active = False \ No newline at end of file diff --git a/itd/models/__init__.py b/itd/models/__init__.py index e69de29..8acef9a 100644 --- a/itd/models/__init__.py +++ b/itd/models/__init__.py @@ -0,0 +1,3 @@ +from itd.models.event import StreamConnect, StreamNotification + +__all__ = ['StreamConnect', 'StreamNotification'] diff --git a/itd/models/event.py b/itd/models/event.py new file mode 100644 index 0000000..775bd8c --- /dev/null +++ b/itd/models/event.py @@ -0,0 +1,33 @@ +from uuid import UUID +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + +from itd.enums import NotificationType, NotificationTargetType +from itd.models.user import UserNotification + + +class StreamConnect(BaseModel): + """Событие подключения к SSE потоку""" + user_id: UUID = Field(alias='userId') + timestamp: int + + +class StreamNotification(BaseModel): + """Уведомление из SSE потока""" + id: UUID + type: NotificationType + + target_type: NotificationTargetType | None = Field(None, alias='targetType') + target_id: UUID | None = Field(None, alias='targetId') + + preview: str | None = None + read_at: datetime | None = Field(None, alias='readAt') + created_at: datetime = Field(alias='createdAt') + + user_id: UUID = Field(alias='userId') + actor: UserNotification + + read: bool = False + sound: bool = True diff --git a/itd/request.py b/itd/request.py index e052b7f..c634258 100644 --- a/itd/request.py +++ b/itd/request.py @@ -47,6 +47,17 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str, return res +def fetch_stream(token: str, url: str): + """Fetch для SSE streaming запросов""" + base = f'https://xn--d1ah4a.com/api/{url}' + headers = { + "Accept": "text/event-stream", + "Authorization": 'Bearer ' + token, + "Cache-Control": "no-cache" + } + return s.get(base, headers=headers, stream=True, timeout=None) + + def set_cookies(cookies: str): for cookie in cookies.split('; '): s.cookies.set(cookie.split('=')[0], cookie.split('=')[-1], path='/', domain='xn--d1ah4a.com.com') diff --git a/itd/routes/notifications.py b/itd/routes/notifications.py index de0a741..a13111d 100644 --- a/itd/routes/notifications.py +++ b/itd/routes/notifications.py @@ -1,6 +1,6 @@ from uuid import UUID -from itd.request import fetch +from itd.request import fetch, fetch_stream def get_notifications(token: str, limit: int = 20, offset: int = 0): return fetch(token, 'get', 'notifications', {'limit': limit, 'offset': offset}) @@ -12,4 +12,12 @@ def mark_all_as_read(token: str): return fetch(token, 'post', f'notifications/read-all') def get_unread_notifications_count(token: str): - return fetch(token, 'get', 'notifications/count') \ No newline at end of file + return fetch(token, 'get', 'notifications/count') + +def stream_notifications(token: str): + """Получить SSE поток уведомлений + + Returns: + Response: Streaming response для SSE + """ + return fetch_stream(token, 'notifications/stream') \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 541740e..5b7ef1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pydantic==2.11.9 -requests==2.32.3 \ No newline at end of file +requests==2.32.3 +sseclient-py==1.8.0 \ No newline at end of file