Skip to main content

Installation

pip install getimmutable
Requirements: Python 3.10+. Single dependency: httpx.

Configuration

from getimmutable import ImmutableClient

client = ImmutableClient(
    api_key="imk_your_api_key_here",
    base_url="https://getimmutable.dev",
    timeout=5.0,  # seconds (default)
)
ParameterRequiredDescription
api_keyYesYour imk_ prefixed API key
base_urlYesBase domain URL (SDK appends /api/v1/...)
timeoutNoRequest timeout in seconds (default: 5.0)

Tracking Events

Fluent Builder

client.actor("user_2hG9kLm", name="Sarah Chen", type="user") \
    .track("document.created", "document", {"size": 1024})
The actor() method starts a fluent chain. Call .track() at the end to send.

With Resource Object

client.actor("user_2hG9kLm", name="Sarah Chen") \
    .track(
        "document.created",
        {"type": "document", "id": "doc_8nXpQr3", "name": "Q4 Report"},
        {"size": 1024, "mime_type": "application/pdf"},
    )

Full Builder Example

client.actor("user_2hG9kLm", name="Sarah Chen", type="user") \
    .idempotency_key("unique_key_123") \
    .version(1) \
    .session("sess_4kN8pLm") \
    .action_category("documents") \
    .target("folder", "folder_3mK9pLq", "Shared Reports") \
    .target("team", "team_5bR7cNx", "Engineering") \
    .track("document.created", "document", {
        "folder": "reports",
        "template": "quarterly",
    })

PendingEvent Methods

MethodParametersDescription
idempotency_key(key)strSet deduplication key
version(v)intSet event schema version
session(id)strSet session identifier
action_category(cat)strSet action category
target(type, id?, name?, metadata?)str, str?, str?, dict?Add a single target
targets(arr)list[dict]Set all targets at once
track(action, resource?, metadata?)str, str|dict?, dict?Send the event

Raw Tracking

For full control, pass a payload dict directly:
client.track({
    "actor_id": "user_2hG9kLm",
    "actor_name": "Sarah Chen",
    "action": "document.created",
    "resource": "document",
    "resource_id": "doc_8nXpQr3",
    "metadata": {"size": 1024},
})

Batch Ingestion

client.track_batch([
    {
        "actor_id": "user_2hG9kLm",
        "action": "document.created",
        "resource_id": "doc_8nXpQr3",
    },
    {
        "actor_id": "user_2hG9kLm",
        "action": "document.shared",
        "resource_id": "doc_8nXpQr3",
        "targets": [
            {"type": "team", "id": "team_5bR7cNx", "name": "Engineering"},
        ],
    },
])
Maximum 100 events per batch.

Querying Events

result = client.get_events(
    actor_id="user_2hG9kLm",
    action="document.*",
    from_date="2026-03-01T00:00:00Z",
    limit=25,
)

for event in result["data"]:
    print(f'{event["action"]} at {event["created_at"]}')

# Cursor pagination
if result["pagination"]["has_more"]:
    next_page = client.get_events(
        actor_id="user_2hG9kLm",
        cursor=result["pagination"]["next_cursor"],
    )
from_date and to_date map to the API’s from and to parameters. Python reserves from as a keyword.

Available Filters

ParameterTypeDescription
actor_idstrFilter by actor
actionstrFilter by action (supports wildcards)
resourcestrFilter by resource type
resource_idstrFilter by resource ID
tenant_idstrFilter by tenant
session_idstrFilter by session
target_typestrFilter by target type
target_idstrFilter by target ID
searchstrFull-text search
from_datestrStart date (ISO 8601)
to_datestrEnd date (ISO 8601)
cursorstrPagination cursor
limitintResults per page

Retrieving a Single Event

event = client.get_event("9b1deb4d-3b7d-4bad-9bdd-2b0d7b3dcb6d")

print(event["data"]["action"])
print(event["data"]["integrity"]["event_hash"])
print(event["data"]["integrity"]["previous_event_hash"])

Verification

result = client.verify()
print(result["data"]["valid"])           # True
print(result["data"]["events_checked"])  # 4821

# With date range
result = client.verify(
    from_date="2026-03-01T00:00:00Z",
    to_date="2026-03-31T23:59:59Z",
)

# With custom limit (default: 10000)
result = client.verify(limit=1000)

# Check for breaks
if not result["data"]["valid"]:
    for b in result["data"]["breaks"]:
        print(f'Break: {b["type"]} at event {b["event_id"]}')

Viewer Tokens

token = client.create_viewer_token(
    tenant_id="org_7rT2xBc",
    actor_id="user_2hG9kLm",
    ttl=7200,  # 2 hours (min: 60, max: 86400)
)

print(token["token"])      # JWT string
print(token["expires_at"]) # Unix timestamp (integer)

Alerts

alerts = client.get_alerts(
    rule_type="new_country",
    limit=25,
)

for alert in alerts["data"]:
    print(f'{alert["rule_type"]}: {alert["reason"]}')

# Paginate
if alerts["pagination"]["has_more"]:
    next_page = client.get_alerts(
        cursor=alerts["pagination"]["next_cursor"],
    )

Exports

# Create export
export = client.create_export(
    actor_id="user_2hG9kLm",
    from_date="2026-03-01T00:00:00Z",
)

# Poll for completion
import time

while True:
    status = client.get_export(export["data"]["id"])
    if status["data"]["status"] == "completed":
        print(f'Download: {status["data"]["download_url"]}')
        break
    elif status["data"]["status"] == "failed":
        print(f'Error: {status["data"].get("error_message")}')
        break
    time.sleep(2)

Error Handling

The SDK raises ImmutableError for API failures:
from getimmutable import ImmutableClient, ImmutableError

client = ImmutableClient(api_key="imk_invalid", base_url="https://getimmutable.dev")

try:
    client.track({"actor_id": "user_1", "action": "test"})
except ImmutableError as e:
    print(e)                  # Error message
    print(e.status_code)      # HTTP status (401, 422, 500, etc.)
    print(e.response_data)    # Full response body as dict
ExceptionWhen
ImmutableError (status 401)Invalid or missing API key
ImmutableError (status 403)IP not in allowlist, or quota exceeded
ImmutableError (status 422)Validation error (missing fields, invalid values)
ImmutableError (status 0)Network timeout or connection error

Django Integration

# settings.py
IMMUTABLE_API_KEY = os.environ["IMMUTABLE_API_KEY"]
IMMUTABLE_BASE_URL = os.environ.get("IMMUTABLE_BASE_URL", "https://getimmutable.dev")

# services.py
from django.conf import settings
from getimmutable import ImmutableClient

immutable = ImmutableClient(
    api_key=settings.IMMUTABLE_API_KEY,
    base_url=settings.IMMUTABLE_BASE_URL,
)

# views.py
from .services import immutable

def create_document(request):
    document = Document.objects.create(title=request.POST["title"])

    immutable.actor(str(request.user.id), name=request.user.get_full_name()) \
        .track("document.created", {
            "type": "document",
            "id": str(document.id),
            "name": document.title,
        })

    return redirect("documents:detail", pk=document.pk)

FastAPI Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from getimmutable import ImmutableClient

client = ImmutableClient(
    api_key="imk_your_api_key",
    base_url="https://getimmutable.dev",
)

app = FastAPI()

@app.post("/documents")
def create_document(request: Request):
    # ... create document ...

    client.actor(str(request.state.user.id), name=request.state.user.name) \
        .session(request.cookies.get("session_id", "")) \
        .track("document.created", "document", {
            "document_id": "doc_123",
        })

    return {"id": "doc_123"}

Flask Integration

from flask import Flask, request, g
from getimmutable import ImmutableClient

app = Flask(__name__)

client = ImmutableClient(
    api_key=app.config["IMMUTABLE_API_KEY"],
    base_url=app.config["IMMUTABLE_BASE_URL"],
)

@app.route("/documents", methods=["POST"])
def create_document():
    # ... create document ...

    client.actor(str(g.user.id), name=g.user.name) \
        .track("document.created", "document", {
            "title": request.form["title"],
        })

    return {"id": "doc_123"}