Apply checkpoint

Enterprise Single-tenant

The Apply checkpoint step verifies details identified in a previous step according to validation formulas, and triggers a review for failed validations.

Rules are specified in validations and connected as modules to the Apply checkpoint step in a flow.

When validation fails and a review is triggered, the flow job is paused at the checkpoint until a reviewer confirms or corrects data and resumes the flow. You can modify this behavior by enabling straight-through processing.

As a best practice, insert a checkpoint at these points in a flow:

  • After classification but before branching, to ensure that records are routed to the correct branch for data extraction.

  • After extraction, refinement, and/or redaction, to ensure that extracted data is accurate.

  • Anywhere that data accuracy is critical. For example, add a checkpoint before inserting values into a database.

Straight-through processing

By default, the Apply checkpoint step pauses all documents in the batch when any one document fails validation. This behavior ensures that all steps after the checkpoint are executed with the complete batch of documents, but it can slow processing overall.

To change this behavior, in the Apply checkpoint step, set Enable Straight-Through Processing to Yes. You can enable straight-through processing on only one Apply checkpoint step in a given flow.

With straight-through processing enabled, documents that pass validation continue executing regardless of validation failures in other documents in a batch. When the validation failures are corrected and the flow is retried, the flow runs again from the checkpoint step with all documents, including any documents that originally passed validation. This behavior ensures that any reduce operations are executed with the full batch of documents, but it means that some steps might be re-executed.

If you choose to enable straight-through processing, you can account for the potential re-execution of steps in these ways:

  • Make sure any custom functions are built to correctly handle repeat execution. See the example below.

  • In review, use the Finalized Records filter to review only documents that aren’t subject to re-execution of steps. Documents subject to re-execution display a warning indicating that the record can’t be corrected because it’s regenerated on the next flow resume.

Handling repeat execution in custom functions

With straight-through processing enabled, documents can be processed multiple times until all documents in a batch pass the checkpoint. If you don’t want documents to be sent to subsequent steps multiple times, you can use a summary file to skip over documents that have already been processed.

For example, imagine you have an Apply checkpoint step followed by a reduce custom function that sends results to a downstream system. The first time the flow runs, three out of four documents pass the checkpoint and are passed to the reduce step. After review, the reduce custom function is re-executed with all four documents. If your downstream system requires that you send only the new document, the reduce custom function needs to be modified to handle this requirement.

Below is an example reduce custom function that filters out documents that were already processed.

1import json
2import logging
3import os
4from typing import Any, Dict, Generator, List, Text
5
6from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder
7
8_SUMMARY_FILENAME = 'summary.json'
9
10def send_results(input_payloads: List[Dict],
11 root_output_folder: Text,
12 step_folder: Text,
13 clients: Any, *args: Any,
14 **kwargs: Any):
15 # Maintain a file at summary_path that contains JSON map of documents
16 # already processed.
17 summary_path = os.path.join(root_output_folder, _SUMMARY_FILENAME)
18 summary = {}
19 output_result = {}
20
21 # Check if summary file exists, if yes load summary.
22 if clients.ibfile.exists(summary_path):
23 summary_txt = clients.ibfile.read_file(summary_path)
24 summary = json.loads(summary_txt)
25
26 for payload in input_payloads:
27 input_filepath = payload['input_filepath']
28
29 # Skip any files that have already been processed.
30 if input_filepath in summary:
31 continue
32
33 summary[input_filepath] = True
34
35 # Loading ibmsg so we can get records from it.
36 content = payload['content']
37 builder, err = ParsedIBOCRBuilder.load_from_str(input_filepath, content)
38 if err:
39 raise Exception(err)
40
41 # Pulls out ibmsg records only.
42 for ibocr_record in builder.get_ibocr_records():
43 refined_phrases, _ = ibocr_record.get_refined_phrases()
44 for phrase in refined_phrases:
45 name = phrase.get_column_name()
46 value = phrase.get_column_value()
47 # TODO: populate output result
48
49 # TODO: send output result to downstream system
50
51 # Write the summary file
52 clients.ibfile.write_file(summary_path, json.dumps(summary_dict))
53 return
54
55def register(name_to_fn):
56 name_to_fn.update({
57 'send_results': {
58 'fn': send_results
59 }
60 })