diff --git a/dev b/dev index b84382d..ad6f67f 160000 --- a/dev +++ b/dev @@ -1 +1 @@ -Subproject commit b84382db5b9299231daee71579262d2792d7c39c +Subproject commit ad6f67fbedb7d04036c5a19ce8cbad949bab692e diff --git a/scidk/core/auth.py b/scidk/core/auth.py index 5ba0afd..83f642f 100644 --- a/scidk/core/auth.py +++ b/scidk/core/auth.py @@ -66,7 +66,7 @@ def init_tables(self): """ ) - # Active sessions table (updated with user_id) + # Active sessions table (updated with user_id and locked state) self.db.execute( """ CREATE TABLE IF NOT EXISTS auth_sessions ( @@ -76,6 +76,8 @@ def init_tables(self): created_at REAL NOT NULL, expires_at REAL NOT NULL, last_activity REAL NOT NULL, + locked INTEGER DEFAULT 0, + locked_at REAL, FOREIGN KEY (user_id) REFERENCES auth_users(id) ON DELETE CASCADE ) """ @@ -112,6 +114,9 @@ def init_tables(self): # Auto-migrate from single-user to multi-user on first run self._migrate_to_multi_user() + # Migrate to add lock columns to auth_sessions + self._migrate_add_session_lock_columns() + def _migrate_to_multi_user(self): """Migrate from single-user auth_config to multi-user auth_users table. @@ -154,6 +159,25 @@ def _migrate_to_multi_user(self): except Exception as e: print(f"Migration warning: {e}") + def _migrate_add_session_lock_columns(self): + """Add locked and locked_at columns to auth_sessions table if they don't exist.""" + try: + # Check if locked column exists + cur = self.db.execute("PRAGMA table_info(auth_sessions)") + columns = [row[1] for row in cur.fetchall()] + + if 'locked' not in columns: + self.db.execute("ALTER TABLE auth_sessions ADD COLUMN locked INTEGER DEFAULT 0") + print("Added locked column to auth_sessions table") + + if 'locked_at' not in columns: + self.db.execute("ALTER TABLE auth_sessions ADD COLUMN locked_at REAL") + print("Added locked_at column to auth_sessions table") + + self.db.commit() + except Exception as e: + print(f"Migration warning (session lock columns): {e}") + def is_enabled(self) -> bool: """Check if authentication is currently enabled. @@ -906,6 +930,133 @@ def get_audit_log(self, since_timestamp: Optional[float] = None, print(f"AuthManager.get_audit_log error: {e}") return [] + # ========== Session Locking ========== + + def lock_session(self, token: str) -> bool: + """Lock a session (auto-lock feature). + + Args: + token: Session token to lock + + Returns: + bool: True if successful, False on error + """ + try: + now = time.time() + self.db.execute( + "UPDATE auth_sessions SET locked = 1, locked_at = ? WHERE token = ?", + (now, token) + ) + self.db.commit() + return True + except Exception as e: + print(f"AuthManager.lock_session error: {e}") + return False + + def unlock_session(self, token: str, password: str) -> bool: + """Unlock a locked session with password verification. + + Args: + token: Session token to unlock + password: Password to verify + + Returns: + bool: True if unlock successful, False if password invalid or error + """ + try: + # Get session info + cur = self.db.execute( + """ + SELECT s.username, s.user_id, s.locked + FROM auth_sessions s + WHERE s.token = ? + """, + (token,) + ) + row = cur.fetchone() + + if not row or not row[2]: # Not found or not locked + return False + + username, user_id = row[0], row[1] + + # Verify password (try multi-user first) + if user_id is not None: + user = self.verify_user_credentials(username, password) + if not user: + return False + else: + # Legacy single-user verification + if not self.verify_credentials(username, password): + return False + + # Unlock session + self.db.execute( + "UPDATE auth_sessions SET locked = 0, locked_at = NULL WHERE token = ?", + (token,) + ) + self.db.commit() + + # Log successful unlock + ip_address = None # Will be set by API route + self.log_audit(username, 'session_unlocked', 'Session unlocked', ip_address) + + return True + except Exception as e: + print(f"AuthManager.unlock_session error: {e}") + return False + + def is_session_locked(self, token: str) -> bool: + """Check if a session is currently locked. + + Args: + token: Session token to check + + Returns: + bool: True if session is locked, False otherwise + """ + try: + cur = self.db.execute( + "SELECT locked FROM auth_sessions WHERE token = ?", + (token,) + ) + row = cur.fetchone() + return bool(row and row[0]) if row else False + except Exception: + return False + + def get_session_lock_info(self, token: str) -> Optional[Dict[str, Any]]: + """Get lock information for a session. + + Args: + token: Session token + + Returns: + dict or None: Lock info with keys: username, locked, locked_at + """ + try: + cur = self.db.execute( + """ + SELECT username, locked, locked_at + FROM auth_sessions + WHERE token = ? + """, + (token,) + ) + row = cur.fetchone() + + if not row: + return None + + return { + 'username': row[0], + 'locked': bool(row[1]), + 'locked_at': row[2], + } + except Exception as e: + print(f"AuthManager.get_session_lock_info error: {e}") + return None + def close(self): """Close database connection.""" try: diff --git a/scidk/ui/templates/base.html b/scidk/ui/templates/base.html index 7e832d7..7df4b81 100644 --- a/scidk/ui/templates/base.html +++ b/scidk/ui/templates/base.html @@ -122,6 +122,220 @@

Session Locked

+

Locked at ${timeStr}

+

User: ${username || 'Unknown'}

+
+ + + +
+ + `; + + lockScreen.appendChild(lockDialog); + document.body.appendChild(lockScreen); + + // Handle unlock form submission + const unlockForm = document.getElementById('unlock-form'); + const unlockPassword = document.getElementById('unlock-password'); + const unlockError = document.getElementById('unlock-error'); + + unlockForm.addEventListener('submit', async (e) => { + e.preventDefault(); + + const password = unlockPassword.value; + + if (!password) { + unlockError.textContent = 'Password is required'; + unlockError.style.display = 'block'; + return; + } + + try { + const response = await fetch('/api/auth/unlock', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ password }), + }); + + if (response.ok) { + // Unlock successful - remove lock screen and restart activity monitor + lockScreen.remove(); + + // Restart activity monitor if it was active + if (activityMonitor) { + const configResponse = await fetch('/api/settings/security/auto-lock'); + const configData = await configResponse.json(); + if (configData.status === 'success' && configData.config.enabled) { + activityMonitor.start(); + } + } + + // Optionally show success toast + window.toast('Session unlocked', 'success', 2000); + } else { + const data = await response.json(); + unlockError.textContent = data.error || 'Invalid password'; + unlockError.style.display = 'block'; + unlockPassword.value = ''; + unlockPassword.focus(); + } + } catch (error) { + unlockError.textContent = 'Unlock failed. Please try again.'; + unlockError.style.display = 'block'; + console.error('Unlock error:', error); + } + }); + + // Focus password input + unlockPassword.focus(); + } + + // Initialize on page load + initAutoLock(); + + // Export for external use + window.scidkActivityMonitor = activityMonitor; + })(); diff --git a/scidk/ui/templates/settings/_general.html b/scidk/ui/templates/settings/_general.html index aa83e69..e15cac0 100644 --- a/scidk/ui/templates/settings/_general.html +++ b/scidk/ui/templates/settings/_general.html @@ -62,6 +62,25 @@

Security

+
+ +
+ + +
+ +

@@ -447,8 +466,9 @@

Configuration Backups

const usernameInput = document.getElementById('security-username'); const passwordInput = document.getElementById('security-password'); const passwordShowCheck = document.getElementById('security-password-show'); - const lockEnabledCheck = document.getElementById('security-lock-enabled'); - const lockTimeoutContainer = document.getElementById('security-lock-timeout-container'); + const autoLockEnabledCheck = document.getElementById('auto-lock-enabled'); + const autoLockTimeoutContainer = document.getElementById('auto-lock-timeout-container'); + const autoLockTimeoutInput = document.getElementById('auto-lock-timeout'); const saveSecurityBtn = document.getElementById('btn-save-security'); const statusP = document.getElementById('security-status'); @@ -475,6 +495,23 @@

Configuration Backups

} } + // Load auto-lock config + async function loadAutoLockConfig() { + try { + const response = await fetch('/api/settings/security/auto-lock'); + if (response.ok) { + const data = await response.json(); + if (data.status === 'success' && data.config) { + autoLockEnabledCheck.checked = data.config.enabled || false; + autoLockTimeoutInput.value = data.config.timeout_minutes || 5; + autoLockTimeoutContainer.style.display = data.config.enabled ? 'block' : 'none'; + } + } + } catch (error) { + console.error('Failed to load auto-lock config:', error); + } + } + // Toggle auth settings visibility authEnabledCheck.addEventListener('change', () => { authSettingsDiv.style.display = authEnabledCheck.checked ? 'block' : 'none'; @@ -487,10 +524,10 @@

Configuration Backups

}); } - // Toggle lock timeout input (not yet implemented - future task) - if (lockEnabledCheck && lockTimeoutContainer) { - lockEnabledCheck.addEventListener('change', () => { - lockTimeoutContainer.style.display = lockEnabledCheck.checked ? 'block' : 'none'; + // Toggle auto-lock timeout input + if (autoLockEnabledCheck && autoLockTimeoutContainer) { + autoLockEnabledCheck.addEventListener('change', () => { + autoLockTimeoutContainer.style.display = autoLockEnabledCheck.checked ? 'block' : 'none'; }); } @@ -501,6 +538,10 @@

Configuration Backups

const username = usernameInput.value.trim(); const password = passwordInput.value.trim(); + // Auto-lock settings + const autoLockEnabled = autoLockEnabledCheck.checked; + const autoLockTimeout = parseInt(autoLockTimeoutInput.value, 10); + // Validation if (enabled && !username) { toast('Username is required when enabling authentication', 'error'); @@ -513,48 +554,80 @@

Configuration Backups

return; } + // Validate auto-lock timeout + if (autoLockEnabled && (isNaN(autoLockTimeout) || autoLockTimeout < 1 || autoLockTimeout > 120)) { + toast('Auto-lock timeout must be between 1 and 120 minutes', 'error'); + return; + } + // Disable button during save saveSecurityBtn.disabled = true; saveSecurityBtn.textContent = 'Saving...'; statusP.textContent = ''; try { - const payload = { enabled, username }; + // Save auth settings + const authPayload = { enabled, username }; if (password) { - payload.password = password; + authPayload.password = password; } - const response = await fetch('/api/settings/security/auth', { + const authResponse = await fetch('/api/settings/security/auth', { method: 'POST', headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify(payload), + body: JSON.stringify(authPayload), }); - const data = await response.json(); + const authData = await authResponse.json(); + + if (!authResponse.ok || authData.status !== 'success') { + toast(authData.error || 'Failed to save authentication settings', 'error'); + statusP.textContent = '❌ ' + (authData.error || 'Failed to save'); + statusP.style.color = '#b00'; + return; + } - if (response.ok && data.status === 'success') { - toast('Security settings saved successfully', 'success'); - statusP.textContent = enabled ? - '✅ Authentication is enabled. Users will need to log in to access SciDK.' : - 'Authentication is disabled. SciDK is accessible without login.'; - statusP.style.color = '#0a6'; + // Save auto-lock settings + const autoLockPayload = { + enabled: autoLockEnabled, + timeout_minutes: autoLockTimeout + }; - // Clear password field after successful save - passwordInput.value = ''; - passwordInput.placeholder = '••••••••'; + const autoLockResponse = await fetch('/api/settings/security/auto-lock', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(autoLockPayload), + }); - // If auth was enabled, redirect to login after a short delay - if (enabled) { - setTimeout(() => { - window.location.href = '/login'; - }, 2000); - } - } else { - toast(data.error || 'Failed to save security settings', 'error'); - statusP.textContent = '❌ ' + (data.error || 'Failed to save'); - statusP.style.color = '#b00'; + const autoLockData = await autoLockResponse.json(); + + if (!autoLockResponse.ok || autoLockData.status !== 'success') { + toast('Auth saved, but auto-lock settings failed to save', 'error'); + statusP.textContent = '⚠️ Partial save - auto-lock settings not saved'; + statusP.style.color = '#f80'; + return; + } + + // Both saved successfully + toast('Security settings saved successfully', 'success'); + statusP.textContent = enabled ? + '✅ Authentication is enabled. Users will need to log in to access SciDK.' : + 'Authentication is disabled. SciDK is accessible without login.'; + statusP.style.color = '#0a6'; + + // Clear password field after successful save + passwordInput.value = ''; + passwordInput.placeholder = '••••••••'; + + // If auth was enabled, redirect to login after a short delay + if (enabled) { + setTimeout(() => { + window.location.href = '/login'; + }, 2000); } } catch (error) { toast('Connection error. Please try again.', 'error'); @@ -569,6 +642,7 @@

Configuration Backups

// Load initial config loadAuthConfig(); + loadAutoLockConfig(); // Check if current user is admin and show user management/audit sections checkAdminAccess(); diff --git a/scidk/web/auth_middleware.py b/scidk/web/auth_middleware.py index 4340fc6..c19bc30 100644 --- a/scidk/web/auth_middleware.py +++ b/scidk/web/auth_middleware.py @@ -13,6 +13,8 @@ '/login', '/api/auth/login', '/api/auth/status', + '/api/auth/lock', # Lock session (requires valid session, but allows locking) + '/api/auth/unlock', # Unlock session (requires password, checked by endpoint) '/api/settings/security/auth', # Allow disabling/checking auth config '/api/health', # Health check endpoint (legitimately needs to be public) '/static', # Prefix for static files @@ -99,6 +101,24 @@ def check_auth(): user = {'username': username, 'role': 'admin'} if user: + # Check if session is locked (only for non-lock-related routes) + if not is_public_route(request.path): + is_locked = auth.is_session_locked(token) + if is_locked: + # Session is locked - return 423 (Locked) status + if request.path.startswith('/api/'): + from flask import jsonify + lock_info = auth.get_session_lock_info(token) + return jsonify({ + 'error': 'Session locked', + 'locked': True, + 'locked_at': lock_info['locked_at'] if lock_info else None, + 'username': lock_info['username'] if lock_info else None, + }), 423 + else: + # UI requests - this will be handled by JavaScript lock screen + pass + # Authentication successful - store user info in Flask g for access in routes from flask import g g.scidk_user = user['username'] diff --git a/scidk/web/routes/api_auth.py b/scidk/web/routes/api_auth.py index a4af662..507ac13 100644 --- a/scidk/web/routes/api_auth.py +++ b/scidk/web/routes/api_auth.py @@ -167,13 +167,15 @@ def api_auth_status(): "authenticated": true, "username": "admin", "auth_enabled": true, - "token_valid": true + "token_valid": true, + "session_locked": false } or 200: { "authenticated": false, "auth_enabled": true, - "token_valid": false + "token_valid": false, + "session_locked": false } """ auth = _get_auth_manager() @@ -186,6 +188,7 @@ def api_auth_status(): 'username': None, 'auth_enabled': False, 'token_valid': False, + 'session_locked': False, }), 200 # Check if user has valid session (try multi-user first) @@ -198,6 +201,9 @@ def api_auth_status(): if username: user = {'username': username, 'role': 'admin', 'id': None} + # Check if session is locked + session_locked = auth.is_session_locked(token) if token else False + if user: return jsonify({ 'authenticated': True, @@ -206,6 +212,7 @@ def api_auth_status(): 'user_id': user.get('id'), 'auth_enabled': True, 'token_valid': True, + 'session_locked': session_locked, }), 200 else: return jsonify({ @@ -215,4 +222,102 @@ def api_auth_status(): 'user_id': None, 'auth_enabled': True, 'token_valid': False, + 'session_locked': False, }), 200 + + +@bp.post('/lock') +def api_auth_lock(): + """Lock current session (auto-lock feature). + + Returns: + 200: {"success": true, "locked_at": timestamp} + 400: {"success": false, "error": "No active session"} + 503: {"success": false, "error": "Authentication not enabled"} + """ + auth = _get_auth_manager() + + # Check if auth is enabled + if not auth.is_enabled(): + return jsonify({'success': False, 'error': 'Authentication not enabled'}), 503 + + token = _get_session_token() + + if not token: + return jsonify({'success': False, 'error': 'No active session'}), 400 + + # Lock the session + success = auth.lock_session(token) + + if success: + # Get lock info + lock_info = auth.get_session_lock_info(token) + + # Log lock event + if lock_info: + ip_address = request.remote_addr + auth.log_audit(lock_info['username'], 'session_locked', 'Session locked', ip_address) + + return jsonify({ + 'success': True, + 'locked_at': lock_info['locked_at'] if lock_info else None, + }), 200 + else: + return jsonify({'success': False, 'error': 'Failed to lock session'}), 500 + + +@bp.post('/unlock') +def api_auth_unlock(): + """Unlock a locked session with password verification. + + Request body: + { + "password": "password123" + } + + Returns: + 200: {"success": true} + 400: {"success": false, "error": "Missing password"} + 401: {"success": false, "error": "Invalid password"} + 400: {"success": false, "error": "No active session"} + 503: {"success": false, "error": "Authentication not enabled"} + """ + auth = _get_auth_manager() + + # Check if auth is enabled + if not auth.is_enabled(): + return jsonify({'success': False, 'error': 'Authentication not enabled'}), 503 + + token = _get_session_token() + + if not token: + return jsonify({'success': False, 'error': 'No active session'}), 400 + + # Parse request body + data = request.get_json() or {} + password = data.get('password', '') + + if not password: + return jsonify({'success': False, 'error': 'Missing password'}), 400 + + # Attempt unlock + success = auth.unlock_session(token, password) + + if success: + # Get session info for audit log + lock_info = auth.get_session_lock_info(token) + + if lock_info: + ip_address = request.remote_addr + auth.log_audit(lock_info['username'], 'session_unlocked', 'Session unlocked successfully', ip_address) + + return jsonify({'success': True}), 200 + else: + # Log failed unlock attempt + lock_info = auth.get_session_lock_info(token) + if lock_info: + ip_address = request.remote_addr + auth.log_failed_attempt(lock_info['username'], ip_address) + auth.log_audit(lock_info['username'], 'unlock_failed', 'Failed unlock attempt', ip_address) + + return jsonify({'success': False, 'error': 'Invalid password'}), 401 diff --git a/scidk/web/routes/api_settings.py b/scidk/web/routes/api_settings.py index b294aa1..66e9532 100644 --- a/scidk/web/routes/api_settings.py +++ b/scidk/web/routes/api_settings.py @@ -886,6 +886,181 @@ def update_security_auth_config(): }), 500 +@bp.route('/settings/security/auto-lock', methods=['GET']) +def get_auto_lock_config(): + """ + Get auto-lock configuration. + + Returns: + { + "status": "success", + "config": { + "enabled": false, + "timeout_minutes": 5 + } + } + """ + try: + settings_db = current_app.config.get('SCIDK_SETTINGS_DB', 'scidk_settings.db') + import sqlite3 + conn = sqlite3.connect(settings_db) + cur = conn.execute( + """ + SELECT value FROM settings + WHERE key IN ('auto_lock_enabled', 'auto_lock_timeout_minutes') + """ + ) + rows = cur.fetchall() + conn.close() + + config = { + 'enabled': False, + 'timeout_minutes': 5, # Default 5 minutes + } + + # Parse settings (stored as key-value pairs) + settings_dict = {} + conn = sqlite3.connect(settings_db) + cur = conn.execute("SELECT key, value FROM settings") + for row in cur.fetchall(): + settings_dict[row[0]] = row[1] + conn.close() + + if 'auto_lock_enabled' in settings_dict: + config['enabled'] = settings_dict['auto_lock_enabled'] == 'true' + + if 'auto_lock_timeout_minutes' in settings_dict: + try: + config['timeout_minutes'] = int(settings_dict['auto_lock_timeout_minutes']) + except (ValueError, TypeError): + config['timeout_minutes'] = 5 + + return jsonify({ + 'status': 'success', + 'config': config + }), 200 + except Exception as e: + return jsonify({ + 'status': 'error', + 'error': str(e) + }), 500 + + +@bp.route('/settings/security/auto-lock', methods=['POST', 'PUT']) +def update_auto_lock_config(): + """ + Update auto-lock configuration. + + Request body: + { + "enabled": true, + "timeout_minutes": 5 + } + + Returns: + { + "status": "success", + "config": { + "enabled": true, + "timeout_minutes": 5 + } + } + """ + try: + data = request.get_json() + if not data: + return jsonify({ + 'status': 'error', + 'error': 'Request body must be JSON' + }), 400 + + enabled = data.get('enabled', False) + timeout_minutes = data.get('timeout_minutes', 5) + + # Validation + if not isinstance(enabled, bool): + return jsonify({ + 'status': 'error', + 'error': 'enabled must be a boolean' + }), 400 + + try: + timeout_minutes = int(timeout_minutes) + except (ValueError, TypeError): + return jsonify({ + 'status': 'error', + 'error': 'timeout_minutes must be an integer' + }), 400 + + if timeout_minutes < 1 or timeout_minutes > 120: + return jsonify({ + 'status': 'error', + 'error': 'timeout_minutes must be between 1 and 120' + }), 400 + + # Save settings + settings_db = current_app.config.get('SCIDK_SETTINGS_DB', 'scidk_settings.db') + import sqlite3 + conn = sqlite3.connect(settings_db) + + # Create settings table if it doesn't exist + conn.execute( + """ + CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT, + updated_at REAL + ) + """ + ) + + import time + now = time.time() + + # Upsert settings + conn.execute( + """ + INSERT OR REPLACE INTO settings (key, value, updated_at) + VALUES ('auto_lock_enabled', ?, ?) + """, + ('true' if enabled else 'false', now) + ) + + conn.execute( + """ + INSERT OR REPLACE INTO settings (key, value, updated_at) + VALUES ('auto_lock_timeout_minutes', ?, ?) + """, + (str(timeout_minutes), now) + ) + + conn.commit() + conn.close() + + # Log audit event + auth = _get_auth_manager() + username = getattr(g, 'scidk_user', 'system') + auth.log_audit( + username, + 'auto_lock_configured', + f'Auto-lock {"enabled" if enabled else "disabled"}, timeout: {timeout_minutes} minutes', + request.remote_addr + ) + + return jsonify({ + 'status': 'success', + 'config': { + 'enabled': enabled, + 'timeout_minutes': timeout_minutes, + } + }), 200 + except Exception as e: + return jsonify({ + 'status': 'error', + 'error': str(e) + }), 500 + + def _get_backup_manager(): """Get or create BackupManager instance.""" from ...core.backup_manager import get_backup_manager diff --git a/tests/test_auto_lock.py b/tests/test_auto_lock.py new file mode 100644 index 0000000..648f89a --- /dev/null +++ b/tests/test_auto_lock.py @@ -0,0 +1,208 @@ +"""Tests for auto-lock functionality.""" + +import pytest +import time +import sqlite3 +from scidk.core.auth import AuthManager + + +@pytest.fixture +def auth_manager(tmp_path): + """Create a temporary AuthManager for testing.""" + db_path = tmp_path / 'test_auth.db' + manager = AuthManager(db_path=str(db_path)) + + # Create a test user + user_id = manager.create_user('testuser', 'testpass123', role='admin', created_by='system') + assert user_id is not None + + yield manager + + manager.close() + + +def test_session_lock_columns_migration(tmp_path): + """Test that lock columns are added to auth_sessions table.""" + db_path = tmp_path / 'test_migration.db' + + # Create database without lock columns (simulate old schema) + conn = sqlite3.connect(str(db_path)) + conn.execute('PRAGMA journal_mode=WAL;') + conn.execute( + """ + CREATE TABLE auth_sessions ( + token TEXT PRIMARY KEY, + username TEXT NOT NULL, + created_at REAL NOT NULL, + expires_at REAL NOT NULL, + last_activity REAL NOT NULL + ) + """ + ) + conn.commit() + conn.close() + + # Initialize AuthManager - should trigger migration + manager = AuthManager(db_path=str(db_path)) + + # Check that lock columns exist + conn = sqlite3.connect(str(db_path)) + cur = conn.execute("PRAGMA table_info(auth_sessions)") + columns = [row[1] for row in cur.fetchall()] + conn.close() + + assert 'locked' in columns, "locked column should be added by migration" + assert 'locked_at' in columns, "locked_at column should be added by migration" + + manager.close() + + +def test_lock_session(auth_manager): + """Test locking a session.""" + # Create a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + assert token + + # Session should not be locked initially + assert not auth_manager.is_session_locked(token) + + # Lock the session + result = auth_manager.lock_session(token) + assert result is True + + # Session should now be locked + assert auth_manager.is_session_locked(token) + + +def test_unlock_session(auth_manager): + """Test unlocking a locked session.""" + # Create and lock a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + auth_manager.lock_session(token) + assert auth_manager.is_session_locked(token) + + # Unlock with correct password + result = auth_manager.unlock_session(token, 'testpass123') + assert result is True + + # Session should be unlocked + assert not auth_manager.is_session_locked(token) + + +def test_unlock_session_wrong_password(auth_manager): + """Test that unlock fails with wrong password.""" + # Create and lock a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + auth_manager.lock_session(token) + + # Try to unlock with wrong password + result = auth_manager.unlock_session(token, 'wrongpassword') + assert result is False + + # Session should still be locked + assert auth_manager.is_session_locked(token) + + +def test_get_session_lock_info(auth_manager): + """Test retrieving session lock information.""" + # Create a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + + # Get lock info before locking + lock_info = auth_manager.get_session_lock_info(token) + assert lock_info is not None + assert lock_info['username'] == 'testuser' + assert lock_info['locked'] is False + assert lock_info['locked_at'] is None + + # Lock the session + auth_manager.lock_session(token) + + # Get lock info after locking + lock_info = auth_manager.get_session_lock_info(token) + assert lock_info is not None + assert lock_info['username'] == 'testuser' + assert lock_info['locked'] is True + assert lock_info['locked_at'] is not None + assert isinstance(lock_info['locked_at'], float) + assert lock_info['locked_at'] <= time.time() + + +def test_lock_nonexistent_session(auth_manager): + """Test locking a session that doesn't exist.""" + result = auth_manager.lock_session('invalid_token_123') + # Should succeed but have no effect (UPDATE with no matching row) + assert result is True + + +def test_unlock_nonlocked_session(auth_manager): + """Test unlocking a session that isn't locked.""" + # Create a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + + # Try to unlock (session not locked) + result = auth_manager.unlock_session(token, 'testpass123') + assert result is False # Should fail because session is not locked + + +def test_audit_log_on_unlock(auth_manager): + """Test that successful unlock is logged in audit trail.""" + # Create and lock a session + token = auth_manager.create_user_session(1, 'testuser', duration_hours=24) + auth_manager.lock_session(token) + + # Unlock session + auth_manager.unlock_session(token, 'testpass123') + + # Check audit log + audit_log = auth_manager.get_audit_log(limit=10) + assert len(audit_log) > 0 + + # Find the unlock event + unlock_events = [entry for entry in audit_log if entry['action'] == 'session_unlocked'] + assert len(unlock_events) > 0 + assert unlock_events[0]['username'] == 'testuser' + + +@pytest.mark.parametrize('timeout_minutes,expected_enabled', [ + (5, True), + (10, True), + (60, True), + (120, True), +]) +def test_auto_lock_settings_valid(tmp_path, timeout_minutes, expected_enabled): + """Test saving valid auto-lock settings.""" + db_path = tmp_path / 'test_settings.db' + conn = sqlite3.connect(str(db_path)) + + # Create settings table + conn.execute( + """ + CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT, + updated_at REAL + ) + """ + ) + + # Save settings + now = time.time() + conn.execute( + "INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, ?)", + ('auto_lock_enabled', 'true', now) + ) + conn.execute( + "INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, ?)", + ('auto_lock_timeout_minutes', str(timeout_minutes), now) + ) + conn.commit() + + # Read back settings + cur = conn.execute("SELECT key, value FROM settings") + settings_dict = {row[0]: row[1] for row in cur.fetchall()} + + assert settings_dict['auto_lock_enabled'] == 'true' + assert int(settings_dict['auto_lock_timeout_minutes']) == timeout_minutes + + conn.close()