"""Core SMART data collection logic with bilingual support.""" import locale import subprocess from dataclasses import dataclass, field from typing import Dict, List, Optional @dataclass class DiskSmartData: """S.M.A.R.T. data for a single disk.""" disk: str model: str = "Unknown" size: str = "Unknown" status: str = "UNKNOWN" temp: str = "N/A" power_hours: str = "N/A" power_cycles: str = "N/A" reallocated: int = 0 pending: int = 0 uncorrectable: int = 0 attrs: Dict[str, dict] = field(default_factory=dict) health: int = 100 warnings: List[str] = field(default_factory=list) error: Optional[str] = None smart_supported: bool = True # Additional SMART attributes for health calculation ssd_life_left: int = 100 remaining_lifetime: int = 100 # ID 169 - more reliable for some SSDs media_wearout_indicator: int = 100 # ID 233 - Intel/Crucial crc_errors: int = 0 program_fail_count: int = 0 erase_fail_count: int = 0 command_timeout: int = 0 spin_retry_count: int = 0 reallocated_event_count: int = 0 reported_uncorrect: int = 0 host_writes_gb: float = 0 # Calculated from attribute 241/233 def get_locale() -> str: """Detect system locale and return 'ru' or 'en'.""" try: loc = locale.getdefaultlocale()[0] or "" return "ru" if loc.startswith("ru") else "en" except Exception: return "en" MESSAGES = { "en": { "smart_not_installed": "❌ smartmontools is not installed!", "install_command": "Install: sudo pacman -S smartmontools", "no_disks_found": "❌ No disks found", "disk_monitor": "DISK HEALTH MONITORING (S.M.A.R.T.)", "disk": "Disk", "model": "Model", "size": "Size", "health": "Health", "status": "Status", "temperature": "Temperature", "power_hours": "Power-On Hours", "power_cycles": "Cycles", "critical_attrs": "Critical Attributes", "reallocated": "Reallocated Sectors", "pending": "Pending Sectors", "uncorrectable": "Uncorrectable Errors", "smart_status_bad": "🔴 S.M.A.R.T. status: BAD", "critical_reallocated_500": "🔴 CRITICAL: {0} reallocated sectors! Disk may fail!", "warning_reallocated_100": "🟠 WARNING: {0} reallocated sectors. Start backup!", "warning_reallocated_10": "🟡 WARNING: {0} reallocated sectors", "critical_pending": "🔴 CRITICAL: {0} pending sectors!", "critical_uncorrectable": "🔴 CRITICAL: {0} uncorrectable errors!", "smart_not_supported": "S.M.A.R.T.: Not supported", "running_as_root": "✓ Running as root", "run_with_sudo": "⚠️ Run with sudo for full access", "collecting_data": "Collecting data...", "disks_found": "Found {0} disk(s)", "error": "Error", "refresh": "🔄 Refresh", "disk_health_report": "📊 S.M.A.R.T. Disk Health Report", # Additional health warnings "warning_ssd_life": "🟠 SSD life remaining: {0}%", "warning_crc_errors": "🟡 CRC errors: {0} (check SATA cable)", "warning_program_fail": "🔴 Program failures: {0}", "warning_erase_fail": "🔴 Erase failures: {0}", "warning_command_timeout": "🟡 Command timeouts: {0}", "warning_spin_retry": "🟡 Spin retry count: {0}", "warning_reallocated_event": "🟡 Reallocation events: {0}", "warning_reported_uncorrect": "🔴 Reported uncorrect errors: {0}", }, "ru": { "smart_not_installed": "❌ smartmontools не установлен!", "install_command": "Установите: sudo pacman -S smartmontools", "no_disks_found": "❌ Диски не найдены", "disk_monitor": "МОНИТОРИНГ ЗДОРОВЬЯ ДИСКОВ (S.M.A.R.T.)", "disk": "Диск", "model": "Модель", "size": "Размер", "health": "Здоровье", "status": "Статус", "temperature": "Температура", "power_hours": "Часов работы", "power_cycles": "Циклов", "critical_attrs": "Критические атрибуты", "reallocated": "Переназначенные сектора", "pending": "Ожидающие сектора", "uncorrectable": "Неисправимые ошибки", "smart_status_bad": "🔴 S.M.A.R.T. статус: BAD", "critical_reallocated_500": "🔴 КРИТИЧНО: {0} переназначенных секторов! Диск может отказать!", "warning_reallocated_100": "🟠 ВНИМАНИЕ: {0} переназначенных секторов. Начните резервное копирование!", "warning_reallocated_10": "🟡 ВНИМАНИЕ: {0} переназначенных секторов", "critical_pending": "🔴 КРИТИЧНО: {0} ожидающих секторов!", "critical_uncorrectable": "🔴 КРИТИЧНО: {0} неисправимых ошибок!", "smart_not_supported": "S.M.A.R.T.: Не поддерживается", "running_as_root": "✓ Запуск от root", "run_with_sudo": "⚠️ Запустите с sudo для полного доступа", "collecting_data": "Сбор данных...", "disks_found": "Найдено дисков: {0}", "error": "Ошибка", "refresh": "🔄 Обновить", "disk_health_report": "📊 Отчет о здоровье дисков (S.M.A.R.T.)", # Additional health warnings "warning_ssd_life": "🟠 Остаток ресурса SSD: {0}%", "warning_crc_errors": "🟡 Ошибки CRC: {0} (проверьте SATA кабель)", "warning_program_fail": "🔴 Ошибки программирования: {0}", "warning_erase_fail": "🔴 Ошибки стирания: {0}", "warning_command_timeout": "🟡 Таймауты команд: {0}", "warning_spin_retry": "🟡 Повторы раскрутки: {0}", "warning_reallocated_event": "🟡 События переназначения: {0}", "warning_reported_uncorrect": "🔴 Сообщённые ошибки: {0}", }, } def get_message(key: str, lang: str = None, *args) -> str: """Get localized message.""" if lang is None: lang = get_locale() msg = MESSAGES.get(lang, MESSAGES["en"]).get(key, MESSAGES["en"].get(key, key)) if args: return msg.format(*args) return msg def check_smartctl() -> bool: """Check if smartctl is installed.""" try: subprocess.run(["which", "smartctl"], capture_output=True, check=True) return True except subprocess.CalledProcessError: return False def get_disk_list() -> List[str]: """Get list of all physical disks (/dev/sda, /dev/nvme0n1, etc.).""" try: result = subprocess.run( ["lsblk", "-d", "-n", "-o", "NAME"], capture_output=True, text=True, check=True, ) return [f"/dev/{disk}" for disk in result.stdout.strip().split("\n") if disk] except Exception: return [] def get_disk_info(disk: str) -> tuple: """Get disk model and size.""" try: model = subprocess.run( ["lsblk", "-d", "-n", "-o", "MODEL", disk], capture_output=True, text=True, ).stdout.strip() or "Unknown" size = subprocess.run( ["lsblk", "-d", "-n", "-o", "SIZE", disk], capture_output=True, text=True, ).stdout.strip() or "Unknown" return model, size except Exception: return "Unknown", "Unknown" def parse_smart_data(disk: str) -> Optional[DiskSmartData]: """Parse S.M.A.R.T. data for a disk (supports both ATA and NVMe).""" data = DiskSmartData(disk=disk) try: result = subprocess.run( ["sudo", "smartctl", "-a", disk], capture_output=True, text=True, ) output = result.stdout except Exception as e: data.error = str(e) data.smart_supported = False return data if not output.strip(): data.smart_supported = False return data # Parse status if "PASSED" in output: data.status = "GOOD" elif "FAILED" in output: data.status = "BAD" else: data.status = "UNKNOWN" # Check if NVMe format is_nvme = "NVMe" in output or "SMART overall-health" not in output # Parse attributes (ATA format) for line in output.split("\n"): parts = line.split() if len(parts) < 10: # Try NVMe format parsing if is_nvme: # NVMe: "Temperature: 35 Celsius" if "Temperature:" in line: try: temp_val = line.split(":")[1].strip().split()[0] data.temp = f"{temp_val}°C" except (IndexError, ValueError): pass # NVMe: "Power On Hours: 1234" if "Power On Hours:" in line: try: hours = int(line.split(":")[1].strip()) data.power_hours = f"{hours}h ({hours // 24}d)" except (IndexError, ValueError): pass # NVMe: "Power Cycle Count: 5678" if "Power Cycle Count:" in line: try: data.power_cycles = line.split(":")[1].strip() except (IndexError, ValueError): pass # NVMe: "Media and Data Integrity Errors: 0" if "Media and Data Integrity Errors:" in line: try: data.uncorrectable = int(line.split(":")[1].strip()) except (IndexError, ValueError): pass continue # ATA format parsing # Temperature (ID 194) if parts[0] == "194" or "Temperature_Celsius" in line: try: data.temp = f"{parts[9]}°C" except (IndexError, ValueError): pass # Power-on hours (ID 9) if parts[0] == "9" or "Power_On_Hours" in line: try: hours = int(parts[9]) data.power_hours = f"{hours}h ({hours // 24}d)" except (IndexError, ValueError): pass # Power cycle count (ID 12) if parts[0] == "12" or "Power_Cycle_Count" in line: try: data.power_cycles = parts[9] except (IndexError, ValueError): pass # Reallocated sectors (ID 5) if parts[0] == "5" or "Reallocated_Sector_Ct" in line: try: data.reallocated = int(parts[9]) except (IndexError, ValueError): pass # Current pending sectors (ID 197) if parts[0] == "197" or "Current_Pending_Sect" in line: try: data.pending = int(parts[9]) except (IndexError, ValueError): pass # Offline uncorrectable (ID 198) if parts[0] == "198" or "Offline_Uncorrectable" in line: try: data.uncorrectable = int(parts[9]) except (IndexError, ValueError): pass # SSD Life Left (ID 231) - crucial for SSD health if parts[0] == "231" or "SSD_Life_Left" in line: try: data.ssd_life_left = int(parts[9]) except (IndexError, ValueError): pass # Remaining Lifetime Percent (ID 169) - more reliable for some SSDs # NOTE: Use normalized VALUE (parts[3]), not raw! if parts[0] == "169" and "Remaining_Lifetime" in line: try: data.remaining_lifetime = int(parts[3]) # Normalized value 0-100 except (IndexError, ValueError): pass # Media Wearout Indicator (ID 233) - Intel/Crucial/WD # NOTE: Use normalized VALUE (parts[3]), not raw! if parts[0] == "233" and ("Media_Wearout" in line or "Wear_Leveling" in line): try: data.media_wearout_indicator = int(parts[3]) # Normalized value 0-100 except (IndexError, ValueError): pass # Host Writes (ID 241) - for calculating actual write volume if parts[0] == "241" or "Host_Writes" in line or "Lifetime_Writes" in line: try: raw_value = int(parts[9]) # Convert from 32MiB blocks to GB data.host_writes_gb = round(raw_value * 32 / 1024, 1) except (IndexError, ValueError): pass # CRC Error Count (ID 199) - indicates cable/connection issues if parts[0] == "199" or "CRC_Error_Count" in line or "UDMA_CRC_Error" in line: try: data.crc_errors = int(parts[9]) except (IndexError, ValueError): pass # Program Fail Count (ID 181) if parts[0] == "181" or "Program_Fail_Count" in line: try: data.program_fail_count = int(parts[9]) except (IndexError, ValueError): pass # Erase Fail Count (ID 172 or 182) if parts[0] in ["172", "182"] or "Erase_Fail_Count" in line: try: data.erase_fail_count = int(parts[9]) except (IndexError, ValueError): pass # Command Timeout (ID 188) if parts[0] == "188" or "Command_Timeout" in line: try: data.command_timeout = int(parts[9]) except (IndexError, ValueError): pass # Spin Retry Count (ID 10) if parts[0] == "10" or "Spin_Retry_Count" in line: try: data.spin_retry_count = int(parts[9]) except (IndexError, ValueError): pass # Reallocated Event Count (ID 196) if parts[0] == "196" or "Reallocated_Event_Count" in line: try: data.reallocated_event_count = int(parts[9]) except (IndexError, ValueError): pass # Reported Uncorrect Errors (ID 187) if parts[0] == "187" or "Reported_Uncorrect" in line: try: data.reported_uncorrect = int(parts[9]) except (IndexError, ValueError): pass # Store all attributes if parts and parts[0].isdigit() and len(parts) >= 10: try: attr_id = parts[0] attr_name = parts[1] if len(parts) > 1 else "Unknown" data.attrs[attr_id] = { "name": attr_name, "value": parts[3], "worst": parts[4], "threshold": parts[5], "raw": parts[9], } except (IndexError, ValueError): pass return data def calculate_health(data: DiskSmartData, lang: str = None) -> tuple: """Calculate disk health percentage and warnings based on multiple SMART attributes.""" if lang is None: lang = get_locale() if data.error or not data.smart_supported: return 50, [] if data.status == "BAD": return 5, [get_message("smart_status_bad", lang)] health = 100 warnings = [] # === SSD WEAR INDICATORS - use the most reliable one === # Priority: remaining_lifetime (169) > media_wearout (233) > ssd_life_left (231) # Some manufacturers (ADATA, Silicon Motion) have unreliable ID 231 ssd_wear_values = [] # ID 169 - Remaining Lifetime (more reliable for ADATA, Silicon Motion) if data.remaining_lifetime < 100 and data.remaining_lifetime > 0: ssd_wear_values.append(("Remaining Lifetime (169)", data.remaining_lifetime)) # ID 233 - Media Wearout Indicator (Intel, Crucial, WD) if data.media_wearout_indicator < 100 and data.media_wearout_indicator > 0: ssd_wear_values.append(("Media Wearout (233)", data.media_wearout_indicator)) # ID 231 - SSD Life Left (Kingston, Samsung, some others) # Only use if no other indicators or if consistent with them if data.ssd_life_left < 100 and data.ssd_life_left > 0: ssd_wear_values.append(("SSD Life Left (231)", data.ssd_life_left)) # Choose the most reliable indicator if ssd_wear_values: # Prefer ID 169 if available (most reliable) preferred = next((v for n, v in ssd_wear_values if "169" in n), None) if preferred is not None: health = min(health, preferred) if preferred < 50: warnings.append(get_message("warning_ssd_life", lang, preferred)) else: # Use minimum of available values min_wear = min(v for _, v in ssd_wear_values) health = min(health, min_wear) if min_wear < 50: warnings.append(get_message("warning_ssd_life", lang, min_wear)) # === REALLOCATED SECTORS (ID 5) === if data.reallocated > 0: if data.reallocated > 500: penalty = min(80, data.reallocated * 0.5) health -= penalty warnings.append(get_message("critical_reallocated_500", lang, data.reallocated)) elif data.reallocated > 100: penalty = min(70, data.reallocated * 0.3) health -= penalty warnings.append(get_message("warning_reallocated_100", lang, data.reallocated)) elif data.reallocated > 10: penalty = data.reallocated * 0.2 health -= penalty warnings.append(get_message("warning_reallocated_10", lang, data.reallocated)) else: health -= data.reallocated * 0.1 # === REALLOCATION EVENTS (ID 196) === if data.reallocated_event_count > 0: if data.reallocated_event_count > 100: health -= min(40, data.reallocated_event_count * 0.4) warnings.append(get_message("warning_reallocated_event", lang, data.reallocated_event_count)) elif data.reallocated_event_count > 0: health -= min(20, data.reallocated_event_count * 0.2) # === PENDING SECTORS (ID 197) === if data.pending > 0: health -= min(70, data.pending * 2) warnings.append(get_message("critical_pending", lang, data.pending)) # === UNCORRECTABLE ERRORS (ID 198) === if data.uncorrectable > 0: health -= min(80, data.uncorrectable * 5) warnings.append(get_message("critical_uncorrectable", lang, data.uncorrectable)) # === REPORTED UNCORRECT ERRORS (ID 187) === if data.reported_uncorrect > 0: health -= min(60, data.reported_uncorrect * 5) warnings.append(get_message("warning_reported_uncorrect", lang, data.reported_uncorrect)) # === PROGRAM FAIL COUNT (ID 181) === if data.program_fail_count > 0: health -= min(50, data.program_fail_count * 10) warnings.append(get_message("warning_program_fail", lang, data.program_fail_count)) # === ERASE FAIL COUNT (ID 172/182) === if data.erase_fail_count > 0: health -= min(50, data.erase_fail_count * 10) warnings.append(get_message("warning_erase_fail", lang, data.erase_fail_count)) # === CRC ERRORS (ID 199) - Usually cable issue === if data.crc_errors > 0: if data.crc_errors > 100: health -= min(30, data.crc_errors * 0.3) elif data.crc_errors > 0: health -= min(15, data.crc_errors * 0.15) warnings.append(get_message("warning_crc_errors", lang, data.crc_errors)) # === COMMAND TIMEOUT (ID 188) === if data.command_timeout > 0: health -= min(25, data.command_timeout * 2) warnings.append(get_message("warning_command_timeout", lang, data.command_timeout)) # === SPIN RETRY COUNT (ID 10) - For HDDs === if data.spin_retry_count > 0: health -= min(30, data.spin_retry_count * 5) warnings.append(get_message("warning_spin_retry", lang, data.spin_retry_count)) data.health = max(5, int(health)) data.warnings = warnings return data.health, warnings def collect_all_disks_data(lang: str = None) -> List[DiskSmartData]: """Collect S.M.A.R.T. data for all disks.""" if lang is None: lang = get_locale() disks = get_disk_list() results = [] for disk in disks: model, size = get_disk_info(disk) smart_data = parse_smart_data(disk) if smart_data: smart_data.model = model smart_data.size = size calculate_health(smart_data, lang) results.append(smart_data) return results def is_root() -> bool: """Check if running as root.""" try: result = subprocess.run(["id", "-u"], capture_output=True, text=True) return result.stdout.strip() == "0" except Exception: return False