Troubleshooting JWT, RBAC, and user authentication problems in the Definable backend
Token validation failures
import jwt
from datetime import datetime
# Decode token without verification to check expiration
try:
payload = jwt.decode(token, options={"verify_signature": False})
exp = payload.get("exp", 0)
current_time = datetime.now().timestamp()
if current_time > exp:
print(f"Token expired {(current_time - exp) / 60:.1f} minutes ago")
else:
print(f"Token valid for {(exp - current_time) / 60:.1f} more minutes")
except Exception as e:
print(f"Failed to decode token: {e}")
# Definable doesn't use refresh tokens by default
# You'll need to re-login when tokens expire
# Default expiration time is set in settings.jwt_expire_minutes
# For client-side expiration check:
async def should_refresh_auth():
current_time = datetime.now().timestamp()
# Get token expiration from decoded payload
if token_exp - current_time < 300: # 5 minutes buffer
# Redirect to login
redirect_to_login()
return True
return False
# Definable uses HS256 algorithm for JWT
# Tokens should have header.payload.signature format
def is_valid_jwt_format(token):
parts = token.split(".")
if len(parts) != 3:
return False
try:
# Check if each part is valid base64
for part in parts[0:2]: # Header and payload should be valid base64
padding = "=" * (4 - (len(part) % 4))
base64.b64decode(part + padding)
return True
except Exception:
return False
JWT_SECRET
environment variable is properly set.env
file# Check JWT secret in environment variables
echo $JWT_SECRET
# Verify it matches what's in .env file
grep JWT_SECRET .env
settings.py
filejwt_expire_minutes
# From utils/auth_util.py
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15) # Default fallback
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256")
return encoded_jwt
Authorization header problems
# Correct format for Definable's JWTBearer middleware
headers = {
"Authorization": f"Bearer {token}" # Note the space after "Bearer"
}
# Incorrect formats that will fail with Definable's JWTBearer:
# headers = {"Authorization": token} # Missing 'Bearer' prefix
# headers = {"Authorization": f"bearer {token}"} # Case sensitive, must be 'Bearer'
# headers = {"Authorization": f"Bearer{token}"} # Missing space after 'Bearer'
# Using a request interceptor (axios example)
axios.interceptors.request.use(
config => {
const token = localStorage.getItem('token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
error => Promise.reject(error)
);
credentials.scheme == "Bearer"
websocket.query_params.get("token")
# For debugging JWT extraction in your middleware:
def debug_token_extraction(request=None, websocket=None):
if request:
auth_header = request.headers.get("Authorization", "")
print(f"Auth header: {auth_header}")
if auth_header.startswith("Bearer "):
token = auth_header[7:] # Remove 'Bearer ' prefix
print(f"Extracted token: {token[:10]}...")
else:
print("Invalid auth header format")
elif websocket:
token = websocket.query_params.get("token")
print(f"WebSocket token query param: {token[:10] if token else 'None'}...")
else:
print("No request or websocket provided")
curl -v -H "Authorization: Bearer your-token" https://api.example.com/endpoint
Token security with Definable's implementation
# Definable JWT payload typically contains:
# - id: The user's UUID
# - exp: Expiration timestamp
# When accessed with RBAC middleware, additional claims are added:
# - org_id: Current organization context
# - required_permission: The permission used for the request
# - role: User's role name
# - role_level: User's role hierarchy level
# - role_id: User's role UUID
# - permissions: List of permissions the user has
# Decode token to check contents:
import jwt
def inspect_token(token):
try:
# Decode without verification for inspection
payload = jwt.decode(token, options={"verify_signature": False})
print(f"User ID: {payload.get('id')}")
print(f"Expiration: {datetime.fromtimestamp(payload.get('exp')).isoformat()}")
print(f"Organization ID: {payload.get('org_id', 'Not present')}")
print(f"Role: {payload.get('role', 'Not present')}")
return payload
except Exception as e:
print(f"Failed to decode token: {e}")
return None
// Since Definable doesn't implement token revocation by default,
// implement these client-side security measures:
// 1. Store token securely
function securelyStoreToken(token) {
// Use HttpOnly cookies in production if possible
// For SPAs, localStorage is often used despite security concerns
localStorage.setItem('access_token', token);
// Also store token issue time for tracking
localStorage.setItem('token_issued_at', Date.now().toString());
}
// 2. Implement logout across tabs
window.addEventListener('storage', (event) => {
if (event.key === 'access_token' && !event.newValue) {
// Token was removed in another tab, logout in this tab too
window.location.href = '/login';
}
});
// 3. Clear token on suspicious activity
function detectSuspiciousActivity() {
const issuedAt = parseInt(localStorage.getItem('token_issued_at') || '0');
const maxSessionTime = 12 * 60 * 60 * 1000; // 12 hours
if (Date.now() - issuedAt > maxSessionTime) {
// Session too long, force re-auth
logout();
}
}
settings.jwt_expire_minutes
to control token lifetime.env
file and update JWT_EXPIRE_MINUTES
to an appropriate value:# Recommended values based on security requirements:
# High security: 15-30 minutes
# Medium security: 1-2 hours
# Low security: 8-24 hours
JWT_EXPIRE_MINUTES=60
# Example of enhanced JWT validation middleware
async def enhanced_jwt_validation(
request: Request,
token_payload: dict = Depends(JWTBearer()),
session: AsyncSession = Depends(get_db)
):
user_id = token_payload.get("id")
# Check if user is still active
user = await session.get(UserModel, UUID(user_id))
if not user or not user.is_active:
raise HTTPException(status_code=403, detail="User account is deactivated")
# Add last activity tracking
user.last_active = datetime.now()
session.add(user)
await session.commit()
return token_payload
Insufficient permissions
# Definable's RBAC implementation is organization-specific
# Check current user's role in the organization:
response = requests.get(
f"{API_URL}/roles/list_roles?org_id={org_id}",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
roles = response.json()
print(f"Available roles: {[role['name'] for role in roles]}")
# Get user's permissions
response = requests.get(
f"{API_URL}/users/me?org_id={org_id}",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
user_data = response.json()
role = user_data.get("role")
print(f"User role: {role}")
user: dict = Depends(RBAC("resource", "action"))
resource:action
pattern, with wildcard support (*
)# Example of Definable's RBAC middleware usage
@app.get("/api/protected")
async def protected_route(user: dict = Depends(RBAC("kb", "read"))):
# If execution reaches here, the user is authorized
return {"message": "Access granted", "user_id": user["id"]}
# For wildcards, Definable supports patterns like:
# - "*" (any resource and any action)
# - "kb:*" (any action on kb resource)
# - "*:read" (read action on any resource)
# - "kb:r*" (any action starting with "r" on kb resource)
OrganizationMemberModel.status == "active"
# Check user's status in organization
member_query = select(OrganizationMemberModel).where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id
)
)
members = await session.execute(member_query)
member = members.unique().scalar_one_or_none()
if not member:
print("User is not a member of this organization")
elif member.status != "active":
print(f"User's status in organization is: {member.status}")
else:
print("User is an active member of this organization")
# Assign a role to a user (admin operation)
response = requests.post(
f"{API_URL}/roles/{org_id}/users/{user_id}/roles",
headers={"Authorization": f"Bearer {admin_token}"},
json={"role_id": "editor_role_id"}
)
# Create a new permission
response = requests.post(
f"{API_URL}/roles/permission",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"resource": "kb",
"action": "read",
"description": "Read knowledge base"
}
)
# Definable uses a custom wildcard matcher in the RBAC class
# You can recreate it for testing:
def check_wildcard_match(permission_value: str, required_value: str) -> bool:
"""Check if permission matches required value with wildcard support."""
if permission_value == "*":
return True
if "*" not in permission_value:
return permission_value == required_value
# Handle pattern matching with wildcards
pattern_parts = permission_value.split("*")
value = required_value
# Check prefix
if pattern_parts[0] and not required_value.startswith(pattern_parts[0]):
return False
# Check suffix
if pattern_parts[-1] and not required_value.endswith(pattern_parts[-1]):
return False
# Check middle parts
for part in pattern_parts[1:-1]:
if part not in required_value:
return False
# Move past the matched part for next check
value = value[value.find(part) + len(part):]
return True
# Test it:
print(check_wildcard_match("kb:*", "kb:read")) # True
print(check_wildcard_match("*:read", "kb:read")) # True
print(check_wildcard_match("kb:r*", "kb:read")) # True
print(check_wildcard_match("kb:w*", "kb:read")) # False
Role assignment issues
# Definable stores role assignments in OrganizationMemberModel
member_query = select(OrganizationMemberModel).where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id
)
)
# Check user's current role in organization
response = requests.get(
f"{API_URL}/organizations/{org_id}/users/{user_id}",
headers={"Authorization": f"Bearer {admin_token}"}
)
if response.status_code == 200:
user_data = response.json()
print(f"User role in this organization: {user_data.get('role', {}).get('name')}")
# Get role definition using Definable's roles service
response = requests.get(
f"{API_URL}/roles/{role_id}?org_id={org_id}",
headers={"Authorization": f"Bearer {admin_token}"}
)
if response.status_code == 200:
role_data = response.json()
print(f"Role name: {role_data.get('name')}")
print(f"Role hierarchy level: {role_data.get('hierarchy_level')}")
print(f"Role is system role: {role_data.get('is_system_role')}")
print(f"Role permissions: {role_data.get('permissions', [])}")
# Definable uses hierarchy_level to determine role precedence
# Higher numbers = higher privilege
# RoleService._validate_hierarchy_level ensures hierarchy levels are unique
# Get all roles in the organization to check hierarchy
response = requests.get(
f"{API_URL}/roles/list_roles?org_id={org_id}",
headers={"Authorization": f"Bearer {admin_token}"}
)
if response.status_code == 200:
roles = response.json()
# Sort by hierarchy level
sorted_roles = sorted(roles, key=lambda r: r.get('hierarchy_level', 0), reverse=True)
for role in sorted_roles:
print(f"Role: {role.get('name')}, Level: {role.get('hierarchy_level')}")
# To resolve persistent issues, recreate the role assignment
# 1. First get the user's current organization member record
member_query = select(OrganizationMemberModel).where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id
)
)
# 2. Update the role_id
# Using direct SQL (example)
update_query = (
update(OrganizationMemberModel)
.where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id
)
)
.values(role_id=new_role_id, status="active")
)
# 3. Verify user needs to get a new token after role changes
print("Role updated. User must log out and log in again for changes to take effect.")
Permission inheritance and organization context
org_id
parameter missing errors# Definable's RBAC is organization-scoped
# The org_id query parameter is required for most endpoints
# Correct request with org_id:
response = requests.get(
f"{API_URL}/some-endpoint?org_id={org_id}",
headers={"Authorization": f"Bearer {token}"}
)
# For WebSockets:
websocket = WebSocket(f"wss://{API_URL}/ws/connect?token={token}&org_id={org_id}")
# RBAC middleware checks:
# 1. User has org_id in request
# 2. User is an active member of that organization
# 3. User has appropriate permissions in that organization
# If getting: "Invalid org id" errors:
# For HTTP requests, check:
if request:
org_id = request.query_params.get("org_id")
print(f"Request org_id: {org_id}")
# For WebSockets, check:
elif websocket:
org_id = websocket.query_params.get("org_id")
print(f"WebSocket org_id: {org_id}")
# If org_id is None, you need to add it to your request
# In Definable's implementation, permissions are tied to:
# 1. A user's role in a specific organization
# 2. The PermissionModel defines resource & action
# 3. RolePermissionModel links roles to permissions
# Query to check what permissions are assigned to a role:
role_perms_query = (
select(RolePermissionModel, PermissionModel)
.join(PermissionModel, RolePermissionModel.permission_id == PermissionModel.id)
.where(RolePermissionModel.role_id == role_id)
)
# List a user's permissions in an organization:
async def list_user_permissions(user_id, org_id, session):
# Get user's role in organization
member_query = select(OrganizationMemberModel).where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id,
OrganizationMemberModel.status == "active"
)
)
member = await session.execute(member_query)
member = member.scalar_one_or_none()
if not member:
return []
# Get permissions for this role
role_perms_query = (
select(PermissionModel)
.join(RolePermissionModel, PermissionModel.id == RolePermissionModel.permission_id)
.where(RolePermissionModel.role_id == member.role_id)
)
perms = await session.execute(role_perms_query)
return list(perms.scalars().all())
// For clients working with multiple organizations:
// Store current organization context
function setCurrentOrganization(orgId) {
localStorage.setItem('current_org_id', orgId);
}
// Add organization context to all API requests
axios.interceptors.request.use(config => {
const orgId = localStorage.getItem('current_org_id');
if (orgId) {
// For GET requests
if (!config.params) {
config.params = {};
}
if (!config.params.org_id) {
config.params.org_id = orgId;
}
// For POST/PUT requests with JSON body
if (config.data && typeof config.data === 'object' && !config.data.org_id) {
config.data.org_id = orgId;
}
}
return config;
});
Login failures
# Definable's AuthService.post_login uses verify_password to check credentials
# Example login request:
response = requests.post(
f"{API_URL}/auth/login",
json={
"email": "user@example.com",
"password": "password123"
}
)
if response.status_code != 200:
error_data = response.json()
print(f"Login error: {error_data.get('detail')}")
else:
token_data = response.json()
print(f"Login successful. Token obtained: {token_data.get('access_token')[:10]}...")
# Definable uses verify_password from auth_util:
def verify_password(plain_password, hashed_password):
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
# Password hash algorithm is bcrypt used by passlib's CryptContext
# If hashing algorithm was changed, older passwords may fail verification
# Definable checks user status during login
# Check user status in the database:
user_query = select(UserModel).where(UserModel.email == email)
user = await session.execute(user_query)
user = user.scalar_one_or_none()
if not user:
print("User not found in database")
else:
print(f"User status: {user.status}")
# Status can be "active", "inactive", "pending", etc.
# Only "active" users can log in
# Definable may require email verification before login
# Check if email verified status:
user_query = select(UserModel).where(UserModel.email == email)
user = await session.execute(user_query)
user = user.scalar_one_or_none()
if user and not user.email_verified:
print("Email not verified. Verification required before login.")
# Resend verification email if needed:
response = requests.post(
f"{API_URL}/auth/resend-verification",
json={"email": email}
)
# Definable's implementation may have rate limiting on login attempts
# Check for specific error messages indicating lockout
# If using RateLimiter middleware from ratelimit.py:
# Typical lockout message: "Rate limit exceeded"
# To reset a locked account (admin operation):
# 1. Reset the user's rate limit counter in Redis
# 2. Or enable a temporary bypass for that user
# Example client-side exponential backoff:
async function loginWithBackoff(credentials, maxRetries = 5) {
let retryCount = 0;
let delay = 1000; // Start with 1 second
while (retryCount < maxRetries) {
try {
const response = await fetch(`${API_URL}/auth/login`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(credentials)
});
if (response.ok) {
return await response.json();
}
const error = await response.json();
if (error.detail.includes("Rate limit exceeded")) {
console.log(`Rate limited. Waiting ${delay/1000} seconds before retry.`);
await new Promise(resolve => setTimeout(resolve, delay));
delay *= 2; // Exponential backoff
retryCount++;
} else {
// Other error, not rate limiting
throw new Error(error.detail);
}
} catch (err) {
console.error("Login error:", err);
throw err;
}
}
throw new Error("Max retries exceeded for login");
}
Invitation and signup issues
# Definable handles invitations in AuthService.post_signup_invite and get_signup_invite
# Check invitation status:
response = requests.get(
f"{API_URL}/auth/signup-invite?token={invitation_token}"
)
if response.status_code != 200:
error_data = response.json()
print(f"Invitation error: {error_data.get('detail')}")
else:
invite_data = response.json()
print(f"Invitation valid for: {invite_data.get('email')}")
print(f"Organization: {invite_data.get('organization', {}).get('name')}")
# Definable validates invite tokens by:
# 1. Checking if invite exists in the database
# 2. Verifying it's not expired
# 3. Ensuring it's not already used
# Common issues:
# - Token expired: invites have expiration timestamps
# - Token already used: invite can be used only once
# - Email mismatch: token is tied to a specific email
# Check invitation in database:
invite_query = select(InvitationModel).where(InvitationModel.token == token)
invite = await session.execute(invite_query)
invite = invite.scalar_one_or_none()
if not invite:
print("Invitation not found")
elif invite.used:
print("Invitation already used")
elif invite.expires_at < datetime.utcnow():
print("Invitation expired")
else:
print("Invitation valid")
# Admin can resend invitation
response = requests.post(
f"{API_URL}/auth/invite",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"email": "user@example.com",
"org_id": organization_id,
"role_id": role_id
}
)
if response.status_code == 200:
print("New invitation sent successfully")
# Complete signup using invitation
response = requests.post(
f"{API_URL}/auth/signup-invite",
json={
"email": "user@example.com",
"password": "securePassword123",
"first_name": "John",
"last_name": "Doe",
"token": invitation_token
}
)
if response.status_code == 201:
print("Signup with invitation successful")
user_data = response.json()
print(f"User created with ID: {user_data.get('id')}")
else:
error_data = response.json()
print(f"Signup error: {error_data.get('detail')}")
Account management issues
# Definable's password reset flow:
# 1. Request password reset token
response = requests.post(
f"{API_URL}/auth/forgot-password",
json={"email": "user@example.com"}
)
if response.status_code == 200:
print("Password reset email sent")
# 2. Reset password with token
response = requests.post(
f"{API_URL}/auth/reset-password",
json={
"token": "reset_token_from_email",
"password": "newSecurePassword123"
}
)
if response.status_code == 200:
print("Password reset successful")
else:
error_data = response.json()
print(f"Password reset error: {error_data.get('detail')}")
# Definable tracks email verification status
# Get user profile to check verification status
response = requests.get(
f"{API_URL}/auth/me",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
user_data = response.json()
print(f"Email verified: {user_data.get('email_verified', False)}")
# Verify email with token:
response = requests.post(
f"{API_URL}/auth/verify-email",
json={"token": "verification_token_from_email"}
)
if response.status_code == 200:
print("Email verification successful")
# Update user profile in Definable
response = requests.put(
f"{API_URL}/auth/me",
headers={"Authorization": f"Bearer {token}"},
json={
"first_name": "John",
"last_name": "Smith",
# Other fields as needed
}
)
if response.status_code == 200:
print("Profile updated successfully")
else:
error_data = response.json()
print(f"Profile update error: {error_data.get('detail')}")
# Definable process for account deactivation
# Deactivate (soft delete) account:
response = requests.delete(
f"{API_URL}/auth/me",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
print("Account deactivated successfully")
# Administrator hard delete (permanent)
response = requests.delete(
f"{API_URL}/users/{user_id}",
headers={"Authorization": f"Bearer {admin_token}"}
)
if response.status_code == 204:
print("User permanently deleted")
WebSocket connection authentication failures
// Definable's WebSocket authentication requires token in query parameters
// Correct format:
const socket = new WebSocket(`wss://${API_URL}/ws/connect?token=${jwt_token}&org_id=${org_id}`);
socket.onopen = () => {
console.log("WebSocket connection established");
};
socket.onerror = (error) => {
console.error("WebSocket error:", error);
};
socket.onclose = (event) => {
console.log(`WebSocket closed with code: ${event.code}, reason: ${event.reason}`);
};
# Definable's JWTBearer validates WebSocket connections
# For WebSockets, validate_socket_token is called
# Server-side debugging:
async def authenticate_ws_connection(websocket: WebSocket):
# Extract token from query params
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=1008, reason="Missing token")
return None
# Extract org_id from query params
org_id = websocket.query_params.get("org_id")
if not org_id:
await websocket.close(code=1008, reason="Missing org_id")
return None
# Validate JWT token
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=["HS256"]
)
user_id = payload.get("sub")
# Further validation as needed
return payload
except Exception as e:
await websocket.close(code=1008, reason=f"Invalid token: {str(e)}")
return None
// Client-side token refresh and reconnection
class AuthenticatedWebSocket {
constructor(baseUrl, getToken, getOrgId) {
this.baseUrl = baseUrl;
this.getToken = getToken;
this.getOrgId = getOrgId;
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.listeners = {
message: [],
error: [],
close: [],
open: []
};
}
connect() {
const token = this.getToken();
const orgId = this.getOrgId();
if (!token) {
console.error("No token available for WebSocket connection");
return;
}
if (!orgId) {
console.error("No organization ID available for WebSocket connection");
return;
}
// Definable's WebSocket endpoint with required auth parameters
this.socket = new WebSocket(`${this.baseUrl}/ws/connect?token=${token}&org_id=${orgId}`);
this.socket.onopen = (event) => {
console.log("WebSocket connected");
this.reconnectAttempts = 0;
this.listeners.open.forEach(listener => listener(event));
};
this.socket.onmessage = (event) => {
this.listeners.message.forEach(listener => listener(event));
};
this.socket.onerror = (event) => {
this.listeners.error.forEach(listener => listener(event));
};
this.socket.onclose = (event) => {
this.listeners.close.forEach(listener => listener(event));
// Handle authentication failures (1008 = Policy Violation)
if (event.code === 1008 && event.reason.includes("token")) {
console.error("WebSocket authentication failed:", event.reason);
// Try to refresh token before reconnecting
this.refreshTokenAndReconnect();
} else if (this.reconnectAttempts < this.maxReconnectAttempts) {
// Handle other disconnects
setTimeout(() => this.reconnect(), this.reconnectDelay);
}
};
}
async refreshTokenAndReconnect() {
try {
// Implement token refresh logic here
await refreshAuthToken();
this.reconnect();
} catch (error) {
console.error("Failed to refresh token:", error);
}
}
reconnect() {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Attempting to reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect();
}, delay);
}
// Event listener methods
on(event, callback) {
if (this.listeners[event]) {
this.listeners[event].push(callback);
}
return this;
}
close() {
if (this.socket) {
this.socket.close();
}
}
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(typeof data === 'string' ? data : JSON.stringify(data));
} else {
console.error("WebSocket not connected");
}
}
}
// Usage:
const ws = new AuthenticatedWebSocket(
'wss://api.definable.ai',
() => localStorage.getItem('access_token'),
() => localStorage.getItem('current_org_id')
);
ws.on('open', () => console.log('Connected!'))
.on('message', (event) => console.log('Received:', event.data))
.connect();
# Definable's WebSocketService uses RBAC for permission checking
# WebSocket routes are protected with RBAC dependency
# In WebSocketService.ws_connect, RBAC dependency is used:
@websocket_endpoint.websocket("/connect")
async def ws_connect(
websocket: WebSocket,
user: dict = Depends(RBAC("ws", "connect"))
):
# If execution reaches here, user is authorized
# Client debugging:
# If WebSocket connection fails with 403, check user permissions
response = requests.get(
f"{API_URL}/roles/list_permissions?org_id={org_id}",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
permissions = response.json()
# Look for "ws:connect" permission
has_ws_permission = any(
p.get("resource") == "ws" and p.get("action") == "connect"
for p in permissions
)
print(f"Has WebSocket connect permission: {has_ws_permission}")
Was this page helpful?