from uuid import UUID
from fastapi import Depends
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from dependencies.security import RBAC
from models import MyFeatureModel
from services.__base.acquire import Acquire
from .schema import MyFeatureCreate, MyFeatureResponse, MyFeatureUpdate, PaginatedMyFeatureResponse
class MyFeatureService:
"""My feature service."""
# List of methods exposed as HTTP endpoints
http_exposed = ["get=list", "post=create", "get=get", "put=update", "delete=delete"]
def __init__(self, acquire: Acquire):
"""Initialize service."""
self.acquire = acquire
async def create(
self,
org_id: UUID,
data: MyFeatureCreate,
session: AsyncSession = Depends(get_db),
user: dict = Depends(RBAC("my_feature", "write")),
) -> MyFeatureResponse:
"""Create a new my feature."""
my_feature = MyFeatureModel(
**data.model_dump(),
organization_id=org_id,
)
session.add(my_feature)
await session.commit()
await session.refresh(my_feature)
return MyFeatureResponse.model_validate(my_feature)
async def list(
self,
org_id: UUID,
offset: int = 0,
limit: int = 10,
session: AsyncSession = Depends(get_db),
user: dict = Depends(RBAC("my_feature", "read")),
) -> PaginatedMyFeatureResponse:
"""Get paginated list of my features for an organization."""
# Count total items
count_query = select(func.count(MyFeatureModel.id)).where(MyFeatureModel.organization_id == org_id)
total = await session.scalar(count_query)
# Main query with pagination
query = (
select(MyFeatureModel)
.where(MyFeatureModel.organization_id == org_id)
.order_by(MyFeatureModel.created_at.desc())
.offset(offset * limit)
.limit(limit + 1)
)
result = await session.execute(query)
items = result.scalars().all()
# Check if there are more items
has_more = len(items) > limit
items = items[:limit] # Remove the extra item used to check for more
return PaginatedMyFeatureResponse(
items=[MyFeatureResponse.model_validate(item) for item in items],
total=total or 0,
has_more=has_more,
)
async def get(
self,
feature_id: UUID,
session: AsyncSession = Depends(get_db),
user: dict = Depends(RBAC("my_feature", "read")),
) -> MyFeatureResponse:
"""Get a specific my feature by ID."""
query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
result = await session.execute(query)
my_feature = result.scalar_one_or_none()
if not my_feature:
raise HTTPException(status_code=404, detail="My feature not found")
return MyFeatureResponse.model_validate(my_feature)
async def update(
self,
feature_id: UUID,
data: MyFeatureUpdate,
session: AsyncSession = Depends(get_db),
user: dict = Depends(RBAC("my_feature", "write")),
) -> MyFeatureResponse:
"""Update an existing my feature."""
query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
result = await session.execute(query)
my_feature = result.scalar_one_or_none()
if not my_feature:
raise HTTPException(status_code=404, detail="My feature not found")
# Update fields with new values
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(my_feature, field, value)
await session.commit()
await session.refresh(my_feature)
return MyFeatureResponse.model_validate(my_feature)
async def delete(
self,
feature_id: UUID,
session: AsyncSession = Depends(get_db),
user: dict = Depends(RBAC("my_feature", "write")),
) -> dict:
"""Delete a my feature."""
query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
result = await session.execute(query)
my_feature = result.scalar_one_or_none()
if not my_feature:
raise HTTPException(status_code=404, detail="My feature not found")
await session.delete(my_feature)
await session.commit()
return {"success": True, "message": "My feature deleted successfully"}