Files
itd-sdk/examples/stream/stop_stream.py
2026-02-10 15:29:04 +03:00

48 lines
1.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Пример остановки SSE потока из кода
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import threading
import time
from itd import ITDClient, StreamConnect, StreamNotification
def main():
cookies = 'YOUR_COOKIES_HERE'
if cookies == 'YOUR_COOKIES_HERE':
print('! Укажите cookies в переменной cookies')
return
client = ITDClient(cookies=cookies)
# Функция для прослушивания в отдельном потоке
def listen():
print('! Начинаем прослушивание...')
try:
for event in client.stream_notifications():
if isinstance(event, StreamConnect):
print(f'-- Подключено! User ID: {event.user_id}')
else:
print(f'🔔 {event.type.value}: {event.actor.username}')
except Exception as e:
print(f'! Ошибка: {e}')
# В отдельном потоке
thread = threading.Thread(target=listen, daemon=True)
thread.start()
print('Прослушивание запущено. Нажмите Enter для остановки...')
input()
print('!! Останавливаем прослушивание...')
client.stop_stream()
thread.join(timeout=5)
print('! Остановлено')
if __name__ == '__main__':
main()