diff --git a/pkg/freecad/silo_commands.py b/pkg/freecad/silo_commands.py index 1d1c7dd..9e7c698 100644 --- a/pkg/freecad/silo_commands.py +++ b/pkg/freecad/silo_commands.py @@ -91,23 +91,11 @@ def _get_ssl_context() -> ssl.SSLContext: def _get_auth_token() -> str: - """Get stored auth token from preferences, checking expiry.""" - param = FreeCAD.ParamGet(_PREF_GROUP) - token = param.GetString("AuthToken", "") - if not token: - return "" - expiry = param.GetString("AuthTokenExpiry", "") - if expiry: - try: - from datetime import datetime, timezone + """Get the active API token for authenticating requests. - exp_dt = datetime.fromisoformat(expiry.replace("Z", "+00:00")) - if datetime.now(timezone.utc) >= exp_dt: - _clear_auth() - return "" - except (ValueError, TypeError): - pass - return token + Priority: ApiToken preference > SILO_API_TOKEN env var. + """ + return _get_api_token() def _get_auth_username() -> str: @@ -116,25 +104,43 @@ def _get_auth_username() -> str: return param.GetString("AuthUsername", "") -def _get_auth_headers() -> Dict[str, str]: - """Return Authorization header dict if authenticated, else empty dict. +def _get_auth_role() -> str: + """Get stored authenticated user role from preferences.""" + param = FreeCAD.ParamGet(_PREF_GROUP) + return param.GetString("AuthRole", "") - Checks login-based token first, then falls back to static API token. - """ + +def _get_auth_source() -> str: + """Get stored authentication source from preferences.""" + param = FreeCAD.ParamGet(_PREF_GROUP) + return param.GetString("AuthSource", "") + + +def _get_auth_headers() -> Dict[str, str]: + """Return Authorization header dict if a token is configured, else empty.""" token = _get_auth_token() - if not token: - token = _get_api_token() if token: return {"Authorization": f"Bearer {token}"} return {} +def _save_auth_info(username: str, role: str = "", source: str = "", token: str = ""): + """Store authentication info in preferences.""" + param = FreeCAD.ParamGet(_PREF_GROUP) + param.SetString("AuthUsername", username) + param.SetString("AuthRole", role) + param.SetString("AuthSource", source) + if token: + param.SetString("ApiToken", token) + + def _clear_auth(): """Clear stored authentication credentials from preferences.""" param = FreeCAD.ParamGet(_PREF_GROUP) - param.SetString("AuthToken", "") + param.SetString("ApiToken", "") param.SetString("AuthUsername", "") - param.SetString("AuthTokenExpiry", "") + param.SetString("AuthRole", "") + param.SetString("AuthSource", "") # Category name mapping for folder structure @@ -636,50 +642,190 @@ class SiloClient: except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") - def login(self, username: str, password: str) -> Dict[str, Any]: - """Authenticate with the Silo API and store the token.""" - url = f"{self.base_url}/auth/login" - headers = {"Content-Type": "application/json"} - body = json.dumps({"username": username, "password": password}).encode() - req = urllib.request.Request(url, data=body, headers=headers, method="POST") + # -- Authentication methods --------------------------------------------- + def login(self, username: str, password: str) -> Dict[str, Any]: + """Authenticate with credentials and obtain an API token. + + Performs a session-based login (POST /login), then uses the + session to create a persistent API token via POST /api/auth/tokens. + The API token is stored in preferences for future requests. + """ + import http.cookiejar + + # Build a cookie-aware opener for the session flow + base = self.base_url + # Strip /api suffix to get the server root for /login + origin = base.rsplit("/api", 1)[0] if base.endswith("/api") else base + ctx = _get_ssl_context() + cookie_jar = http.cookiejar.CookieJar() + opener = urllib.request.build_opener( + urllib.request.HTTPCookieProcessor(cookie_jar), + urllib.request.HTTPSHandler(context=ctx), + ) + + # Step 1: POST form-encoded credentials to /login + login_url = f"{origin}/login" + form_data = urllib.parse.urlencode( + {"username": username, "password": password} + ).encode() + req = urllib.request.Request( + login_url, + data=form_data, + method="POST", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) try: - with urllib.request.urlopen(req, context=_get_ssl_context()) as resp: - result = json.loads(resp.read().decode()) + opener.open(req) except urllib.error.HTTPError as e: - error_body = e.read().decode() - try: - detail = json.loads(error_body).get("error", error_body) - except (json.JSONDecodeError, AttributeError): - detail = error_body - raise RuntimeError(f"Login failed: {detail}") + if e.code in (302, 303): + pass # Redirect after login is expected + else: + raise RuntimeError( + f"Login failed (HTTP {e.code}): invalid credentials or server error" + ) except urllib.error.URLError as e: raise RuntimeError(f"Connection error: {e.reason}") - param = FreeCAD.ParamGet(_PREF_GROUP) - param.SetString("AuthToken", result.get("token", "")) - param.SetString("AuthUsername", result.get("username", username)) - param.SetString("AuthTokenExpiry", result.get("expires_at", "")) - return result + # Step 2: Verify session by calling /api/auth/me + me_url = f"{origin}/api/auth/me" + me_req = urllib.request.Request(me_url, method="GET") + try: + with opener.open(me_req) as resp: + user_info = json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + if e.code == 401: + raise RuntimeError("Login failed: invalid username or password") + raise RuntimeError(f"Login verification failed (HTTP {e.code})") + except urllib.error.URLError as e: + raise RuntimeError(f"Connection error: {e.reason}") + + # Step 3: Create a persistent API token for FreeCAD + token_url = f"{origin}/api/auth/tokens" + import socket + + hostname = socket.gethostname() + token_body = json.dumps( + {"name": f"FreeCAD ({hostname})", "expires_in_days": 90} + ).encode() + token_req = urllib.request.Request( + token_url, + data=token_body, + method="POST", + headers={"Content-Type": "application/json"}, + ) + try: + with opener.open(token_req) as resp: + token_result = json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + raise RuntimeError(f"Failed to create API token (HTTP {e.code})") + except urllib.error.URLError as e: + raise RuntimeError(f"Connection error: {e.reason}") + + raw_token = token_result.get("token", "") + if not raw_token: + raise RuntimeError("Server did not return an API token") + + # Store token and user info + _save_auth_info( + username=user_info.get("username", username), + role=user_info.get("role", ""), + source=user_info.get("auth_source", ""), + token=raw_token, + ) + + return { + "username": user_info.get("username", username), + "role": user_info.get("role", ""), + "auth_source": user_info.get("auth_source", ""), + "token_name": token_result.get("name", ""), + "token_prefix": token_result.get("token_prefix", ""), + } def logout(self): - """Clear stored authentication credentials.""" + """Clear stored API token and authentication info.""" _clear_auth() def is_authenticated(self) -> bool: - """Return True if a valid auth token is stored.""" + """Return True if a valid API token is configured.""" return bool(_get_auth_token()) def auth_username(self) -> str: """Return the stored authenticated username.""" return _get_auth_username() + def auth_role(self) -> str: + """Return the stored user role.""" + return _get_auth_role() + + def auth_source(self) -> str: + """Return the stored authentication source (local, ldap, oidc).""" + return _get_auth_source() + + def get_current_user(self) -> Optional[Dict[str, Any]]: + """Fetch the current user info from the server. + + Returns user dict or None if not authenticated. + """ + try: + return self._request("GET", "/auth/me") + except RuntimeError: + return None + + def refresh_auth_info(self) -> bool: + """Refresh locally cached user info from the server. + + Returns True if authenticated, False otherwise. + """ + user = self.get_current_user() + if user and user.get("username"): + _save_auth_info( + username=user["username"], + role=user.get("role", ""), + source=user.get("auth_source", ""), + ) + return True + return False + + def list_tokens(self) -> List[Dict[str, Any]]: + """List API tokens for the current user.""" + return self._request("GET", "/auth/tokens") + + def create_token( + self, name: str, expires_in_days: Optional[int] = None + ) -> Dict[str, Any]: + """Create a new API token. + + Returns dict with 'token' (raw, shown once), 'id', 'name', etc. + """ + data: Dict[str, Any] = {"name": name} + if expires_in_days is not None: + data["expires_in_days"] = expires_in_days + return self._request("POST", "/auth/tokens", data) + + def revoke_token(self, token_id: str) -> None: + """Revoke an API token by its ID.""" + url = f"{self.base_url}/auth/tokens/{token_id}" + headers = {"Content-Type": "application/json"} + headers.update(_get_auth_headers()) + req = urllib.request.Request(url, headers=headers, method="DELETE") + try: + urllib.request.urlopen(req, context=_get_ssl_context()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + raise RuntimeError(f"API error {e.code}: {error_body}") + except urllib.error.URLError as e: + raise RuntimeError(f"Connection error: {e.reason}") + def check_connection(self) -> Tuple[bool, str]: """Check connectivity to the Silo API. Returns (reachable, message). """ - url = f"{self.base_url}/health" + # Use origin /health (not /api/health) since health is at root + base = self.base_url + origin = base.rsplit("/api", 1)[0] if base.endswith("/api") else base + url = f"{origin}/health" req = urllib.request.Request(url, method="GET") try: with urllib.request.urlopen( @@ -687,7 +833,6 @@ class SiloClient: ) as resp: return True, f"OK ({resp.status})" except urllib.error.HTTPError as e: - # Server responded - reachable even if error return True, f"Server error ({e.code})" except urllib.error.URLError as e: return False, str(e.reason) @@ -2145,31 +2290,74 @@ class Silo_Settings: layout.addWidget(auth_heading) auth_user = _get_auth_username() - auth_status_text = ( - f"Logged in as {auth_user}" - if auth_user and _get_auth_token() - else "Not logged in" - ) + auth_role = _get_auth_role() + auth_source = _get_auth_source() + has_token = bool(_get_auth_token()) + + if has_token and auth_user: + auth_parts = [f"Logged in as {auth_user}"] + if auth_role: + auth_parts.append(f"(role: {auth_role})") + if auth_source: + auth_parts.append(f"via {auth_source}") + auth_status_text = " ".join(auth_parts) + else: + auth_status_text = "Not logged in" + auth_status_lbl = QtGui.QLabel(auth_status_text) auth_status_lbl.setTextFormat(QtCore.Qt.RichText) layout.addWidget(auth_status_lbl) - auth_hint = QtGui.QLabel( - "Use the Database Auth panel to log in. " - "Credentials are stored locally in FreeCAD preferences." - ) - auth_hint.setWordWrap(True) - auth_hint.setStyleSheet("color: #888; font-size: 11px;") - layout.addWidget(auth_hint) + # API token input + token_label = QtGui.QLabel("API Token:") + layout.addWidget(token_label) - clear_auth_btn = QtGui.QPushButton("Clear Saved Credentials") - clear_auth_btn.setEnabled(bool(_get_auth_token())) + token_row = QtGui.QHBoxLayout() + token_input = QtGui.QLineEdit() + token_input.setEchoMode(QtGui.QLineEdit.Password) + token_input.setPlaceholderText("silo_... (paste token or use Login)") + current_token = param.GetString("ApiToken", "") + if current_token: + token_input.setText(current_token) + token_row.addWidget(token_input) + + token_show_btn = QtGui.QToolButton() + token_show_btn.setText("\U0001f441") + token_show_btn.setCheckable(True) + token_show_btn.setFixedSize(28, 28) + token_show_btn.setToolTip("Show/hide token") + + def on_toggle_show(checked): + if checked: + token_input.setEchoMode(QtGui.QLineEdit.Normal) + else: + token_input.setEchoMode(QtGui.QLineEdit.Password) + + token_show_btn.toggled.connect(on_toggle_show) + token_row.addWidget(token_show_btn) + layout.addLayout(token_row) + + token_hint = QtGui.QLabel( + "Paste an API token generated from the Silo web UI, " + "or use Login in the Database Auth panel to create one " + "automatically. Tokens can also be set via the " + "SILO_API_TOKEN environment variable." + ) + token_hint.setWordWrap(True) + token_hint.setStyleSheet("color: #888; font-size: 11px;") + layout.addWidget(token_hint) + + layout.addSpacing(4) + + clear_auth_btn = QtGui.QPushButton("Clear Token and Logout") + clear_auth_btn.setEnabled(has_token) def on_clear_auth(): _clear_auth() + token_input.setText("") auth_status_lbl.setText("Not logged in") clear_auth_btn.setEnabled(False) - FreeCAD.Console.PrintMessage("Silo: Saved credentials cleared\n") + FreeCAD.Console.PrintMessage("Silo: API token and credentials cleared\n") clear_auth_btn.clicked.connect(on_clear_auth) layout.addWidget(clear_auth_btn) @@ -2178,11 +2366,14 @@ class Silo_Settings: # Current effective values (read-only) cert_display = param.GetString("SslCertPath", "") or "(system defaults)" - auth_display = ( - f"logged in as {auth_user}" - if auth_user and _get_auth_token() - else "not logged in" - ) + if has_token and auth_user: + auth_display = f"{auth_user} ({auth_role or 'unknown role'})" + if auth_source: + auth_display += f" via {auth_source}" + elif has_token: + auth_display = "token configured (user unknown)" + else: + auth_display = "not configured" status_label = QtGui.QLabel( f"Active URL: {_get_api_url()}
" f"SSL verification: {'enabled' if _get_ssl_verify() else 'disabled'}
" @@ -2209,6 +2400,18 @@ class Silo_Settings: param.SetBool("SslVerify", ssl_checkbox.isChecked()) cert_path = cert_input.text().strip() param.SetString("SslCertPath", cert_path) + # Save API token if changed + new_token = token_input.text().strip() + old_token = param.GetString("ApiToken", "") + if new_token != old_token: + param.SetString("ApiToken", new_token) + if new_token and not old_token: + FreeCAD.Console.PrintMessage("Silo: API token configured\n") + elif not new_token and old_token: + _clear_auth() + FreeCAD.Console.PrintMessage("Silo: API token removed\n") + else: + FreeCAD.Console.PrintMessage("Silo: API token updated\n") FreeCAD.Console.PrintMessage( f"Silo settings saved. URL: {_get_api_url()}, " f"SSL verify: {_get_ssl_verify()}, " @@ -2728,6 +2931,18 @@ class SiloAuthDockWidget: user_row.addStretch() layout.addLayout(user_row) + # Role row + role_row = QtGui.QHBoxLayout() + role_row.setSpacing(6) + role_lbl = QtGui.QLabel("Role:") + role_lbl.setStyleSheet("color: #888;") + self._role_label = QtGui.QLabel("") + self._role_label.setStyleSheet("font-size: 11px;") + role_row.addWidget(role_lbl) + role_row.addWidget(self._role_label) + role_row.addStretch() + layout.addLayout(role_row) + layout.addSpacing(4) # URL row (compact display) @@ -2771,13 +2986,10 @@ class SiloAuthDockWidget: # Update URL display self._url_label.setText(_get_api_url()) - authed = _client.is_authenticated() + has_token = _client.is_authenticated() username = _client.auth_username() - - if authed and username: - self._user_label.setText(username) - else: - self._user_label.setText("(not logged in)") + role = _client.auth_role() + source = _client.auth_source() # Check server connectivity try: @@ -2785,32 +2997,53 @@ class SiloAuthDockWidget: except Exception: reachable = False + # If reachable and we have a token, validate it against the server + authed = False + if reachable and has_token: + user = _client.get_current_user() + if user and user.get("username"): + authed = True + username = user["username"] + role = user.get("role", "") + source = user.get("auth_source", "") + _save_auth_info(username=username, role=role, source=source) + + if authed: + self._user_label.setText(username) + role_text = role or "" + if source: + role_text += f" ({source})" if role_text else source + self._role_label.setText(role_text) + else: + self._user_label.setText("(not logged in)") + self._role_label.setText("") + + # Update button state + try: + self._login_btn.clicked.disconnect() + except RuntimeError: + pass + if reachable and authed: self._status_dot.setStyleSheet("color: #4CAF50; font-size: 10px;") self._status_label.setText("Connected") self._login_btn.setText("Logout") - try: - self._login_btn.clicked.disconnect() - except RuntimeError: - pass self._login_btn.clicked.connect(self._on_logout_clicked) - elif reachable and not authed: + elif reachable and has_token and not authed: + # Token exists but is invalid/expired + self._status_dot.setStyleSheet("color: #FF9800; font-size: 10px;") + self._status_label.setText("Token invalid") + self._login_btn.setText("Login") + self._login_btn.clicked.connect(self._on_login_clicked) + elif reachable and not has_token: self._status_dot.setStyleSheet("color: #FFC107; font-size: 10px;") self._status_label.setText("Connected (no auth)") self._login_btn.setText("Login") - try: - self._login_btn.clicked.disconnect() - except RuntimeError: - pass self._login_btn.clicked.connect(self._on_login_clicked) else: self._status_dot.setStyleSheet("color: #F44336; font-size: 10px;") self._status_label.setText("Disconnected") self._login_btn.setText("Login") - try: - self._login_btn.clicked.disconnect() - except RuntimeError: - pass self._login_btn.clicked.connect(self._on_login_clicked) # -- Actions ------------------------------------------------------------ @@ -2833,7 +3066,7 @@ class SiloAuthDockWidget: dialog = QtGui.QDialog(self.widget) dialog.setWindowTitle("Silo Login") - dialog.setMinimumWidth(350) + dialog.setMinimumWidth(380) layout = QtGui.QVBoxLayout(dialog) @@ -2842,14 +3075,23 @@ class SiloAuthDockWidget: server_label.setStyleSheet("color: #888; font-size: 11px;") layout.addWidget(server_label) + layout.addSpacing(4) + + info_label = QtGui.QLabel( + "Enter your credentials to create a persistent API token. " + "Supports local accounts and LDAP (FreeIPA)." + ) + info_label.setWordWrap(True) + info_label.setStyleSheet("color: #888; font-size: 11px;") + layout.addWidget(info_label) + layout.addSpacing(8) # Username user_label = QtGui.QLabel("Username:") layout.addWidget(user_label) user_input = QtGui.QLineEdit() - user_input.setPlaceholderText("LDAP username") - # Pre-fill with last known username + user_input.setPlaceholderText("Username") last_user = _get_auth_username() if last_user: user_input.setText(last_user) @@ -2862,17 +3104,16 @@ class SiloAuthDockWidget: layout.addWidget(pass_label) pass_input = QtGui.QLineEdit() pass_input.setEchoMode(QtGui.QLineEdit.Password) - pass_input.setPlaceholderText("LDAP password") + pass_input.setPlaceholderText("Password") layout.addWidget(pass_input) layout.addSpacing(4) - # Error label (hidden initially) - error_label = QtGui.QLabel("") - error_label.setStyleSheet("color: #F44336;") - error_label.setWordWrap(True) - error_label.setVisible(False) - layout.addWidget(error_label) + # Error / status label + status_label = QtGui.QLabel("") + status_label.setWordWrap(True) + status_label.setVisible(False) + layout.addWidget(status_label) layout.addSpacing(8) @@ -2889,16 +3130,35 @@ class SiloAuthDockWidget: username = user_input.text().strip() password = pass_input.text() if not username or not password: - error_label.setText("Username and password are required.") - error_label.setVisible(True) + status_label.setText("Username and password are required.") + status_label.setStyleSheet("color: #F44336;") + status_label.setVisible(True) return + # Disable inputs during login + login_btn.setEnabled(False) + status_label.setText("Logging in...") + status_label.setStyleSheet("color: #888;") + status_label.setVisible(True) + # Process events so the user sees the status update + from PySide.QtWidgets import QApplication + + QApplication.processEvents() try: - _client.login(username, password) - FreeCAD.Console.PrintMessage(f"Silo: Logged in as {username}\n") + result = _client.login(username, password) + role = result.get("role", "") + source = result.get("auth_source", "") + msg = f"Silo: Logged in as {username}" + if role: + msg += f" ({role})" + if source: + msg += f" via {source}" + FreeCAD.Console.PrintMessage(msg + "\n") dialog.accept() except RuntimeError as e: - error_label.setText(str(e)) - error_label.setVisible(True) + status_label.setText(str(e)) + status_label.setStyleSheet("color: #F44336;") + status_label.setVisible(True) + login_btn.setEnabled(True) login_btn.clicked.connect(on_login) cancel_btn.clicked.connect(dialog.reject)