When processing documents in Kodexa, effective state management is crucial for building reliable and maintainable systems. This article explores how to implement a state machine approach for document processing, addressing common challenges and providing concrete implementation examples.
Many document processing implementations start with simple event reactions. While intuitive, this approach can become problematic as your system grows:
Event-driven systems often become “chatty” with excessive message volume
Processing may occur without awareness of the document’s current state
Document family events occur frequently, making it difficult to track meaningful state changes
End-of-Pipeline Status Updates: Each pipeline should conclude by setting an appropriate document status:
Copy
Ask AI
# At the end of a preparation pipelinedocument_family.set_document_status(prepared_status)# At the end of a labeling pipeline document_family.set_document_status(labeled_status)
Status-Driven Workflows: Subsequent processing steps should trigger based on document status changes rather than generic events
External System Synchronization: Use the Apply Status model to synchronize with external systems:
Copy
Ask AI
# Configuration for the Apply Status model{ "status_id": "PUBLISHED", "external_endpoint": "https://your-system.com/api/document-status", "payload_template": { "docId": "{{document.id}}", "status": "COMPLETE", "processedDate": "{{now}}" }}
Failure handling requires special consideration. Rather than immediately marking documents as failed, Kodexa’s Document Retry model provides a robust approach:
Copy
Ask AI
def handle_event(event: BaseEvent, document_store_ref: str, retry_filter: str, failed_status_id: str, project: ProjectEndpoint, number_of_retries: int, assistant: AssistantEndpoint, label_prefix: str, database_host: str, database_port: int, database_username: str, database_password: str, database_name: str): if isinstance(event, ScheduledEvent): # Get necessary endpoints client = KodexaClient() document_store: DocumentStoreEndpoint = client.get_object_by_ref("store", document_store_ref) # Find the failed status definition possible_statuses = [status for status in project.document_statuses if status.id == failed_status_id] failed_status = possible_statuses[0] if possible_statuses else None # Find documents that need retry but aren't in final failed state final_status = f"({retry_filter}) and not(documentStatus.id:'{failed_status.id}')" document_iterator = document_store.stream_filter(final_status) # Process each document needing retry for document_family in document_iterator: # Determine current retry count from labels current_retry_count = 0 for label in document_family.labels: if label.label.startswith(f"{label_prefix}-"): try: count = int(label.label.split('-')[-1]) if count > current_retry_count: current_retry_count = count except ValueError: logger.warning(f"Could not parse retry count from label: {label.label}") next_retry_attempt = current_retry_count + 1 # Retry if attempts remain, otherwise mark as failed if next_retry_attempt <= number_of_retries: document_family.add_label(f"{label_prefix}-{next_retry_attempt}") document_family.reprocess(assistant) else: document_family.set_document_status(failed_status)
This implementation has several key advantages:
Scheduled Execution: The model runs on a schedule rather than being event-triggered, allowing for controlled retry attempts
Label-Based Tracking: It uses document labels with incrementing counters (e.g., “retry-1”, “retry-2”) to track attempts
Configurable Retry Limit: It applies a cap on retry attempts before marking as permanently failed
Efficient Filtering: It uses query filters to only process documents that need attention
For reporting failures to external systems, implement a scheduled model that checks for failed documents:
Copy
Ask AI
def check_failures(event: ScheduledEvent, document_store_ref: str, failed_status_id: str, notification_endpoint: str): # Get necessary endpoints client = KodexaClient() document_store = client.get_object_by_ref("store", document_store_ref) # Find documents in failed status without notification label query = f"documentStatus.id:'{failed_status_id}' and not(label:'failure_reported')" failed_docs = document_store.stream_filter(query) for doc in failed_docs: # Notify external system response = notify_external_system(notification_endpoint, doc) if response.status_code == 200: # Mark as reported doc.add_label("failure_reported") # Optionally update to a different status doc.set_document_status(fail_reported_status)
This approach allows you to:
Pause retries during system issues without losing track of failures
Create dedicated monitoring for failure conditions
Control the cadence of failure reporting independently from processing
The state machine approach provides a robust foundation for document processing in Kodexa. By explicitly modeling state transitions and separating failure handling from normal processing flow, your pipelines become more reliable and maintainable.Rather than building event-driven architectures that react to every document change, focus on defining clear document states, making explicit transitions between states, and implementing resilient error handling through scheduled models. This approach will result in more predictable behavior and easier system maintenance as your document processing needs grow.
Assistant
Responses are generated using AI and may contain mistakes.