Skip to content

IDP SDK Documentation

The IDP SDK provides programmatic Python access to all IDP Accelerator capabilities with a clean, namespaced API.

Terminal window
# Install from local development
# Recommended: install everything at once
make setup-venv
source .venv/bin/activate
# Or install just the SDK with pip/uv
uv pip install -e ./lib/idp_sdk

The SDK is designed around a single entry point: IDPClient. Always import from the top-level idp_sdk package — this is the stable public interface. Avoid importing directly from internal submodules (e.g., idp_sdk._core, idp_sdk.operations, idp_sdk.models.*) as these are private implementation details and may change without notice.

# Correct: use the public interface
from idp_sdk import IDPClient, BatchProcessResult, RerunStep, IDPError
# Avoid: importing from private/internal modules
# from idp_sdk.operations.batch import BatchOperation # private
# from idp_sdk._core.batch_processor import BatchProcessor # private

All response models, enums, and exceptions you need are exported directly from idp_sdk — see the Response Models section for the complete list.

from idp_sdk import IDPClient
# Create client with stack configuration
client = IDPClient(stack_name="my-idp-stack", region="us-west-2")
# Upload and process a single document
result = client.document.process(file_path="./invoice.pdf")
print(f"Document ID: {result.document_id}, Status: {result.status}")
# Process a batch of documents
batch_result = client.batch.process(source="./documents/")
print(f"Batch: {batch_result.batch_id}, Queued: {batch_result.queued}")
# Check status
status = client.batch.get_status(batch_id=batch_result.batch_id)
print(f"Progress: {status.completed}/{status.total}")

The SDK follows a namespaced operation pattern for better organization:

client = IDPClient(stack_name="my-stack")
# Stack operations
client.stack.deploy(...)
client.stack.delete(...)
client.stack.get_resources()
client.stack.exists()
client.stack.get_status()
client.stack.check_in_progress()
client.stack.monitor(...)
client.stack.cancel_update()
client.stack.wait_for_stable_state()
client.stack.get_failure_analysis()
client.stack.cleanup_orphaned(...)
client.stack.get_bucket_info()
# Batch operations (multiple documents)
client.batch.process(...)
client.batch.reprocess(...)
client.batch.get_status(...)
client.batch.get_document_ids(...)
client.batch.get_results(...)
client.batch.get_confidence(...)
client.batch.list()
client.batch.download_results(...)
client.batch.download_sources(...)
client.batch.delete_documents(...)
client.batch.stop_workflows()
# Document operations (single document)
client.document.process(...)
client.document.get_status(...)
client.document.get_metadata(...)
client.document.list(...)
client.document.download_results(...)
client.document.download_source(...)
client.document.reprocess(...)
client.document.delete(...)
# Discovery operations (schema generation)
client.discovery.run(...)
client.discovery.run_batch(...)
client.discovery.run_multi_doc(...)
client.discovery.run_multi_section(...)
client.discovery.auto_detect_sections(...)
# Evaluation operations (baseline comparison)
client.evaluation.create_baseline(...)
client.evaluation.get_report(...)
client.evaluation.get_metrics(...)
client.evaluation.list_baselines(...)
client.evaluation.delete_baseline(...)
# Assessment operations (quality metrics)
client.assessment.get_confidence(...)
client.assessment.get_geometry(...)
client.assessment.get_metrics(...)
# Search operations (knowledge base)
client.search.query(...)
# Configuration operations
client.config.create(...)
client.config.validate(...)
client.config.upload(...)
client.config.download(...)
client.config.list()
client.config.activate(...)
client.config.delete(...)
client.config.sync_bda(...)
# Manifest operations
client.manifest.generate(...)
client.manifest.validate(...)
# Testing operations
client.testing.load_test(...)
from idp_sdk import IDPClient
# With stack name (for stack-dependent operations)
client = IDPClient(stack_name="my-stack", region="us-west-2")
# Without stack (for stack-independent operations)
client = IDPClient()
# Stack can be set later
client.stack_name = "new-stack"
ParameterTypeRequiredDescription
stack_namestrNoCloudFormation stack name
regionstrNoAWS region (defaults to boto3 default)

Operations for processing individual documents.

Process a single document (upload and queue for processing).

Parameters:

  • file_path (str, required): Path to local file to upload
  • document_id (str, optional): Custom document ID (defaults to filename without extension)
  • stack_name (str, optional): Stack name override

Returns: DocumentUploadResult with document_id, status, and timestamp

result = client.document.process(
file_path="/path/to/invoice.pdf",
document_id="custom-id" # Optional
)
print(f"Document ID: {result.document_id}")
print(f"Status: {result.status}") # "queued"
print(f"Timestamp: {result.timestamp}")

Get processing status for a single document.

Parameters:

  • document_id (str, required): Document identifier (S3 key format: batch-id/filename)
  • stack_name (str, optional): Stack name override

Returns: DocumentStatus with processing information including status, duration, pages, sections, and errors

status = client.document.get_status(document_id="batch-123/invoice.pdf")
print(f"Status: {status.status.value}")
print(f"Pages: {status.num_pages}")
print(f"Duration: {status.duration_seconds}s")

Download processing results (processed outputs) from OutputBucket for a single document.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • output_dir (str, required): Local directory to save results
  • file_types (list[str], optional): File types to download - “pages”, “sections”, “summary”, “evaluation” (defaults to all)
  • stack_name (str, optional): Stack name override

Returns: DocumentDownloadResult with document_id, files_downloaded, and output_dir

result = client.document.download_results(
document_id="batch-123/invoice.pdf",
output_dir="./results",
file_types=["pages", "sections", "summary"] # Optional
)
print(f"Downloaded {result.files_downloaded} files")

Download original document source file from InputBucket.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • output_path (str, required): Local file path to save document
  • stack_name (str, optional): Stack name override

Returns: str - Local file path where document was saved

file_path = client.document.download_source(
document_id="batch-123/invoice.pdf",
output_path="./downloads/invoice.pdf"
)
print(f"Downloaded to: {file_path}")

Reprocess a single document from a specific pipeline step.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • step (str or RerunStep, required): Pipeline step to reprocess from (e.g., “classification”, “extraction”, RerunStep.EXTRACTION)
  • stack_name (str, optional): Stack name override

Returns: DocumentReprocessResult with document_id, step, and queued status

from idp_sdk import RerunStep
result = client.document.reprocess(
document_id="batch-123/invoice.pdf",
step=RerunStep.EXTRACTION
)
print(f"Queued: {result.queued}")

Permanently delete a single document and all associated data from InputBucket, OutputBucket, and DynamoDB.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • stack_name (str, optional): Stack name override
  • dry_run (bool, optional): If True, simulate deletion without actually deleting (default: False)

Returns: DocumentDeletionResult with success, object_key, deleted (dict of deleted items), and errors

result = client.document.delete(
document_id="batch-123/invoice.pdf",
dry_run=False
)
print(f"Success: {result.success}")
print(f"Deleted: {result.deleted}")

List processed documents with pagination support.

Parameters:

  • limit (int, optional): Maximum number of documents to return (default: 100)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: DocumentListResult with documents (list of DocumentInfo), count, and optional next_token

# List documents
result = client.document.list(limit=50)
for doc in result.documents:
print(f"{doc.document_id}: {doc.status}")
# Pagination
if result.next_token:
next_page = client.document.list(limit=50, next_token=result.next_token)

Get extracted metadata and fields for a processed document.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • section_id (int, optional): Section number (default: 1)
  • stack_name (str, optional): Stack name override

Returns: DocumentMetadata with document_id, section_id, document_class, fields, confidence, page_count, and metadata

metadata = client.document.get_metadata(document_id="batch-123/invoice.pdf")
print(f"Document Type: {metadata.document_class}")
print(f"Confidence: {metadata.confidence:.2%}")
print(f"Fields:")
for field_name, field_value in metadata.fields.items():
print(f" {field_name}: {field_value}")
# Access specific fields
invoice_number = metadata.fields.get("invoice_number")
total_amount = metadata.fields.get("total_amount")

Operations for processing multiple documents.

Process multiple documents through the IDP pipeline.

Parameters:

  • source (str, optional): Auto-detected source - directory path, manifest file, or S3 URI
  • manifest (str, optional): Path to manifest CSV file
  • directory (str, optional): Local directory path
  • s3_uri (str, optional): S3 URI (s3://bucket/prefix/)
  • test_set (str, optional): Test set identifier
  • batch_id (str, optional): Custom batch ID
  • batch_prefix (str, optional): Batch ID prefix (default: “sdk-batch”)
  • file_pattern (str, optional): File pattern for filtering (default: “*.pdf”)
  • recursive (bool, optional): Recursively process subdirectories (default: True)
  • number_of_files (int, optional): Limit number of files to process
  • config_path (str, optional): Path to custom configuration file
  • config_version (str, optional): Configuration version to use for processing
  • context (str, optional): Context for test set processing
  • stack_name (str, optional): Stack name override

Returns: BatchProcessResult with batch_id, document_ids, queued, uploaded, failed, baselines_uploaded, source, output_prefix, and timestamp

# From directory
result = client.batch.process(source="./documents/")
# From manifest
result = client.batch.process(source="./manifest.csv")
# From S3
result = client.batch.process(source="s3://bucket/path/")
# With options
result = client.batch.process(
source="./documents/",
batch_prefix="my-batch",
file_pattern="*.pdf",
recursive=True,
number_of_files=10,
config_path="./config.yaml",
config_version="v2"
)
print(f"Batch ID: {result.batch_id}")
print(f"Documents queued: {result.queued}")

Get processing status for all documents in a batch.

Parameters:

  • batch_id (str, required): Batch identifier
  • stack_name (str, optional): Stack name override

Returns: BatchStatus with batch_id, documents (list of DocumentStatus), total, completed, failed, in_progress, queued, success_rate, and all_complete

status = client.batch.get_status(batch_id="batch-20250123-123456")
print(f"Total: {status.total}")
print(f"Completed: {status.completed}")
print(f"Failed: {status.failed}")
print(f"Success Rate: {status.success_rate:.1%}")
for doc in status.documents:
print(f" {doc.document_id}: {doc.status.value}")

Get all document IDs belonging to a batch. Useful for pre-fetching a document count before a confirmation prompt without triggering the full reprocess pipeline.

Parameters:

  • batch_id (str, required): Batch identifier
  • stack_name (str, optional): Stack name override

Returns: list[str] - List of document object keys (S3 keys) in the batch

doc_ids = client.batch.get_document_ids(batch_id="batch-20250123-123456")
print(f"Batch contains {len(doc_ids)} documents")

Get extracted metadata and fields for all documents in a batch, with pagination support.

Parameters:

  • batch_id (str, required): Batch identifier
  • section_id (int, optional): Section number within documents (default: 1)
  • limit (int, optional): Maximum documents to return per page (default: 10)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: dict with batch_id, section_id, count, total_in_batch, documents (list of per-document result dicts), and optional next_token

result = client.batch.get_results(batch_id="batch-20250123-123456", limit=20)
print(f"Showing {result['count']} of {result['total_in_batch']} documents")
for doc in result["documents"]:
print(f" {doc['document_id']}: {doc['document_class']} ({doc['status']})")
# Pagination
if result.get("next_token"):
next_page = client.batch.get_results(
batch_id="batch-20250123-123456",
limit=20,
next_token=result["next_token"]
)

Get confidence scores and quality metrics for all documents in a batch, with pagination support.

Parameters:

  • batch_id (str, required): Batch identifier
  • section_id (int, optional): Section number (default: 1)
  • limit (int, optional): Maximum documents to return (default: 10)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: dict with batch_id, section_id, count, total_in_batch, documents (list with per-document confidence attributes), and optional next_token

result = client.batch.get_confidence(batch_id="batch-20250123-123456", limit=20)
for doc in result["documents"]:
low_conf = [f for f, a in doc["attributes"].items()
if not a.get("meets_threshold")]
if low_conf:
print(f"{doc['document_id']} needs review: {', '.join(low_conf)}")

List recent batch processing jobs with pagination support.

Parameters:

  • limit (int, optional): Maximum number of batches to return (default: 10)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: BatchListResult with batches (list of BatchInfo), count, and optional next_token

# List recent batches
result = client.batch.list(limit=20)
for batch in result.batches:
print(f"{batch.batch_id}: {batch.queued} docs, {batch.timestamp}")
# Pagination
if result.next_token:
next_page = client.batch.list(limit=20, next_token=result.next_token)

Download processing results (processed outputs) from OutputBucket for all documents in a batch.

Parameters:

  • batch_id (str, required): Batch identifier
  • output_dir (str, required): Local directory to save results
  • file_types (list[str], optional): File types to download - “pages”, “sections”, “summary”, “evaluation”, or “all” (default: [“all”])
  • stack_name (str, optional): Stack name override

Returns: BatchDownloadResult with files_downloaded, documents_downloaded, and output_dir

result = client.batch.download_results(
batch_id="batch-20250123-123456",
output_dir="./results",
file_types=["summary", "sections"]
)
print(f"Downloaded {result.files_downloaded} files")

Download original source files from InputBucket for all documents in a batch.

Parameters:

  • batch_id (str, required): Batch identifier
  • output_dir (str, required): Local directory to save source files
  • stack_name (str, optional): Stack name override

Returns: BatchDownloadResult with files_downloaded, documents_downloaded, and output_dir

result = client.batch.download_sources(
batch_id="batch-20250123-123456",
output_dir="./source_files"
)
print(f"Downloaded {result.files_downloaded} source files")

Permanently delete documents and their associated data from InputBucket, OutputBucket, and DynamoDB. Select documents by batch ID or wildcard pattern.

Parameters:

  • batch_id (str, optional): Batch identifier (selects all docs containing this string)
  • pattern (str, optional): Wildcard pattern to match document keys (e.g., "batch-123/*.pdf", "*invoice*")
  • status_filter (str, optional): Filter by document status (e.g., “FAILED”, “COMPLETED”)
  • stack_name (str, optional): Stack name override
  • dry_run (bool, optional): If True, simulate deletion without actually deleting (default: False)
  • continue_on_error (bool, optional): Continue deleting if one document fails (default: True)

Note: Must specify either batch_id or pattern (not both).

Returns: BatchDeletionResult with success, deleted_count, failed_count, total_count, dry_run, and results (list of DocumentDeletionResult)

# Delete entire batch
result = client.batch.delete_documents(batch_id="batch-123")
# Delete with status filter
result = client.batch.delete_documents(
batch_id="batch-123",
status_filter="FAILED"
)
# Delete by wildcard pattern
result = client.batch.delete_documents(
pattern="batch-123/*.pdf"
)
# Delete all failed invoices across batches
result = client.batch.delete_documents(
pattern="*invoice*",
status_filter="FAILED"
)
# Dry run
result = client.batch.delete_documents(
batch_id="batch-123",
dry_run=True
)
print(f"Deleted: {result.deleted_count}/{result.total_count}")

Reprocess existing documents from a specific pipeline step.

Parameters:

  • step (str or RerunStep, required): Pipeline step to reprocess from (e.g., “classification”, “extraction”, RerunStep.EXTRACTION)
  • document_ids (list[str], optional): Specific document IDs to reprocess
  • batch_id (str, optional): Batch ID to reprocess all documents in batch
  • stack_name (str, optional): Stack name override

Note: Must specify either document_ids or batch_id

Returns: BatchReprocessResult with documents_queued, documents_failed, failed_documents, and step

from idp_sdk import RerunStep
# Reprocess batch
result = client.batch.reprocess(
step=RerunStep.EXTRACTION,
batch_id="batch-20250123-123456"
)
# Reprocess specific documents
result = client.batch.reprocess(
step="classification",
document_ids=["batch/doc1.pdf", "batch/doc2.pdf"]
)
print(f"Queued: {result.documents_queued}")

Stop all running Step Functions workflows and purge the SQS queue.

Parameters:

  • stack_name (str, optional): Stack name override
  • skip_purge (bool, optional): Skip purging the SQS queue (default: False)
  • skip_stop (bool, optional): Skip stopping executions (default: False)

Returns: StopWorkflowsResult with executions_stopped (ExecutionsStoppedResult), documents_aborted (DocumentsAbortedResult), and queue_purged

result = client.batch.stop_workflows()
print(f"Queue purged: {result.queue_purged}")
if result.executions_stopped:
print(f"Executions stopped: {result.executions_stopped.total_stopped}")
if result.documents_aborted:
print(f"Documents aborted: {result.documents_aborted.documents_aborted}")

Operations for baseline comparison and accuracy measurement.

Create evaluation baseline for a document to enable automated accuracy testing.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • baseline_data (dict, required): Expected extraction results with sections and fields
  • metadata (dict, optional): Optional metadata (created_by, purpose, etc.)
  • stack_name (str, optional): Stack name override

Returns: dict with baseline creation result

baseline = {
"sections": [
{
"section_id": 1,
"document_class": "invoice",
"fields": {
"invoice_number": "INV-12345",
"total_amount": "1250.00",
"invoice_date": "2024-01-15"
}
}
]
}
result = client.evaluation.create_baseline(
document_id="test-invoice-001.pdf",
baseline_data=baseline,
metadata={"created_by": "qa_team"}
)

Get evaluation report comparing extraction results to baseline.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • section_id (int, optional): Section number (default: 1)
  • stack_name (str, optional): Stack name override

Returns: EvaluationReport with document_id, section_id, accuracy, field_results, and summary

report = client.evaluation.get_report(document_id="test-invoice-001.pdf")
print(f"Accuracy: {report.accuracy:.1%}")
for field, result in report.field_results.items():
if result['match']:
print(f"✓ {field}: {result['extracted']}")
else:
print(f"✗ {field}: expected '{result['expected']}', got '{result['extracted']}'")

Get aggregated evaluation metrics across multiple documents.

Parameters:

  • start_date (str, optional): Filter by start date (ISO format: “2024-01-15”)
  • end_date (str, optional): Filter by end date (ISO format: “2024-01-31”)
  • document_class (str, optional): Filter by document type (e.g., “invoice”)
  • batch_id (str, optional): Filter by batch identifier
  • stack_name (str, optional): Stack name override

Returns: EvaluationMetrics with total_evaluations, average_accuracy, and by_document_class

metrics = client.evaluation.get_metrics(
start_date="2024-01-01",
end_date="2024-01-31"
)
print(f"Total evaluations: {metrics.total_evaluations}")
print(f"Average accuracy: {metrics.average_accuracy:.1%}")
for doc_class, accuracy in metrics.by_document_class.items():
print(f"{doc_class}: {accuracy:.1%}")

List evaluation baselines with pagination support.

Parameters:

  • limit (int, optional): Maximum number of baselines to return (default: 100)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: EvaluationBaselineListResult with baselines, count, and optional next_token

result = client.evaluation.list_baselines(limit=50)
for baseline in result.baselines:
print(f"{baseline['document_id']}: {baseline['created_at']}")
if result.next_token:
next_page = client.evaluation.list_baselines(limit=50, next_token=result.next_token)

Delete evaluation baseline for a document.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • stack_name (str, optional): Stack name override

Returns: dict with deletion result

result = client.evaluation.delete_baseline(document_id="test-invoice-001.pdf")
print(f"Deleted: {result['success']}")

Operations for quality metrics and confidence scoring.

Get confidence scores for all extracted fields in a document section.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • section_id (int, optional): Section number (default: 1)
  • stack_name (str, optional): Stack name override

Returns: AssessmentConfidenceResult with document_id, section_id, and attributes (dict of AssessmentFieldConfidence)

AssessmentFieldConfidence fields:

  • confidence: Float 0.0-1.0 (e.g., 0.95 = 95% confident)
  • confidence_threshold: Minimum acceptable confidence from config
  • meets_threshold: Boolean indicating if confidence is acceptable
  • reason: Explanation for the confidence level
confidence = client.assessment.get_confidence(document_id="batch-001/invoice.pdf")
# Check if any fields need review
low_conf_fields = [
field for field, attr in confidence.attributes.items()
if not attr.meets_threshold
]
if low_conf_fields:
print(f"Review needed for: {', '.join(low_conf_fields)}")
# Get confidence for specific field
total_amount = confidence.attributes.get("total_amount")
if total_amount:
print(f"Total amount confidence: {total_amount.confidence:.2%}")
print(f"Meets threshold: {total_amount.meets_threshold}")

Get bounding box coordinates for all extracted fields in a document section.

Parameters:

  • document_id (str, required): Document identifier (S3 key)
  • section_id (int, optional): Section number (default: 1)
  • stack_name (str, optional): Stack name override

Returns: AssessmentGeometryResult with document_id, section_id, and attributes (dict of AssessmentFieldGeometry)

AssessmentFieldGeometry fields:

  • page: Page number where field was found (1-indexed)
  • bbox: Normalized bounding box [left, top, width, height] (0.0-1.0)
  • bounding_box: Absolute pixel coordinates {Left, Top, Width, Height}
geometry = client.assessment.get_geometry(document_id="batch-001/invoice.pdf")
# Highlight fields in document viewer
for field_name, geo in geometry.attributes.items():
print(f"{field_name} found on page {geo.page}")
# Draw rectangle at normalized coordinates
draw_highlight(
page=geo.page,
left=geo.bbox[0],
top=geo.bbox[1],
width=geo.bbox[2],
height=geo.bbox[3]
)

Get aggregated quality metrics across multiple documents.

Parameters:

  • start_date (str, optional): Filter by start date (ISO format: “2024-01-15”)
  • end_date (str, optional): Filter by end date (ISO format: “2024-01-31”)
  • document_class (str, optional): Filter by document type (e.g., “invoice”)
  • batch_id (str, optional): Filter by batch identifier
  • stack_name (str, optional): Stack name override

Returns: dict with aggregated metrics

metrics = client.assessment.get_metrics(
start_date="2024-01-15",
end_date="2024-01-15"
)
print(f"Processed: {metrics['total_documents']} documents")
print(f"Avg confidence: {metrics['average_confidence']:.2%}")
print(f"SLA compliance: {metrics['threshold_compliance']:.2%}")

Operations for knowledge base queries and semantic search.

Query knowledge base with natural language questions.

Parameters:

  • question (str, required): Natural language question
  • document_ids (list[str], optional): Limit search to specific documents
  • limit (int, optional): Maximum number of results to return (default: 10)
  • next_token (str, optional): Pagination token from previous request
  • stack_name (str, optional): Stack name override

Returns: SearchResult with answer, confidence, citations, and optional next_token

# Ask a question
result = client.search.query(
question="What is the total amount on invoice INV-12345?"
)
print(f"Answer: {result.answer}")
print(f"Confidence: {result.confidence:.1%}")
for citation in result.citations:
print(f"Source: {citation.document.document_id}")
print(f"Page: {citation.document.page}")
print(f"Text: {citation.text}")
# Search within specific documents
result = client.search.query(
question="What is the vendor name?",
document_ids=["batch-001/invoice1.pdf", "batch-001/invoice2.pdf"]
)

Operations for deploying and managing IDP stacks.

Deploy or update an IDP CloudFormation stack.

Parameters:

  • stack_name (str, optional): CloudFormation stack name (uses client default if not provided)
  • admin_email (str, optional): Admin user email — required for new stacks
  • template_url (str, optional): URL to CloudFormation template in S3
  • template_path (str, optional): Local path to CloudFormation template file
  • from_code (str, optional): Path to project root for building from source
  • custom_config (str, optional): Path to local config file or S3 URI
  • max_concurrent (int, optional): Maximum concurrent workflows
  • log_level (str, optional): Logging level (DEBUG, INFO, WARN, ERROR)
  • enable_hitl (bool, optional): Enable Human-in-the-Loop
  • parameters (dict, optional): Additional CloudFormation parameters
  • wait (bool, optional): Wait for operation to complete (default: True)
  • no_rollback (bool, optional): Disable rollback on failure (default: False)
  • role_arn (str, optional): CloudFormation service role ARN

Returns: StackDeploymentResult with success, operation, status, stack_name, stack_id, outputs, and error

from idp_sdk import Pattern
result = client.stack.deploy(
stack_name="my-new-stack",
admin_email="admin@example.com",
max_concurrent=100,
wait=True
)
if result.success:
print(f"Stack deployed: {result.stack_name}")
print(f"Outputs: {result.outputs}")

Delete an IDP CloudFormation stack.

Parameters:

  • stack_name (str, optional): CloudFormation stack name (uses client default if not provided)
  • empty_buckets (bool, optional): Empty S3 buckets before deletion (default: False)
  • force_delete_all (bool, optional): Force delete all retained resources after deletion (default: False)
  • wait (bool, optional): Wait for deletion to complete (default: True)

Returns: StackDeletionResult with success, status, stack_name, stack_id, error, and cleanup_result

result = client.stack.delete(
empty_buckets=True,
force_delete_all=False,
wait=True
)
print(f"Status: {result.status}")

Get stack resource information.

Returns: StackResources with bucket names, ARNs, and other resource identifiers

resources = client.stack.get_resources()
print(f"Input Bucket: {resources.input_bucket}")
print(f"Output Bucket: {resources.output_bucket}")
print(f"Queue URL: {resources.document_queue_url}")

Check whether a CloudFormation stack exists.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: bool — True if the stack exists, False otherwise

if client.stack.exists():
print("Stack is deployed")
else:
print("Stack not found")

Get the current CloudFormation status of a stack.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: str or None — CloudFormation status string (e.g., "UPDATE_IN_PROGRESS"), or None if the stack does not exist

status = client.stack.get_status()
print(f"Stack status: {status}")

Check whether a CloudFormation stack has an operation currently in progress.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: StackOperationInProgress if an operation is in progress, None otherwise. The operation field is one of "CREATE", "UPDATE", or "DELETE".

in_progress = client.stack.check_in_progress()
if in_progress:
print(f"Operation in progress: {in_progress.operation} ({in_progress.status})")

Monitor a CloudFormation stack operation until it reaches a terminal state. Blocks until the operation completes or fails.

Parameters:

  • stack_name (str, optional): Stack name override
  • operation (str, optional): Operation type being monitored: "CREATE", "UPDATE", or "DELETE" (default: "UPDATE")
  • poll_interval_seconds (int, optional): Seconds between CloudFormation API polls (default: 10)

Returns: StackMonitorResult with success, operation, status, stack_name, outputs, and error

result = client.stack.monitor(operation="UPDATE")
if result.success:
print(f"Operation complete: {result.status}")
else:
print(f"Operation failed: {result.error}")

Cancel an in-progress stack update. Only valid when the stack is in UPDATE_IN_PROGRESS status.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: CancelUpdateResult with success, message, and error

result = client.stack.cancel_update()
if result.success:
print("Update cancelled successfully")

Wait for a CloudFormation stack to reach a stable (non-transitional) state. Useful before triggering an operation on a stack that may be in a transitional state.

Parameters:

  • stack_name (str, optional): Stack name override
  • timeout_seconds (int, optional): Maximum seconds to wait (default: 1200)
  • poll_interval_seconds (int, optional): Seconds between polls (default: 10)

Returns: StackStableStateResult with success, status, and message

result = client.stack.wait_for_stable_state()
if result.success:
print(f"Stack is stable: {result.status}")

Analyze a CloudFormation deployment failure. Recursively collects failed events from the main stack and all nested stacks, identifies root causes vs. cascade failures.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: FailureAnalysis with stack_name, root_causes (list of FailureCause), and all_failures (list of FailureCause)

analysis = client.stack.get_failure_analysis()
print(f"Root causes ({len(analysis.root_causes)}):")
for cause in analysis.root_causes:
print(f" {cause.resource}: {cause.reason}")

Remove residual AWS resources left behind from deleted IDP stacks. Identifies orphaned CloudFront distributions, CloudWatch log groups, AppSync APIs, IAM policies, S3 buckets, DynamoDB tables, and more.

Parameters:

  • dry_run (bool, optional): Preview changes without making them (default: False)
  • auto_approve (bool, optional): Auto-approve all deletions (default: False)
  • regions (list[str], optional): AWS regions to check (default: us-east-1, us-west-2, eu-central-1)
  • profile (str, optional): AWS profile name (default: None)

Returns: OrphanedResourceCleanupResult with results (dict), has_errors, and has_disabled

# Preview what would be cleaned up
result = client.stack.cleanup_orphaned(dry_run=True)
print(f"Has errors: {result.has_errors}")

Get information about S3 buckets associated with a CloudFormation stack.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: list[BucketInfo] — one BucketInfo per S3 bucket, with logical_id, bucket_name, object_count, total_size, and size_display

buckets = client.stack.get_bucket_info()
for bucket in buckets:
print(f"{bucket.logical_id}: {bucket.bucket_name} ({bucket.size_display}, {bucket.object_count} objects)")

Operations for managing IDP configurations.

Generate an IDP configuration template.

Parameters:

  • features (str, optional): Feature set to include — "min", "core", "all", or comma-separated (default: "min")
  • pattern (str, optional): Pattern to use — "pattern-1" or "pattern-2" (default: "pattern-2")
  • output (str, optional): Output file path
  • include_prompts (bool, optional): Include prompt templates (default: False)
  • include_comments (bool, optional): Include explanatory comments (default: True)

Returns: ConfigCreateResult with yaml_content and output_path

result = client.config.create(
features="min", # min, core, all, or comma-separated
pattern="pattern-2",
output="config.yaml",
include_prompts=False,
include_comments=True
)
print(result.yaml_content)

Validate a configuration file against system defaults.

Parameters:

  • config_file (str, required): Path to the configuration file
  • pattern (str, optional): Pattern to validate against (default: "pattern-2")
  • show_merged (bool, optional): Include merged configuration in result (default: False)
  • strict (bool, optional): Report deprecated/unknown fields as errors (default: False)

Returns: ConfigValidationResult with valid, errors, warnings, deprecated_fields, unknown_fields, and optional merged_config

result = client.config.validate(
config_file="./config.yaml",
pattern="pattern-2"
)
if result.valid:
print("Configuration is valid")
else:
for error in result.errors:
print(f"Error: {error}")
for warning in result.warnings:
print(f"Warning: {warning}")

Upload configuration to a deployed stack.

Parameters:

  • config_file (str, required): Path to the YAML or JSON configuration file
  • config_version (str, required): Version to upload to (e.g., "default", "v1", "production"). If the version doesn’t exist, it will be created automatically.
  • stack_name (str, optional): Stack name override
  • validate (bool, optional): Validate configuration before uploading (default: True)
  • pattern (str, optional): Pattern to validate against (default: "pattern-2")
  • description (str, optional): Description for the configuration version

Returns: ConfigUploadResult with success, version, version_created, and error

# Upload to the default version
result = client.config.upload(
config_file="./my-config.yaml",
config_version="default",
validate=True
)
if result.success:
print("Configuration uploaded")
# Create a new named version
result = client.config.upload(
config_file="./my-config.yaml",
config_version="v2",
description="Updated extraction rules"
)
if result.success:
if result.version_created:
print(f"New version created: {result.version}")
else:
print(f"Version updated: {result.version}")

Download configuration from a deployed stack.

Parameters:

  • stack_name (str, optional): Stack name override
  • output (str, optional): Output file path
  • format (str, optional): Format type — "full" or "minimal" (default: "full")
  • pattern (str, optional): Pattern override (auto-detected if not provided)
  • config_version (str, optional): Configuration version to download (default: active version)

Returns: ConfigDownloadResult with config, yaml_content, and output_path

result = client.config.download(
output="downloaded-config.yaml",
format="minimal" # "full" or "minimal"
)
print(result.yaml_content)

List all configuration versions in a deployed stack.

Parameters:

  • stack_name (str, optional): Stack name override

Returns: ConfigListResult with versions (list of ConfigVersionInfo) and count

ConfigVersionInfo fields: version_name, is_active, created_at, updated_at, description

result = client.config.list()
print(f"Found {result.count} versions:")
for version in result.versions:
status = " (ACTIVE)" if version.is_active else ""
print(f" - {version.version_name}{status}")

Activate a configuration version. If the configuration uses BDA (use_bda=True), a BDA blueprint sync is performed before activation.

Parameters:

  • config_version (str, required): Configuration version to activate
  • stack_name (str, optional): Stack name override

Returns: ConfigActivateResult with success, activated_version, bda_synced, bda_classes_synced, bda_classes_failed, and error

result = client.config.activate("v2")
if result.success:
print(f"Activated version: {result.activated_version}")
if result.bda_synced:
print(f"BDA synced: {result.bda_classes_synced} classes")
else:
print(f"Failed to activate: {result.error}")

Delete a configuration version.

Parameters:

  • config_version (str, required): Configuration version to delete
  • stack_name (str, optional): Stack name override

Returns: ConfigDeleteResult with success, deleted_version, and error

result = client.config.delete("old-version")
if result.success:
print(f"Deleted version: {result.deleted_version}")
else:
print(f"Failed to delete: {result.error}")

Note: Cannot delete "default" or currently active versions.

Synchronize IDP document class schemas with BDA (Bedrock Data Automation) blueprints.

Parameters:

  • direction (str, optional): Sync direction — "bidirectional" (default), "bda_to_idp", or "idp_to_bda"
  • mode (str, optional): Sync mode — "replace" (default, full alignment) or "merge" (additive)
  • config_version (str, optional): Configuration version to sync (default: active version)
  • stack_name (str, optional): Stack name override

Returns: ConfigSyncBdaResult with success, direction, mode, classes_synced, classes_failed, processed_classes, and error

# Bidirectional sync (default)
result = client.config.sync_bda()
# Import BDA blueprints to IDP (merge mode)
result = client.config.sync_bda(
direction="bda_to_idp",
mode="merge"
)
# Push IDP classes to BDA
result = client.config.sync_bda(
direction="idp_to_bda",
config_version="v2"
)
if result.success:
print(f"Synced {result.classes_synced} classes")
for cls in result.processed_classes:
print(f" • {cls}")
else:
print(f"Sync failed: {result.error}")

Discover document class schemas from sample documents using Amazon Bedrock.

Two modes:

  • Stack-connected (with stack_name): Uses the stack’s discovery config from DynamoDB, saves discovered schema to config
  • Local (without stack_name): Uses system default Bedrock settings, returns schema without saving

Analyze a document to generate a JSON Schema definition for a document class.

Parameters:

  • document_path (str, required): Local path to document file (PDF, PNG, JPG, TIFF)
  • ground_truth_path (str, optional): Path to JSON ground truth file
  • config_version (str, optional): Config version to save to (stack mode only)
  • stack_name (str, optional): Stack name override
  • page_range (str, optional): Page range to extract from a PDF (e.g., “1-3”)
  • class_name_hint (str, optional): Hint for the document class name (LLM uses this as $id)
  • auto_detect (bool, optional): If True, auto-detect section boundaries and discover each section. Returns DiscoveryBatchResult.

Returns: DiscoveryResult with status, document_class, json_schema, config_version, document_path, page_range, and error. When auto_detect=True, returns DiscoveryBatchResult.

# Local mode — no stack needed
client = IDPClient()
result = client.discovery.run("./invoice.pdf")
print(json.dumps(result.json_schema, indent=2))
# Stack mode — uses stack config, saves schema
client = IDPClient(stack_name="my-stack")
result = client.discovery.run("./w2-form.pdf")
# With ground truth for better accuracy
result = client.discovery.run(
"./invoice.pdf",
ground_truth_path="./invoice-expected.json"
)
# With class name hint
result = client.discovery.run(
"./form.pdf",
class_name_hint="W2 Tax Form"
)
# Discover specific page range from a PDF
result = client.discovery.run(
"./lending_package.pdf",
page_range="3-5",
class_name_hint="W2 Form"
)
# Auto-detect sections and discover each
batch_result = client.discovery.run(
"./lending_package.pdf",
auto_detect=True,
config_version="v2"
)
for r in batch_result.results:
print(f"{r.document_class} (pages {r.page_range}): {r.status}")
# Save to specific config version
result = client.discovery.run(
"./form.pdf",
config_version="v2"
)

Detect document section boundaries in a multi-page PDF using LLM analysis.

Parameters:

  • document_path (str, required): Local path to a PDF document
  • stack_name (str, optional): Stack name override

Returns: AutoDetectResult with status, sections (list of AutoDetectSection), document_path, and error

AutoDetectSection fields: start (int), end (int), type (str, optional)

# Detect section boundaries
result = client.discovery.auto_detect_sections("./lending_package.pdf")
if result.status == "SUCCESS":
for section in result.sections:
print(f"Pages {section.start}-{section.end}: {section.type}")
# Output:
# Pages 1-2: Cover Letter
# Pages 3-5: W2 Form
# Pages 6-8: Bank Statement

Discover multiple document classes from page ranges in a single PDF.

Parameters:

  • document_path (str, required): Local path to a multi-page PDF
  • page_ranges (list, required): List of dicts with start (int), end (int), and optional label (str)
  • config_version (str, optional): Config version to save to
  • stack_name (str, optional): Stack name override

Returns: DiscoveryBatchResult with one result per page range

# Discover specific page ranges
result = client.discovery.run_multi_section(
"./lending_package.pdf",
page_ranges=[
{"start": 1, "end": 2, "label": "Cover Letter"},
{"start": 3, "end": 5, "label": "W2 Form"},
{"start": 6, "end": 8, "label": "Bank Statement"},
],
config_version="v2"
)
print(f"Discovered {result.succeeded}/{result.total} sections")
for r in result.results:
print(f" Pages {r.page_range}: {r.document_class} ({r.status})")

Discover document classes from a collection of documents using embedding-based clustering and agentic analysis. Unlike run() (which analyzes one document at a time), this method analyzes a directory of mixed documents to automatically identify document types, cluster similar documents, and generate JSON Schemas for each discovered class.

Requires: pip install idp-common[multi_document_discovery]

Note: Requires at least 2 documents per expected class. Clusters with fewer than 2 documents are filtered as noise. For discovering schemas from individual documents, use discovery.run() instead.

Parameters:

  • document_dir (str, optional): Directory path containing documents to analyze (recursive scan)
  • document_paths (list[str], optional): List of individual document file paths
  • embedding_model_id (str, optional): Bedrock embedding model ID (default: us.cohere.embed-v4:0)
  • analysis_model_id (str, optional): Bedrock LLM for cluster analysis (default: us.anthropic.claude-sonnet-4-6)
  • output_dir (str, optional): Directory to write individual JSON schema files per discovered class
  • save_to_config (bool, optional): Save discovered schemas to the stack’s configuration (default: False)
  • config_version (str, optional): Configuration version to save schemas to (required with save_to_config)
  • progress_callback (callable, optional): Callback function for pipeline progress updates
  • region (str, optional): AWS region

Returns: MultiDocDiscoveryResult with status (SUCCESS/PARTIAL/FAILED), discovered_classes (list of DiscoveredClassResult), reflection_report, total_documents, total_clusters, noise_documents, config_version, and error

DiscoveredClassResult fields: cluster_id, classification, json_schema, document_count, sample_doc_ids, error

# Basic usage — discover from a directory
client = IDPClient()
result = client.discovery.run_multi_doc(document_dir="./samples/")
print(f"Status: {result.status}")
print(f"Documents: {result.total_documents} → Clusters: {result.total_clusters}")
for dc in result.discovered_classes:
if not dc.error:
print(f" Cluster {dc.cluster_id}: {dc.classification} ({dc.document_count} docs)")
# Save schemas to output directory
result = client.discovery.run_multi_doc(
document_dir="./samples/",
output_dir="./schemas/"
)
# Save to stack configuration
client = IDPClient(stack_name="my-stack")
result = client.discovery.run_multi_doc(
document_dir="./samples/",
save_to_config=True,
config_version="v2"
)
# With explicit document paths
result = client.discovery.run_multi_doc(
document_paths=["./doc1.pdf", "./doc2.png", "./doc3.jpg"]
)
# With progress callback
def on_progress(step, data=None):
print(f" [{step}] {data}")
result = client.discovery.run_multi_doc(
document_dir="./samples/",
progress_callback=on_progress
)
# Print reflection report
if result.reflection_report:
print(result.reflection_report)

Note: If the required dependencies are not installed, this method returns a MultiDocDiscoveryResult with status="FAILED" and an error message instructing the user to install idp-common[multi_document_discovery], rather than raising an exception.

Run discovery on multiple documents sequentially. Ground truth paths are auto-matched to documents by position.

Parameters:

  • document_paths (list, required): List of local file paths
  • ground_truth_paths (list, optional): Parallel list of ground truth paths (use None for docs without ground truth)
  • config_version (str, optional): Config version to save to
  • stack_name (str, optional): Stack name override

Returns: DiscoveryBatchResult with total, succeeded, failed, and results (list of DiscoveryResult)

# Batch without ground truth
result = client.discovery.run_batch([
"./invoice.pdf",
"./w2-form.pdf",
"./paystub.png",
])
print(f"Succeeded: {result.succeeded}/{result.total}")
# Batch with selective ground truth (matched by position)
result = client.discovery.run_batch(
["./invoice.pdf", "./w2.pdf"],
ground_truth_paths=[None, "./w2.json"],
)

Operations for manifest generation and validation.

Generate a manifest file from a directory or S3 URI.

result = client.manifest.generate(
directory="./documents/",
baseline_dir="./baselines/",
output="manifest.csv",
file_pattern="*.pdf",
recursive=True
)
print(f"Documents: {result.document_count}")
print(f"Baselines matched: {result.baselines_matched}")

Validate a manifest file.

result = client.manifest.validate(manifest_path="./manifest.csv")
if result.valid:
print(f"Valid manifest with {result.document_count} documents")
else:
print(f"Invalid: {result.error}")

Operations for load testing and performance validation.

Run load testing by copying files to the input bucket.

Parameters:

  • source_file (str, required): Source file to copy
  • stack_name (str, optional): Stack name override
  • rate (int, optional): Files per minute for constant load (default: 100)
  • duration (int, optional): Duration in minutes (default: 1)
  • schedule_file (str, optional): Optional schedule file for variable load
  • dest_prefix (str, optional): Destination prefix in S3 (default: "load-test")
  • config_version (str, optional): Configuration version to tag files with

Returns: LoadTestResult with success, total_files, duration_minutes, and error

result = client.testing.load_test(
source_file="./sample.pdf",
rate=100, # Files per minute
duration=5, # Duration in minutes
dest_prefix="load-test"
)
print(f"Total files: {result.total_files}")
print(f"Success: {result.success}")

All operations return typed Pydantic models. Import them from the top-level idp_sdk package:

from idp_sdk import (
# Document models
DocumentUploadResult,
DocumentStatus,
DocumentDownloadResult,
DocumentReprocessResult,
DocumentRerunResult,
DocumentDeletionResult,
DocumentMetadata,
DocumentInfo,
DocumentListResult,
# Batch models
BatchResult,
BatchProcessResult,
BatchStatus,
BatchInfo,
BatchListResult,
BatchReprocessResult,
BatchRerunResult,
BatchDownloadResult,
BatchDeletionResult,
# Evaluation models
EvaluationReport,
EvaluationMetrics,
EvaluationBaselineListResult,
BaselineResult,
BaselineInfo,
FieldComparison,
DeleteResult,
# Assessment models
AssessmentConfidenceResult,
AssessmentFieldConfidence,
AssessmentGeometryResult,
AssessmentFieldGeometry,
AssessmentMetrics,
# Search models
SearchResult,
SearchCitation,
SearchDocumentReference,
# Stack models
StackDeploymentResult,
StackDeletionResult,
StackResources,
StackOperationInProgress,
StackMonitorResult,
StackStableStateResult,
FailureCause,
FailureAnalysis,
BucketInfo,
CancelUpdateResult,
OrphanedResourceCleanupResult,
# Config models
ConfigCreateResult,
ConfigValidationResult,
ConfigUploadResult,
ConfigDownloadResult,
ConfigActivateResult,
ConfigVersionInfo,
ConfigListResult,
ConfigDeleteResult,
# Discovery models
DiscoveryResult,
DiscoveryBatchResult,
# Manifest models
ManifestDocument,
ManifestResult,
ManifestValidationResult,
# Testing models
StopWorkflowsResult,
ExecutionsStoppedResult,
DocumentsAbortedResult,
LoadTestResult,
# Enums
DocumentState,
Pattern,
RerunStep,
StackState,
# Exceptions
IDPError,
IDPConfigurationError,
IDPStackError,
IDPProcessingError,
IDPResourceNotFoundError,
IDPValidationError,
IDPTimeoutError,
)

from idp_sdk import IDPClient, IDPProcessingError, IDPResourceNotFoundError
client = IDPClient(stack_name="my-stack")
try:
result = client.document.process(file_path="./invoice.pdf")
except IDPProcessingError as e:
print(f"Processing error: {e}")
except IDPResourceNotFoundError as e:
print(f"Resource not found: {e}")
except Exception as e:
print(f"Unexpected error: {e}")

from idp_sdk import IDPClient, RerunStep
import time
# Initialize client
client = IDPClient(stack_name="my-idp-stack", region="us-west-2")
# Upload single document
doc_result = client.document.process(file_path="./invoice.pdf")
doc_id = doc_result.document_id
# Monitor document processing
while True:
status = client.document.get_status(document_id=doc_id)
print(f"Status: {status.status.value}")
if status.status.value in ["COMPLETED", "FAILED"]:
break
time.sleep(5)
# Download results if successful
if status.status.value == "COMPLETED":
client.document.download_results(
document_id=doc_id,
output_dir="./results"
)
print("Results downloaded successfully")
# Process a batch
batch_result = client.batch.process(source="./documents/")
batch_id = batch_result.batch_id
# Monitor batch progress
while True:
batch_status = client.batch.get_status(batch_id=batch_id)
print(f"Progress: {batch_status.completed}/{batch_status.total}")
if batch_status.all_complete:
break
time.sleep(10)
# Download batch results
client.batch.download_results(
batch_id=batch_id,
output_dir="./batch_results"
)
print(f"Batch complete! Success rate: {batch_status.success_rate:.1%}")