diff --git a/_example.py b/_example.py index 3428af4..d118fc2 100644 --- a/_example.py +++ b/_example.py @@ -1,41 +1,35 @@ -from django.http import JsonResponse -from django.views.decorators.http import require_http_methods -from django.views.decorators.csrf import csrf_exempt -from .client import supabase import logging -logger = logging.getLogger(" apps.supabase_home") +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse + +from .client import supabase + +app = FastAPI() +logger = logging.getLogger("apps.supabase_home") -@csrf_exempt -@require_http_methods(["GET"]) -def example_supabase_view(request): + +@app.get("/api/supabase/example/") +async def example_supabase_view(): """ - Example view demonstrating how to use the Supabase client in Django. + Example endpoint demonstrating how to use the Supabase client in FastAPI. - This view shows how to: + This endpoint shows how to: 1. Use the Supabase database service to fetch data 2. Handle errors properly 3. Return JSON responses - URL: /api/supabase/example/ - Method: GET - Returns: - JsonResponse: JSON response with data or error message + JSONResponse: JSON response with data or error message """ try: - # Get the database service from the Supabase client db_service = supabase.get_database_service() - - # Example: Fetch data from a table (replace 'your_table' with an actual table name) - # For demonstration purposes only table_name = "example_table" # Replace with your actual table name try: - # Try to fetch data from the table data = db_service.fetch_data(table=table_name, limit=10) - return JsonResponse( + return JSONResponse( { "success": True, "data": data, @@ -43,11 +37,8 @@ def example_supabase_view(request): } ) except Exception as e: - # If the table doesn't exist or there's another error, log it logger.warning(f"Error fetching data: {str(e)}") - - # Return a fallback response showing the Supabase connection is working - return JsonResponse( + return JSONResponse( { "success": True, "data": None, @@ -57,57 +48,34 @@ def example_supabase_view(request): ) except Exception as e: - # Log any unexpected errors logger.exception(f"Unexpected error in example_supabase_view: {str(e)}") - - # Return an error response - return JsonResponse( - { - "success": False, - "error": str(e), - "message": "An error occurred while connecting to Supabase.", - }, - status=500, + raise HTTPException( + status_code=500, detail="An error occurred while connecting to Supabase." ) -@csrf_exempt -@require_http_methods(["GET"]) -def supabase_health_check(request): +@app.get("/api/supabase/health/") +async def supabase_health_check(): """ Health check endpoint for Supabase connection. - This view attempts to initialize the Supabase client and reports + This endpoint attempts to initialize the Supabase client and reports whether the connection is working properly. - URL: /api/supabase/health/ - Method: GET - Returns: - JsonResponse: JSON response with connection status + JSONResponse: JSON response with connection status """ try: - # Get the raw Supabase client to check if it's initialized raw_client = supabase.get_raw_client() - - # If we get here, the client was initialized successfully - return JsonResponse( + return JSONResponse( { "status": "ok", "message": "Supabase client is configured correctly", - "supabase_url": raw_client.supabase_url, # Safe to show the URL + "supabase_url": raw_client.supabase_url, } ) except Exception as e: - # Log the error logger.exception(f"Supabase health check failed: {str(e)}") - - # Return an error response - return JsonResponse( - { - "status": "error", - "message": "Supabase client configuration error", - "error": str(e), - }, - status=500, + raise HTTPException( + status_code=500, detail="Supabase client configuration error" ) diff --git a/_service.py b/_service.py index f0198a4..a910fba 100644 --- a/_service.py +++ b/_service.py @@ -1,34 +1,31 @@ import json -from typing import Any, Dict, Optional +import logging +import os +from typing import Any import requests -from django.conf import settings -import logging +from fastapi import HTTPException + +from ..supabase_home.client import get_supabase_client -logger = logging.getLogger(" apps.supabase_home") +logger = logging.getLogger("apps.supabase_home") class SupabaseError(Exception): """Base exception for Supabase-related errors""" - pass - class SupabaseAuthError(SupabaseError): """Exception raised for authentication errors""" - pass - class SupabaseAPIError(SupabaseError): """Exception raised for API errors""" - - def __init__(self, message: str, status_code: int = None, details: Dict = None): + def __init__(self, message: str, status_code: int = None, details: dict = None): self.status_code = status_code self.details = details or {} super().__init__(message) - class SupabaseService: """ Service class for interacting with Supabase API. @@ -44,119 +41,85 @@ class SupabaseService: """ def __init__(self): - # Get configuration from settings - self.base_url = settings.SUPABASE_URL - self.anon_key = settings.SUPABASE_ANON_KEY - self.service_role_key = settings.SUPABASE_SERVICE_ROLE_KEY + self.base_url = os.getenv("SUPABASE_URL") + self.anon_key = os.getenv("SUPABASE_ANON_KEY") + self.service_role_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") + self.raw = get_supabase_client( + url=self.base_url, key=self.anon_key, service_key=self.service_role_key + ) + + self._configure_service() - # Validate required settings if not self.base_url: - logger.error("SUPABASE_URL is not set in settings") - raise ValueError("SUPABASE_URL is not set in settings") + logger.error("SUPABASE_URL is not set in environment variables") + raise ValueError("SUPABASE_URL is not set in environment variables") if not self.anon_key: - logger.error("SUPABASE_ANON_KEY is not set in settings") - raise ValueError("SUPABASE_ANON_KEY is not set in settings") + logger.error("SUPABASE_ANON_KEY is not set in environment variables") + raise ValueError("SUPABASE_ANON_KEY is not set in environment variables") if not self.service_role_key: logger.warning( - "SUPABASE_SERVICE_ROLE_KEY is not set in settings. Admin operations will not work." + "SUPABASE_SERVICE_ROLE_KEY is not set in environment variables. Admin operations will not work." ) def _get_headers( - self, auth_token: Optional[str] = None, is_admin: bool = False - ) -> Dict[str, str]: - """ - Get the headers for a Supabase API request. - - Args: - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - - Returns: - Dict of headers - """ + self, auth_token: str | None = None, is_admin: bool = False + ) -> dict[str, str]: headers = { "Content-Type": "application/json", "apikey": self.service_role_key if is_admin else self.anon_key, } - # For storage operations, we need to set the Authorization header correctly - # If is_admin is True, we should use the service role key regardless of auth_token if is_admin: - # Use service role key as bearer token for admin operations if not self.service_role_key: raise SupabaseAuthError( "Service role key is required for admin operations" ) headers["Authorization"] = f"Bearer {self.service_role_key}" elif auth_token: - # Use the provided auth token if not in admin mode headers["Authorization"] = f"Bearer {auth_token}" return headers - def _make_request( + async def _make_request( self, method: str, endpoint: str, - auth_token: Optional[str] = None, + auth_token: str | None = None, is_admin: bool = False, - data: Optional[Dict[str, Any]] = None, - params: Optional[Dict[str, Any]] = None, - headers: Optional[Dict[str, str]] = None, + data: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, timeout: int = 30, - ) -> Dict[str, Any]: - """ - Make a request to the Supabase API. - - Args: - method: HTTP method (GET, POST, PUT, DELETE, etc.) - endpoint: API endpoint path - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - data: Optional request body data - params: Optional query parameters - headers: Optional additional headers - timeout: Request timeout in seconds - - Returns: - Response data as dictionary - - Raises: - SupabaseAuthError: If there's an authentication error - SupabaseAPIError: If the API request fails - SupabaseError: For other Supabase-related errors - Exception: For unexpected errors - """ + ) -> dict[str, Any]: url = f"{self.base_url}{endpoint}" - # Get default headers and merge with any additional headers request_headers = self._get_headers(auth_token, is_admin) if headers: request_headers.update(headers) - # Enhanced logging for debugging logger.info(f"Making {method} request to {url}") logger.info(f"Headers: {request_headers}") - if 'Authorization' in request_headers: - auth_header = request_headers['Authorization'] + if "Authorization" in request_headers: + auth_header = request_headers["Authorization"] logger.info(f"Authorization header: {auth_header[:15]}...") else: logger.info("No Authorization header found") - + logger.info(f"Request data: {data}") logger.info(f"Request params: {params}") - # Ensure data is not None for any requests with application/json content type - # Supabase API expects a valid JSON body (even if empty) when Content-Type is application/json - if data is None and 'Content-Type' in request_headers and request_headers['Content-Type'] == 'application/json': + if ( + data is None + and "Content-Type" in request_headers + and request_headers["Content-Type"] == "application/json" + ): data = {} logger.info("Initialized empty JSON data") try: - logger.debug(f"Making {method} request to {url}") - response = requests.request( + response = await requests.request( method=method, url=url, headers=request_headers, @@ -165,11 +128,9 @@ def _make_request( timeout=timeout, ) - # Log request details at debug level logger.info(f"Request to {url}: {method} - Status: {response.status_code}") logger.info(f"Response headers: {response.headers}") - - # Log response content for debugging + try: if response.content: logger.info(f"Response content: {response.content[:200]}...") @@ -178,16 +139,13 @@ def _make_request( except Exception as e: logger.error(f"Error logging response content: {str(e)}") - # Handle different error scenarios if response.status_code == 401 or response.status_code == 403: error_detail = self._parse_error_response(response) logger.error(f"Authentication error: {error_detail}") - raise SupabaseAuthError(f"Authentication error: {error_detail}") + raise HTTPException(status_code=response.status_code, detail=str(error_detail)) - # Raise exception for other error status codes response.raise_for_status() - # Return JSON response if available, otherwise return empty dict if response.content: return response.json() return {} @@ -195,46 +153,25 @@ def _make_request( except requests.exceptions.HTTPError as e: error_detail = self._parse_error_response(response) logger.error(f"Supabase API error: {str(e)} - Details: {error_detail}") - raise SupabaseAPIError( - message=f"Supabase API error: {str(e)}", - status_code=response.status_code, - details=error_detail, - ) + raise HTTPException(status_code=response.status_code, detail=str(error_detail)) except requests.exceptions.ConnectionError as e: logger.error("Supabase connection error: " + str(e)) - raise SupabaseError( - "Connection error: Unable to connect to Supabase API. Check your network connection and Supabase URL." - ) + raise HTTPException(status_code=503, detail="Unable to connect to Supabase API") except requests.exceptions.Timeout as e: logger.error(f"Supabase request timeout: {str(e)}") - raise SupabaseError( - f"Request timeout: The request to Supabase API timed out after {timeout} seconds." - ) + raise HTTPException(status_code=504, detail="Request to Supabase API timed out") except requests.exceptions.RequestException as e: logger.error(f"Supabase request exception: {str(e)}") - raise SupabaseError(f"Request error: {str(e)}") - - except SupabaseAuthError as e: - # Re-raise SupabaseAuthError without wrapping it in a generic Exception - logger.error(f"Authentication error being re-raised: {str(e)}") - raise + raise HTTPException(status_code=500, detail="Request error") except Exception as e: logger.exception(f"Unexpected error during Supabase request: {str(e)}") - raise Exception(f"Unexpected error during Supabase request: {str(e)}") - - def _parse_error_response(self, response: requests.Response) -> Dict: - """Parse error response from Supabase API - - Args: - response: Response object from requests + raise HTTPException(status_code=500, detail="Unexpected error during Supabase request") - Returns: - Dictionary containing error details - """ + def _parse_error_response(self, response: requests.Response) -> dict: try: return response.json() except json.JSONDecodeError: diff --git a/client.py b/client.py index 3c71409..1c59b5b 100644 --- a/client.py +++ b/client.py @@ -1,14 +1,15 @@ from .auth import SupabaseAuthService -from .database import SupabaseDatabaseService -from .storage import SupabaseStorageService -from .edge_functions import SupabaseEdgeFunctionsService -from .realtime import SupabaseRealtimeService +from .functions.database import SupabaseDatabaseService +from .functions.edge_functions import SupabaseEdgeFunctionsService +from .functions.realtime import SupabaseRealtimeService +from .functions.storage import SupabaseStorageService from .init import get_supabase_client + class SupabaseClient: """ Client for interacting with all Supabase services. - + This class provides a unified interface to access all Supabase services: - Auth - Database @@ -16,74 +17,75 @@ class SupabaseClient: - Edge Functions - Realtime """ - + def __init__(self): # Initialize the raw supabase client self._raw_client = get_supabase_client() - + # Initialize service classes self.auth = SupabaseAuthService() self.database = SupabaseDatabaseService() self.storage = SupabaseStorageService() self.edge_functions = SupabaseEdgeFunctionsService() self.realtime = SupabaseRealtimeService() - + def get_auth_service(self) -> SupabaseAuthService: """ Get the Auth service. - + Returns: SupabaseAuthService instance """ return self.auth - + def get_database_service(self) -> SupabaseDatabaseService: """ Get the Database service. - + Returns: SupabaseDatabaseService instance """ return self.database - + def get_storage_service(self) -> SupabaseStorageService: """ Get the Storage service. - + Returns: SupabaseStorageService instance """ return self.storage - + def get_edge_functions_service(self) -> SupabaseEdgeFunctionsService: """ Get the Edge Functions service. - + Returns: SupabaseEdgeFunctionsService instance """ return self.edge_functions - + def get_realtime_service(self) -> SupabaseRealtimeService: """ Get the Realtime service. - + Returns: SupabaseRealtimeService instance """ return self.realtime - + def get_raw_client(self): """ Get the raw Supabase client from supabase-py. - + This provides direct access to the underlying client if needed for advanced operations not covered by the service classes. - + Returns: supabase.Client instance """ return self._raw_client + # Create a singleton instance supabase = SupabaseClient() diff --git a/auth.py b/functions/auth.py similarity index 87% rename from auth.py rename to functions/auth.py index de2ac33..57615cc 100644 --- a/auth.py +++ b/functions/auth.py @@ -1,6 +1,6 @@ -from typing import Any, Dict, List, Optional +from typing import Any, dict, list -from ._service import SupabaseService +from .._service import SupabaseService class SupabaseAuthService(SupabaseService): @@ -10,10 +10,14 @@ class SupabaseAuthService(SupabaseService): This class provides methods for user management, authentication, and session handling using Supabase Auth. """ + def _configure_service(self): + """Initialize auth-specific client""" + self.auth = self.raw.auth # Gets the GoTrue client + self.admin_auth = self.raw.auth.admin # For admin operations def create_user( - self, email: str, password: str, user_metadata: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: + self, email: str, password: str, user_metadata: dict[str, Any] | None = None + ) -> dict[str, Any]: """ Create a new user with email and password. @@ -37,7 +41,7 @@ def create_user( method="POST", endpoint="/auth/v1/admin/users", is_admin=True, data=data ) - def create_anonymous_user(self) -> Dict[str, Any]: + def create_anonymous_user(self) -> dict[str, Any]: """ Create an anonymous user. @@ -48,7 +52,7 @@ def create_anonymous_user(self) -> Dict[str, Any]: def sign_in_with_email( self, email: str, password: str, is_admin: bool = False - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Sign in a user with email and password. @@ -67,7 +71,7 @@ def sign_in_with_email( is_admin=is_admin, ) - def sign_in_with_id_token(self, provider: str, id_token: str) -> Dict[str, Any]: + def sign_in_with_id_token(self, provider: str, id_token: str) -> dict[str, Any]: """ Sign in a user with an ID token from a third-party provider. @@ -84,7 +88,7 @@ def sign_in_with_id_token(self, provider: str, id_token: str) -> Dict[str, Any]: data={"provider": provider, "id_token": id_token}, ) - def sign_in_with_otp(self, email: str) -> Dict[str, Any]: + def sign_in_with_otp(self, email: str) -> dict[str, Any]: """ Send a one-time password to the user's email. @@ -98,7 +102,7 @@ def sign_in_with_otp(self, email: str) -> Dict[str, Any]: method="POST", endpoint="/auth/v1/otp", data={"email": email} ) - def verify_otp(self, email: str, token: str, type: str = "email") -> Dict[str, Any]: + def verify_otp(self, email: str, token: str, type: str = "email") -> dict[str, Any]: """ Verify a one-time password and log in the user. @@ -116,7 +120,7 @@ def verify_otp(self, email: str, token: str, type: str = "email") -> Dict[str, A data={"email": email, "token": token, "type": type}, ) - def sign_in_with_oauth(self, provider: str, redirect_url: str) -> Dict[str, Any]: + def sign_in_with_oauth(self, provider: str, redirect_url: str) -> dict[str, Any]: """ Get the URL to redirect the user for OAuth sign-in. @@ -133,7 +137,7 @@ def sign_in_with_oauth(self, provider: str, redirect_url: str) -> Dict[str, Any] data={"redirect_to": redirect_url}, ) - def sign_in_with_sso(self, domain: str, redirect_url: str) -> Dict[str, Any]: + def sign_in_with_sso(self, domain: str, redirect_url: str) -> dict[str, Any]: """ Sign in a user through SSO with a domain. @@ -150,7 +154,7 @@ def sign_in_with_sso(self, domain: str, redirect_url: str) -> Dict[str, Any]: data={"domain": domain, "redirect_to": redirect_url}, ) - def sign_out(self, auth_token: str) -> Dict[str, Any]: + def sign_out(self, auth_token: str) -> dict[str, Any]: """ Sign out a user. @@ -165,8 +169,8 @@ def sign_out(self, auth_token: str) -> Dict[str, Any]: ) def reset_password( - self, email: str, redirect_url: Optional[str] = None, is_admin: bool = False - ) -> Dict[str, Any]: + self, email: str, redirect_url: str | None = None, is_admin: bool = False + ) -> dict[str, Any]: """ Send a password reset email to the user. @@ -184,7 +188,7 @@ def reset_password( return self._make_request(method="POST", endpoint="/auth/v1/recover", data=data, is_admin=is_admin) - def get_session(self, auth_token: str) -> Dict[str, Any]: + def get_session(self, auth_token: str) -> dict[str, Any]: """ Retrieve the user's session. @@ -198,7 +202,7 @@ def get_session(self, auth_token: str) -> Dict[str, Any]: method="GET", endpoint="/auth/v1/user", auth_token=auth_token ) - def refresh_session(self, refresh_token: str) -> Dict[str, Any]: + def refresh_session(self, refresh_token: str) -> dict[str, Any]: """ Refresh the user's session with a refresh token. @@ -214,7 +218,7 @@ def refresh_session(self, refresh_token: str) -> Dict[str, Any]: data={"refresh_token": refresh_token}, ) - def get_user(self, user_id: str) -> Dict[str, Any]: + def get_user(self, user_id: str) -> dict[str, Any]: """ Retrieve a user by ID (admin only). @@ -228,7 +232,7 @@ def get_user(self, user_id: str) -> Dict[str, Any]: method="GET", endpoint=f"/auth/v1/admin/users/{user_id}", is_admin=True ) - def update_user(self, user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any]: + def update_user(self, user_id: str, user_data: dict[str, Any]) -> dict[str, Any]: """ Update a user's data (admin only). @@ -246,7 +250,7 @@ def update_user(self, user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any] data=user_data, ) - def get_user_identities(self, user_id: str) -> List[Dict[str, Any]]: + def get_user_identities(self, user_id: str) -> list[dict[str, Any]]: """ Retrieve identities linked to a user (admin only). @@ -254,14 +258,14 @@ def get_user_identities(self, user_id: str) -> List[Dict[str, Any]]: user_id: User's ID Returns: - List of identities + list of identities """ user = self.get_user(user_id) return user.get("identities", []) def link_identity( self, auth_token: str, provider: str, redirect_url: str - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Link an identity to a user. @@ -280,7 +284,7 @@ def link_identity( data={"redirect_to": redirect_url}, ) - def unlink_identity(self, auth_token: str, identity_id: str) -> Dict[str, Any]: + def unlink_identity(self, auth_token: str, identity_id: str) -> dict[str, Any]: """ Unlink an identity from a user. @@ -297,7 +301,7 @@ def unlink_identity(self, auth_token: str, identity_id: str) -> Dict[str, Any]: auth_token=auth_token, ) - def set_session_data(self, auth_token: str, data: Dict[str, Any]) -> Dict[str, Any]: + def set_session_data(self, auth_token: str, data: dict[str, Any]) -> dict[str, Any]: """ Set the session data. @@ -315,7 +319,7 @@ def set_session_data(self, auth_token: str, data: Dict[str, Any]) -> Dict[str, A data={"data": data}, ) - def get_user_by_token(self, token: str) -> Dict[str, Any]: + def get_user_by_token(self, token: str) -> dict[str, Any]: """ Get user information from a JWT token. @@ -335,7 +339,7 @@ def get_user_by_token(self, token: str) -> Dict[str, Any]: # MFA methods def enroll_mfa_factor( self, auth_token: str, factor_type: str = "totp" - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Enroll a multi-factor authentication factor. @@ -353,7 +357,7 @@ def enroll_mfa_factor( data={"factor_type": factor_type}, ) - def create_mfa_challenge(self, auth_token: str, factor_id: str) -> Dict[str, Any]: + def create_mfa_challenge(self, auth_token: str, factor_id: str) -> dict[str, Any]: """ Create a multi-factor authentication challenge. @@ -373,7 +377,7 @@ def create_mfa_challenge(self, auth_token: str, factor_id: str) -> Dict[str, Any def verify_mfa_challenge( self, auth_token: str, factor_id: str, challenge_id: str, code: str - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Verify a multi-factor authentication challenge. @@ -393,7 +397,7 @@ def verify_mfa_challenge( data={"factor_id": factor_id, "challenge_id": challenge_id, "code": code}, ) - def unenroll_mfa_factor(self, auth_token: str, factor_id: str) -> Dict[str, Any]: + def unenroll_mfa_factor(self, auth_token: str, factor_id: str) -> dict[str, Any]: """ Unenroll a multi-factor authentication factor. @@ -410,16 +414,16 @@ def unenroll_mfa_factor(self, auth_token: str, factor_id: str) -> Dict[str, Any] auth_token=auth_token, ) - def list_users(self, page: int = 1, per_page: int = 50) -> Dict[str, Any]: + def list_users(self, page: int = 1, per_page: int = 50) -> dict[str, Any]: """ - List all users (admin only). + list all users (admin only). Args: page: Page number for pagination per_page: Number of users per page Returns: - List of users + list of users """ return self._make_request( method="GET", @@ -431,9 +435,9 @@ def admin_create_user( self, email: str, password: str, - user_metadata: Optional[Dict[str, Any]] = None, + user_metadata: dict[str, Any] | None = None, email_confirm: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Create a new user with admin privileges (bypassing email verification if needed). diff --git a/database.py b/functions/database.py similarity index 70% rename from database.py rename to functions/database.py index bfe9d47..91d152d 100644 --- a/database.py +++ b/functions/database.py @@ -1,26 +1,34 @@ -from typing import Any, Dict, List, Optional, Union +from typing import Any + +from ...supabase_home.client import get_supabase_client +from .._service import SupabaseService -from ._service import SupabaseService class SupabaseDatabaseService(SupabaseService): """ Service for interacting with Supabase Database (PostgreSQL) API. - + This class provides methods for database operations using Supabase's RESTful API for PostgreSQL. """ + def _configure_service(self): + """Initialize auth-specific client""" + self.auth = self.raw.table # Gets the GoTrue client + self.rpc = self.raw.rpc() # For stored procedures - def fetch_data(self, - table: str, - auth_token: Optional[str] = None, - select: str = "*", - filters: Optional[Dict[str, Any]] = None, - order: Optional[str] = None, - limit: Optional[int] = None, - offset: Optional[int] = None) -> List[Dict[str, Any]]: + def fetch_data( + self, + table: str, + auth_token: str | None = None, + select: str = "*", + filters: dict[str, Any] | None = None, + order: str | None = None, + limit: int | None = None, + offset: int | None = None, + ) -> list[dict[str, Any]]: """ Fetch data from a table with optional filtering, ordering, and pagination. - + Args: table: Table name auth_token: Optional JWT token for authenticated requests @@ -29,186 +37,189 @@ def fetch_data(self, order: Optional order by clause limit: Optional limit of rows to return offset: Optional offset for pagination - + Returns: - List of rows as dictionaries + list of rows as dictionaries """ endpoint = f"/rest/v1/{table}" params = {"select": select} - + # Add filters if provided if filters: for key, value in filters.items(): # Format filter with eq operator for Supabase REST API params[f"{key}"] = f"eq.{value}" - + # Add ordering if provided if order: params["order"] = order - + # Add pagination if provided if limit is not None: params["limit"] = limit if offset is not None: params["offset"] = offset - + return self._make_request( method="GET", endpoint=endpoint, auth_token=auth_token, params=params, - headers={"Prefer": "return=representation"} + headers={"Prefer": "return=representation"}, ) - - def insert_data(self, - table: str, - data: Union[Dict[str, Any], List[Dict[str, Any]]], - auth_token: Optional[str] = None, - upsert: bool = False) -> List[Dict[str, Any]]: + + def insert_data( + self, + table: str, + data: dict[str, Any] | list[dict[str, Any]], + auth_token: str | None = None, + upsert: bool = False, + ) -> list[dict[str, Any]]: """ Insert data into a table. - + Args: table: Table name data: Data to insert (single record or list of records) auth_token: Optional JWT token for authenticated requests upsert: Whether to upsert (update on conflict) - + Returns: Inserted data """ endpoint = f"/rest/v1/{table}" headers = {"Prefer": "return=representation"} - + if upsert: headers["Prefer"] = "resolution=merge-duplicates,return=representation" - + return self._make_request( method="POST", endpoint=endpoint, auth_token=auth_token, data=data, - headers=headers + headers=headers, ) - - def update_data(self, - table: str, - data: Dict[str, Any], - filters: Dict[str, Any], - auth_token: Optional[str] = None) -> List[Dict[str, Any]]: + + def update_data( + self, + table: str, + data: dict[str, Any], + filters: dict[str, Any], + auth_token: str | None = None, + ) -> list[dict[str, Any]]: """ Update data in a table. - + Args: table: Table name data: Data to update filters: Filters to identify rows to update auth_token: Optional JWT token for authenticated requests - + Returns: Updated data """ endpoint = f"/rest/v1/{table}" params = {} - + # Format filters with eq operator for Supabase REST API if filters: for key, value in filters.items(): params[f"{key}"] = f"eq.{value}" - + return self._make_request( method="PATCH", endpoint=endpoint, auth_token=auth_token, data=data, params=params, - headers={"Prefer": "return=representation"} + headers={"Prefer": "return=representation"}, ) - - def upsert_data(self, - table: str, - data: Union[Dict[str, Any], List[Dict[str, Any]]], - auth_token: Optional[str] = None) -> List[Dict[str, Any]]: + + def upsert_data( + self, + table: str, + data: dict[str, Any] | list[dict[str, Any]], + auth_token: str | None = None, + ) -> list[dict[str, Any]]: """ Upsert data in a table (insert or update). - + Args: table: Table name data: Data to upsert auth_token: Optional JWT token for authenticated requests - + Returns: Upserted data """ return self.insert_data(table, data, auth_token, upsert=True) - - def delete_data(self, - table: str, - filters: Dict[str, Any], - auth_token: Optional[str] = None) -> List[Dict[str, Any]]: + + def delete_data( + self, table: str, filters: dict[str, Any], auth_token: str | None = None + ) -> list[dict[str, Any]]: """ Delete data from a table. - + Args: table: Table name filters: Filters to identify rows to delete auth_token: Optional JWT token for authenticated requests - + Returns: Deleted data """ endpoint = f"/rest/v1/{table}" params = {} - + # Format filters with eq operator for Supabase REST API if filters: for key, value in filters.items(): params[f"{key}"] = f"eq.{value}" - + return self._make_request( method="DELETE", endpoint=endpoint, auth_token=auth_token, params=params, - headers={"Prefer": "return=representation"} + headers={"Prefer": "return=representation"}, ) - - def call_function(self, - function_name: str, - params: Optional[Dict[str, Any]] = None, - auth_token: Optional[str] = None) -> Any: + + def call_function( + self, + function_name: str, + params: dict[str, Any] | None = None, + auth_token: str | None = None, + ) -> Any: """ Call a PostgreSQL function. - + Args: function_name: Function name params: Function parameters auth_token: Optional JWT token for authenticated requests - + Returns: Function result """ endpoint = f"/rest/v1/rpc/{function_name}" - + return self._make_request( - method="POST", - endpoint=endpoint, - auth_token=auth_token, - data=params or {} + method="POST", endpoint=endpoint, auth_token=auth_token, data=params or {} ) - def create_test_table(self, - table: str, - auth_token: Optional[str] = None, - is_admin: bool = True) -> Dict[str, Any]: + def create_test_table( + self, table: str, auth_token: str | None = None, is_admin: bool = True + ) -> dict[str, Any]: """ Create a simple test table for integration tests. - + Args: table: Table name to create auth_token: Optional JWT token for authenticated requests is_admin: Whether to use service role key (admin access) - + Returns: Response from the API """ @@ -221,10 +232,8 @@ def create_test_table(self, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), user_id TEXT ); - -- Set up RLS policies ALTER TABLE {table} ENABLE ROW LEVEL SECURITY; - -- Create policy to allow all operations for authenticated users DROP POLICY IF EXISTS "Allow all operations for authenticated users" ON {table}; CREATE POLICY "Allow all operations for authenticated users" @@ -234,39 +243,38 @@ def create_test_table(self, USING (true) WITH CHECK (true); """ - + # Execute the SQL using the rpc endpoint return self._make_request( method="POST", endpoint="/rest/v1/rpc/exec_sql", auth_token=auth_token, is_admin=is_admin, # Must use admin privileges to create tables - data={"query": sql} + data={"query": sql}, ) - def delete_table(self, - table: str, - auth_token: Optional[str] = None, - is_admin: bool = True) -> Dict[str, Any]: + def delete_table( + self, table: str, auth_token: str | None = None, is_admin: bool = True + ) -> dict[str, Any]: """ Delete a table from the database. - + Args: table: Table name to delete auth_token: Optional JWT token for authenticated requests is_admin: Whether to use service role key (admin access) - + Returns: Response from the API """ # SQL to drop the table sql = f"DROP TABLE IF EXISTS {table};" - + # Execute the SQL using the rpc endpoint return self._make_request( method="POST", endpoint="/rest/v1/rpc/exec_sql", auth_token=auth_token, is_admin=is_admin, # Must use admin privileges to delete tables - data={"query": sql} + data={"query": sql}, ) diff --git a/edge_functions.py b/functions/edge_functions.py similarity index 72% rename from edge_functions.py rename to functions/edge_functions.py index 777c23f..4b6a3dd 100644 --- a/edge_functions.py +++ b/functions/edge_functions.py @@ -1,145 +1,157 @@ -from typing import Any, Dict, List, Optional +from typing import Any + +from ...supabase_home.client import get_supabase_client +from .._service import SupabaseService -from ._service import SupabaseService class SupabaseEdgeFunctionsService(SupabaseService): """ Service for interacting with Supabase Edge Functions. - + This class provides methods for invoking Edge Functions deployed to Supabase. Note: Creating, listing, and deleting functions requires the Supabase CLI or Dashboard. """ - - def invoke_function(self, - function_name: str, - invoke_method: str = "POST", - body: Optional[Dict[str, Any]] = None, - headers: Optional[Dict[str, str]] = None, - auth_token: Optional[str] = None, - is_admin: bool = False) -> Any: + + def _configure_service(self): + """Initialize edge functions client""" + self.functions = self.raw.functions # Reuses base client + + def invoke_function( + self, + function_name: str, + invoke_method: str = "POST", + body: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + auth_token: str | None = None, + is_admin: bool = False, + ) -> Any: """ Invoke a Supabase Edge Function. - + Args: function_name: Name of the function to invoke - invoke_method: HTTP method to use (GET, POST, etc.) + invoke_method: HTTP maethod to use (GET, POST, etc.) body: Optional request body headers: Optional additional headers auth_token: Optional JWT token for authenticated requests is_admin: Whether to use admin privileges - + Returns: Function response """ endpoint = f"/functions/v1/{function_name}" - + # Get default headers and merge with any additional headers request_headers = self._get_headers(auth_token, is_admin) if headers: request_headers.update(headers) - + return self._make_request( method=invoke_method, endpoint=endpoint, auth_token=auth_token, is_admin=is_admin, data=body, - headers=request_headers + headers=request_headers, ) - + # Note: The following methods are placeholders that would normally require the Supabase Management API # For testing purposes, we'll mock these in the tests - - def list_functions(self) -> List[Dict[str, Any]]: + + def list_functions(self) -> list[dict[str, Any]]: """ - List all Edge Functions in the Supabase project. - + list all Edge Functions in the Supabase project. + Note: This operation requires access to the Supabase Management API, which is not available through the standard API keys. In a real-world scenario, you would use the Supabase CLI or Dashboard. - + Returns: - List of edge functions + list of edge functions """ # For testing purposes, we'll return a mock response return [] - - def create_function(self, - name: str, - source_code: str, - verify_jwt: bool = True, - import_map: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + + def create_function( + self, + name: str, + source_code: str, + verify_jwt: bool = True, + import_map: dict[str, str] | None = None, + ) -> dict[str, Any]: """ Create a new Edge Function in the Supabase project. - + Note: This operation requires access to the Supabase Management API, which is not available through the standard API keys. In a real-world scenario, you would use the Supabase CLI or Dashboard. - + Args: name: Name of the function to create source_code: JavaScript/TypeScript source code for the function verify_jwt: Whether to verify JWT tokens in requests to this function import_map: Optional import map for dependencies - + Returns: Created function details """ # For testing purposes, we'll return a mock response return {"name": name, "status": "MOCK_CREATED"} - - def delete_function(self, function_name: str) -> Dict[str, Any]: + + def delete_function(self, function_name: str) -> dict[str, Any]: """ Delete an Edge Function from the Supabase project. - + Note: This operation requires access to the Supabase Management API, which is not available through the standard API keys. In a real-world scenario, you would use the Supabase CLI or Dashboard. - + Args: function_name: Name of the function to delete - + Returns: Response confirming deletion """ # For testing purposes, we'll return a mock response return {"name": function_name, "status": "MOCK_DELETED"} - - def get_function(self, function_name: str) -> Dict[str, Any]: + + def get_function(self, function_name: str) -> dict[str, Any]: """ Get details of a specific Edge Function. - + Note: This operation requires access to the Supabase Management API, which is not available through the standard API keys. In a real-world scenario, you would use the Supabase CLI or Dashboard. - + Args: function_name: Name of the function to get - + Returns: Function details """ # For testing purposes, we'll return a mock response return {"name": function_name, "status": "MOCK_ACTIVE"} - - def update_function(self, - function_name: str, - source_code: Optional[str] = None, - verify_jwt: Optional[bool] = None, - import_map: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + + def update_function( + self, + function_name: str, + source_code: str | None = None, + verify_jwt: bool | None = None, + import_map: dict[str, str] | None = None, + ) -> dict[str, Any]: """ Update an existing Edge Function. - + Note: This operation requires access to the Supabase Management API, which is not available through the standard API keys. In a real-world scenario, you would use the Supabase CLI or Dashboard. - + Args: function_name: Name of the function to update source_code: Optional new JavaScript/TypeScript source code verify_jwt: Optional setting to verify JWT tokens import_map: Optional import map for dependencies - + Returns: Updated function details """ diff --git a/functions/realtime.py b/functions/realtime.py new file mode 100644 index 0000000..19c4746 --- /dev/null +++ b/functions/realtime.py @@ -0,0 +1,170 @@ +import logging +from collections.abc import Callable +from typing import Any + +from fastapi import HTTPException + +from .._service import SupabaseService + +logger = logging.getLogger(__name__) + + +class SupabaseRealtimeService(SupabaseService): + """ + Service for managing Supabase Realtime subscriptions. + + This class provides methods for creating and managing Realtime subscriptions + using server-side websocket connections. + """ + + def _configure_service(self): + """Initialize Realtime clients""" + self.realtime = self.raw.realtime # Main client + self.active_channels = {} # Track active subscriptions + + async def subscribe_to_channel( + self, channel: str, event: str = "*", callback: Callable | None = None + ) -> Any: + """ + Subscribe to a Realtime channel. + + Args: + channel: Channel name + event: Event to subscribe to (default: "*" for all events) + callback: Optional callback function for events + + Returns: + Subscription object + """ + if not self.client.is_connected(): + await self.client.connect() + + try: + channel_obj = self.client.channel(channel) + if callback: + channel_obj.on(event, callback) + + subscription = await channel_obj.subscribe() + self.active_channels[channel] = subscription + return subscription + + except Exception as e: + logger.error(f"Realtime error: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to subscribe to {channel}: {str(e)}" + ) + + async def unsubscribe_from_channel(self, channel: str) -> None: + """ + Unsubscribe from a Realtime channel. + + Args: + channel: Channel name + """ + if channel in self.active_channels: + try: + await self.active_channels[channel].unsubscribe() + del self.active_channels[channel] + except Exception as e: + logger.error(f"Unsubscribe error: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Failed to unsubscribe from {channel}: {str(e)}", + ) + + async def unsubscribe_all(self) -> None: + """ + Unsubscribe from all Realtime channels. + """ + for channel in list(self.active_channels.keys()): + await self.unsubscribe_from_channel(channel) + + def get_channels(self) -> dict[str, Any]: + """ + Retrieve all subscribed channels. + + Returns: + dict containing list of subscribed channels + """ + return {"channels": list(self.active_channels.keys())} + + async def broadcast_message( + self, channel: str, payload: dict[str, Any], event: str = "broadcast" + ) -> None: + """ + Broadcast a message to a channel. + + Args: + channel: Channel name + payload: Message payload + event: Event name (default: "broadcast") + """ + if channel in self.active_channels: + try: + await self.active_channels[channel].send( + type=event, event=event, payload=payload + ) + except Exception as e: + logger.error(f"Broadcast error: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Failed to broadcast to {channel}: {str(e)}", + ) + else: + raise ValueError(f"Channel '{channel}' is not subscribed") + + async def subscribe_to_private_channel(self, channel: str, user_token: str) -> Any: + """ + Subscribe to a private Realtime channel using RLS. + + Args: + channel: Channel name + user_token: User's authentication token for RLS + + Returns: + Subscription object + """ + if not self.client.is_connected(): + await self.client.connect() + + try: + private_channel = self.client.channel( + channel, + { + "config": {"token": user_token} # RLS token + }, + ) + subscription = await private_channel.subscribe() + self.active_channels[channel] = subscription + return subscription + except Exception as e: + logger.error(f"Private channel subscription error: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Failed to subscribe to private channel {channel}: {str(e)}", + ) + + def add_callback(self, channel: str, event: str, callback: Callable) -> None: + """ + Add a callback function to an existing channel subscription. + + Args: + channel: Channel name + event: Event to listen for + callback: Callback function to execute on event + """ + if channel in self.active_channels: + self.active_channels[channel].on(event, callback) + else: + raise ValueError(f"Channel '{channel}' is not subscribed") + + async def close_all(self) -> None: + """ + Close all websocket connections and clean up resources. + """ + try: + await self.unsubscribe_all() + if self.client.is_connected(): + await self.client.disconnect() + except Exception as e: + logger.warning(f"Cleanup error: {str(e)}") diff --git a/storage.py b/functions/storage.py similarity index 82% rename from storage.py rename to functions/storage.py index f77e9df..18182c9 100644 --- a/storage.py +++ b/functions/storage.py @@ -1,8 +1,9 @@ -from typing import Any, Dict, List, Optional, Union, BinaryIO, Tuple import os +from typing import Any, BinaryIO, Optional, Tuple, Union + import requests -from ._service import SupabaseService +from .._service import SupabaseService class SupabaseStorageService(SupabaseService): @@ -12,16 +13,21 @@ class SupabaseStorageService(SupabaseService): This class provides methods for managing buckets and files in Supabase Storage. """ + def _configure_service(self): + """Initialize storage clients""" + self.storage = self.raw.storage # Main storage client + self.bucket_api = self.storage.BucketAPI() # For bucket operations + self.file_api = self.storage.FileAPI() # For file operations def create_bucket( self, bucket_id: str, public: bool = False, - file_size_limit: Optional[int] = None, - allowed_mime_types: Optional[List[str]] = None, - auth_token: Optional[str] = None, + file_size_limit: int | None = None, + allowed_mime_types: list[str] | None = None, + auth_token: str | None = None, is_admin: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Create a new storage bucket. @@ -62,8 +68,8 @@ def create_bucket( ) def get_bucket( - self, bucket_id: str, auth_token: Optional[str] = None, is_admin: bool = False - ) -> Dict[str, Any]: + self, bucket_id: str, auth_token: str | None = None, is_admin: bool = False + ) -> dict[str, Any]: """ Retrieve a bucket by ID. @@ -83,17 +89,17 @@ def get_bucket( ) def list_buckets( - self, auth_token: Optional[str] = None, is_admin: bool = False - ) -> List[Dict[str, Any]]: + self, auth_token: str | None = None, is_admin: bool = False + ) -> list[dict[str, Any]]: """ - List all buckets. + list all buckets. Args: auth_token: Optional JWT token for authenticated requests is_admin: Whether to use service role key (admin access) Returns: - List of buckets + list of buckets """ return self._make_request( method="GET", @@ -105,11 +111,11 @@ def list_buckets( def update_bucket( self, bucket_id: str, - public: Optional[bool] = None, - file_size_limit: Optional[int] = None, - allowed_mime_types: Optional[List[str]] = None, - auth_token: Optional[str] = None, - ) -> Dict[str, Any]: + public: bool | None = None, + file_size_limit: int | None = None, + allowed_mime_types: list[str] | None = None, + auth_token: str | None = None, + ) -> dict[str, Any]: """ Update a bucket. @@ -142,8 +148,8 @@ def update_bucket( ) def delete_bucket( - self, bucket_id: str, auth_token: Optional[str] = None, is_admin: bool = False - ) -> Dict[str, Any]: + self, bucket_id: str, auth_token: str | None = None, is_admin: bool = False + ) -> dict[str, Any]: """ Delete a bucket. @@ -164,8 +170,8 @@ def delete_bucket( ) def empty_bucket( - self, bucket_id: str, auth_token: Optional[str] = None - ) -> Dict[str, Any]: + self, bucket_id: str, auth_token: str | None = None + ) -> dict[str, Any]: """ Empty a bucket (delete all files). @@ -186,11 +192,11 @@ def upload_file( self, bucket_id: str, path: str, - file_data: Union[bytes, BinaryIO], - content_type: Optional[str] = None, - auth_token: Optional[str] = None, + file_data: bytes | BinaryIO, + content_type: str | None = None, + auth_token: str | None = None, is_admin: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Upload a file to a bucket. @@ -209,7 +215,7 @@ def upload_file( # Get headers with authentication headers = self._get_headers(auth_token, is_admin) - + # Set content type based on provided value or file extension if content_type: headers["Content-Type"] = content_type @@ -233,52 +239,66 @@ def upload_file( # For file uploads, we need to use requests directly instead of _make_request # because we're not sending JSON data import logging + logger = logging.getLogger("apps.supabase_home") - logger.info(f"Uploading file to {bucket_id}/{path} with content type: {headers.get('Content-Type')}") + logger.info( + f"Uploading file to {bucket_id}/{path} with content type: {headers.get('Content-Type')}" + ) logger.info(f"Headers: {headers}") - + response = requests.post(url, headers=headers, data=file_data, timeout=30) - + # Log the response status and headers logger.info(f"Upload response status: {response.status_code}") logger.info(f"Upload response headers: {response.headers}") - + # Log the response content for debugging if response.status_code >= 400: logger.error(f"Upload error response: {response.text}") - + response.raise_for_status() - + return response.json() except requests.exceptions.RequestException as e: # Log the error and re-raise with more context import logging + logger = logging.getLogger("apps.supabase_home") logger.error(f"Error uploading file to {bucket_id}/{path}: {str(e)}") - + # Log request details logger.error(f"Request URL: {url}") logger.error(f"Request headers: {headers}") - - from ._service import SupabaseAPIError + + from .._service import SupabaseAPIError + error_details = {} if hasattr(e, "response") and e.response is not None: try: error_details = e.response.json() logger.error(f"Error response JSON: {error_details}") except ValueError: - error_details = {"status": e.response.status_code, "text": e.response.text} + error_details = { + "status": e.response.status_code, + "text": e.response.text, + } logger.error(f"Error response text: {e.response.text}") - + raise SupabaseAPIError( message=f"Error uploading file: {str(e)}", - status_code=getattr(e.response, "status_code", None) if hasattr(e, "response") else None, - details=error_details + status_code=getattr(e.response, "status_code", None) + if hasattr(e, "response") + else None, + details=error_details, ) def download_file( - self, bucket_id: str, path: str, auth_token: Optional[str] = None, is_admin: bool = False - ) -> Tuple[bytes, str]: + self, + bucket_id: str, + path: str, + auth_token: str | None = None, + is_admin: bool = False, + ) -> tuple[bytes, str]: """ Download a file from a bucket. @@ -294,39 +314,47 @@ def download_file( try: url = f"{self.base_url}/storage/v1/object/{bucket_id}/{path}" headers = self._get_headers(auth_token, is_admin) - + # For file downloads, we need to use requests directly instead of _make_request # because we want the raw response content response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() - + # Get content type from response headers or guess from file extension - content_type = response.headers.get('Content-Type') + content_type = response.headers.get("Content-Type") if not content_type: import mimetypes + content_type, _ = mimetypes.guess_type(path) if not content_type: content_type = "application/octet-stream" - + return response.content, content_type except requests.exceptions.RequestException as e: # Log the error and re-raise with more context import logging + logger = logging.getLogger("apps.supabase_home") logger.error(f"Error downloading file from {bucket_id}/{path}: {str(e)}") - - from ._service import SupabaseAPIError + + from .._service import SupabaseAPIError + error_details = {} if hasattr(e, "response") and e.response is not None: try: error_details = e.response.json() except ValueError: - error_details = {"status": e.response.status_code, "text": e.response.text} - + error_details = { + "status": e.response.status_code, + "text": e.response.text, + } + raise SupabaseAPIError( message=f"Error downloading file: {str(e)}", - status_code=getattr(e.response, "status_code", None) if hasattr(e, "response") else None, - details=error_details + status_code=getattr(e.response, "status_code", None) + if hasattr(e, "response") + else None, + details=error_details, ) def list_files( @@ -335,12 +363,12 @@ def list_files( path: str = "", limit: int = 100, offset: int = 0, - sort_by: Optional[Dict[str, str]] = None, - auth_token: Optional[str] = None, + sort_by: dict[str, str] | None = None, + auth_token: str | None = None, is_admin: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ - List files in a bucket. + list files in a bucket. Args: bucket_id: Bucket identifier @@ -352,13 +380,14 @@ def list_files( is_admin: Whether to use service role key (admin access) Returns: - List of files + list of files """ import logging + logger = logging.getLogger("apps.supabase_home") logger.info(f"Listing files in bucket {bucket_id} with path prefix: {path}") logger.info(f"Using admin access: {is_admin}") - + params = {"prefix": path, "limit": limit, "offset": offset} logger.info(f"Request params: {params}") @@ -399,8 +428,8 @@ def move_file( bucket_id: str, source_path: str, destination_path: str, - auth_token: Optional[str] = None, - ) -> Dict[str, Any]: + auth_token: str | None = None, + ) -> dict[str, Any]: """ Move a file to a new location. @@ -429,8 +458,8 @@ def copy_file( bucket_id: str, source_path: str, destination_path: str, - auth_token: Optional[str] = None, - ) -> Dict[str, Any]: + auth_token: str | None = None, + ) -> dict[str, Any]: """ Copy a file to a new location. @@ -457,11 +486,11 @@ def copy_file( def delete_file( self, bucket_id: str, - paths: Union[str, List[str]] = None, + paths: str | list[str] = None, path: str = None, - auth_token: Optional[str] = None, + auth_token: str | None = None, is_admin: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Delete files from a bucket. @@ -476,15 +505,18 @@ def delete_file( Success message """ import logging + logger = logging.getLogger("apps.supabase_home.storage") - logger.info(f"Delete file called with bucket_id: {bucket_id}, paths: {paths}, path: {path}") + logger.info( + f"Delete file called with bucket_id: {bucket_id}, paths: {paths}, path: {path}" + ) logger.info(f"Auth token available: {bool(auth_token)}, is_admin: {is_admin}") - + # Handle both 'path' and 'paths' parameters for backward compatibility if path is not None and paths is None: paths = path logger.info(f"Using path parameter: {path}") - + if isinstance(paths, str): paths = [paths] logger.info(f"Converted string path to list: {paths}") @@ -505,13 +537,15 @@ def delete_file( method="DELETE", endpoint=f"/storage/v1/object/{bucket_id}/{single_path.lstrip('/')}", auth_token=auth_token, - is_admin=is_admin + is_admin=is_admin, ) logger.info(f"Single file deletion successful: {result}") return result except Exception as single_delete_error: - logger.warning(f"Single file deletion failed, trying batch delete: {str(single_delete_error)}") - + logger.warning( + f"Single file deletion failed, trying batch delete: {str(single_delete_error)}" + ) + # Try batch deletion as fallback or for multiple files logger.info("Attempting batch deletion") result = self._make_request( @@ -533,8 +567,8 @@ def create_signed_url( bucket_id: str, path: str, expires_in: int = 60, - auth_token: Optional[str] = None, - ) -> Dict[str, Any]: + auth_token: str | None = None, + ) -> dict[str, Any]: """ Create a signed URL for a file. @@ -557,21 +591,21 @@ def create_signed_url( def create_signed_urls( self, bucket_id: str, - paths: List[str], + paths: list[str], expires_in: int = 60, - auth_token: Optional[str] = None, - ) -> List[Dict[str, Any]]: + auth_token: str | None = None, + ) -> list[dict[str, Any]]: """ Create signed URLs for multiple files. Args: bucket_id: Bucket identifier - paths: List of file paths + paths: list of file paths expires_in: Expiration time in seconds auth_token: Optional JWT token for authenticated requests Returns: - List of signed URL data + list of signed URL data """ return self._make_request( method="POST", @@ -581,8 +615,8 @@ def create_signed_urls( ) def create_signed_upload_url( - self, bucket_id: str, path: str, auth_token: Optional[str] = None - ) -> Dict[str, Any]: + self, bucket_id: str, path: str, auth_token: str | None = None + ) -> dict[str, Any]: """ Create a signed URL for uploading a file. @@ -603,8 +637,8 @@ def create_signed_upload_url( def upload_to_signed_url( self, signed_url: str, - file_data: Union[bytes, BinaryIO], - content_type: Optional[str] = None, + file_data: bytes | BinaryIO, + content_type: str | None = None, ) -> None: """ Upload a file to a signed URL. @@ -623,10 +657,18 @@ def upload_to_signed_url( import requests - response = requests.put(signed_url, headers=headers, data=file_data, timeout=30) # Add 30-second timeout for security + response = requests.put( + signed_url, headers=headers, data=file_data, timeout=30 + ) # Add 30-second timeout for security response.raise_for_status() - def get_public_url(self, bucket_id: str, path: str, auth_token: Optional[str] = None, is_admin: bool = False) -> str: + def get_public_url( + self, + bucket_id: str, + path: str, + auth_token: str | None = None, + is_admin: bool = False, + ) -> str: """ Get a public URL for a file in a public bucket. @@ -641,13 +683,16 @@ def get_public_url(self, bucket_id: str, path: str, auth_token: Optional[str] = """ # Check if the bucket exists and is public try: - bucket = self.get_bucket(bucket_id, auth_token=auth_token, is_admin=is_admin) + bucket = self.get_bucket( + bucket_id, auth_token=auth_token, is_admin=is_admin + ) if not bucket.get("public", False): raise ValueError(f"Bucket {bucket_id} is not public") except Exception as e: # If we can't verify the bucket is public, we'll still try to generate the URL # but log a warning import logging + logger = logging.getLogger("apps.supabase_home") logger.warning(f"Could not verify bucket {bucket_id} is public: {str(e)}") diff --git a/realtime.py b/realtime.py deleted file mode 100644 index 178a318..0000000 --- a/realtime.py +++ /dev/null @@ -1,244 +0,0 @@ -from typing import Any, Dict, Optional - -from ._service import SupabaseService - -class SupabaseRealtimeService(SupabaseService): - """ - Service for interacting with Supabase Realtime API. - - This class provides methods for managing Realtime subscriptions. - Note: This is a server-side implementation and doesn't maintain websocket - connections. For client-side realtime, use the Supabase JavaScript client. - """ - - def subscribe_to_channel(self, - channel: str, - event: str = "*", - auth_token: Optional[str] = None, - is_admin: bool = True) -> Dict[str, Any]: - """ - Subscribe to a Realtime channel. - - Args: - channel: Channel name - event: Event to subscribe to (default: "*" for all events) - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - - Returns: - Subscription data - """ - return self._make_request( - method="POST", - endpoint="/realtime/v1/subscribe", - auth_token=auth_token, - is_admin=is_admin, - data={ - "channel": channel, - "event": event, - "config": { - "private": True # Enable private channel for RLS support - } - } - ) - - def unsubscribe_from_channel(self, - subscription_id: str, - auth_token: Optional[str] = None, - is_admin: bool = True) -> Dict[str, Any]: - """ - Unsubscribe from a Realtime channel. - - Args: - subscription_id: Subscription ID - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - - Returns: - Success message - """ - return self._make_request( - method="POST", - endpoint="/realtime/v1/unsubscribe", - auth_token=auth_token, - is_admin=is_admin, - data={ - "subscription_id": subscription_id - } - ) - - def unsubscribe_all(self, auth_token: Optional[str] = None, is_admin: bool = True) -> Dict[str, Any]: - """ - Unsubscribe from all Realtime channels. - - Note: Supabase's Realtime API does not support server-side management of - websocket connections. For client applications, it's better to use - the client's native method: client.realtime.remove_all_channels() - - This method attempts to use the server API but will often return permission errors - as the endpoints are not publicly accessible. It's provided for compatibility and - diagnostic purposes only. - - Args: - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - - Returns: - Success message or error details when API fails - """ - response = { - "status": "warning", - "message": "Server-side channel management is not fully supported by Supabase Realtime API." - } - - # Add recommendation for client-side approach - response["recommendation"] = ( - "For client applications, use: await client.realtime.remove_all_channels() " + - "instead of this server-side method." - ) - - # For RLS issues, add policy recommendations - response["rls_info"] = ( - "Ensure your channel naming follows the required format for RLS: " + - "'private-[schema]-[table]-[*|id]' and that proper RLS policies are in place." - ) - - # Try the server API but don't expect success - try: - api_response = self._make_request( - method="POST", - endpoint="/realtime/v1/unsubscribe_all", - auth_token=auth_token, - is_admin=is_admin, - data={} - ) - response["api_response"] = api_response - response["status"] = "success" - response["message"] = "Successfully unsubscribed from all channels using server API (unusual)." - return response - except Exception as e: - # Expected behavior - API endpoint is usually restricted - error_info = { - "status": "error", - "message": str(e) - } - - # Add status code if available - if hasattr(e, 'response') and hasattr(e.response, 'status_code'): - error_info["status_code"] = e.response.status_code - - # Special handling for 403 errors - expected for this endpoint - if e.response.status_code == 403: - # This is normal - update status to warning - response["error_details"] = "Permission denied (403) when accessing the unsubscribe_all endpoint. " +\ - "This is expected behavior as Supabase restricts server-side management of realtime channels." - response["sql_policy"] = """-- Recommended RLS policies for Supabase Realtime --- Run these in your Supabase SQL Editor - --- Enable RLS on the realtime schema tables -ALTER TABLE realtime.messages ENABLE ROW LEVEL SECURITY; - --- Create policies for realtime messages -CREATE POLICY "Allow authenticated users to select from realtime messages" -ON realtime.messages -FOR SELECT -TO authenticated -USING (true); - -CREATE POLICY "Allow authenticated users to insert into realtime messages" -ON realtime.messages -FOR INSERT -TO authenticated -WITH CHECK (true); - --- Allow use of presence features -CREATE POLICY "Allow authenticated users to use presence" -ON realtime.presence -FOR ALL -TO authenticated -USING (true); - """ - - response["api_error"] = error_info - return response - - def get_channels(self, auth_token: Optional[str] = None) -> Dict[str, Any]: - """ - Retrieve all subscribed channels. - - Note: When using Supabase Realtime with RLS, channels should follow the naming - convention: 'private-[schema]-[table]-[*|id]' for proper authorization. - Examples: - - 'private-public-users-*' (all users table changes) - - 'private-public-users-123' (specific user ID) - - This method attempts to access the /realtime/v1/channels endpoint which - may require admin privileges. If you encounter 403 errors, consider using - your Supabase client's realtime.channels property directly instead. - - Args: - auth_token: Optional JWT token for authenticated requests - - Returns: - Dict containing list of subscribed channels - - Raises: - Exception: When the request fails and no fallback data is available - """ - try: - # First attempt: Use admin access to get channels via API endpoint - return self._make_request( - method="GET", - endpoint="/realtime/v1/channels", - auth_token=auth_token, - is_admin=True # Always use admin access for this endpoint - ) - except Exception as e: - # If this fails with a 403, it might be that the endpoint requires a specific format - # or doesn't work with the current Supabase version - error_msg = str(e).lower() - if '403' in error_msg or 'forbidden' in error_msg or 'unauthorized' in error_msg: - # Log the issue - import logging - logger = logging.getLogger(__name__) - logger.warning( - "Could not access Realtime channels endpoint; " - "this may be expected if your Supabase version doesn't support it. " - "Consider accessing client.realtime.channels directly instead." - ) - # Return an empty channel list - return {"channels": [], "error": "API endpoint unavailable - use client-side methods"} - else: - # For other errors, re-raise - raise - - def broadcast_message(self, - channel: str, - payload: Dict[str, Any], - event: str = "broadcast", - auth_token: Optional[str] = None, - is_admin: bool = True) -> Dict[str, Any]: - """ - Broadcast a message to a channel. - - Args: - channel: Channel name - payload: Message payload - event: Event name (default: "broadcast") - auth_token: Optional JWT token for authenticated requests - is_admin: Whether to use the service role key (admin access) - - Returns: - Response data - """ - return self._make_request( - method="POST", - endpoint="/realtime/v1/broadcast", - auth_token=auth_token, - is_admin=is_admin, - data={ - "channel": channel, - "event": event, - "payload": payload - } - )