Skip to main content
The processing module provides classes for tracking document processing history, recording knowledge items, and managing execution pipelines.

ProcessingStep

A ProcessingStep represents a unit of work in a document processing pipeline. Steps form a DAG (directed acyclic graph) with parent-child relationships, enabling you to track the full lineage of how a document was processed.

Creating Steps

from kodexa_document import ProcessingStep

# Create a processing step
step = ProcessingStep(name="PDF Extraction")

# With metadata
step = ProcessingStep(
    name="Invoice Classification",
    metadata={"model_version": "2.1", "threshold": 0.85},
    presentation_metadata={"icon": "file-invoice", "color": "blue"}
)

Fields

FieldTypeDescription
idstrUUID (auto-generated)
namestrStep name (required)
start_timestampdatetimeWhen the step started
durationintDuration in milliseconds
metadatadictArbitrary key-value metadata
presentation_metadatadictUI display hints
childrenList[ProcessingStep]Child steps
parentsList[ProcessingStep]Parent steps
internal_stepsList[ProcessingStep]Internal sub-steps
knowledge_itemsList[KnowledgeItem]Associated knowledge items

Parent-Child Relationships

Build processing hierarchies:
# Create a pipeline
pipeline = ProcessingStep(name="Document Pipeline")

# Add child steps
extraction = ProcessingStep(name="Text Extraction")
pipeline.add_child(extraction)  # Bidirectional link

classification = ProcessingStep(name="Classification")
pipeline.add_child(classification)

tagging = ProcessingStep(name="Entity Tagging")
extraction.add_child(tagging)

# Children know their parents
print(tagging.parents[0].name)  # "Text Extraction"

Merging Steps

Combine multiple processing branches:
# Two independent processing steps
ocr_step = ProcessingStep(name="OCR Processing")
nlp_step = ProcessingStep(name="NLP Analysis")

# Merge them into a combined step
merged = ProcessingStep.merge_with(ocr_step, nlp_step)
print(merged.name)  # "Merged Step"
print(len(merged.parents))  # 2

Serialization

Steps serialize to JSON with circular reference handling:
step = ProcessingStep(name="My Step")
child = ProcessingStep(name="Child Step")
step.add_child(child)

# To dict/JSON
step_dict = step.to_dict()
json_str = step.to_json()

# From dict/JSON
restored = ProcessingStep.from_dict(step_dict)
restored = ProcessingStep.from_json(json_str)
The to_dict() method uses a seen set to handle circular references from bidirectional parent-child links. The from_dict() method uses a step_cache to reconstruct these references.

KnowledgeItem

A KnowledgeItem represents a piece of knowledge produced or consumed during processing.
from kodexa_document.processing.processing_step import KnowledgeItem, KnowledgeFeature

item = KnowledgeItem(
    title="Invoice #12345",
    description="Extracted invoice data",
    slug="invoice-12345",
    knowledge_item_type_ref="knowledge-item-type://my-org/invoice",
    properties={"total": 1234.56, "vendor": "Acme Corp"}
)

Fields

FieldTypeDescription
idstrUUID (auto-generated)
titlestrDisplay title
descriptionstrDescription
slugstrURL-friendly identifier
sequence_orderintOrdering within a set
knowledge_item_type_refstrReference to the item type
propertiesdictArbitrary properties
featuresList[KnowledgeFeature]Associated features

KnowledgeFeature

A KnowledgeFeature attaches structured metadata to knowledge items.
feature = KnowledgeFeature(
    feature_type_ref="knowledge-feature-type://my-org/confidence-score",
    slug="extraction-confidence",
    properties={"score": 0.95, "model": "invoice-v2"},
    extended_properties={"per_field_scores": {"total": 0.99, "date": 0.91}}
)

item.features.append(feature)

Fields

FieldTypeDescription
idstrUUID (auto-generated)
feature_type_refstrReference to the feature type
slugstrURL-friendly identifier
activeboolWhether this feature is active (default: True)
propertiesdictCore properties
extended_propertiesdictAdditional properties

Attaching to Processing Steps

Knowledge items are associated with processing steps:
step = ProcessingStep(name="Invoice Extraction")

item = KnowledgeItem(
    title="Invoice #12345",
    knowledge_item_type_ref="knowledge-item-type://my-org/invoice"
)

feature = KnowledgeFeature(
    feature_type_ref="knowledge-feature-type://my-org/extraction-result",
    properties={"status": "complete", "field_count": 12}
)
item.features.append(feature)

step.knowledge_items.append(item)

PipelineContext

PipelineContext tracks the state of a running execution pipeline. It is primarily used by module developers building custom processing steps.
from kodexa_document.platform.kodexa import PipelineContext

context = PipelineContext(
    execution_id="exec-abc-123",
    content_provider=my_content_provider
)

Fields

FieldTypeDescription
execution_idstrUUID for this execution (auto-generated if not provided)
statisticsPipelineStatisticsTracks documents processed
output_documentDocumentThe output document
content_objectslistContent objects in the pipeline
stop_on_exceptionboolWhether to halt on errors (default: True)
current_documentDocumentThe document currently being processed
document_familyDocumentFamilyThe document family context
document_storeStoreThe associated document store

Status Updates

Report progress during execution:
context.update_status("Extracting page 5 of 20", progress=5, progress_max=20)

Cancellation

Check for user-initiated cancellation:
if context.is_cancelled():
    print("Execution cancelled by user")
    return

RemoteStep

RemoteStep wraps a reference to a module on the platform and can process documents remotely:
from kodexa_document.platform.client import RemoteStep

step = RemoteStep(
    ref="my-org/fast-pdf-model:1.0.0",
    step_type="ACTION",
    options={"dpi": 300, "language": "en"}
)

# Process a document through the remote module
result_doc = step.process(document, context)

Complete Example

from datetime import datetime
from kodexa_document import ProcessingStep
from kodexa_document.processing.processing_step import KnowledgeItem, KnowledgeFeature

# Build a processing pipeline history
pipeline = ProcessingStep(
    name="Invoice Processing Pipeline",
    start_timestamp=datetime.now(),
    metadata={"version": "3.0"}
)

# Stage 1: PDF extraction
extraction = ProcessingStep(
    name="PDF Extraction",
    metadata={"model": "fast-pdf-model", "pages": 3}
)
pipeline.add_child(extraction)

# Stage 2: Classification
classification = ProcessingStep(
    name="Document Classification",
    metadata={"result": "invoice", "confidence": 0.98}
)
pipeline.add_child(classification)

# Stage 3: Data extraction (depends on both previous steps)
data_extraction = ProcessingStep(
    name="Data Extraction",
    metadata={"fields_extracted": 12}
)
extraction.add_child(data_extraction)
classification.add_child(data_extraction)

# Attach knowledge item to the data extraction step
item = KnowledgeItem(
    title="Invoice #12345",
    knowledge_item_type_ref="knowledge-item-type://acme/invoice",
    properties={"vendor": "Acme Corp", "total": 1234.56}
)

item.features.append(KnowledgeFeature(
    feature_type_ref="knowledge-feature-type://acme/extraction-score",
    properties={"overall_confidence": 0.95}
))

data_extraction.knowledge_items.append(item)

# Serialize the full pipeline
pipeline_json = pipeline.to_json()

# Deserialize later
restored = ProcessingStep.from_json(pipeline_json)
print(f"Pipeline: {restored.name}")
print(f"Steps: {len(restored.children)}")