1. Database Connection & Initialization

MongoDBClient

__init__

:small_blue_diamond: Description:
Initializes a MongoDB connection using environment variables. Implements a Singleton pattern to ensure only one connection is active.
Required environment variables:
  • DB_USERNAME
  • DB_PASSWORD
  • DB_HOST (default: localhost)
  • DB_PORT (default: 27017)
  • DB_DATABASE (default: customai)
  • DB_CONNECTION (default: mongodb)

get_database

:small_blue_diamond: Description:
Retrieves the target MongoDB database instance.
Parameters:
  • db_name (optional): custom database name
Returns:
MongoDB database instance
db = MongoDBClient().get_database()

2. Callback Handlers

MongoDBCallbackHandler

on_llm_end

:small_blue_diamond: Description:
Handles post-processing after an LLM generates output, including chat updates and token tracking.
Parameters:
  • response: LLMResult object from the model
Functionality:
  • Tracks usage
  • Updates memory and credits
  • Saves chat history
Essential for systems where token usage and memory management are critical.

3. Error Handling

mongodb_connection_error_handler

:small_blue_diamond: Description:
Global handler for MongoDB-related connection issues.
Handles:
  • ServerSelectionTimeoutError
  • General connection failures
Returns:
Standardized HTTP response with error details.
@app.exception_handler(MongoDBConnectionError)
async def handler(request, exc):
    return mongodb_connection_error_handler(exc)

4. Utility Functions

get_db_instance

:small_blue_diamond: Description:
Retrieves a Singleton-based database instance across the app.
Parameters:
  • db_name (optional)
Returns:
MongoDB DB instance
db = get_db_instance()

Utility Classes

MessageEncryptor

Utility for encrypting plaintext using AES in ECB mode.

Constructor:

encryptor = MessageEncryptor(key=b'sixteenbytekey123')

Methods:

  • encrypt(plaintext: str) -> str: Encrypts text to Base64-encoded ciphertext.

MessageDecryptor

Utility for decrypting AES-encrypted Base64 strings.

Constructor:

decryptor = MessageDecryptor(key=b'sixteenbytekey123')

Methods:

  • decrypt(ciphertext: str) -> str: Decrypts Base64 string back to plaintext.

get_user_data(user_id: str) -> dict

Fetches user profile data by user ID.
Returns: A dictionary of user information.

Default Model Repositories

DefaultGeminiModelRepository

Manages the default Gemini model per company.

Methods:

  • get_default_model_key()
  • get_encrypt_key()
  • get_decrypt_key()

DefaultAnthropicModelRepository

Handles Anthropic model defaults.

Methods:

  • get_default_model_key()
  • get_encrypt_key()
  • get_decrypt_key()

DefaultOpenAIModelRepository

For managing the default OpenAI model configuration.

Method:

  • get_default_model_key()

Email & Notification Services

EmailService

Abstraction layer for sending emails via AWS SES or SMTP.

Constructor:

service = EmailService()

Method:

  • send_email(...): Sends templated email via configured provider.
Usage:
service.send_email(email, subject, username, content_body, slug, template_name)
Selected via the EMAIL_PROVIDER environment variable.

Firebase

Service class for sending push notifications using Firebase Cloud Messaging.

Constructor:

firebase = Firebase()

Method:

  • send_push_notification(tokens: list, title: str, body: str = None)
Example:
firebase.send_push_notification(tokens=["token1"], title="Alert", body="You’ve got mail.")

API Key Decryption

LLMAPIKeyDecryptionHandler

Handles secure decryption of API keys from MongoDB.

Methods:

  • initialization(api_key_id: str, collection_name: str)
  • decrypt() -> bytes
Example:
handler = LLMAPIKeyDecryptionHandler()
handler.initialization("key-id", "keys-collection")
plaintext = handler.decrypt()
Make sure the environment contains the correct decrypt key.

Streaming Enhancements

StreamingResponseWithStatusCode

Custom subclass of FastAPI’s StreamingResponse.

Key Feature:

Allows dynamic status code updates while streaming data chunks to the client.

Method:

  • stream_response(send): Streams and sets status code mid-stream
Usage:
return StreamingResponseWithStatusCode(generator_func())
Use this when streaming model responses where the status might change (e.g. invalid request mid-stream).