PyPI package
Python SDK
Typed sync and async HTTP clients for the Inbox API. Manage email accounts, send and read messages, search threads, automate webhooks, and build integrations with full type annotation support.
Quick start
# Install
pip install inbox-api
# Use in your script
from inbox_api import InboxApiClient
with InboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
for msg in client.messages.iter_all("account_id"):
print(msg.subject) Installation
Install from PyPI:
pip install inbox-api
Or with uv:
uv add inbox-api
Requirements: Python 3.11+
Dependencies: httpx, pydantic v2, anyio
Authentication
The SDK authenticates using API tokens (prefixed with cw_).
from inbox_api import InboxApiClient
client = InboxApiClient(
"https://api.inbox-api.com",
"cw_your_token",
)
Sync Client
from inbox_api import InboxApiClient
with InboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
result = client.messages.list("account_id", page_size=5)
for msg in result.items:
print(f"{msg.subject} — from {msg.from_email}")
# Context manager is optional — also works without 'with'
client = InboxApiClient("https://api.inbox-api.com", "cw_your_token")
accounts = client.accounts.list()
client.close()
Async Client
import asyncio
from inbox_api import AsyncInboxApiClient
async def main():
async with AsyncInboxApiClient("https://api.inbox-api.com", "cw_your_token") as client:
result = await client.messages.list("account_id")
for msg in result.items:
print(msg.subject)
asyncio.run(main())
Configuration
# Custom timeout (default: 30s request, 5s connect)
import httpx
client = InboxApiClient(
"https://api.inbox-api.com",
"cw_your_token",
timeout=httpx.Timeout(60.0, connect=10.0),
)
# Custom retry count (default: 2 retries on 429/5xx)
client = InboxApiClient(
"https://api.inbox-api.com",
"cw_your_token",
max_retries=5,
)
# Disable retries entirely
client = InboxApiClient(
"https://api.inbox-api.com",
"cw_your_token",
max_retries=0,
)
# Inject a custom httpx client (e.g. for proxies)
custom = httpx.Client(proxy="http://proxy:8080")
client = InboxApiClient(
"https://api.inbox-api.com",
"cw_your_token",
http_client=custom,
)
accounts.list()
List all connected email accounts.
accounts = client.accounts.list()
for acc in accounts:
print(f"{acc.email_address} — {acc.provider} — {acc.unread_count} unread")
Returns: list[AccountInfo]
.id str — Account ID
.display_name str — Display name
.email_address str — Email address
.provider str — Provider (gmail, imap, outlook)
.message_count int — Total messages
.unread_count int — Unread messages
accounts.health()
Get IMAP health status for an email account. Returns connection state, sync status, and error details from Redis cache.
health = client.accounts.health("account_id")
print(f"Status: {health.status}, Errors (24h): {health.error_count_24h}")
Parameters:
account_id str — Account ID (required)
Returns: AccountHealthDto
.account_id str — Account ID
.status str — Health status
.last_synced_at str | None — Last sync timestamp
.error_count_24h int — Errors in last 24 hours
.average_latency_ms float | None — Average IMAP latency
.last_error str | None — Most recent error
.data_age str | None — Cache data age
accounts.digest()
Get an email digest summary across all accounts. Returns recent messages grouped by account.
digest = client.accounts.digest(since="24h")
print(f"Total: {digest.total_messages} messages across {digest.total_accounts} accounts")
for acc in digest.accounts:
print(f" {acc.email_address}: {acc.unread_count} unread")
for msg in acc.messages:
print(f" {msg.subject}")
Parameters:
since str — Time window, e.g. "24h", "7d" (default: "24h")
Returns: DigestResponse
.since str — Requested time window
.generated_at str — Timestamp of generation
.total_accounts int — Number of accounts
.total_messages int — Total messages in window
.accounts list[DigestAccountDto] — Per-account digests
messages.list()
List messages for an account with pagination and optional folder filtering.
result = client.messages.list("account_id", page=1, page_size=25)
print(f"Showing {len(result.items)} of {result.total_count}")
for msg in result.items:
print(f"{msg.subject} — {msg.from_email}")
# Filter by folder
result = client.messages.list("account_id", folder_id="folder_id")
Parameters:
account_id str — Account ID (required)
page int — Page number (default: 1)
page_size int — Results per page (default: 10)
folder_id str | None — Filter by folder ID
Returns: PagedResult[MessageDto]
messages.iter_all()
Automatically paginate through all messages. Yields individual items.
# Sync
for msg in client.messages.iter_all("account_id"):
print(msg.subject)
# Async
async for msg in client.messages.iter_all("account_id"):
print(msg.subject)
# Limit pages for safety
for msg in client.messages.iter_all("account_id", max_pages=5):
print(msg.subject)
Parameters:
account_id str — Account ID (required)
page_size int — Items per page request (default: 10)
folder_id str | None — Filter by folder ID
max_pages int | None — Stop after N pages (None = unlimited)
Yields: MessageDto
messages.get()
Get a specific message by ID.
msg = client.messages.get("message_id")
print(f"From: {msg.from_name} <{msg.from_email}>")
print(f"Subject: {msg.subject}")
Parameters:
message_id str — Message ID (required)
Returns: MessageDto
.id str — Message ID
.account_id str — Account ID
.thread_id str | None — Thread ID
.folder_id str | None — Folder ID
.from_email str — Sender email
.from_name str | None — Sender name
.to_addresses list[str] | None — Recipients
.cc_addresses list[str] | None — CC recipients
.subject str — Subject line
.snippet str | None — Preview text
.sent_at str | None — Send timestamp
.received_at str | None — Receive timestamp
.is_read bool — Read flag
.is_starred bool — Starred flag
.has_attachments bool — Has attachments
messages.get_body()
Get message body content in text, HTML, or LLM-optimized format.
# Plain text (default)
body = client.messages.get_body("message_id")
print(body.text_body)
# HTML
body = client.messages.get_body("message_id", format="html")
print(body.html_body)
# LLM-optimized (cleaned, minimal tokens)
body = client.messages.get_body("message_id", format="llm")
print(body.text_body)
Parameters:
message_id str — Message ID (required)
format "text" | "html" | "llm" (default: "text")
Returns: MessageBody
.text_body str | None — Plain text content
.html_body str | None — HTML content
messages.batch_mark_read()
Mark multiple messages as read in a single operation.
result = client.messages.batch_mark_read(["id1", "id2", "id3"])
print(f"Succeeded: {result.succeeded}/{result.total_requested}")
Parameters:
ids list[str] — Message IDs (required)
Returns: BatchOperationResponse
.total_requested int — Total messages requested
.succeeded int — Successfully processed
.failed int — Failed count
.results list[BatchItemResult] — Per-message results
messages.batch_archive()
Archive multiple messages in a single operation.
result = client.messages.batch_archive(["id1", "id2"])
print(f"Archived: {result.succeeded}")
Parameters:
ids list[str] — Message IDs (required)
Returns: BatchOperationResponse
messages.batch_move()
Move multiple messages to a folder in a single operation.
result = client.messages.batch_move(["id1", "id2"], folder_id="folder_id")
print(f"Moved: {result.succeeded}")
Parameters:
ids list[str] — Message IDs (required)
folder_id str — Target folder ID (required)
Returns: BatchOperationResponse
threads.list()
List conversation threads for an account.
result = client.threads.list("account_id", page_size=10)
for thread in result.items:
print(f"{thread.subject} — {thread.message_count} messages, {thread.unread_count} unread")
Parameters:
account_id str — Account ID (required)
page int — Page number (default: 1)
page_size int — Results per page (default: 10)
Returns: PagedResult[ThreadDetailDto]
threads.iter_all()
Automatically paginate through all threads.
for thread in client.threads.iter_all("account_id"):
print(f"{thread.subject} ({thread.message_count} messages)")
Parameters:
account_id str — Account ID (required)
page_size int — Items per page request (default: 10)
max_pages int | None — Stop after N pages (None = unlimited)
Yields: ThreadDetailDto
threads.get()
Get a full thread with all messages.
thread = client.threads.get("thread_id")
print(f"Thread: {thread.subject}")
for msg in thread.messages:
print(f" {msg.from_email}: {msg.snippet}")
Parameters:
thread_id str — Thread ID (required)
Returns: ThreadDetailDto
.id str — Thread ID
.account_id str — Account ID
.subject str — Thread subject
.snippet str | None — Preview text
.last_message_at str | None — Most recent message timestamp
.message_count int — Number of messages
.unread_count int — Number of unread messages
.has_attachments bool — Any message has attachments
.messages list[MessageDto] — All messages in thread
send.send()
Send a new email.
from inbox_api.models.send import SendEmailRequest
from inbox_api.models.common import AddressDto
client.send.send(SendEmailRequest(
account_id="account_id",
to=[AddressDto(email="alice@example.com", name="Alice")],
cc=[AddressDto(email="bob@example.com")],
subject="Weekly Update",
text_body="Hi team, here's the update.",
html_body="<h1>Weekly Update</h1><p>Hi team...</p>",
))
Parameters: SendEmailRequest
.account_id str — Account to send from (required)
.to list[AddressDto] — Recipients (required)
.cc list[AddressDto] | None — CC recipients
.bcc list[AddressDto] | None — BCC recipients
.subject str — Subject line (required)
.text_body str | None — Plain text body
.html_body str | None — HTML body
Returns: None
send.reply()
Reply to an existing message.
from inbox_api.models.send import ReplyRequest
# Reply to sender only
client.send.reply("message_id", ReplyRequest(
text_body="Thanks, looks good!",
))
# Reply to all recipients
client.send.reply("message_id", ReplyRequest(
text_body="Noted, thanks everyone.",
reply_all=True,
))
Parameters:
message_id str — Message to reply to (required)
request ReplyRequest
.text_body str | None — Plain text reply body
.html_body str | None — HTML reply body
.reply_all bool | None — Reply to all recipients
Returns: None
send.forward()
Forward a message to new recipients. All original attachments are preserved.
from inbox_api.models.send import ForwardEmailRequest
from inbox_api.models.common import AddressDto
client.send.forward("message_id", ForwardEmailRequest(
to=[AddressDto(email="bob@example.com")],
additional_text="FYI — see the message below.",
))
Parameters:
message_id str — Message to forward (required)
request ForwardEmailRequest
.to list[AddressDto] — Recipients (required)
.cc list[AddressDto] | None — CC recipients
.bcc list[AddressDto] | None — BCC recipients
.additional_text str | None — Text to prepend
Returns: None
search.search()
Full-text search within an account.
results = client.search.search("account_id", "invoice")
for msg in results.items:
print(f"{msg.subject} — {msg.snippet}")
Parameters:
account_id str — Account ID (required)
query str — Search query (required)
page int — Page number (default: 1)
page_size int — Results per page (default: 10)
Returns: PagedResult[MessageDto]
search.iter_all()
Automatically paginate through all search results.
for msg in client.search.iter_all("account_id", "quarterly report"):
print(f"{msg.subject}")
Parameters:
account_id str — Account ID (required)
query str — Search query (required)
page_size int — Items per page request (default: 10)
max_pages int | None — Stop after N pages (None = unlimited)
Yields: MessageDto
drafts.list()
List drafts for an account.
drafts = client.drafts.list("account_id")
for draft in drafts:
print(f"{draft.subject} — created {draft.created_at}")
Parameters:
account_id str — Account ID (required)
Returns: list[DraftDto]
.id str — Draft ID
.account_id str — Account ID
.subject str | None — Subject line
.text_body str | None — Plain text body
.html_body str | None — HTML body
.created_at str — Creation timestamp
drafts.create()
Create a new draft.
from inbox_api.models.drafts import CreateDraftRequest
from inbox_api.models.common import AddressDto
draft = client.drafts.create(CreateDraftRequest(
account_id="account_id",
to=[AddressDto(email="alice@example.com")],
subject="Proposal Draft",
text_body="Hi Alice, here's the draft...",
))
print(f"Created draft: {draft.id}")
Parameters: CreateDraftRequest
.account_id str — Account ID (required)
.to list[AddressDto] | None — Recipients
.cc list[AddressDto] | None — CC recipients
.subject str | None — Subject line
.text_body str | None — Plain text body
.html_body str | None — HTML body
Returns: DraftDto
drafts.update()
Update an existing draft.
from inbox_api.models.drafts import UpdateDraftRequest
draft = client.drafts.update("draft_id", UpdateDraftRequest(
account_id="account_id",
subject="Updated Subject",
text_body="Updated content.",
))
Parameters:
draft_id str — Draft ID (required)
request UpdateDraftRequest (same fields as CreateDraftRequest)
Returns: DraftDto
drafts.send()
Send a saved draft. The draft is deleted after sending.
client.drafts.send("draft_id")
Parameters:
draft_id str — Draft ID (required)
Returns: None
drafts.delete()
Delete a draft.
client.drafts.delete("draft_id")
Parameters:
draft_id str — Draft ID (required)
Returns: None
contacts.list()
List contacts extracted from email headers, ranked by frequency.
result = client.contacts.list("account_id")
for contact in result.items:
print(f"{contact.display_name} <{contact.email}> — {contact.frequency} interactions")
Parameters:
account_id str — Account ID (required)
page int — Page number (default: 1)
page_size int — Results per page (default: 10)
Returns: PagedResult[ContactDto]
.email str — Contact email
.display_name str | None — Display name
.frequency int — Interaction count
.last_seen_at str — Last interaction timestamp
.first_seen_at str — First interaction timestamp
.account_ids list[str] — Associated accounts
contacts.iter_all()
Automatically paginate through all contacts.
for contact in client.contacts.iter_all("account_id"):
print(f"{contact.email} — {contact.frequency} interactions")
Parameters:
account_id str — Account ID (required)
page_size int — Items per page request (default: 10)
max_pages int | None — Stop after N pages (None = unlimited)
Yields: ContactDto
webhooks.list()
List all webhook subscriptions.
webhooks = client.webhooks.list()
for wh in webhooks:
print(f"{wh.url} — events: {', '.join(wh.events)} — active: {wh.is_active}")
Returns: list[WebhookDto]
.id str — Webhook ID
.url str — Delivery URL
.events list[str] — Subscribed events
.description str | None — Description
.payload_level str — Minimal, Standard, or Full
.is_active bool — Active status
.created_at str — Creation timestamp
.secret str | None — Signing secret (only on create)
webhooks.get()
Get a specific webhook by ID.
wh = client.webhooks.get("webhook_id")
print(f"{wh.url} — {wh.payload_level}")
Parameters:
webhook_id str — Webhook ID (required)
Returns: WebhookDto
webhooks.get_delivery()
Get a single webhook delivery by ID.
delivery = client.webhooks.get_delivery("webhook_id", "delivery_id")
print(f"Status: {delivery.status}, Event: {delivery.event_type}")
Parameters:
webhook_id str — Webhook ID (required)
delivery_id str — Delivery ID (required)
Returns: WebhookDeliveryDto
Error Handling
from inbox_api import ApiError, AuthenticationError, NotFoundError, RateLimitError
try:
msg = client.messages.get("nonexistent_id")
except NotFoundError:
print("Message not found")
except AuthenticationError:
print("Invalid or expired token")
except RateLimitError as e:
print(f"Rate limited — retry after {e.retry_after}s")
except ApiError as e:
print(f"API error {e.status_code}: {e.message}")
Exception Hierarchy:
ApiError Base exception (any HTTP error)
.status_code int — HTTP status code
.message str — Error message
AuthenticationError 401 Unauthorized
NotFoundError 404 Not Found
RateLimitError 429 Too Many Requests
.retry_after float | None — Seconds to wait
Auto-Retry
The SDK automatically retries on 429 (rate limited) and 5xx (server error) responses.
Defaults:
Max retries: 2
Base delay: 0.5s (exponential backoff with jitter)
Max delay: 30s
429 responses: Respects Retry-After header when present
# Override
client = InboxApiClient(url, token, max_retries=5)
# Disable
client = InboxApiClient(url, token, max_retries=0)
Model Reference
All response models are Pydantic v2 classes with camelCase JSON aliasing.
They accept both snake_case and camelCase fields and ignore unknown fields for forward compatibility.
Response Models:
AccountInfo Email account summary
AccountHealthDto Account health status
DigestResponse Cross-account digest
DigestAccountDto Per-account digest
DigestMessageDto Digest message summary
MessageDto Email message metadata
MessageBody Message body content
ThreadDetailDto Thread with messages
DraftDto Email draft
ContactDto Extracted contact
WebhookDto Webhook configuration
WebhookDeliveryDto Delivery attempt record
PagedResult[T] Paginated result container
BatchOperationResponse Batch operation result
BatchItemResult Per-item batch result
AddressDto Email address + name
AttachmentDto Attachment metadata
FolderDto IMAP folder metadata
Request Models:
SendEmailRequest Send a new email
ReplyRequest Reply to a message
ForwardEmailRequest Forward a message
CreateDraftRequest Create a draft
UpdateDraftRequest Update a draft
# Import from submodules
from inbox_api.models.messages import MessageDto, MessageBody
from inbox_api.models.send import SendEmailRequest
from inbox_api.models.common import PagedResult, AddressDto
# Or import everything from the top level
from inbox_api import MessageDto, SendEmailRequest, AddressDto