IDP SDK Documentation
IDP SDK Documentation
Section titled “IDP SDK Documentation”The IDP SDK provides programmatic Python access to all IDP Accelerator capabilities with a clean, namespaced API.
Installation
Section titled “Installation”# Install from local development# Recommended: install everything at oncemake setup-venvsource .venv/bin/activate
# Or install just the SDK with pip/uvuv pip install -e ./lib/idp_sdkQuick Start
Section titled “Quick Start”Using the Public Interface
Section titled “Using the Public Interface”The SDK is designed around a single entry point: IDPClient. Always import from the top-level idp_sdk package — this is the stable public interface. Avoid importing directly from internal submodules (e.g., idp_sdk._core, idp_sdk.operations, idp_sdk.models.*) as these are private implementation details and may change without notice.
# Correct: use the public interfacefrom idp_sdk import IDPClient, BatchProcessResult, RerunStep, IDPError
# Avoid: importing from private/internal modules# from idp_sdk.operations.batch import BatchOperation # private# from idp_sdk._core.batch_processor import BatchProcessor # privateAll response models, enums, and exceptions you need are exported directly from idp_sdk — see the Response Models section for the complete list.
from idp_sdk import IDPClient
# Create client with stack configurationclient = IDPClient(stack_name="my-idp-stack", region="us-west-2")
# Upload and process a single documentresult = client.document.process(file_path="./invoice.pdf")print(f"Document ID: {result.document_id}, Status: {result.status}")
# Process a batch of documentsbatch_result = client.batch.process(source="./documents/")print(f"Batch: {batch_result.batch_id}, Queued: {batch_result.queued}")
# Check statusstatus = client.batch.get_status(batch_id=batch_result.batch_id)print(f"Progress: {status.completed}/{status.total}")Architecture
Section titled “Architecture”The SDK follows a namespaced operation pattern for better organization:
client = IDPClient(stack_name="my-stack")
# Stack operationsclient.stack.deploy(...)client.stack.delete(...)client.stack.get_resources()client.stack.exists()client.stack.get_status()client.stack.check_in_progress()client.stack.monitor(...)client.stack.cancel_update()client.stack.wait_for_stable_state()client.stack.get_failure_analysis()client.stack.cleanup_orphaned(...)client.stack.get_bucket_info()
# Batch operations (multiple documents)client.batch.process(...)client.batch.reprocess(...)client.batch.get_status(...)client.batch.get_document_ids(...)client.batch.get_results(...)client.batch.get_confidence(...)client.batch.list()client.batch.download_results(...)client.batch.download_sources(...)client.batch.delete_documents(...)client.batch.stop_workflows()
# Document operations (single document)client.document.process(...)client.document.get_status(...)client.document.get_metadata(...)client.document.list(...)client.document.download_results(...)client.document.download_source(...)client.document.reprocess(...)client.document.delete(...)
# Discovery operations (schema generation)client.discovery.run(...)client.discovery.run_batch(...)client.discovery.run_multi_doc(...)client.discovery.run_multi_section(...)client.discovery.auto_detect_sections(...)
# Evaluation operations (baseline comparison)client.evaluation.create_baseline(...)client.evaluation.get_report(...)client.evaluation.get_metrics(...)client.evaluation.list_baselines(...)client.evaluation.delete_baseline(...)
# Assessment operations (quality metrics)client.assessment.get_confidence(...)client.assessment.get_geometry(...)client.assessment.get_metrics(...)
# Search operations (knowledge base)client.search.query(...)
# Configuration operationsclient.config.create(...)client.config.validate(...)client.config.upload(...)client.config.download(...)client.config.list()client.config.activate(...)client.config.delete(...)client.config.sync_bda(...)
# Manifest operationsclient.manifest.generate(...)client.manifest.validate(...)
# Testing operationsclient.testing.load_test(...)Client Initialization
Section titled “Client Initialization”from idp_sdk import IDPClient
# With stack name (for stack-dependent operations)client = IDPClient(stack_name="my-stack", region="us-west-2")
# Without stack (for stack-independent operations)client = IDPClient()
# Stack can be set laterclient.stack_name = "new-stack"Parameters
Section titled “Parameters”| Parameter | Type | Required | Description |
|---|---|---|---|
stack_name | str | No | CloudFormation stack name |
region | str | No | AWS region (defaults to boto3 default) |
Document Operations
Section titled “Document Operations”Operations for processing individual documents.
document.process()
Section titled “document.process()”Process a single document (upload and queue for processing).
Parameters:
file_path(str, required): Path to local file to uploaddocument_id(str, optional): Custom document ID (defaults to filename without extension)stack_name(str, optional): Stack name override
Returns: DocumentUploadResult with document_id, status, and timestamp
result = client.document.process( file_path="/path/to/invoice.pdf", document_id="custom-id" # Optional)
print(f"Document ID: {result.document_id}")print(f"Status: {result.status}") # "queued"print(f"Timestamp: {result.timestamp}")document.get_status()
Section titled “document.get_status()”Get processing status for a single document.
Parameters:
document_id(str, required): Document identifier (S3 key format: batch-id/filename)stack_name(str, optional): Stack name override
Returns: DocumentStatus with processing information including status, duration, pages, sections, and errors
status = client.document.get_status(document_id="batch-123/invoice.pdf")
print(f"Status: {status.status.value}")print(f"Pages: {status.num_pages}")print(f"Duration: {status.duration_seconds}s")document.download_results()
Section titled “document.download_results()”Download processing results (processed outputs) from OutputBucket for a single document.
Parameters:
document_id(str, required): Document identifier (S3 key)output_dir(str, required): Local directory to save resultsfile_types(list[str], optional): File types to download - “pages”, “sections”, “summary”, “evaluation” (defaults to all)stack_name(str, optional): Stack name override
Returns: DocumentDownloadResult with document_id, files_downloaded, and output_dir
result = client.document.download_results( document_id="batch-123/invoice.pdf", output_dir="./results", file_types=["pages", "sections", "summary"] # Optional)
print(f"Downloaded {result.files_downloaded} files")document.download_source()
Section titled “document.download_source()”Download original document source file from InputBucket.
Parameters:
document_id(str, required): Document identifier (S3 key)output_path(str, required): Local file path to save documentstack_name(str, optional): Stack name override
Returns: str - Local file path where document was saved
file_path = client.document.download_source( document_id="batch-123/invoice.pdf", output_path="./downloads/invoice.pdf")
print(f"Downloaded to: {file_path}")document.reprocess()
Section titled “document.reprocess()”Reprocess a single document from a specific pipeline step.
Parameters:
document_id(str, required): Document identifier (S3 key)step(str or RerunStep, required): Pipeline step to reprocess from (e.g., “classification”, “extraction”, RerunStep.EXTRACTION)stack_name(str, optional): Stack name override
Returns: DocumentReprocessResult with document_id, step, and queued status
from idp_sdk import RerunStep
result = client.document.reprocess( document_id="batch-123/invoice.pdf", step=RerunStep.EXTRACTION)
print(f"Queued: {result.queued}")document.delete()
Section titled “document.delete()”Permanently delete a single document and all associated data from InputBucket, OutputBucket, and DynamoDB.
Parameters:
document_id(str, required): Document identifier (S3 key)stack_name(str, optional): Stack name overridedry_run(bool, optional): If True, simulate deletion without actually deleting (default: False)
Returns: DocumentDeletionResult with success, object_key, deleted (dict of deleted items), and errors
result = client.document.delete( document_id="batch-123/invoice.pdf", dry_run=False)
print(f"Success: {result.success}")print(f"Deleted: {result.deleted}")document.list()
Section titled “document.list()”List processed documents with pagination support.
Parameters:
limit(int, optional): Maximum number of documents to return (default: 100)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: DocumentListResult with documents (list of DocumentInfo), count, and optional next_token
# List documentsresult = client.document.list(limit=50)
for doc in result.documents: print(f"{doc.document_id}: {doc.status}")
# Paginationif result.next_token: next_page = client.document.list(limit=50, next_token=result.next_token)document.get_metadata()
Section titled “document.get_metadata()”Get extracted metadata and fields for a processed document.
Parameters:
document_id(str, required): Document identifier (S3 key)section_id(int, optional): Section number (default: 1)stack_name(str, optional): Stack name override
Returns: DocumentMetadata with document_id, section_id, document_class, fields, confidence, page_count, and metadata
metadata = client.document.get_metadata(document_id="batch-123/invoice.pdf")
print(f"Document Type: {metadata.document_class}")print(f"Confidence: {metadata.confidence:.2%}")print(f"Fields:")for field_name, field_value in metadata.fields.items(): print(f" {field_name}: {field_value}")
# Access specific fieldsinvoice_number = metadata.fields.get("invoice_number")total_amount = metadata.fields.get("total_amount")Batch Operations
Section titled “Batch Operations”Operations for processing multiple documents.
batch.process()
Section titled “batch.process()”Process multiple documents through the IDP pipeline.
Parameters:
source(str, optional): Auto-detected source - directory path, manifest file, or S3 URImanifest(str, optional): Path to manifest CSV filedirectory(str, optional): Local directory paths3_uri(str, optional): S3 URI (s3://bucket/prefix/)test_set(str, optional): Test set identifierbatch_id(str, optional): Custom batch IDbatch_prefix(str, optional): Batch ID prefix (default: “sdk-batch”)file_pattern(str, optional): File pattern for filtering (default: “*.pdf”)recursive(bool, optional): Recursively process subdirectories (default: True)number_of_files(int, optional): Limit number of files to processconfig_path(str, optional): Path to custom configuration fileconfig_version(str, optional): Configuration version to use for processingcontext(str, optional): Context for test set processingstack_name(str, optional): Stack name override
Returns: BatchProcessResult with batch_id, document_ids, queued, uploaded, failed, baselines_uploaded, source, output_prefix, and timestamp
# From directoryresult = client.batch.process(source="./documents/")
# From manifestresult = client.batch.process(source="./manifest.csv")
# From S3result = client.batch.process(source="s3://bucket/path/")
# With optionsresult = client.batch.process( source="./documents/", batch_prefix="my-batch", file_pattern="*.pdf", recursive=True, number_of_files=10, config_path="./config.yaml", config_version="v2")
print(f"Batch ID: {result.batch_id}")print(f"Documents queued: {result.queued}")batch.get_status()
Section titled “batch.get_status()”Get processing status for all documents in a batch.
Parameters:
batch_id(str, required): Batch identifierstack_name(str, optional): Stack name override
Returns: BatchStatus with batch_id, documents (list of DocumentStatus), total, completed, failed, in_progress, queued, success_rate, and all_complete
status = client.batch.get_status(batch_id="batch-20250123-123456")
print(f"Total: {status.total}")print(f"Completed: {status.completed}")print(f"Failed: {status.failed}")print(f"Success Rate: {status.success_rate:.1%}")
for doc in status.documents: print(f" {doc.document_id}: {doc.status.value}")batch.get_document_ids()
Section titled “batch.get_document_ids()”Get all document IDs belonging to a batch. Useful for pre-fetching a document count before a confirmation prompt without triggering the full reprocess pipeline.
Parameters:
batch_id(str, required): Batch identifierstack_name(str, optional): Stack name override
Returns: list[str] - List of document object keys (S3 keys) in the batch
doc_ids = client.batch.get_document_ids(batch_id="batch-20250123-123456")print(f"Batch contains {len(doc_ids)} documents")batch.get_results()
Section titled “batch.get_results()”Get extracted metadata and fields for all documents in a batch, with pagination support.
Parameters:
batch_id(str, required): Batch identifiersection_id(int, optional): Section number within documents (default: 1)limit(int, optional): Maximum documents to return per page (default: 10)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: dict with batch_id, section_id, count, total_in_batch, documents (list of per-document result dicts), and optional next_token
result = client.batch.get_results(batch_id="batch-20250123-123456", limit=20)
print(f"Showing {result['count']} of {result['total_in_batch']} documents")for doc in result["documents"]: print(f" {doc['document_id']}: {doc['document_class']} ({doc['status']})")
# Paginationif result.get("next_token"): next_page = client.batch.get_results( batch_id="batch-20250123-123456", limit=20, next_token=result["next_token"] )batch.get_confidence()
Section titled “batch.get_confidence()”Get confidence scores and quality metrics for all documents in a batch, with pagination support.
Parameters:
batch_id(str, required): Batch identifiersection_id(int, optional): Section number (default: 1)limit(int, optional): Maximum documents to return (default: 10)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: dict with batch_id, section_id, count, total_in_batch, documents (list with per-document confidence attributes), and optional next_token
result = client.batch.get_confidence(batch_id="batch-20250123-123456", limit=20)
for doc in result["documents"]: low_conf = [f for f, a in doc["attributes"].items() if not a.get("meets_threshold")] if low_conf: print(f"{doc['document_id']} needs review: {', '.join(low_conf)}")batch.list()
Section titled “batch.list()”List recent batch processing jobs with pagination support.
Parameters:
limit(int, optional): Maximum number of batches to return (default: 10)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: BatchListResult with batches (list of BatchInfo), count, and optional next_token
# List recent batchesresult = client.batch.list(limit=20)
for batch in result.batches: print(f"{batch.batch_id}: {batch.queued} docs, {batch.timestamp}")
# Paginationif result.next_token: next_page = client.batch.list(limit=20, next_token=result.next_token)batch.download_results()
Section titled “batch.download_results()”Download processing results (processed outputs) from OutputBucket for all documents in a batch.
Parameters:
batch_id(str, required): Batch identifieroutput_dir(str, required): Local directory to save resultsfile_types(list[str], optional): File types to download - “pages”, “sections”, “summary”, “evaluation”, or “all” (default: [“all”])stack_name(str, optional): Stack name override
Returns: BatchDownloadResult with files_downloaded, documents_downloaded, and output_dir
result = client.batch.download_results( batch_id="batch-20250123-123456", output_dir="./results", file_types=["summary", "sections"])
print(f"Downloaded {result.files_downloaded} files")batch.download_sources()
Section titled “batch.download_sources()”Download original source files from InputBucket for all documents in a batch.
Parameters:
batch_id(str, required): Batch identifieroutput_dir(str, required): Local directory to save source filesstack_name(str, optional): Stack name override
Returns: BatchDownloadResult with files_downloaded, documents_downloaded, and output_dir
result = client.batch.download_sources( batch_id="batch-20250123-123456", output_dir="./source_files")
print(f"Downloaded {result.files_downloaded} source files")batch.delete_documents()
Section titled “batch.delete_documents()”Permanently delete documents and their associated data from InputBucket, OutputBucket, and DynamoDB. Select documents by batch ID or wildcard pattern.
Parameters:
batch_id(str, optional): Batch identifier (selects all docs containing this string)pattern(str, optional): Wildcard pattern to match document keys (e.g.,"batch-123/*.pdf","*invoice*")status_filter(str, optional): Filter by document status (e.g., “FAILED”, “COMPLETED”)stack_name(str, optional): Stack name overridedry_run(bool, optional): If True, simulate deletion without actually deleting (default: False)continue_on_error(bool, optional): Continue deleting if one document fails (default: True)
Note: Must specify either batch_id or pattern (not both).
Returns: BatchDeletionResult with success, deleted_count, failed_count, total_count, dry_run, and results (list of DocumentDeletionResult)
# Delete entire batchresult = client.batch.delete_documents(batch_id="batch-123")
# Delete with status filterresult = client.batch.delete_documents( batch_id="batch-123", status_filter="FAILED")
# Delete by wildcard patternresult = client.batch.delete_documents( pattern="batch-123/*.pdf")
# Delete all failed invoices across batchesresult = client.batch.delete_documents( pattern="*invoice*", status_filter="FAILED")
# Dry runresult = client.batch.delete_documents( batch_id="batch-123", dry_run=True)
print(f"Deleted: {result.deleted_count}/{result.total_count}")batch.reprocess()
Section titled “batch.reprocess()”Reprocess existing documents from a specific pipeline step.
Parameters:
step(str or RerunStep, required): Pipeline step to reprocess from (e.g., “classification”, “extraction”, RerunStep.EXTRACTION)document_ids(list[str], optional): Specific document IDs to reprocessbatch_id(str, optional): Batch ID to reprocess all documents in batchstack_name(str, optional): Stack name override
Note: Must specify either document_ids or batch_id
Returns: BatchReprocessResult with documents_queued, documents_failed, failed_documents, and step
from idp_sdk import RerunStep
# Reprocess batchresult = client.batch.reprocess( step=RerunStep.EXTRACTION, batch_id="batch-20250123-123456")
# Reprocess specific documentsresult = client.batch.reprocess( step="classification", document_ids=["batch/doc1.pdf", "batch/doc2.pdf"])
print(f"Queued: {result.documents_queued}")batch.stop_workflows()
Section titled “batch.stop_workflows()”Stop all running Step Functions workflows and purge the SQS queue.
Parameters:
stack_name(str, optional): Stack name overrideskip_purge(bool, optional): Skip purging the SQS queue (default: False)skip_stop(bool, optional): Skip stopping executions (default: False)
Returns: StopWorkflowsResult with executions_stopped (ExecutionsStoppedResult), documents_aborted (DocumentsAbortedResult), and queue_purged
result = client.batch.stop_workflows()
print(f"Queue purged: {result.queue_purged}")if result.executions_stopped: print(f"Executions stopped: {result.executions_stopped.total_stopped}")if result.documents_aborted: print(f"Documents aborted: {result.documents_aborted.documents_aborted}")Evaluation Operations
Section titled “Evaluation Operations”Operations for baseline comparison and accuracy measurement.
evaluation.create_baseline()
Section titled “evaluation.create_baseline()”Create evaluation baseline for a document to enable automated accuracy testing.
Parameters:
document_id(str, required): Document identifier (S3 key)baseline_data(dict, required): Expected extraction results with sections and fieldsmetadata(dict, optional): Optional metadata (created_by, purpose, etc.)stack_name(str, optional): Stack name override
Returns: dict with baseline creation result
baseline = { "sections": [ { "section_id": 1, "document_class": "invoice", "fields": { "invoice_number": "INV-12345", "total_amount": "1250.00", "invoice_date": "2024-01-15" } } ]}
result = client.evaluation.create_baseline( document_id="test-invoice-001.pdf", baseline_data=baseline, metadata={"created_by": "qa_team"})evaluation.get_report()
Section titled “evaluation.get_report()”Get evaluation report comparing extraction results to baseline.
Parameters:
document_id(str, required): Document identifier (S3 key)section_id(int, optional): Section number (default: 1)stack_name(str, optional): Stack name override
Returns: EvaluationReport with document_id, section_id, accuracy, field_results, and summary
report = client.evaluation.get_report(document_id="test-invoice-001.pdf")
print(f"Accuracy: {report.accuracy:.1%}")
for field, result in report.field_results.items(): if result['match']: print(f"✓ {field}: {result['extracted']}") else: print(f"✗ {field}: expected '{result['expected']}', got '{result['extracted']}'")evaluation.get_metrics()
Section titled “evaluation.get_metrics()”Get aggregated evaluation metrics across multiple documents.
Parameters:
start_date(str, optional): Filter by start date (ISO format: “2024-01-15”)end_date(str, optional): Filter by end date (ISO format: “2024-01-31”)document_class(str, optional): Filter by document type (e.g., “invoice”)batch_id(str, optional): Filter by batch identifierstack_name(str, optional): Stack name override
Returns: EvaluationMetrics with total_evaluations, average_accuracy, and by_document_class
metrics = client.evaluation.get_metrics( start_date="2024-01-01", end_date="2024-01-31")
print(f"Total evaluations: {metrics.total_evaluations}")print(f"Average accuracy: {metrics.average_accuracy:.1%}")
for doc_class, accuracy in metrics.by_document_class.items(): print(f"{doc_class}: {accuracy:.1%}")evaluation.list_baselines()
Section titled “evaluation.list_baselines()”List evaluation baselines with pagination support.
Parameters:
limit(int, optional): Maximum number of baselines to return (default: 100)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: EvaluationBaselineListResult with baselines, count, and optional next_token
result = client.evaluation.list_baselines(limit=50)
for baseline in result.baselines: print(f"{baseline['document_id']}: {baseline['created_at']}")
if result.next_token: next_page = client.evaluation.list_baselines(limit=50, next_token=result.next_token)evaluation.delete_baseline()
Section titled “evaluation.delete_baseline()”Delete evaluation baseline for a document.
Parameters:
document_id(str, required): Document identifier (S3 key)stack_name(str, optional): Stack name override
Returns: dict with deletion result
result = client.evaluation.delete_baseline(document_id="test-invoice-001.pdf")print(f"Deleted: {result['success']}")Assessment Operations
Section titled “Assessment Operations”Operations for quality metrics and confidence scoring.
assessment.get_confidence()
Section titled “assessment.get_confidence()”Get confidence scores for all extracted fields in a document section.
Parameters:
document_id(str, required): Document identifier (S3 key)section_id(int, optional): Section number (default: 1)stack_name(str, optional): Stack name override
Returns: AssessmentConfidenceResult with document_id, section_id, and attributes (dict of AssessmentFieldConfidence)
AssessmentFieldConfidence fields:
confidence: Float 0.0-1.0 (e.g., 0.95 = 95% confident)confidence_threshold: Minimum acceptable confidence from configmeets_threshold: Boolean indicating if confidence is acceptablereason: Explanation for the confidence level
confidence = client.assessment.get_confidence(document_id="batch-001/invoice.pdf")
# Check if any fields need reviewlow_conf_fields = [ field for field, attr in confidence.attributes.items() if not attr.meets_threshold]
if low_conf_fields: print(f"Review needed for: {', '.join(low_conf_fields)}")
# Get confidence for specific fieldtotal_amount = confidence.attributes.get("total_amount")if total_amount: print(f"Total amount confidence: {total_amount.confidence:.2%}") print(f"Meets threshold: {total_amount.meets_threshold}")assessment.get_geometry()
Section titled “assessment.get_geometry()”Get bounding box coordinates for all extracted fields in a document section.
Parameters:
document_id(str, required): Document identifier (S3 key)section_id(int, optional): Section number (default: 1)stack_name(str, optional): Stack name override
Returns: AssessmentGeometryResult with document_id, section_id, and attributes (dict of AssessmentFieldGeometry)
AssessmentFieldGeometry fields:
page: Page number where field was found (1-indexed)bbox: Normalized bounding box [left, top, width, height] (0.0-1.0)bounding_box: Absolute pixel coordinates {Left, Top, Width, Height}
geometry = client.assessment.get_geometry(document_id="batch-001/invoice.pdf")
# Highlight fields in document viewerfor field_name, geo in geometry.attributes.items(): print(f"{field_name} found on page {geo.page}") # Draw rectangle at normalized coordinates draw_highlight( page=geo.page, left=geo.bbox[0], top=geo.bbox[1], width=geo.bbox[2], height=geo.bbox[3] )assessment.get_metrics()
Section titled “assessment.get_metrics()”Get aggregated quality metrics across multiple documents.
Parameters:
start_date(str, optional): Filter by start date (ISO format: “2024-01-15”)end_date(str, optional): Filter by end date (ISO format: “2024-01-31”)document_class(str, optional): Filter by document type (e.g., “invoice”)batch_id(str, optional): Filter by batch identifierstack_name(str, optional): Stack name override
Returns: dict with aggregated metrics
metrics = client.assessment.get_metrics( start_date="2024-01-15", end_date="2024-01-15")
print(f"Processed: {metrics['total_documents']} documents")print(f"Avg confidence: {metrics['average_confidence']:.2%}")print(f"SLA compliance: {metrics['threshold_compliance']:.2%}")Search Operations
Section titled “Search Operations”Operations for knowledge base queries and semantic search.
search.query()
Section titled “search.query()”Query knowledge base with natural language questions.
Parameters:
question(str, required): Natural language questiondocument_ids(list[str], optional): Limit search to specific documentslimit(int, optional): Maximum number of results to return (default: 10)next_token(str, optional): Pagination token from previous requeststack_name(str, optional): Stack name override
Returns: SearchResult with answer, confidence, citations, and optional next_token
# Ask a questionresult = client.search.query( question="What is the total amount on invoice INV-12345?")
print(f"Answer: {result.answer}")print(f"Confidence: {result.confidence:.1%}")
for citation in result.citations: print(f"Source: {citation.document.document_id}") print(f"Page: {citation.document.page}") print(f"Text: {citation.text}")
# Search within specific documentsresult = client.search.query( question="What is the vendor name?", document_ids=["batch-001/invoice1.pdf", "batch-001/invoice2.pdf"])Stack Operations
Section titled “Stack Operations”Operations for deploying and managing IDP stacks.
stack.deploy()
Section titled “stack.deploy()”Deploy or update an IDP CloudFormation stack.
Parameters:
stack_name(str, optional): CloudFormation stack name (uses client default if not provided)admin_email(str, optional): Admin user email — required for new stackstemplate_url(str, optional): URL to CloudFormation template in S3template_path(str, optional): Local path to CloudFormation template filefrom_code(str, optional): Path to project root for building from sourcecustom_config(str, optional): Path to local config file or S3 URImax_concurrent(int, optional): Maximum concurrent workflowslog_level(str, optional): Logging level (DEBUG, INFO, WARN, ERROR)enable_hitl(bool, optional): Enable Human-in-the-Loopparameters(dict, optional): Additional CloudFormation parameterswait(bool, optional): Wait for operation to complete (default: True)no_rollback(bool, optional): Disable rollback on failure (default: False)role_arn(str, optional): CloudFormation service role ARN
Returns: StackDeploymentResult with success, operation, status, stack_name, stack_id, outputs, and error
from idp_sdk import Pattern
result = client.stack.deploy( stack_name="my-new-stack", admin_email="admin@example.com", max_concurrent=100, wait=True)
if result.success: print(f"Stack deployed: {result.stack_name}") print(f"Outputs: {result.outputs}")stack.delete()
Section titled “stack.delete()”Delete an IDP CloudFormation stack.
Parameters:
stack_name(str, optional): CloudFormation stack name (uses client default if not provided)empty_buckets(bool, optional): Empty S3 buckets before deletion (default: False)force_delete_all(bool, optional): Force delete all retained resources after deletion (default: False)wait(bool, optional): Wait for deletion to complete (default: True)
Returns: StackDeletionResult with success, status, stack_name, stack_id, error, and cleanup_result
result = client.stack.delete( empty_buckets=True, force_delete_all=False, wait=True)
print(f"Status: {result.status}")stack.get_resources()
Section titled “stack.get_resources()”Get stack resource information.
Returns: StackResources with bucket names, ARNs, and other resource identifiers
resources = client.stack.get_resources()
print(f"Input Bucket: {resources.input_bucket}")print(f"Output Bucket: {resources.output_bucket}")print(f"Queue URL: {resources.document_queue_url}")stack.exists()
Section titled “stack.exists()”Check whether a CloudFormation stack exists.
Parameters:
stack_name(str, optional): Stack name override
Returns: bool — True if the stack exists, False otherwise
if client.stack.exists(): print("Stack is deployed")else: print("Stack not found")stack.get_status()
Section titled “stack.get_status()”Get the current CloudFormation status of a stack.
Parameters:
stack_name(str, optional): Stack name override
Returns: str or None — CloudFormation status string (e.g., "UPDATE_IN_PROGRESS"), or None if the stack does not exist
status = client.stack.get_status()print(f"Stack status: {status}")stack.check_in_progress()
Section titled “stack.check_in_progress()”Check whether a CloudFormation stack has an operation currently in progress.
Parameters:
stack_name(str, optional): Stack name override
Returns: StackOperationInProgress if an operation is in progress, None otherwise. The operation field is one of "CREATE", "UPDATE", or "DELETE".
in_progress = client.stack.check_in_progress()if in_progress: print(f"Operation in progress: {in_progress.operation} ({in_progress.status})")stack.monitor()
Section titled “stack.monitor()”Monitor a CloudFormation stack operation until it reaches a terminal state. Blocks until the operation completes or fails.
Parameters:
stack_name(str, optional): Stack name overrideoperation(str, optional): Operation type being monitored:"CREATE","UPDATE", or"DELETE"(default:"UPDATE")poll_interval_seconds(int, optional): Seconds between CloudFormation API polls (default: 10)
Returns: StackMonitorResult with success, operation, status, stack_name, outputs, and error
result = client.stack.monitor(operation="UPDATE")if result.success: print(f"Operation complete: {result.status}")else: print(f"Operation failed: {result.error}")stack.cancel_update()
Section titled “stack.cancel_update()”Cancel an in-progress stack update. Only valid when the stack is in UPDATE_IN_PROGRESS status.
Parameters:
stack_name(str, optional): Stack name override
Returns: CancelUpdateResult with success, message, and error
result = client.stack.cancel_update()if result.success: print("Update cancelled successfully")stack.wait_for_stable_state()
Section titled “stack.wait_for_stable_state()”Wait for a CloudFormation stack to reach a stable (non-transitional) state. Useful before triggering an operation on a stack that may be in a transitional state.
Parameters:
stack_name(str, optional): Stack name overridetimeout_seconds(int, optional): Maximum seconds to wait (default: 1200)poll_interval_seconds(int, optional): Seconds between polls (default: 10)
Returns: StackStableStateResult with success, status, and message
result = client.stack.wait_for_stable_state()if result.success: print(f"Stack is stable: {result.status}")stack.get_failure_analysis()
Section titled “stack.get_failure_analysis()”Analyze a CloudFormation deployment failure. Recursively collects failed events from the main stack and all nested stacks, identifies root causes vs. cascade failures.
Parameters:
stack_name(str, optional): Stack name override
Returns: FailureAnalysis with stack_name, root_causes (list of FailureCause), and all_failures (list of FailureCause)
analysis = client.stack.get_failure_analysis()print(f"Root causes ({len(analysis.root_causes)}):")for cause in analysis.root_causes: print(f" {cause.resource}: {cause.reason}")stack.cleanup_orphaned()
Section titled “stack.cleanup_orphaned()”Remove residual AWS resources left behind from deleted IDP stacks. Identifies orphaned CloudFront distributions, CloudWatch log groups, AppSync APIs, IAM policies, S3 buckets, DynamoDB tables, and more.
Parameters:
dry_run(bool, optional): Preview changes without making them (default: False)auto_approve(bool, optional): Auto-approve all deletions (default: False)regions(list[str], optional): AWS regions to check (default: us-east-1, us-west-2, eu-central-1)profile(str, optional): AWS profile name (default: None)
Returns: OrphanedResourceCleanupResult with results (dict), has_errors, and has_disabled
# Preview what would be cleaned upresult = client.stack.cleanup_orphaned(dry_run=True)print(f"Has errors: {result.has_errors}")stack.get_bucket_info()
Section titled “stack.get_bucket_info()”Get information about S3 buckets associated with a CloudFormation stack.
Parameters:
stack_name(str, optional): Stack name override
Returns: list[BucketInfo] — one BucketInfo per S3 bucket, with logical_id, bucket_name, object_count, total_size, and size_display
buckets = client.stack.get_bucket_info()for bucket in buckets: print(f"{bucket.logical_id}: {bucket.bucket_name} ({bucket.size_display}, {bucket.object_count} objects)")Configuration Operations
Section titled “Configuration Operations”Operations for managing IDP configurations.
config.create()
Section titled “config.create()”Generate an IDP configuration template.
Parameters:
features(str, optional): Feature set to include —"min","core","all", or comma-separated (default:"min")pattern(str, optional): Pattern to use —"pattern-1"or"pattern-2"(default:"pattern-2")output(str, optional): Output file pathinclude_prompts(bool, optional): Include prompt templates (default: False)include_comments(bool, optional): Include explanatory comments (default: True)
Returns: ConfigCreateResult with yaml_content and output_path
result = client.config.create( features="min", # min, core, all, or comma-separated pattern="pattern-2", output="config.yaml", include_prompts=False, include_comments=True)
print(result.yaml_content)config.validate()
Section titled “config.validate()”Validate a configuration file against system defaults.
Parameters:
config_file(str, required): Path to the configuration filepattern(str, optional): Pattern to validate against (default:"pattern-2")show_merged(bool, optional): Include merged configuration in result (default: False)strict(bool, optional): Report deprecated/unknown fields as errors (default: False)
Returns: ConfigValidationResult with valid, errors, warnings, deprecated_fields, unknown_fields, and optional merged_config
result = client.config.validate( config_file="./config.yaml", pattern="pattern-2")
if result.valid: print("Configuration is valid")else: for error in result.errors: print(f"Error: {error}")
for warning in result.warnings: print(f"Warning: {warning}")config.upload()
Section titled “config.upload()”Upload configuration to a deployed stack.
Parameters:
config_file(str, required): Path to the YAML or JSON configuration fileconfig_version(str, required): Version to upload to (e.g.,"default","v1","production"). If the version doesn’t exist, it will be created automatically.stack_name(str, optional): Stack name overridevalidate(bool, optional): Validate configuration before uploading (default:True)pattern(str, optional): Pattern to validate against (default:"pattern-2")description(str, optional): Description for the configuration version
Returns: ConfigUploadResult with success, version, version_created, and error
# Upload to the default versionresult = client.config.upload( config_file="./my-config.yaml", config_version="default", validate=True)
if result.success: print("Configuration uploaded")
# Create a new named versionresult = client.config.upload( config_file="./my-config.yaml", config_version="v2", description="Updated extraction rules")
if result.success: if result.version_created: print(f"New version created: {result.version}") else: print(f"Version updated: {result.version}")config.download()
Section titled “config.download()”Download configuration from a deployed stack.
Parameters:
stack_name(str, optional): Stack name overrideoutput(str, optional): Output file pathformat(str, optional): Format type —"full"or"minimal"(default:"full")pattern(str, optional): Pattern override (auto-detected if not provided)config_version(str, optional): Configuration version to download (default: active version)
Returns: ConfigDownloadResult with config, yaml_content, and output_path
result = client.config.download( output="downloaded-config.yaml", format="minimal" # "full" or "minimal")
print(result.yaml_content)config.list()
Section titled “config.list()”List all configuration versions in a deployed stack.
Parameters:
stack_name(str, optional): Stack name override
Returns: ConfigListResult with versions (list of ConfigVersionInfo) and count
ConfigVersionInfo fields: version_name, is_active, created_at, updated_at, description
result = client.config.list()
print(f"Found {result.count} versions:")for version in result.versions: status = " (ACTIVE)" if version.is_active else "" print(f" - {version.version_name}{status}")config.activate()
Section titled “config.activate()”Activate a configuration version. If the configuration uses BDA (use_bda=True), a BDA blueprint sync is performed before activation.
Parameters:
config_version(str, required): Configuration version to activatestack_name(str, optional): Stack name override
Returns: ConfigActivateResult with success, activated_version, bda_synced, bda_classes_synced, bda_classes_failed, and error
result = client.config.activate("v2")
if result.success: print(f"Activated version: {result.activated_version}") if result.bda_synced: print(f"BDA synced: {result.bda_classes_synced} classes")else: print(f"Failed to activate: {result.error}")config.delete()
Section titled “config.delete()”Delete a configuration version.
Parameters:
config_version(str, required): Configuration version to deletestack_name(str, optional): Stack name override
Returns: ConfigDeleteResult with success, deleted_version, and error
result = client.config.delete("old-version")
if result.success: print(f"Deleted version: {result.deleted_version}")else: print(f"Failed to delete: {result.error}")Note: Cannot delete "default" or currently active versions.
config.sync_bda()
Section titled “config.sync_bda()”Synchronize IDP document class schemas with BDA (Bedrock Data Automation) blueprints.
Parameters:
direction(str, optional): Sync direction —"bidirectional"(default),"bda_to_idp", or"idp_to_bda"mode(str, optional): Sync mode —"replace"(default, full alignment) or"merge"(additive)config_version(str, optional): Configuration version to sync (default: active version)stack_name(str, optional): Stack name override
Returns: ConfigSyncBdaResult with success, direction, mode, classes_synced, classes_failed, processed_classes, and error
# Bidirectional sync (default)result = client.config.sync_bda()
# Import BDA blueprints to IDP (merge mode)result = client.config.sync_bda( direction="bda_to_idp", mode="merge")
# Push IDP classes to BDAresult = client.config.sync_bda( direction="idp_to_bda", config_version="v2")
if result.success: print(f"Synced {result.classes_synced} classes") for cls in result.processed_classes: print(f" • {cls}")else: print(f"Sync failed: {result.error}")Discovery Operations
Section titled “Discovery Operations”Discover document class schemas from sample documents using Amazon Bedrock.
Two modes:
- Stack-connected (with
stack_name): Uses the stack’s discovery config from DynamoDB, saves discovered schema to config - Local (without
stack_name): Uses system default Bedrock settings, returns schema without saving
discovery.run()
Section titled “discovery.run()”Analyze a document to generate a JSON Schema definition for a document class.
Parameters:
document_path(str, required): Local path to document file (PDF, PNG, JPG, TIFF)ground_truth_path(str, optional): Path to JSON ground truth fileconfig_version(str, optional): Config version to save to (stack mode only)stack_name(str, optional): Stack name overridepage_range(str, optional): Page range to extract from a PDF (e.g., “1-3”)class_name_hint(str, optional): Hint for the document class name (LLM uses this as$id)auto_detect(bool, optional): If True, auto-detect section boundaries and discover each section. ReturnsDiscoveryBatchResult.
Returns: DiscoveryResult with status, document_class, json_schema, config_version, document_path, page_range, and error. When auto_detect=True, returns DiscoveryBatchResult.
# Local mode — no stack neededclient = IDPClient()result = client.discovery.run("./invoice.pdf")print(json.dumps(result.json_schema, indent=2))
# Stack mode — uses stack config, saves schemaclient = IDPClient(stack_name="my-stack")result = client.discovery.run("./w2-form.pdf")
# With ground truth for better accuracyresult = client.discovery.run( "./invoice.pdf", ground_truth_path="./invoice-expected.json")
# With class name hintresult = client.discovery.run( "./form.pdf", class_name_hint="W2 Tax Form")
# Discover specific page range from a PDFresult = client.discovery.run( "./lending_package.pdf", page_range="3-5", class_name_hint="W2 Form")
# Auto-detect sections and discover eachbatch_result = client.discovery.run( "./lending_package.pdf", auto_detect=True, config_version="v2")for r in batch_result.results: print(f"{r.document_class} (pages {r.page_range}): {r.status}")
# Save to specific config versionresult = client.discovery.run( "./form.pdf", config_version="v2")discovery.auto_detect_sections()
Section titled “discovery.auto_detect_sections()”Detect document section boundaries in a multi-page PDF using LLM analysis.
Parameters:
document_path(str, required): Local path to a PDF documentstack_name(str, optional): Stack name override
Returns: AutoDetectResult with status, sections (list of AutoDetectSection), document_path, and error
AutoDetectSection fields: start (int), end (int), type (str, optional)
# Detect section boundariesresult = client.discovery.auto_detect_sections("./lending_package.pdf")
if result.status == "SUCCESS": for section in result.sections: print(f"Pages {section.start}-{section.end}: {section.type}") # Output: # Pages 1-2: Cover Letter # Pages 3-5: W2 Form # Pages 6-8: Bank Statementdiscovery.run_multi_section()
Section titled “discovery.run_multi_section()”Discover multiple document classes from page ranges in a single PDF.
Parameters:
document_path(str, required): Local path to a multi-page PDFpage_ranges(list, required): List of dicts withstart(int),end(int), and optionallabel(str)config_version(str, optional): Config version to save tostack_name(str, optional): Stack name override
Returns: DiscoveryBatchResult with one result per page range
# Discover specific page rangesresult = client.discovery.run_multi_section( "./lending_package.pdf", page_ranges=[ {"start": 1, "end": 2, "label": "Cover Letter"}, {"start": 3, "end": 5, "label": "W2 Form"}, {"start": 6, "end": 8, "label": "Bank Statement"}, ], config_version="v2")
print(f"Discovered {result.succeeded}/{result.total} sections")for r in result.results: print(f" Pages {r.page_range}: {r.document_class} ({r.status})")discovery.run_multi_doc()
Section titled “discovery.run_multi_doc()”Discover document classes from a collection of documents using embedding-based clustering and agentic analysis. Unlike run() (which analyzes one document at a time), this method analyzes a directory of mixed documents to automatically identify document types, cluster similar documents, and generate JSON Schemas for each discovered class.
Requires: pip install idp-common[multi_document_discovery]
Note: Requires at least 2 documents per expected class. Clusters with fewer than 2 documents are filtered as noise. For discovering schemas from individual documents, use discovery.run() instead.
Parameters:
document_dir(str, optional): Directory path containing documents to analyze (recursive scan)document_paths(list[str], optional): List of individual document file pathsembedding_model_id(str, optional): Bedrock embedding model ID (default:us.cohere.embed-v4:0)analysis_model_id(str, optional): Bedrock LLM for cluster analysis (default:us.anthropic.claude-sonnet-4-6)output_dir(str, optional): Directory to write individual JSON schema files per discovered classsave_to_config(bool, optional): Save discovered schemas to the stack’s configuration (default: False)config_version(str, optional): Configuration version to save schemas to (required withsave_to_config)progress_callback(callable, optional): Callback function for pipeline progress updatesregion(str, optional): AWS region
Returns: MultiDocDiscoveryResult with status (SUCCESS/PARTIAL/FAILED), discovered_classes (list of DiscoveredClassResult), reflection_report, total_documents, total_clusters, noise_documents, config_version, and error
DiscoveredClassResult fields: cluster_id, classification, json_schema, document_count, sample_doc_ids, error
# Basic usage — discover from a directoryclient = IDPClient()result = client.discovery.run_multi_doc(document_dir="./samples/")
print(f"Status: {result.status}")print(f"Documents: {result.total_documents} → Clusters: {result.total_clusters}")
for dc in result.discovered_classes: if not dc.error: print(f" Cluster {dc.cluster_id}: {dc.classification} ({dc.document_count} docs)")
# Save schemas to output directoryresult = client.discovery.run_multi_doc( document_dir="./samples/", output_dir="./schemas/")
# Save to stack configurationclient = IDPClient(stack_name="my-stack")result = client.discovery.run_multi_doc( document_dir="./samples/", save_to_config=True, config_version="v2")
# With explicit document pathsresult = client.discovery.run_multi_doc( document_paths=["./doc1.pdf", "./doc2.png", "./doc3.jpg"])
# With progress callbackdef on_progress(step, data=None): print(f" [{step}] {data}")
result = client.discovery.run_multi_doc( document_dir="./samples/", progress_callback=on_progress)
# Print reflection reportif result.reflection_report: print(result.reflection_report)Note: If the required dependencies are not installed, this method returns a MultiDocDiscoveryResult with status="FAILED" and an error message instructing the user to install idp-common[multi_document_discovery], rather than raising an exception.
discovery.run_batch()
Section titled “discovery.run_batch()”Run discovery on multiple documents sequentially. Ground truth paths are auto-matched to documents by position.
Parameters:
document_paths(list, required): List of local file pathsground_truth_paths(list, optional): Parallel list of ground truth paths (useNonefor docs without ground truth)config_version(str, optional): Config version to save tostack_name(str, optional): Stack name override
Returns: DiscoveryBatchResult with total, succeeded, failed, and results (list of DiscoveryResult)
# Batch without ground truthresult = client.discovery.run_batch([ "./invoice.pdf", "./w2-form.pdf", "./paystub.png",])print(f"Succeeded: {result.succeeded}/{result.total}")
# Batch with selective ground truth (matched by position)result = client.discovery.run_batch( ["./invoice.pdf", "./w2.pdf"], ground_truth_paths=[None, "./w2.json"],)Manifest Operations
Section titled “Manifest Operations”Operations for manifest generation and validation.
manifest.generate()
Section titled “manifest.generate()”Generate a manifest file from a directory or S3 URI.
result = client.manifest.generate( directory="./documents/", baseline_dir="./baselines/", output="manifest.csv", file_pattern="*.pdf", recursive=True)
print(f"Documents: {result.document_count}")print(f"Baselines matched: {result.baselines_matched}")manifest.validate()
Section titled “manifest.validate()”Validate a manifest file.
result = client.manifest.validate(manifest_path="./manifest.csv")
if result.valid: print(f"Valid manifest with {result.document_count} documents")else: print(f"Invalid: {result.error}")Testing Operations
Section titled “Testing Operations”Operations for load testing and performance validation.
testing.load_test()
Section titled “testing.load_test()”Run load testing by copying files to the input bucket.
Parameters:
source_file(str, required): Source file to copystack_name(str, optional): Stack name overriderate(int, optional): Files per minute for constant load (default: 100)duration(int, optional): Duration in minutes (default: 1)schedule_file(str, optional): Optional schedule file for variable loaddest_prefix(str, optional): Destination prefix in S3 (default:"load-test")config_version(str, optional): Configuration version to tag files with
Returns: LoadTestResult with success, total_files, duration_minutes, and error
result = client.testing.load_test( source_file="./sample.pdf", rate=100, # Files per minute duration=5, # Duration in minutes dest_prefix="load-test")
print(f"Total files: {result.total_files}")print(f"Success: {result.success}")Response Models
Section titled “Response Models”All operations return typed Pydantic models. Import them from the top-level idp_sdk package:
from idp_sdk import ( # Document models DocumentUploadResult, DocumentStatus, DocumentDownloadResult, DocumentReprocessResult, DocumentRerunResult, DocumentDeletionResult, DocumentMetadata, DocumentInfo, DocumentListResult,
# Batch models BatchResult, BatchProcessResult, BatchStatus, BatchInfo, BatchListResult, BatchReprocessResult, BatchRerunResult, BatchDownloadResult, BatchDeletionResult,
# Evaluation models EvaluationReport, EvaluationMetrics, EvaluationBaselineListResult, BaselineResult, BaselineInfo, FieldComparison, DeleteResult,
# Assessment models AssessmentConfidenceResult, AssessmentFieldConfidence, AssessmentGeometryResult, AssessmentFieldGeometry, AssessmentMetrics,
# Search models SearchResult, SearchCitation, SearchDocumentReference,
# Stack models StackDeploymentResult, StackDeletionResult, StackResources, StackOperationInProgress, StackMonitorResult, StackStableStateResult, FailureCause, FailureAnalysis, BucketInfo, CancelUpdateResult, OrphanedResourceCleanupResult,
# Config models ConfigCreateResult, ConfigValidationResult, ConfigUploadResult, ConfigDownloadResult, ConfigActivateResult, ConfigVersionInfo, ConfigListResult, ConfigDeleteResult,
# Discovery models DiscoveryResult, DiscoveryBatchResult,
# Manifest models ManifestDocument, ManifestResult, ManifestValidationResult,
# Testing models StopWorkflowsResult, ExecutionsStoppedResult, DocumentsAbortedResult, LoadTestResult,
# Enums DocumentState, Pattern, RerunStep, StackState,
# Exceptions IDPError, IDPConfigurationError, IDPStackError, IDPProcessingError, IDPResourceNotFoundError, IDPValidationError, IDPTimeoutError,)Error Handling
Section titled “Error Handling”from idp_sdk import IDPClient, IDPProcessingError, IDPResourceNotFoundError
client = IDPClient(stack_name="my-stack")
try: result = client.document.process(file_path="./invoice.pdf")except IDPProcessingError as e: print(f"Processing error: {e}")except IDPResourceNotFoundError as e: print(f"Resource not found: {e}")except Exception as e: print(f"Unexpected error: {e}")Complete Example
Section titled “Complete Example”from idp_sdk import IDPClient, RerunStepimport time
# Initialize clientclient = IDPClient(stack_name="my-idp-stack", region="us-west-2")
# Upload single documentdoc_result = client.document.process(file_path="./invoice.pdf")doc_id = doc_result.document_id
# Monitor document processingwhile True: status = client.document.get_status(document_id=doc_id) print(f"Status: {status.status.value}")
if status.status.value in ["COMPLETED", "FAILED"]: break
time.sleep(5)
# Download results if successfulif status.status.value == "COMPLETED": client.document.download_results( document_id=doc_id, output_dir="./results" ) print("Results downloaded successfully")
# Process a batchbatch_result = client.batch.process(source="./documents/")batch_id = batch_result.batch_id
# Monitor batch progresswhile True: batch_status = client.batch.get_status(batch_id=batch_id) print(f"Progress: {batch_status.completed}/{batch_status.total}")
if batch_status.all_complete: break
time.sleep(10)
# Download batch resultsclient.batch.download_results( batch_id=batch_id, output_dir="./batch_results")
print(f"Batch complete! Success rate: {batch_status.success_rate:.1%}")See Also
Section titled “See Also”- IDP CLI Documentation - Command-line interface
- SDK Examples - Code examples
- API Reference - Detailed API documentation