PyPI package

Python SDK

Typed sync and async HTTP clients for the Inbox API. Manage email accounts, send and read messages, search threads, automate webhooks, and build integrations with full type annotation support.

Quick start
# Install
pip install inbox-api

# Use in your script
from inbox_api import InboxApiClient

with InboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
    for msg in client.messages.iter_all("account_id"):
        print(msg.subject)

Installation

                  Install from PyPI:

pip install inbox-api

Or with uv:

uv add inbox-api

Requirements: Python 3.11+
Dependencies: httpx, pydantic v2, anyio
                

Authentication

                  The SDK authenticates using API tokens (prefixed with cw_).

from inbox_api import InboxApiClient

client = InboxApiClient(
    "https://api.inbox-api.com",
    "cw_your_token",
)
                

Sync Client

                  from inbox_api import InboxApiClient

with InboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
    result = client.messages.list("account_id", page_size=5)
    for msg in result.items:
        print(f"{msg.subject} — from {msg.from_email}")

# Context manager is optional — also works without 'with'
client = InboxApiClient("https://api.inbox-api.com", "cw_your_token")
accounts = client.accounts.list()
client.close()
                

Async Client

                  import asyncio
from inbox_api import AsyncInboxApiClient

async def main():
    async with AsyncInboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
        result = await client.messages.list("account_id")
        for msg in result.items:
            print(msg.subject)

asyncio.run(main())
                

Configuration

                  # Custom timeout (default: 30s request, 5s connect)
import httpx

client = InboxApiClient(
    "https://api.inbox-api.com",
    "cw_your_token",
    timeout=httpx.Timeout(60.0, connect=10.0),
)

# Custom retry count (default: 2 retries on 429/5xx)
client = InboxApiClient(
    "https://api.inbox-api.com",
    "cw_your_token",
    max_retries=5,
)

# Disable retries entirely
client = InboxApiClient(
    "https://api.inbox-api.com",
    "cw_your_token",
    max_retries=0,
)

# Inject a custom httpx client (e.g. for proxies)
custom = httpx.Client(proxy="http://proxy:8080")
client = InboxApiClient(
    "https://api.inbox-api.com",
    "cw_your_token",
    http_client=custom,
)
                

accounts.list()

                  List all connected email accounts.

accounts = client.accounts.list()
for acc in accounts:
    print(f"{acc.email_address} — {acc.provider} — {acc.unread_count} unread")

Returns: list[AccountInfo]

  .id              str — Account ID
  .display_name    str — Display name
  .email_address   str — Email address
  .provider        str — Provider (gmail, imap, outlook)
  .message_count   int — Total messages
  .unread_count    int — Unread messages
                

accounts.health()

                  Get IMAP health status for an email account. Returns connection state, sync status, and error details from Redis cache.

health = client.accounts.health("account_id")
print(f"Status: {health.status}, Errors (24h): {health.error_count_24h}")

Parameters:
  account_id  str — Account ID (required)

Returns: AccountHealthDto

  .account_id         str — Account ID
  .status             str — Health status
  .last_synced_at     str | None — Last sync timestamp
  .error_count_24h    int — Errors in last 24 hours
  .average_latency_ms float | None — Average IMAP latency
  .last_error         str | None — Most recent error
  .data_age           str | None — Cache data age
                

accounts.digest()

                  Get an email digest summary across all accounts. Returns recent messages grouped by account.

digest = client.accounts.digest(since="24h")
print(f"Total: {digest.total_messages} messages across {digest.total_accounts} accounts")

for acc in digest.accounts:
    print(f"  {acc.email_address}: {acc.unread_count} unread")
    for msg in acc.messages:
        print(f"    {msg.subject}")

Parameters:
  since  str — Time window, e.g. "24h", "7d" (default: "24h")

Returns: DigestResponse

  .since           str — Requested time window
  .generated_at    str — Timestamp of generation
  .total_accounts  int — Number of accounts
  .total_messages  int — Total messages in window
  .accounts        list[DigestAccountDto] — Per-account digests
                

messages.list()

                  List messages for an account with pagination and optional folder filtering.

result = client.messages.list("account_id", page=1, page_size=25)
print(f"Showing {len(result.items)} of {result.total_count}")

for msg in result.items:
    print(f"{msg.subject} — {msg.from_email}")

# Filter by folder
result = client.messages.list("account_id", folder_id="folder_id")

Parameters:
  account_id  str — Account ID (required)
  page        int — Page number (default: 1)
  page_size   int — Results per page (default: 10)
  folder_id   str | None — Filter by folder ID

Returns: PagedResult[MessageDto]
                

messages.iter_all()

                  Automatically paginate through all messages. Yields individual items.

# Sync
for msg in client.messages.iter_all("account_id"):
    print(msg.subject)

# Async
async for msg in client.messages.iter_all("account_id"):
    print(msg.subject)

# Limit pages for safety
for msg in client.messages.iter_all("account_id", max_pages=5):
    print(msg.subject)

Parameters:
  account_id  str — Account ID (required)
  page_size   int — Items per page request (default: 10)
  folder_id   str | None — Filter by folder ID
  max_pages   int | None — Stop after N pages (None = unlimited)

Yields: MessageDto
                

messages.get()

                  Get a specific message by ID.

msg = client.messages.get("message_id")
print(f"From: {msg.from_name} <{msg.from_email}>")
print(f"Subject: {msg.subject}")

Parameters:
  message_id  str — Message ID (required)

Returns: MessageDto

  .id               str — Message ID
  .account_id       str — Account ID
  .thread_id        str | None — Thread ID
  .folder_id        str | None — Folder ID
  .from_email       str — Sender email
  .from_name        str | None — Sender name
  .to_addresses     list[str] | None — Recipients
  .cc_addresses     list[str] | None — CC recipients
  .subject          str — Subject line
  .snippet          str | None — Preview text
  .sent_at          str | None — Send timestamp
  .received_at      str | None — Receive timestamp
  .is_read          bool — Read flag
  .is_starred       bool — Starred flag
  .has_attachments  bool — Has attachments
                

messages.get_body()

                  Get message body content in text, HTML, or LLM-optimized format.

# Plain text (default)
body = client.messages.get_body("message_id")
print(body.text_body)

# HTML
body = client.messages.get_body("message_id", format="html")
print(body.html_body)

# LLM-optimized (cleaned, minimal tokens)
body = client.messages.get_body("message_id", format="llm")
print(body.text_body)

Parameters:
  message_id  str — Message ID (required)
  format      "text" | "html" | "llm" (default: "text")

Returns: MessageBody

  .text_body  str | None — Plain text content
  .html_body  str | None — HTML content
                

messages.batch_mark_read()

                  Mark multiple messages as read in a single operation.

result = client.messages.batch_mark_read(["id1", "id2", "id3"])
print(f"Succeeded: {result.succeeded}/{result.total_requested}")

Parameters:
  ids  list[str] — Message IDs (required)

Returns: BatchOperationResponse

  .total_requested  int — Total messages requested
  .succeeded        int — Successfully processed
  .failed           int — Failed count
  .results          list[BatchItemResult] — Per-message results
                

messages.batch_archive()

                  Archive multiple messages in a single operation.

result = client.messages.batch_archive(["id1", "id2"])
print(f"Archived: {result.succeeded}")

Parameters:
  ids  list[str] — Message IDs (required)

Returns: BatchOperationResponse
                

messages.batch_move()

                  Move multiple messages to a folder in a single operation.

result = client.messages.batch_move(["id1", "id2"], folder_id="folder_id")
print(f"Moved: {result.succeeded}")

Parameters:
  ids        list[str] — Message IDs (required)
  folder_id  str — Target folder ID (required)

Returns: BatchOperationResponse
                

threads.list()

                  List conversation threads for an account.

result = client.threads.list("account_id", page_size=10)
for thread in result.items:
    print(f"{thread.subject} — {thread.message_count} messages, {thread.unread_count} unread")

Parameters:
  account_id  str — Account ID (required)
  page        int — Page number (default: 1)
  page_size   int — Results per page (default: 10)

Returns: PagedResult[ThreadDetailDto]
                

threads.iter_all()

                  Automatically paginate through all threads.

for thread in client.threads.iter_all("account_id"):
    print(f"{thread.subject} ({thread.message_count} messages)")

Parameters:
  account_id  str — Account ID (required)
  page_size   int — Items per page request (default: 10)
  max_pages   int | None — Stop after N pages (None = unlimited)

Yields: ThreadDetailDto
                

threads.get()

                  Get a full thread with all messages.

thread = client.threads.get("thread_id")
print(f"Thread: {thread.subject}")
for msg in thread.messages:
    print(f"  {msg.from_email}: {msg.snippet}")

Parameters:
  thread_id  str — Thread ID (required)

Returns: ThreadDetailDto

  .id               str — Thread ID
  .account_id       str — Account ID
  .subject          str — Thread subject
  .snippet          str | None — Preview text
  .last_message_at  str | None — Most recent message timestamp
  .message_count    int — Number of messages
  .unread_count     int — Number of unread messages
  .has_attachments  bool — Any message has attachments
  .messages         list[MessageDto] — All messages in thread
                

send.send()

                  Send a new email.

from inbox_api.models.send import SendEmailRequest
from inbox_api.models.common import AddressDto

client.send.send(SendEmailRequest(
    account_id="account_id",
    to=[AddressDto(email="alice@example.com", name="Alice")],
    cc=[AddressDto(email="bob@example.com")],
    subject="Weekly Update",
    text_body="Hi team, here's the update.",
    html_body="<h1>Weekly Update</h1><p>Hi team...</p>",
))

Parameters: SendEmailRequest
  .account_id  str — Account to send from (required)
  .to          list[AddressDto] — Recipients (required)
  .cc          list[AddressDto] | None — CC recipients
  .bcc         list[AddressDto] | None — BCC recipients
  .subject     str — Subject line (required)
  .text_body   str | None — Plain text body
  .html_body   str | None — HTML body

Returns: None
                

send.reply()

                  Reply to an existing message.

from inbox_api.models.send import ReplyRequest

# Reply to sender only
client.send.reply("message_id", ReplyRequest(
    text_body="Thanks, looks good!",
))

# Reply to all recipients
client.send.reply("message_id", ReplyRequest(
    text_body="Noted, thanks everyone.",
    reply_all=True,
))

Parameters:
  message_id  str — Message to reply to (required)
  request     ReplyRequest
    .text_body   str | None — Plain text reply body
    .html_body   str | None — HTML reply body
    .reply_all   bool | None — Reply to all recipients

Returns: None
                

send.forward()

                  Forward a message to new recipients. All original attachments are preserved.

from inbox_api.models.send import ForwardEmailRequest
from inbox_api.models.common import AddressDto

client.send.forward("message_id", ForwardEmailRequest(
    to=[AddressDto(email="bob@example.com")],
    additional_text="FYI — see the message below.",
))

Parameters:
  message_id  str — Message to forward (required)
  request     ForwardEmailRequest
    .to               list[AddressDto] — Recipients (required)
    .cc               list[AddressDto] | None — CC recipients
    .bcc              list[AddressDto] | None — BCC recipients
    .additional_text  str | None — Text to prepend

Returns: None
                

search.iter_all()

                  Automatically paginate through all search results.

for msg in client.search.iter_all("account_id", "quarterly report"):
    print(f"{msg.subject}")

Parameters:
  account_id  str — Account ID (required)
  query       str — Search query (required)
  page_size   int — Items per page request (default: 10)
  max_pages   int | None — Stop after N pages (None = unlimited)

Yields: MessageDto
                

drafts.list()

                  List drafts for an account.

drafts = client.drafts.list("account_id")
for draft in drafts:
    print(f"{draft.subject} — created {draft.created_at}")

Parameters:
  account_id  str — Account ID (required)

Returns: list[DraftDto]

  .id          str — Draft ID
  .account_id  str — Account ID
  .subject     str | None — Subject line
  .text_body   str | None — Plain text body
  .html_body   str | None — HTML body
  .created_at  str — Creation timestamp
                

drafts.create()

                  Create a new draft.

from inbox_api.models.drafts import CreateDraftRequest
from inbox_api.models.common import AddressDto

draft = client.drafts.create(CreateDraftRequest(
    account_id="account_id",
    to=[AddressDto(email="alice@example.com")],
    subject="Proposal Draft",
    text_body="Hi Alice, here's the draft...",
))
print(f"Created draft: {draft.id}")

Parameters: CreateDraftRequest
  .account_id  str — Account ID (required)
  .to          list[AddressDto] | None — Recipients
  .cc          list[AddressDto] | None — CC recipients
  .subject     str | None — Subject line
  .text_body   str | None — Plain text body
  .html_body   str | None — HTML body

Returns: DraftDto
                

drafts.update()

                  Update an existing draft.

from inbox_api.models.drafts import UpdateDraftRequest

draft = client.drafts.update("draft_id", UpdateDraftRequest(
    account_id="account_id",
    subject="Updated Subject",
    text_body="Updated content.",
))

Parameters:
  draft_id  str — Draft ID (required)
  request   UpdateDraftRequest (same fields as CreateDraftRequest)

Returns: DraftDto
                

drafts.send()

                  Send a saved draft. The draft is deleted after sending.

client.drafts.send("draft_id")

Parameters:
  draft_id  str — Draft ID (required)

Returns: None
                

drafts.delete()

                  Delete a draft.

client.drafts.delete("draft_id")

Parameters:
  draft_id  str — Draft ID (required)

Returns: None
                

contacts.list()

                  List contacts extracted from email headers, ranked by frequency.

result = client.contacts.list("account_id")
for contact in result.items:
    print(f"{contact.display_name} <{contact.email}> — {contact.frequency} interactions")

Parameters:
  account_id  str — Account ID (required)
  page        int — Page number (default: 1)
  page_size   int — Results per page (default: 10)

Returns: PagedResult[ContactDto]

  .email          str — Contact email
  .display_name   str | None — Display name
  .frequency      int — Interaction count
  .last_seen_at   str — Last interaction timestamp
  .first_seen_at  str — First interaction timestamp
  .account_ids    list[str] — Associated accounts
                

contacts.iter_all()

                  Automatically paginate through all contacts.

for contact in client.contacts.iter_all("account_id"):
    print(f"{contact.email} — {contact.frequency} interactions")

Parameters:
  account_id  str — Account ID (required)
  page_size   int — Items per page request (default: 10)
  max_pages   int | None — Stop after N pages (None = unlimited)

Yields: ContactDto
                

webhooks.list()

                  List all webhook subscriptions.

webhooks = client.webhooks.list()
for wh in webhooks:
    print(f"{wh.url} — events: {', '.join(wh.events)} — active: {wh.is_active}")

Returns: list[WebhookDto]

  .id             str — Webhook ID
  .url            str — Delivery URL
  .events         list[str] — Subscribed events
  .description    str | None — Description
  .payload_level  str — Minimal, Standard, or Full
  .is_active      bool — Active status
  .created_at     str — Creation timestamp
  .secret         str | None — Signing secret (only on create)
                

webhooks.get()

                  Get a specific webhook by ID.

wh = client.webhooks.get("webhook_id")
print(f"{wh.url} — {wh.payload_level}")

Parameters:
  webhook_id  str — Webhook ID (required)

Returns: WebhookDto
                

webhooks.get_delivery()

                  Get a single webhook delivery by ID.

delivery = client.webhooks.get_delivery("webhook_id", "delivery_id")
print(f"Status: {delivery.status}, Event: {delivery.event_type}")

Parameters:
  webhook_id   str — Webhook ID (required)
  delivery_id  str — Delivery ID (required)

Returns: WebhookDeliveryDto
                

Error Handling

                  from inbox_api import ApiError, AuthenticationError, NotFoundError, RateLimitError

try:
    msg = client.messages.get("nonexistent_id")
except NotFoundError:
    print("Message not found")
except AuthenticationError:
    print("Invalid or expired token")
except RateLimitError as e:
    print(f"Rate limited — retry after {e.retry_after}s")
except ApiError as e:
    print(f"API error {e.status_code}: {e.message}")

Exception Hierarchy:

  ApiError               Base exception (any HTTP error)
    .status_code         int — HTTP status code
    .message             str — Error message
  AuthenticationError    401 Unauthorized
  NotFoundError          404 Not Found
  RateLimitError         429 Too Many Requests
    .retry_after         float | None — Seconds to wait
                

Auto-Retry

                  The SDK automatically retries on 429 (rate limited) and 5xx (server error) responses.

Defaults:
  Max retries:   2
  Base delay:    0.5s (exponential backoff with jitter)
  Max delay:     30s
  429 responses: Respects Retry-After header when present

# Override
client = InboxApiClient(url, token, max_retries=5)

# Disable
client = InboxApiClient(url, token, max_retries=0)
                

Model Reference

                  All response models are Pydantic v2 classes with camelCase JSON aliasing.
They accept both snake_case and camelCase fields and ignore unknown fields for forward compatibility.

Response Models:
  AccountInfo             Email account summary
  AccountHealthDto        Account health status
  DigestResponse          Cross-account digest
  DigestAccountDto        Per-account digest
  DigestMessageDto        Digest message summary
  MessageDto              Email message metadata
  MessageBody             Message body content
  ThreadDetailDto         Thread with messages
  DraftDto                Email draft
  ContactDto              Extracted contact
  WebhookDto              Webhook configuration
  WebhookDeliveryDto      Delivery attempt record
  PagedResult[T]          Paginated result container
  BatchOperationResponse  Batch operation result
  BatchItemResult         Per-item batch result
  AddressDto              Email address + name
  AttachmentDto           Attachment metadata
  FolderDto               IMAP folder metadata

Request Models:
  SendEmailRequest        Send a new email
  ReplyRequest            Reply to a message
  ForwardEmailRequest     Forward a message
  CreateDraftRequest      Create a draft
  UpdateDraftRequest      Update a draft

# Import from submodules
from inbox_api.models.messages import MessageDto, MessageBody
from inbox_api.models.send import SendEmailRequest
from inbox_api.models.common import PagedResult, AddressDto

# Or import everything from the top level
from inbox_api import MessageDto, SendEmailRequest, AddressDto