Introduction
API Reference
Development Guides
Authentication Issues
Troubleshooting JWT, RBAC, and user authentication problems in the Zyeta backend
This guide covers common authentication-related issues that may arise when working with the Zyeta backend.
JWT Token Issues
Symptoms:
- Error: “Invalid or expired token”
- Authentication works initially but fails after some time
- Inconsistent authentication errors
Solutions:
-
Check token expiration:
import jwt from datetime import datetime # Decode token without verification to check expiration try: payload = jwt.decode(token, options={"verify_signature": False}) exp = payload.get("exp", 0) current_time = datetime.now().timestamp() if current_time > exp: print(f"Token expired {(current_time - exp) / 60:.1f} minutes ago") else: print(f"Token valid for {(exp - current_time) / 60:.1f} more minutes") except Exception as e: print(f"Failed to decode token: {e}")
-
Implement proper token refresh:
# Zyeta doesn't use refresh tokens by default # You'll need to re-login when tokens expire # Default expiration time is set in settings.jwt_expire_minutes # For client-side expiration check: async def should_refresh_auth(): current_time = datetime.now().timestamp() # Get token expiration from decoded payload if token_exp - current_time < 300: # 5 minutes buffer # Redirect to login redirect_to_login() return True return False
-
Verify token format:
# Zyeta uses HS256 algorithm for JWT # Tokens should have header.payload.signature format def is_valid_jwt_format(token): parts = token.split(".") if len(parts) != 3: return False try: # Check if each part is valid base64 for part in parts[0:2]: # Header and payload should be valid base64 padding = "=" * (4 - (len(part) % 4)) base64.b64decode(part + padding) return True except Exception: return False
-
Check JWT secret in environment:
- Verify that
JWT_SECRET
environment variable is properly set - Ensure the same secret is used across all services/instances
- Verify that it matches the value in your
.env
file
# Check JWT secret in environment variables echo $JWT_SECRET # Verify it matches what's in .env file grep JWT_SECRET .env
- Verify that
-
Verify client settings for JWT expiration:
- Zyeta’s JWT expiration is configured in the
settings.py
file - Default setting is
jwt_expire_minutes
- The token creation uses this setting:
# From utils/auth_util.py def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) # Default fallback to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256") return encoded_jwt
- Zyeta’s JWT expiration is configured in the
Symptoms:
- Error: “Invalid authorization” or “Missing authentication credentials”
- Authentication works in some tools (e.g., Postman) but not in your code
Solutions:
-
Check header format against JWTBearer implementation:
# Correct format for Zyeta's JWTBearer middleware headers = { "Authorization": f"Bearer {token}" # Note the space after "Bearer" } # Incorrect formats that will fail with Zyeta's JWTBearer: # headers = {"Authorization": token} # Missing 'Bearer' prefix # headers = {"Authorization": f"bearer {token}"} # Case sensitive, must be 'Bearer' # headers = {"Authorization": f"Bearer{token}"} # Missing space after 'Bearer'
-
Verify token inclusion in all requests:
# Using a request interceptor (axios example) axios.interceptors.request.use( config => { const token = localStorage.getItem('token'); if (token) { config.headers.Authorization = `Bearer ${token}`; } return config; }, error => Promise.reject(error) );
-
Check request object in JWTBearer middleware:
- Zyeta’s JWTBearer middleware extracts token from either HTTP Request or WebSocket
- For HTTP, it checks
credentials.scheme == "Bearer"
- For WebSocket, it checks
websocket.query_params.get("token")
# For debugging JWT extraction in your middleware: def debug_token_extraction(request=None, websocket=None): if request: auth_header = request.headers.get("Authorization", "") print(f"Auth header: {auth_header}") if auth_header.startswith("Bearer "): token = auth_header[7:] # Remove 'Bearer ' prefix print(f"Extracted token: {token[:10]}...") else: print("Invalid auth header format") elif websocket: token = websocket.query_params.get("token") print(f"WebSocket token query param: {token[:10] if token else 'None'}...") else: print("No request or websocket provided")
-
Check for header modifications in proxies:
- Some proxies may strip or modify authentication headers
- Verify headers reach the API intact using debugging tools
curl -v -H "Authorization: Bearer your-token" https://api.example.com/endpoint
Symptoms:
- Unexpected logins from unknown locations
- Security alerts about unauthorized access
- Token becoming invalid unexpectedly
Solutions:
-
Understand Zyeta’s token content:
# Zyeta JWT payload typically contains: # - id: The user's UUID # - exp: Expiration timestamp # When accessed with RBAC middleware, additional claims are added: # - org_id: Current organization context # - required_permission: The permission used for the request # - role: User's role name # - role_level: User's role hierarchy level # - role_id: User's role UUID # - permissions: List of permissions the user has # Decode token to check contents: import jwt def inspect_token(token): try: # Decode without verification for inspection payload = jwt.decode(token, options={"verify_signature": False}) print(f"User ID: {payload.get('id')}") print(f"Expiration: {datetime.fromtimestamp(payload.get('exp')).isoformat()}") print(f"Organization ID: {payload.get('org_id', 'Not present')}") print(f"Role: {payload.get('role', 'Not present')}") return payload except Exception as e: print(f"Failed to decode token: {e}") return None
-
Implement client-side security measures:
// Since Zyeta doesn't implement token revocation by default, // implement these client-side security measures: // 1. Store token securely function securelyStoreToken(token) { // Use HttpOnly cookies in production if possible // For SPAs, localStorage is often used despite security concerns localStorage.setItem('access_token', token); // Also store token issue time for tracking localStorage.setItem('token_issued_at', Date.now().toString()); } // 2. Implement logout across tabs window.addEventListener('storage', (event) => { if (event.key === 'access_token' && !event.newValue) { // Token was removed in another tab, logout in this tab too window.location.href = '/login'; } }); // 3. Clear token on suspicious activity function detectSuspiciousActivity() { const issuedAt = parseInt(localStorage.getItem('token_issued_at') || '0'); const maxSessionTime = 12 * 60 * 60 * 1000; // 12 hours if (Date.now() - issuedAt > maxSessionTime) { // Session too long, force re-auth logout(); } }
-
Configure JWT validity period:
- Zyeta uses
settings.jwt_expire_minutes
to control token lifetime - Check your
.env
file and updateJWT_EXPIRE_MINUTES
to an appropriate value:
# Recommended values based on security requirements: # High security: 15-30 minutes # Medium security: 1-2 hours # Low security: 8-24 hours JWT_EXPIRE_MINUTES=60
- Zyeta uses
-
Additional security for production:
- Zyeta doesn’t include fingerprinting or IP validation by default
- For production, consider implementing an enhanced authentication middleware:
# Example of enhanced JWT validation middleware async def enhanced_jwt_validation( request: Request, token_payload: dict = Depends(JWTBearer()), session: AsyncSession = Depends(get_db) ): user_id = token_payload.get("id") # Check if user is still active user = await session.get(UserModel, UUID(user_id)) if not user or not user.is_active: raise HTTPException(status_code=403, detail="User account is deactivated") # Add last activity tracking user.last_active = datetime.now() session.add(user) await session.commit() return token_payload
RBAC Permission Issues
Symptoms:
- Error: “Access denied. Required: :” with status code 403
- User can access some endpoints but not others
- Permissions work for some users but not others
Solutions:
-
Verify user roles in organization context:
# Zyeta's RBAC implementation is organization-specific # Check current user's role in the organization: response = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: roles = response.json() print(f"Available roles: {[role['name'] for role in roles]}") # Get user's permissions response = requests.get( f"{API_URL}/users/me?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: user_data = response.json() role = user_data.get("role") print(f"User role: {role}")
-
Understand Zyeta’s RBAC middleware:
- Zyeta’s RBAC middleware checks for specific resource and action permissions
- Routes are protected with:
user: dict = Depends(RBAC("resource", "action"))
- Permissions follow
resource:action
pattern, with wildcard support (*
)
# Example of Zyeta's RBAC middleware usage @app.get("/api/protected") async def protected_route(user: dict = Depends(RBAC("kb", "read"))): # If execution reaches here, the user is authorized return {"message": "Access granted", "user_id": user["id"]} # For wildcards, Zyeta supports patterns like: # - "*" (any resource and any action) # - "kb:*" (any action on kb resource) # - "*:read" (read action on any resource) # - "kb:r*" (any action starting with "r" on kb resource)
-
Check organization membership status:
- Zyeta’s RBAC checks
OrganizationMemberModel.status == "active"
- If user is not active in the organization, they’ll get 403 error
# Check user's status in organization member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) members = await session.execute(member_query) member = members.unique().scalar_one_or_none() if not member: print("User is not a member of this organization") elif member.status != "active": print(f"User's status in organization is: {member.status}") else: print("User is an active member of this organization")
- Zyeta’s RBAC checks
-
Add missing permissions or update role:
# Assign a role to a user (admin operation) response = requests.post( f"{API_URL}/roles/{org_id}/users/{user_id}/roles", headers={"Authorization": f"Bearer {admin_token}"}, json={"role_id": "editor_role_id"} ) # Create a new permission response = requests.post( f"{API_URL}/roles/permission", headers={"Authorization": f"Bearer {admin_token}"}, json={ "resource": "kb", "action": "read", "description": "Read knowledge base" } )
-
Debug Zyeta’s RBAC wildcard matching:
# Zyeta uses a custom wildcard matcher in the RBAC class # You can recreate it for testing: def check_wildcard_match(permission_value: str, required_value: str) -> bool: """Check if permission matches required value with wildcard support.""" if permission_value == "*": return True if "*" not in permission_value: return permission_value == required_value # Handle pattern matching with wildcards pattern_parts = permission_value.split("*") value = required_value # Check prefix if pattern_parts[0] and not required_value.startswith(pattern_parts[0]): return False # Check suffix if pattern_parts[-1] and not required_value.endswith(pattern_parts[-1]): return False # Check middle parts for part in pattern_parts[1:-1]: if part not in required_value: return False # Move past the matched part for next check value = value[value.find(part) + len(part):] return True # Test it: print(check_wildcard_match("kb:*", "kb:read")) # True print(check_wildcard_match("*:read", "kb:read")) # True print(check_wildcard_match("kb:r*", "kb:read")) # True print(check_wildcard_match("kb:w*", "kb:read")) # False
Symptoms:
- User doesn’t see expected role in their profile
- Role assignments don’t persist
- User loses access after logging out and back in
Solutions:
-
Verify role was correctly assigned in Zyeta’s database:
# Zyeta stores role assignments in OrganizationMemberModel member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) # Check user's current role in organization response = requests.get( f"{API_URL}/organizations/{org_id}/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: user_data = response.json() print(f"User role in this organization: {user_data.get('role', {}).get('name')}")
-
Check role existence and permissions:
# Get role definition using Zyeta's roles service response = requests.get( f"{API_URL}/roles/{role_id}?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: role_data = response.json() print(f"Role name: {role_data.get('name')}") print(f"Role hierarchy level: {role_data.get('hierarchy_level')}") print(f"Role is system role: {role_data.get('is_system_role')}") print(f"Role permissions: {role_data.get('permissions', [])}")
-
Check for hierarchy level conflicts:
# Zyeta uses hierarchy_level to determine role precedence # Higher numbers = higher privilege # RoleService._validate_hierarchy_level ensures hierarchy levels are unique # Get all roles in the organization to check hierarchy response = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: roles = response.json() # Sort by hierarchy level sorted_roles = sorted(roles, key=lambda r: r.get('hierarchy_level', 0), reverse=True) for role in sorted_roles: print(f"Role: {role.get('name')}, Level: {role.get('hierarchy_level')}")
-
Recreate role assignment:
# To resolve persistent issues, recreate the role assignment # 1. First get the user's current organization member record member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) # 2. Update the role_id # Using direct SQL (example) update_query = ( update(OrganizationMemberModel) .where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) .values(role_id=new_role_id, status="active") ) # 3. Verify user needs to get a new token after role changes print("Role updated. User must log out and log in again for changes to take effect.")
Symptoms:
- User with admin role cannot perform expected actions
- Permissions don’t apply across organizations
- Required
org_id
parameter missing errors
Solutions:
-
Understand Zyeta’s organization-based permissions:
# Zyeta's RBAC is organization-scoped # The org_id query parameter is required for most endpoints # Correct request with org_id: response = requests.get( f"{API_URL}/some-endpoint?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) # For WebSockets: websocket = WebSocket(f"wss://{API_URL}/ws/connect?token={token}&org_id={org_id}") # RBAC middleware checks: # 1. User has org_id in request # 2. User is an active member of that organization # 3. User has appropriate permissions in that organization
-
Debug missing organization context:
# If getting: "Invalid org id" errors: # For HTTP requests, check: if request: org_id = request.query_params.get("org_id") print(f"Request org_id: {org_id}") # For WebSockets, check: elif websocket: org_id = websocket.query_params.get("org_id") print(f"WebSocket org_id: {org_id}") # If org_id is None, you need to add it to your request
-
Verify permission scope in Zyeta:
# In Zyeta's implementation, permissions are tied to: # 1. A user's role in a specific organization # 2. The PermissionModel defines resource & action # 3. RolePermissionModel links roles to permissions # Query to check what permissions are assigned to a role: role_perms_query = ( select(RolePermissionModel, PermissionModel) .join(PermissionModel, RolePermissionModel.permission_id == PermissionModel.id) .where(RolePermissionModel.role_id == role_id) ) # List a user's permissions in an organization: async def list_user_permissions(user_id, org_id, session): # Get user's role in organization member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id, OrganizationMemberModel.status == "active" ) ) member = await session.execute(member_query) member = member.scalar_one_or_none() if not member: return [] # Get permissions for this role role_perms_query = ( select(PermissionModel) .join(RolePermissionModel, PermissionModel.id == RolePermissionModel.permission_id) .where(RolePermissionModel.role_id == member.role_id) ) perms = await session.execute(role_perms_query) return list(perms.scalars().all())
-
Handle multi-organization scenarios:
// For clients working with multiple organizations: // Store current organization context function setCurrentOrganization(orgId) { localStorage.setItem('current_org_id', orgId); } // Add organization context to all API requests axios.interceptors.request.use(config => { const orgId = localStorage.getItem('current_org_id'); if (orgId) { // For GET requests if (!config.params) { config.params = {}; } if (!config.params.org_id) { config.params.org_id = orgId; } // For POST/PUT requests with JSON body if (config.data && typeof config.data === 'object' && !config.data.org_id) { config.data.org_id = orgId; } } return config; });
Login and Account Issues
Symptoms:
- Unable to log in with valid credentials
- Error message: “Incorrect username or password”
- Persistent login failures
Solutions:
-
Verify credentials against Zyeta’s auth service:
# Zyeta's AuthService.post_login uses verify_password to check credentials # Example login request: response = requests.post( f"{API_URL}/auth/login", json={ "email": "user@example.com", "password": "password123" } ) if response.status_code != 200: error_data = response.json() print(f"Login error: {error_data.get('detail')}") else: token_data = response.json() print(f"Login successful. Token obtained: {token_data.get('access_token')[:10]}...")
-
Check password verification in Zyeta:
# Zyeta uses verify_password from auth_util: def verify_password(plain_password, hashed_password): """Verify a password against a hash.""" return pwd_context.verify(plain_password, hashed_password) # Password hash algorithm is bcrypt used by passlib's CryptContext # If hashing algorithm was changed, older passwords may fail verification
-
Check for account status issues:
# Zyeta checks user status during login # Check user status in the database: user_query = select(UserModel).where(UserModel.email == email) user = await session.execute(user_query) user = user.scalar_one_or_none() if not user: print("User not found in database") else: print(f"User status: {user.status}") # Status can be "active", "inactive", "pending", etc. # Only "active" users can log in
-
Check for email verification requirements:
# Zyeta may require email verification before login # Check if email verified status: user_query = select(UserModel).where(UserModel.email == email) user = await session.execute(user_query) user = user.scalar_one_or_none() if user and not user.email_verified: print("Email not verified. Verification required before login.") # Resend verification email if needed: response = requests.post( f"{API_URL}/auth/resend-verification", json={"email": email} )
-
Handle login lockouts:
# Zyeta's implementation may have rate limiting on login attempts # Check for specific error messages indicating lockout # If using RateLimiter middleware from ratelimit.py: # Typical lockout message: "Rate limit exceeded" # To reset a locked account (admin operation): # 1. Reset the user's rate limit counter in Redis # 2. Or enable a temporary bypass for that user # Example client-side exponential backoff: async function loginWithBackoff(credentials, maxRetries = 5) { let retryCount = 0; let delay = 1000; // Start with 1 second while (retryCount < maxRetries) { try { const response = await fetch(`${API_URL}/auth/login`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(credentials) }); if (response.ok) { return await response.json(); } const error = await response.json(); if (error.detail.includes("Rate limit exceeded")) { console.log(`Rate limited. Waiting ${delay/1000} seconds before retry.`); await new Promise(resolve => setTimeout(resolve, delay)); delay *= 2; // Exponential backoff retryCount++; } else { // Other error, not rate limiting throw new Error(error.detail); } } catch (err) { console.error("Login error:", err); throw err; } } throw new Error("Max retries exceeded for login"); }
Symptoms:
- Cannot accept invitation
- Error in signup process
- Invitation links expired or invalid
Solutions:
-
Verify invitation token:
# Zyeta handles invitations in AuthService.post_signup_invite and get_signup_invite # Check invitation status: response = requests.get( f"{API_URL}/auth/signup-invite?token={invitation_token}" ) if response.status_code != 200: error_data = response.json() print(f"Invitation error: {error_data.get('detail')}") else: invite_data = response.json() print(f"Invitation valid for: {invite_data.get('email')}") print(f"Organization: {invite_data.get('organization', {}).get('name')}")
-
Debug invitation token issues:
# Zyeta validates invite tokens by: # 1. Checking if invite exists in the database # 2. Verifying it's not expired # 3. Ensuring it's not already used # Common issues: # - Token expired: invites have expiration timestamps # - Token already used: invite can be used only once # - Email mismatch: token is tied to a specific email # Check invitation in database: invite_query = select(InvitationModel).where(InvitationModel.token == token) invite = await session.execute(invite_query) invite = invite.scalar_one_or_none() if not invite: print("Invitation not found") elif invite.used: print("Invitation already used") elif invite.expires_at < datetime.utcnow(): print("Invitation expired") else: print("Invitation valid")
-
Resend invitation if needed:
# Admin can resend invitation response = requests.post( f"{API_URL}/auth/invite", headers={"Authorization": f"Bearer {admin_token}"}, json={ "email": "user@example.com", "org_id": organization_id, "role_id": role_id } ) if response.status_code == 200: print("New invitation sent successfully")
-
Complete signup with invitation:
# Complete signup using invitation response = requests.post( f"{API_URL}/auth/signup-invite", json={ "email": "user@example.com", "password": "securePassword123", "first_name": "John", "last_name": "Doe", "token": invitation_token } ) if response.status_code == 201: print("Signup with invitation successful") user_data = response.json() print(f"User created with ID: {user_data.get('id')}") else: error_data = response.json() print(f"Signup error: {error_data.get('detail')}")
Symptoms:
- Cannot update profile information
- Password reset fails
- Email verification issues
Solutions:
-
Debug password reset flow:
# Zyeta's password reset flow: # 1. Request password reset token response = requests.post( f"{API_URL}/auth/forgot-password", json={"email": "user@example.com"} ) if response.status_code == 200: print("Password reset email sent") # 2. Reset password with token response = requests.post( f"{API_URL}/auth/reset-password", json={ "token": "reset_token_from_email", "password": "newSecurePassword123" } ) if response.status_code == 200: print("Password reset successful") else: error_data = response.json() print(f"Password reset error: {error_data.get('detail')}")
-
Check email verification status:
# Zyeta tracks email verification status # Get user profile to check verification status response = requests.get( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: user_data = response.json() print(f"Email verified: {user_data.get('email_verified', False)}") # Verify email with token: response = requests.post( f"{API_URL}/auth/verify-email", json={"token": "verification_token_from_email"} ) if response.status_code == 200: print("Email verification successful")
-
Update user profile:
# Update user profile in Zyeta response = requests.put( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"}, json={ "first_name": "John", "last_name": "Smith", # Other fields as needed } ) if response.status_code == 200: print("Profile updated successfully") else: error_data = response.json() print(f"Profile update error: {error_data.get('detail')}")
-
Handle account deletion:
# Zyeta process for account deactivation # Deactivate (soft delete) account: response = requests.delete( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: print("Account deactivated successfully") # Administrator hard delete (permanent) response = requests.delete( f"{API_URL}/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 204: print("User permanently deleted")
WebSocket Authentication Issues
Symptoms:
- WebSocket connection is rejected
- Error: “Not authenticated” or “Invalid token”
- Connection drops immediately after establishing
Solutions:
-
Verify WebSocket connection with token:
// Zyeta's WebSocket authentication requires token in query parameters // Correct format: const socket = new WebSocket(`wss://${API_URL}/ws/connect?token=${jwt_token}&org_id=${org_id}`); socket.onopen = () => { console.log("WebSocket connection established"); }; socket.onerror = (error) => { console.error("WebSocket error:", error); }; socket.onclose = (event) => { console.log(`WebSocket closed with code: ${event.code}, reason: ${event.reason}`); };
-
Debug WebSocket auth errors:
# Zyeta's JWTBearer validates WebSocket connections # For WebSockets, validate_socket_token is called # Server-side debugging: async def authenticate_ws_connection(websocket: WebSocket): # Extract token from query params token = websocket.query_params.get("token") if not token: await websocket.close(code=1008, reason="Missing token") return None # Extract org_id from query params org_id = websocket.query_params.get("org_id") if not org_id: await websocket.close(code=1008, reason="Missing org_id") return None # Validate JWT token try: payload = jwt.decode( token, settings.jwt_secret, algorithms=["HS256"] ) user_id = payload.get("sub") # Further validation as needed return payload except Exception as e: await websocket.close(code=1008, reason=f"Invalid token: {str(e)}") return None
-
Handle WebSocket reconnection with token refresh:
// Client-side token refresh and reconnection class AuthenticatedWebSocket { constructor(baseUrl, getToken, getOrgId) { this.baseUrl = baseUrl; this.getToken = getToken; this.getOrgId = getOrgId; this.socket = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; this.listeners = { message: [], error: [], close: [], open: [] }; } connect() { const token = this.getToken(); const orgId = this.getOrgId(); if (!token) { console.error("No token available for WebSocket connection"); return; } if (!orgId) { console.error("No organization ID available for WebSocket connection"); return; } // Zyeta's WebSocket endpoint with required auth parameters this.socket = new WebSocket(`${this.baseUrl}/ws/connect?token=${token}&org_id=${orgId}`); this.socket.onopen = (event) => { console.log("WebSocket connected"); this.reconnectAttempts = 0; this.listeners.open.forEach(listener => listener(event)); }; this.socket.onmessage = (event) => { this.listeners.message.forEach(listener => listener(event)); }; this.socket.onerror = (event) => { this.listeners.error.forEach(listener => listener(event)); }; this.socket.onclose = (event) => { this.listeners.close.forEach(listener => listener(event)); // Handle authentication failures (1008 = Policy Violation) if (event.code === 1008 && event.reason.includes("token")) { console.error("WebSocket authentication failed:", event.reason); // Try to refresh token before reconnecting this.refreshTokenAndReconnect(); } else if (this.reconnectAttempts < this.maxReconnectAttempts) { // Handle other disconnects setTimeout(() => this.reconnect(), this.reconnectDelay); } }; } async refreshTokenAndReconnect() { try { // Implement token refresh logic here await refreshAuthToken(); this.reconnect(); } catch (error) { console.error("Failed to refresh token:", error); } } reconnect() { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); console.log(`Attempting to reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`); setTimeout(() => { this.connect(); }, delay); } // Event listener methods on(event, callback) { if (this.listeners[event]) { this.listeners[event].push(callback); } return this; } close() { if (this.socket) { this.socket.close(); } } send(data) { if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(typeof data === 'string' ? data : JSON.stringify(data)); } else { console.error("WebSocket not connected"); } } } // Usage: const ws = new AuthenticatedWebSocket( 'wss://api.zyeta.io', () => localStorage.getItem('access_token'), () => localStorage.getItem('current_org_id') ); ws.on('open', () => console.log('Connected!')) .on('message', (event) => console.log('Received:', event.data)) .connect();
-
Check RBAC permissions for WebSocket connections:
# Zyeta's WebSocketService uses RBAC for permission checking # WebSocket routes are protected with RBAC dependency # In WebSocketService.ws_connect, RBAC dependency is used: @websocket_endpoint.websocket("/connect") async def ws_connect( websocket: WebSocket, user: dict = Depends(RBAC("ws", "connect")) ): # If execution reaches here, user is authorized # Client debugging: # If WebSocket connection fails with 403, check user permissions response = requests.get( f"{API_URL}/roles/list_permissions?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: permissions = response.json() # Look for "ws:connect" permission has_ws_permission = any( p.get("resource") == "ws" and p.get("action") == "connect" for p in permissions ) print(f"Has WebSocket connect permission: {has_ws_permission}")
Next Steps
If you’ve resolved your authentication issues, consider reviewing these related guides:
- JWT Authentication - Detailed information about JWT implementation
- RBAC Permissions - How role-based access control works
- API Errors - Common API errors and their solutions
- Invitation Flow - User invitation and onboarding process
If you’re still experiencing authentication problems, check the API documentation or contact the development team for assistance.
Was this page helpful?