Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev
Submodule dev updated from b84382 to ad6f67
153 changes: 152 additions & 1 deletion scidk/core/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def init_tables(self):
"""
)

# Active sessions table (updated with user_id)
# Active sessions table (updated with user_id and locked state)
self.db.execute(
"""
CREATE TABLE IF NOT EXISTS auth_sessions (
Expand All @@ -76,6 +76,8 @@ def init_tables(self):
created_at REAL NOT NULL,
expires_at REAL NOT NULL,
last_activity REAL NOT NULL,
locked INTEGER DEFAULT 0,
locked_at REAL,
FOREIGN KEY (user_id) REFERENCES auth_users(id) ON DELETE CASCADE
)
"""
Expand Down Expand Up @@ -112,6 +114,9 @@ def init_tables(self):
# Auto-migrate from single-user to multi-user on first run
self._migrate_to_multi_user()

# Migrate to add lock columns to auth_sessions
self._migrate_add_session_lock_columns()

def _migrate_to_multi_user(self):
"""Migrate from single-user auth_config to multi-user auth_users table.

Expand Down Expand Up @@ -154,6 +159,25 @@ def _migrate_to_multi_user(self):
except Exception as e:
print(f"Migration warning: {e}")

def _migrate_add_session_lock_columns(self):
"""Add locked and locked_at columns to auth_sessions table if they don't exist."""
try:
# Check if locked column exists
cur = self.db.execute("PRAGMA table_info(auth_sessions)")
columns = [row[1] for row in cur.fetchall()]

if 'locked' not in columns:
self.db.execute("ALTER TABLE auth_sessions ADD COLUMN locked INTEGER DEFAULT 0")
print("Added locked column to auth_sessions table")

if 'locked_at' not in columns:
self.db.execute("ALTER TABLE auth_sessions ADD COLUMN locked_at REAL")
print("Added locked_at column to auth_sessions table")

self.db.commit()
except Exception as e:
print(f"Migration warning (session lock columns): {e}")

def is_enabled(self) -> bool:
"""Check if authentication is currently enabled.

Expand Down Expand Up @@ -906,6 +930,133 @@ def get_audit_log(self, since_timestamp: Optional[float] = None,
print(f"AuthManager.get_audit_log error: {e}")
return []

# ========== Session Locking ==========

def lock_session(self, token: str) -> bool:
"""Lock a session (auto-lock feature).

Args:
token: Session token to lock

Returns:
bool: True if successful, False on error
"""
try:
now = time.time()
self.db.execute(
"UPDATE auth_sessions SET locked = 1, locked_at = ? WHERE token = ?",
(now, token)
)
self.db.commit()
return True
except Exception as e:
print(f"AuthManager.lock_session error: {e}")
return False

def unlock_session(self, token: str, password: str) -> bool:
"""Unlock a locked session with password verification.

Args:
token: Session token to unlock
password: Password to verify

Returns:
bool: True if unlock successful, False if password invalid or error
"""
try:
# Get session info
cur = self.db.execute(
"""
SELECT s.username, s.user_id, s.locked
FROM auth_sessions s
WHERE s.token = ?
""",
(token,)
)
row = cur.fetchone()

if not row or not row[2]: # Not found or not locked
return False

username, user_id = row[0], row[1]

# Verify password (try multi-user first)
if user_id is not None:
user = self.verify_user_credentials(username, password)
if not user:
return False
else:
# Legacy single-user verification
if not self.verify_credentials(username, password):
return False

# Unlock session
self.db.execute(
"UPDATE auth_sessions SET locked = 0, locked_at = NULL WHERE token = ?",
(token,)
)
self.db.commit()

# Log successful unlock
ip_address = None # Will be set by API route
self.log_audit(username, 'session_unlocked', 'Session unlocked', ip_address)

return True
except Exception as e:
print(f"AuthManager.unlock_session error: {e}")
return False

def is_session_locked(self, token: str) -> bool:
"""Check if a session is currently locked.

Args:
token: Session token to check

Returns:
bool: True if session is locked, False otherwise
"""
try:
cur = self.db.execute(
"SELECT locked FROM auth_sessions WHERE token = ?",
(token,)
)
row = cur.fetchone()
return bool(row and row[0]) if row else False
except Exception:
return False

def get_session_lock_info(self, token: str) -> Optional[Dict[str, Any]]:
"""Get lock information for a session.

Args:
token: Session token

Returns:
dict or None: Lock info with keys: username, locked, locked_at
"""
try:
cur = self.db.execute(
"""
SELECT username, locked, locked_at
FROM auth_sessions
WHERE token = ?
""",
(token,)
)
row = cur.fetchone()

if not row:
return None

return {
'username': row[0],
'locked': bool(row[1]),
'locked_at': row[2],
}
except Exception as e:
print(f"AuthManager.get_session_lock_info error: {e}")
return None

def close(self):
"""Close database connection."""
try:
Expand Down
Loading