This guide outlines the process for adding new services to the Zyeta backend codebase.

Overview

Services in our application form the business logic layer, connecting API endpoints with database models. Each service is responsible for a specific domain of functionality, such as user management, authentication, or agent operations.

Step 1: Create the Service Directory

Create a new directory for your service in the src/services directory with a descriptive name that follows the naming convention of existing services (e.g., my_feature).

src/
└── services/
    └── my_feature/
        ├── __init__.py  # Empty file to make the directory a package
        ├── schema.py    # Pydantic models for request/response validation
        └── service.py   # Service implementation

Step 2: Define Schema Models

Create the schema.py file to define Pydantic models for your service’s input and output data validation:

from datetime import datetime
from typing import List, Optional
from uuid import UUID

from pydantic import BaseModel, Field


class MyFeatureBase(BaseModel):
    """Base schema for my feature."""

    name: str = Field(..., min_length=1, max_length=255)
    description: Optional[str] = None
    is_active: bool = True
    settings: dict = Field(default_factory=dict)


class MyFeatureCreate(MyFeatureBase):
    """Create schema for my feature."""

    pass


class MyFeatureUpdate(BaseModel):
    """Update schema for my feature."""

    name: Optional[str] = Field(None, min_length=1, max_length=255)
    description: Optional[str] = None
    is_active: Optional[bool] = None
    settings: Optional[dict] = None


class MyFeatureResponse(MyFeatureBase):
    """Response schema for my feature."""

    id: UUID
    organization_id: UUID
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True


class PaginatedMyFeatureResponse(BaseModel):
    """Paginated response schema for my feature."""

    items: List[MyFeatureResponse]
    total: int
    has_more: bool

Step 3: Implement the Service

Create the service.py file implementing your service class:

from uuid import UUID

from fastapi import Depends
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from dependencies.security import RBAC
from models import MyFeatureModel
from services.__base.acquire import Acquire

from .schema import MyFeatureCreate, MyFeatureResponse, MyFeatureUpdate, PaginatedMyFeatureResponse


class MyFeatureService:
    """My feature service."""

    # List of methods exposed as HTTP endpoints
    http_exposed = ["get=list", "post=create", "get=get", "put=update", "delete=delete"]

    def __init__(self, acquire: Acquire):
        """Initialize service."""
        self.acquire = acquire

    async def create(
        self,
        org_id: UUID,
        data: MyFeatureCreate,
        session: AsyncSession = Depends(get_db),
        user: dict = Depends(RBAC("my_feature", "write")),
    ) -> MyFeatureResponse:
        """Create a new my feature."""
        my_feature = MyFeatureModel(
            **data.model_dump(),
            organization_id=org_id,
        )
        session.add(my_feature)
        await session.commit()
        await session.refresh(my_feature)
        return MyFeatureResponse.model_validate(my_feature)

    async def list(
        self,
        org_id: UUID,
        offset: int = 0,
        limit: int = 10,
        session: AsyncSession = Depends(get_db),
        user: dict = Depends(RBAC("my_feature", "read")),
    ) -> PaginatedMyFeatureResponse:
        """Get paginated list of my features for an organization."""
        # Count total items
        count_query = select(func.count(MyFeatureModel.id)).where(MyFeatureModel.organization_id == org_id)
        total = await session.scalar(count_query)

        # Main query with pagination
        query = (
            select(MyFeatureModel)
            .where(MyFeatureModel.organization_id == org_id)
            .order_by(MyFeatureModel.created_at.desc())
            .offset(offset * limit)
            .limit(limit + 1)
        )
        result = await session.execute(query)
        items = result.scalars().all()

        # Check if there are more items
        has_more = len(items) > limit
        items = items[:limit]  # Remove the extra item used to check for more

        return PaginatedMyFeatureResponse(
            items=[MyFeatureResponse.model_validate(item) for item in items],
            total=total or 0,
            has_more=has_more,
        )

    async def get(
        self,
        feature_id: UUID,
        session: AsyncSession = Depends(get_db),
        user: dict = Depends(RBAC("my_feature", "read")),
    ) -> MyFeatureResponse:
        """Get a specific my feature by ID."""
        query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
        result = await session.execute(query)
        my_feature = result.scalar_one_or_none()
        
        if not my_feature:
            raise HTTPException(status_code=404, detail="My feature not found")
            
        return MyFeatureResponse.model_validate(my_feature)

    async def update(
        self,
        feature_id: UUID,
        data: MyFeatureUpdate,
        session: AsyncSession = Depends(get_db),
        user: dict = Depends(RBAC("my_feature", "write")),
    ) -> MyFeatureResponse:
        """Update an existing my feature."""
        query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
        result = await session.execute(query)
        my_feature = result.scalar_one_or_none()
        
        if not my_feature:
            raise HTTPException(status_code=404, detail="My feature not found")
            
        # Update fields with new values
        update_data = data.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(my_feature, field, value)
            
        await session.commit()
        await session.refresh(my_feature)
        return MyFeatureResponse.model_validate(my_feature)

    async def delete(
        self,
        feature_id: UUID,
        session: AsyncSession = Depends(get_db),
        user: dict = Depends(RBAC("my_feature", "write")),
    ) -> dict:
        """Delete a my feature."""
        query = select(MyFeatureModel).where(MyFeatureModel.id == feature_id)
        result = await session.execute(query)
        my_feature = result.scalar_one_or_none()
        
        if not my_feature:
            raise HTTPException(status_code=404, detail="My feature not found")
            
        await session.delete(my_feature)
        await session.commit()
        return {"success": True, "message": "My feature deleted successfully"}

Step 4: Register the Service

The service auto-discovery system will automatically register your service based on the http_exposed attribute in your service class.

However, ensure your service is imported somewhere in the application. You may need to add an import to src/app.py or create a module-level import in the respective service package.

Service Implementation Best Practices

  1. http_exposed Attribute:

    • Format: ["<http_method>=<service_method>", ...]
    • Example: ["get=list", "post=create"]
    • This maps HTTP methods to service methods for API exposure
  2. Method Parameters:

    • Use FastAPI’s Depends for common dependencies like database sessions
    • Include RBAC for proper permission checking
    • Use typed parameters with appropriate annotations
  3. Response Types:

    • Explicitly define return types for all methods
    • Use Pydantic models for consistent serialization/validation
  4. Error Handling:

    • Raise appropriate HTTP exceptions for error conditions
    • Include descriptive error messages
    • Handle edge cases gracefully
  5. Database Operations:

    • Use SQLAlchemy’s async operations for database queries
    • Structure queries for efficient execution
    • Handle database errors appropriately

Example Service Methods

Create Method Example

async def create(
    self,
    org_id: UUID,
    data: MyFeatureCreate,
    session: AsyncSession = Depends(get_db),
    user: dict = Depends(RBAC("my_feature", "write")),
) -> MyFeatureResponse:
    """Create a new my feature."""
    my_feature = MyFeatureModel(
        **data.model_dump(),
        organization_id=org_id,
    )
    session.add(my_feature)
    await session.commit()
    await session.refresh(my_feature)
    return MyFeatureResponse.model_validate(my_feature)

List Method Example

async def get_list(
    self,
    org_id: UUID,
    offset: int = 0,
    limit: int = 10,
    session: AsyncSession = Depends(get_db),
    user: dict = Depends(RBAC("agents", "read")),
) -> PaginatedAgentResponse:
    """Get paginated list of agents for an organization."""
    # Base query for total count
    count_query = select(func.count(AgentModel.id)).where(AgentModel.organization_id == org_id)
    total = await session.scalar(count_query)

    # Main query
    query = select(AgentModel).where(AgentModel.organization_id == org_id)
    query = query.offset(offset * limit).limit(limit + 1)
    result = await session.execute(query)
    items = result.scalars().all()

    # Check if there are more items
    has_more = len(items) > limit
    items = items[:limit]  # Remove the extra item used to check for more

    return PaginatedAgentResponse(
        items=[AgentResponse.model_validate(item) for item in items],
        total=total or 0,
        has_more=has_more,
    )

Testing Services

After creating your service, you should thoroughly test its functionality:

  1. Unit Tests:

    • Test individual service methods
    • Mock database dependencies
    • Verify correct error handling
  2. Integration Tests:

    • Test complete service workflows
    • Verify database interactions
    • Test permissions and access control
  3. API Tests:

    • Test HTTP endpoint exposure
    • Verify correct request/response handling
    • Test error responses

Troubleshooting

Common Issues

  1. Service not exposed:

    • Check the http_exposed attribute for correct format
    • Verify service is imported and accessible
  2. Permission errors:

    • Check RBAC configuration for correct resource/action
    • Verify user has appropriate permissions
  3. Database errors:

    • Check query syntax and structure
    • Verify model relationships
    • Handle potential null values

Getting Help

If you encounter issues with service implementation, consult:

  • The FastAPI documentation
  • The SQLAlchemy documentation
  • Existing service implementations in the codebase
  • Reach out to the development team for assistance