#11 Added SSE listening

This commit is contained in:
Vasily Domakov
2026-02-09 23:21:01 +03:00
committed by firedotguy
parent 13365fc23a
commit f2e18e08c0
6 changed files with 135 additions and 7 deletions

View File

@@ -1 +1,4 @@
from itd.client import Client as ITDClient
from itd.client import Client as ITDClient
from itd.models.event import StreamConnect, StreamNotification
__all__ = ['ITDClient', 'StreamConnect', 'StreamNotification']

View File

@@ -1,16 +1,19 @@
# from warnings import deprecated
from uuid import UUID
from _io import BufferedReader
from typing import cast
from typing import cast, Iterator
from datetime import datetime
import json
import time
from requests.exceptions import ConnectionError, HTTPError
from sseclient import SSEClient
from itd.routes.users import get_user, update_profile, follow, unfollow, get_followers, get_following, update_privacy
from itd.routes.etc import get_top_clans, get_who_to_follow, get_platform_status
from itd.routes.comments import get_comments, add_comment, delete_comment, like_comment, unlike_comment, add_reply_comment, get_replies
from itd.routes.hashtags import get_hashtags, get_posts_by_hashtag
from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count
from itd.routes.notifications import get_notifications, mark_as_read, mark_all_as_read, get_unread_notifications_count, stream_notifications
from itd.routes.posts import create_post, get_posts, get_post, edit_post, delete_post, pin_post, repost, view_post, get_liked_posts, restore_post, like_post, unlike_post, get_user_posts
from itd.routes.reports import report
from itd.routes.search import search
@@ -30,6 +33,7 @@ from itd.models.verification import Verification, VerificationStatus
from itd.models.report import NewReport
from itd.models.file import File
from itd.models.pin import Pin
from itd.models.event import StreamConnect, StreamNotification
from itd.enums import PostsTab, ReportTargetType, ReportTargetReason
from itd.request import set_cookies
@@ -57,6 +61,7 @@ def refresh_on_error(func):
class Client:
def __init__(self, token: str | None = None, cookies: str | None = None):
self.cookies = cookies
self._stream_active = False # Флаг для остановки stream_notifications
if token:
self.token = token.replace('Bearer ', '')
@@ -1081,4 +1086,101 @@ class Client:
raise PinNotOwned(slug)
res.raise_for_status()
return res.json()['pin']
return res.json()['pin']
@refresh_on_error
def stream_notifications(self) -> Iterator[StreamConnect | StreamNotification]:
"""Слушать SSE поток уведомлений
Yields:
StreamConnect | StreamNotification: События подключения или уведомления
Example:
```python
from itd import ITDClient
client = ITDClient(cookies='refresh_token=...')
# Запуск прослушивания
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'Подключено: {event.user_id}')
else:
print(f'Уведомление: {event.type} от {event.actor.username}')
# Остановка из другого потока или обработчика
# client.stop_stream()
```
"""
self._stream_active = True
while self._stream_active:
try:
response = stream_notifications(self.token)
response.raise_for_status()
client = SSEClient(response)
for event in client.events():
if not self._stream_active:
response.close()
return
try:
if not event.data or event.data.strip() == '':
continue
data = json.loads(event.data)
if 'userId' in data and 'timestamp' in data and 'type' not in data:
yield StreamConnect.model_validate(data)
else:
yield StreamNotification.model_validate(data)
except json.JSONDecodeError:
print(f'Не удалось распарсить сообщение: {event.data}')
continue
except Exception as e:
print(f'Ошибка обработки события: {e}')
continue
except Unauthorized:
if self.cookies and self._stream_active:
print('Токен истек, обновляем...')
self.refresh_auth()
continue
else:
raise
except Exception as e:
if not self._stream_active:
return
print(f'Ошибка соединения: {e}, переподключение через 5 секунд...')
time.sleep(5)
continue
def stop_stream(self):
"""Остановить прослушивание SSE потока
Example:
```python
import threading
from itd import ITDClient
client = ITDClient(cookies='refresh_token=...')
# Запуск в отдельном потоке
def listen():
for event in client.stream_notifications():
print(event)
thread = threading.Thread(target=listen)
thread.start()
# Остановка через 10 секунд
import time
time.sleep(10)
client.stop_stream()
thread.join()
```
"""
self._stream_active = False

View File

@@ -0,0 +1,3 @@
from itd.models.event import StreamConnect, StreamNotification
__all__ = ['StreamConnect', 'StreamNotification']

View File

@@ -47,6 +47,17 @@ def fetch(token: str, method: str, url: str, params: dict = {}, files: dict[str,
return res
def fetch_stream(token: str, url: str):
"""Fetch для SSE streaming запросов"""
base = f'https://xn--d1ah4a.com/api/{url}'
headers = {
"Accept": "text/event-stream",
"Authorization": 'Bearer ' + token,
"Cache-Control": "no-cache"
}
return s.get(base, headers=headers, stream=True, timeout=None)
def set_cookies(cookies: str):
for cookie in cookies.split('; '):
s.cookies.set(cookie.split('=')[0], cookie.split('=')[-1], path='/', domain='xn--d1ah4a.com.com')

View File

@@ -1,6 +1,6 @@
from uuid import UUID
from itd.request import fetch
from itd.request import fetch, fetch_stream
def get_notifications(token: str, limit: int = 20, offset: int = 0):
return fetch(token, 'get', 'notifications', {'limit': limit, 'offset': offset})
@@ -12,4 +12,12 @@ def mark_all_as_read(token: str):
return fetch(token, 'post', f'notifications/read-all')
def get_unread_notifications_count(token: str):
return fetch(token, 'get', 'notifications/count')
return fetch(token, 'get', 'notifications/count')
def stream_notifications(token: str):
"""Получить SSE поток уведомлений
Returns:
Response: Streaming response для SSE
"""
return fetch_stream(token, 'notifications/stream')

View File

@@ -1,2 +1,3 @@
pydantic==2.11.9
requests==2.32.3
requests==2.32.3
sseclient-py==1.8.0