Introduction
API Reference
Development Guides
Database Issues
Troubleshooting PostgreSQL and SQLAlchemy problems in the Zyeta backend
This guide covers common database-related issues that may arise when working with the Zyeta backend.
Connection Issues
Symptoms:
- Error:
Could not connect to server: Connection refused
- Error:
OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed
Solutions:
-
Verify PostgreSQL is running:
# For Linux sudo systemctl status postgresql # For macOS brew services list | grep postgres # For Windows # Check Services application (services.msc)
-
Check connection parameters:
- Verify hostname, port, username, password, and database name
- Make sure database exists:
psql -U postgres -c "SELECT datname FROM pg_database;"
-
Test connection with psql:
psql -U postgres -h localhost -p 5432 -d postgres
-
Check PostgreSQL logs:
# Location varies by system sudo tail -f /var/log/postgresql/postgresql-14-main.log # Debian/Ubuntu sudo tail -f /var/lib/pgsql/data/log/ # RHEL/CentOS
Symptoms:
- Error:
asyncpg.exceptions.PostgresConnectionError: connection to server was closed
- Error:
asyncpg.exceptions.InvalidAuthorizationSpecificationError: password authentication failed
Solutions:
-
Check your connection string format:
# Correct format for asyncpg "postgresql+asyncpg://username:password@hostname:port/database"
-
URL encode special characters in password:
# If your password contains special characters like @, %, etc. from urllib.parse import quote_plus password = quote_plus("your@complex!password") connection_string = f"postgresql+asyncpg://username:{password}@hostname:port/database"
-
Check PostgreSQL authentication settings (pg_hba.conf):
- Ensure it allows password authentication (md5 or scram-sha-256)
- For development, temporarily set local connections to ‘trust’
-
Test with a minimal example:
import asyncio import asyncpg async def test_connection(): conn = await asyncpg.connect("postgresql://username:password@localhost/postgres") version = await conn.fetchval("SELECT version();") print(version) await conn.close() asyncio.run(test_connection())
Symptoms:
- Error:
ssl_error_want_read
orssl_error_want_write
- Error:
SSL SYSCALL error: EOF detected
Solutions:
-
Disable SSL for local development (if needed):
# In your connection string postgresql+asyncpg://username:password@hostname:port/database?ssl=false
-
For production, configure SSL properly:
# Using SSL postgresql+asyncpg://username:password@hostname:port/database?ssl=true # Verify SSL certificate postgresql+asyncpg://username:password@hostname:port/database?ssl=true&sslmode=verify-full
-
For Supabase with SSL issues:
- Use the connection pooler URL instead of direct connection
- Go to Supabase dashboard > Project Settings > Database > Connection Pooling
Migration Issues
Symptoms:
- Error:
FAILED: Multiple head revisions are present
- Error:
Can't locate revision identified by '...'
Solutions:
-
For multiple heads:
# List heads alembic heads # Create a merge migration alembic merge -m "merge heads" head1 head2 # Then upgrade to the merged head alembic upgrade head
-
For missing revisions:
# Fix the down_revision in the problematic migration file # Make sure all migration files are in the versions directory # Or start fresh (development only) alembic stamp base # Reset alembic version table alembic upgrade head # Apply all migrations
-
Check alembic configuration:
- Verify
alembic.ini
has correct database URL - Check
env.py
imports your SQLAlchemy models correctly
- Verify
Symptoms:
- Error:
Error: Target database is not up to date.
- Error:
Can't locate revision identified by '...'
Solutions:
-
Synchronize with the team:
# Pull latest migrations from version control git pull # Check current database revision alembic current # Upgrade to latest alembic upgrade head
-
For development, reset migration state:
# WARNING: This will destroy data in the development database # DROP DATABASE and recreate it # Then reset alembic alembic stamp base alembic upgrade head
-
Fix revision chain manually:
- Edit the
down_revision
in migration files to fix the chain - Use
alembic history
to understand the current chain
- Edit the
Symptoms:
alembic revision --autogenerate
doesn’t detect model changes- Generated migration has unexpected changes
Solutions:
-
Ensure models are imported in env.py:
# In alembic/env.py from src.models import * # Make sure your models are imported here
-
Check model metadata:
# Make sure your models use the correct metadata from src.database import Base class YourModel(Base): # model definition...
-
Run with verbose output:
alembic revision --autogenerate -m "your message" --verbose
-
Check for unsupported model features:
- Some SQLAlchemy constructs aren’t detected by Alembic
- Add manual migrations for: Constraints, Indexes, some Column types
Symptoms:
- Error like:
ProgrammingError: column X does not exist
- Error when executing migration’s upgrade() function
Solutions:
-
Edit the migration file:
- Fix the SQL or Alembic operations causing errors
-
For data corruption, clean approach (development only):
# Reset to previous working migration alembic downgrade your_last_working_revision # Fix the problematic migration file # Try upgrading again alembic upgrade head
-
For schema issues where tables already exist:
# In your migration file, check if table exists before creating if not op.has_table('table_name'): op.create_table(...)
SQLAlchemy Issues
Symptoms:
- Error:
TimeoutError: Connection attempt timed out
- Application hangs when connecting to database
Solutions:
-
Configure connection pooling correctly:
# Adjust pool settings engine = create_async_engine( DATABASE_URL, pool_size=5, # Adjust based on your needs max_overflow=10, pool_timeout=30, # Seconds pool_recycle=1800, # Recycle connections after 30 minutes )
-
Implement retry logic for transient failures:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def get_connection(): async with engine.connect() as conn: return conn
-
Check for database resource limitations:
- Maximum connections (
max_connections
in postgresql.conf) - Check current connections:
SELECT count(*) FROM pg_stat_activity;
- Maximum connections (
Symptoms:
- Error:
AssertionError: A sync operation occurred within an async transaction.
- Error:
InterfaceError: connection is closed
Solutions:
-
Ensure you’re using async operations throughout:
# Correct async pattern async with async_session() as session: async with session.begin(): result = await session.execute(query) # Don't mix sync and async operations
-
Check for instances of sync operations:
- Replace
.all()
withawait session.execute(query).all()
- Replace
.first()
withawait session.execute(query).first()
- Use
await session.commit()
instead ofsession.commit()
- Replace
-
Fix connection closing issues:
# Create a fresh session for each request async def get_db(): async with async_session() as session: try: yield session finally: await session.close()
Symptoms:
- Unexpected query results
- Error:
AttributeError: 'Query' object has no attribute 'xxx'
Solutions:
-
Debug queries by logging SQL:
# Add to your settings or session creation engine = create_async_engine( DATABASE_URL, echo=True, # This will log all SQL )
-
Review SQLAlchemy 2.0-style execution:
# 2.0 style querying from sqlalchemy import select # Query query = select(User).where(User.email == email) result = await session.execute(query) user = result.scalar_one_or_none()
-
Check for ORM vs. Core confusion:
- Result objects differ between ORM queries and Core queries
- For ORM: Use
.scalars()
to get model instances - For Core: Use
.mappings()
to get dictionaries
PostgreSQL Issues
Symptoms:
- Error:
ERROR: relation "vector" does not exist
- Error:
ERROR: function vector_l2_squared_distance(vector, vector) does not exist
Solutions:
-
Install and enable the pgvector extension:
-- Connect as superuser CREATE EXTENSION IF NOT EXISTS vector; -- Verify it's installed SELECT * FROM pg_extension WHERE extname = 'vector';
-
Check if extension is installed in the database:
# List installed extensions psql -U postgres -d your_database -c "SELECT * FROM pg_extension;"
-
For Docker, use a container with pgvector:
docker pull ankane/pgvector docker run --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d ankane/pgvector
-
For hosted databases (Supabase), enable via dashboard:
- Dashboard > Database > Extensions > Enable vector
Symptoms:
- Error:
permission denied for schema public
- Error:
permission denied for relation your_table
Solutions:
-
Grant permissions to your database user:
-- Connect as superuser GRANT ALL PRIVILEGES ON DATABASE your_database TO your_user; GRANT ALL PRIVILEGES ON SCHEMA public TO your_user; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO your_user; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO your_user;
-
Check current user and permissions:
-- Who am I? SELECT current_user; -- What permissions do I have? SELECT * FROM information_schema.role_table_grants WHERE grantee = current_user;
-
For hosted databases with restricted permissions:
- Use the admin/owner account instead of a restricted role
- Contact your database provider for assistance
Symptoms:
- Poor search results
- Performance issues with text search
Solutions:
-
Create proper indexes:
-- Create GIN index for full-text search CREATE INDEX idx_your_table_search ON your_table USING GIN (to_tsvector('english', your_text_column));
-
Optimize your search queries:
# Efficient full-text search with SQLAlchemy from sqlalchemy import text query = text(""" SELECT * FROM your_table WHERE to_tsvector('english', your_text_column) @@ plainto_tsquery('english', :search_term) ORDER BY ts_rank(to_tsvector('english', your_text_column), plainto_tsquery('english', :search_term)) DESC """) result = await session.execute(query, {"search_term": search_term})
-
Consider using a vector database for semantic search:
- Use pgvector for embedding-based search
- Create appropriate indexes for vector columns
Performance Issues
Symptoms:
- Database operations taking too long
- API response times deteriorating
Solutions:
-
Identify slow queries:
-- Find slow queries SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10; -- If pg_stat_statements is not enabled, enable it in postgresql.conf
-
Use EXPLAIN ANALYZE to understand query plans:
EXPLAIN ANALYZE your_slow_query;
-
Add appropriate indexes:
-- For columns used in WHERE clauses CREATE INDEX idx_your_table_column ON your_table(your_column); -- For foreign keys CREATE INDEX idx_your_table_fk_column ON your_table(foreign_key_column);
-
Optimize your queries:
- Use
LIMIT
to restrict result size - Only select needed columns
- Avoid multiple joins when possible
- Consider pagination for large result sets
- Use
Symptoms:
- Error:
FATAL: too many connections
- Applications waiting for database connections
Solutions:
-
Configure connection pooling properly:
# Adjust pool settings in SQLAlchemy engine = create_async_engine( DATABASE_URL, pool_size=5, # Start with a reasonable value max_overflow=10, pool_pre_ping=True, # Check connections before using them )
-
Check current connections and limits:
-- Current connections SELECT count(*) FROM pg_stat_activity; -- Connection limit SHOW max_connections;
-
For production, consider external connection poolers:
- PgBouncer
- AWS RDS Proxy
- Supabase Connection Pooler
Symptoms:
- Out of memory errors
- Slow API responses when retrieving many records
Solutions:
-
Implement pagination:
# With SQLAlchemy query = select(YourModel).offset(offset).limit(limit) # In API endpoints @app.get("/items/") async def get_items(offset: int = 0, limit: int = 100): # Apply pagination
-
Use cursor-based pagination for large datasets:
# Using keyset pagination (more efficient than offset/limit) # Example: sorting by id query = select(YourModel).where(YourModel.id > last_id).order_by(YourModel.id).limit(limit)
-
Stream results for large exports:
# Using SQLAlchemy async streaming async def stream_results(): async with async_session() as session: stream = await session.stream(select(YourModel)) async for row in stream: yield row
Data Integrity Issues
Symptoms:
- Error:
invalid input syntax for type uuid
- Missing UUID values in records
Solutions:
-
Ensure UUIDs are used consistently:
# In your SQLAlchemy models from sqlalchemy.dialects.postgresql import UUID import uuid class YourModel(Base): id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
-
For migrations involving UUID columns:
# In alembic migrations from sqlalchemy.dialects import postgresql op.create_table( 'your_table', sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text("gen_random_uuid()"), primary_key=True), # other columns... )
-
Check database and python type consistency:
# Convert strings to UUID when needed from uuid import UUID def validate_uuid(value): if isinstance(value, str): return UUID(value) return value
Symptoms:
- Error:
violates foreign key constraint
- Error:
insert or update on table violates foreign key constraint
Solutions:
-
Check referential integrity:
-- Find the constraint that's failing SELECT conname, conrelid::regclass, confrelid::regclass FROM pg_constraint WHERE contype = 'f'; -- Check if referenced id exists SELECT * FROM parent_table WHERE id = 'referenced_id';
-
Fix application logic:
- Ensure parent records exist before creating child records
- Implement proper cascading deletes in models
# In SQLAlchemy model parent_id = Column(UUID(as_uuid=True), ForeignKey("parent.id", ondelete="CASCADE"))
-
For migration or data fixing:
-- Temporarily disable constraints (for data fixing only) SET session_replication_role = 'replica'; -- Fix data... SET session_replication_role = 'origin';
Symptoms:
- Error:
duplicate key value violates unique constraint
- Insert or update operations failing
Solutions:
-
Check existing data:
-- Find duplicate records SELECT column_name, COUNT(*) FROM your_table GROUP BY column_name HAVING COUNT(*) > 1;
-
Implement upsert logic:
# Using SQLAlchemy 2.0 style from sqlalchemy.dialects.postgresql import insert stmt = insert(YourModel).values( email="user@example.com", name="User" ) # ON CONFLICT DO UPDATE stmt = stmt.on_conflict_do_update( index_elements=['email'], set_=dict(name="User") ) await session.execute(stmt)
-
For batch operations, find and filter duplicates before insert:
# Get existing records existing = await session.execute( select(YourModel.email).where(YourModel.email.in_(emails_to_insert)) ) existing_emails = {r[0] for r in existing.fetchall()} # Filter out duplicates new_items = [item for item in items if item.email not in existing_emails]
Advanced Issues
Symptoms:
- Error:
Unexpected JSON type
- Problems querying or updating JSON fields
Solutions:
-
Use appropriate column type:
# In models from sqlalchemy.dialects.postgresql import JSONB settings = Column(JSONB, nullable=False, server_default='{}')
-
Query JSON data efficiently:
# Get records where settings contains specific key query = select(YourModel).where(YourModel.settings.contains({"key": "value"})) # Get records based on JSON path query = select(YourModel).where(YourModel.settings["nested"]["key"].astext == "value")
-
Update JSON fields:
# Update specific key in JSONB from sqlalchemy.dialects.postgresql import insert from sqlalchemy import func await session.execute( update(YourModel) .where(YourModel.id == model_id) .values(settings=func.jsonb_set(YourModel.settings, '{key}', '"new_value"', True)) )
Symptoms:
- Deadlocks
- Error:
current transaction is aborted, commands ignored until end of transaction block
Solutions:
-
Use proper transaction patterns:
# Context manager approach async with session.begin(): # All operations here are in a transaction # Auto-commits at the end or rolls back on exception # Explicit approach try: await session.begin() # operations... await session.commit() except: await session.rollback() raise
-
Handle nested transactions:
# Using savepoints for nested transactions async with session.begin_nested() as nested: # Create a savepoint try: # operations that might fail await nested.commit() except: # This rolls back to the savepoint, not the entire transaction pass
-
For deadlocks, implement retry logic:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from sqlalchemy.exc import OperationalError @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type(OperationalError) ) async def execute_with_retry(): async with session.begin(): # Your database operations
Symptoms:
- Slow vector similarity searches
- High CPU usage during vector operations
Solutions:
-
Create appropriate indexes:
-- For cosine distance CREATE INDEX ON your_table USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); -- For L2 distance CREATE INDEX ON your_table USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);
-
Tune search parameters:
# Limit number of probes (trades accuracy for speed) # In SQLAlchemy from sqlalchemy import text # Set session variable for next queries await session.execute(text("SET ivfflat.probes = 10;")) # Then perform vector search
-
Optimize embedding dimension and storage:
- Use the smallest embedding size that maintains accuracy
- Consider dimensionality reduction techniques
- For very large collections, implement clustering or partitioning
Supabase Connection Issues
Symptoms:
- Error:
could not connect to server: Connection timed out (0x0000274C/10060) Is the server running on host "db.abcdefghijkl.supabase.co" and accepting TCP/IP connections on port 5432?
- Connection works from one network but not another
- Intermittent connection issues
Solutions:
-
Force IPv4 connections:
# Add host_name=ipv4: prefix to force IPv4 # Original: postgresql+asyncpg://username:password@db.abcdefghijkl.supabase.co:5432/postgres # Modified: "postgresql+asyncpg://username:password@ipv4:db.abcdefghijkl.supabase.co:5432/postgres"
-
Disable IPv6 on your local machine (temporary test):
# For Linux sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 # For Windows (run in PowerShell as Administrator) Disable-NetAdapterBinding -Name "*" -ComponentID ms_tcpip6 # For macOS networksetup -setv6off Wi-Fi # or your network interface
-
Use Supabase connection pooler instead of direct connection:
- Go to Supabase Dashboard > Project Settings > Database > Connection Pooling
- Use the provided connection string which may bypass IP version issues
postgresql://[user]:[password]@[host].pooler.supabase.com:6543/postgres
-
Test connection using domain instead of IP address:
psql -h db.abcdefghijkl.supabase.co -p 5432 -U postgres -d postgres
Symptoms:
- Connections time out after exactly 30-60 seconds
- Error:
OperationalError: could not connect to server: Connection refused
- Error:
ssl_error_syscall
with Supabase connection
Solutions:
-
Check if your network allows outbound connections to port 5432:
# Test connectivity with netcat nc -zv db.abcdefghijkl.supabase.co 5432 # Alternative with telnet telnet db.abcdefghijkl.supabase.co 5432
-
Check if your IP is allowlisted in Supabase:
- Go to Supabase Dashboard > Project Settings > Database > Network Restrictions
- Add your current IP address to the allowed list
- If using dynamic IPs, you may need to keep this updated
-
Use Supabase’s HTTPS connection pooler for restrictive networks:
# Connection pooler over HTTPS (port 443) # This may bypass firewall restrictions on standard PostgreSQL ports postgresql://[user]:[password]@[host].pooler.supabase.co:6543/postgres?pgbouncer=true
-
VPN or proxy solutions:
- If your network has strict firewall rules, consider using a VPN
- Configure a proxy that allows PostgreSQL connections
Symptoms:
- Error:
remaining connection slots are reserved for non-replication superuser connections
- Connections work initially but fail under load
- Error:
sorry, too many clients already
when multiple services connect
Solutions:
-
Configure proper connection pooling in application:
# Properly configured connection pooling for Supabase engine = create_async_engine( DATABASE_URL, pool_size=3, # Keep pool size small for Supabase max_overflow=2, pool_timeout=30, pool_recycle=1800, # Recycle connections after 30 minutes pool_pre_ping=True, # Verify connections are still alive )
-
Use Supabase’s connection pooler:
# Add pgbouncer=true parameter postgresql://[user]:[password]@[host].pooler.supabase.co:6543/postgres?pgbouncer=true
-
Implement aggressive connection clean-up:
# Ensure connections are properly closed async def get_db(): async with async_session() as session: try: yield session finally: await session.close() # For non-FastAPI applications, always use try/finally try: async with async_session() as session: # your code finally: await engine.dispose() # Close all connections when done
-
Monitor and adjust connection limits in dashboard:
- Go to Supabase Dashboard > Project Settings > Database > Connection Pooling
- Adjust pool mode and connection limits based on your tier
- For Free and Pro tiers, be especially careful with connection counts
Symptoms:
- High latency on database operations
- Intermittent timeouts despite successful connections
- Connection issues during specific times of day
Solutions:
-
Select appropriate Supabase region:
- When creating a new project, choose the region closest to your users/servers
- For existing projects, consider migrating to a closer region if latency is critical
-
Configure longer timeouts for high-latency connections:
# Increase connection timeout and command timeout engine = create_async_engine( DATABASE_URL, connect_args={ "command_timeout": 30.0, # Seconds for each query "timeout": 60.0 # Seconds for connection establishment } )
-
Implement connection caching for read-heavy workloads:
# Use Redis or in-memory caching for frequently accessed data from functools import lru_cache @lru_cache(maxsize=100) async def get_cached_data(id): async with async_session() as session: result = await session.execute(select(Data).where(Data.id == id)) return result.scalar_one_or_none()
-
Consider Supabase Edge Functions for latency-sensitive operations:
- Deploy Edge Functions closer to the database
- Reduce round-trip time for critical operations
Symptoms:
- Error:
SSL SYSCALL error: EOF detected
- Error:
ssl_error_want_read
orssl_error_want_write
- Error:
SSL error: certificate verify failed
Solutions:
-
Configure SSL properly:
# Properly configure SSL for Supabase connection engine = create_async_engine( DATABASE_URL, connect_args={ "ssl": True, "sslmode": "require" # Or verify-full for stricter validation } )
-
For development/testing, disable strict certificate verification (not recommended for production):
# Less strict SSL for development only import ssl engine = create_async_engine( DATABASE_URL, connect_args={ "ssl": ssl.create_default_context(), "ssl_context": ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=None) } )
-
Update CA certificates:
# For Linux sudo update-ca-certificates # For macOS brew install openssl@3 # For Windows # Check for Windows updates or update browser
-
Use Supabase connection string with SSL parameters:
postgresql://[user]:[password]@[host].supabase.co:5432/postgres?sslmode=require
Symptoms:
- Hitting connection limits unexpectedly
- Database operations becoming slower at certain times
- Error:
sorry, too many clients already
despite proper connection pooling
Solutions:
-
Be aware of tier limitations:
- Free tier: Limited connections, compute, and may pause after inactivity
- Pro tier: Higher limits but still restricted compared to enterprise
- Check current tier limits at Supabase dashboard
-
Implement application-level connection management:
# Add retry logic with exponential backoff from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=60)) async def execute_db_operation(): async with async_session() as session: # Your database operations
-
Add monitoring to track connection usage:
# Log connection information import logging logging.info(f"Engine stats: {engine.pool.status()}") # Create dashboards to monitor connection count trends
-
Consider upgrading tier or optimizing application:
- For production workloads, higher tiers provide better guarantees
- Implement aggressive connection pooling
- Add read replicas for read-heavy workloads if available in your tier
Next Steps
If you’ve resolved your database issues, consider reviewing these related guides:
If you’re still experiencing database problems, check PostgreSQL and SQLAlchemy official documentation or contact the development team for assistance.
Was this page helpful?