Custom functions in flow

Enterprise Single-tenant

Flows let you create custom functions (or user-defined functions, UDFs) to implement custom functionality. There are four main types in a flow: map UDF, reduce UDF, pre-flow UDF, and post-flow UDF.

Map UDF

Use a map UDF to process each input file or record in a flow in parallel only when your processing logic operates on a single document.

Input variables

NameTypeDescriptionValues
INPUT_RECORDdictionaryDictionary containing information about the input file that’s being processed by the custom function.Contains the following keys:
input_filepath: Path to the input file
content: Contents of the input file, as a string
output_filename: Name to use for the output file generated by the custom function.
ROOT_OUTPUT_FOLDERstringRoot output folder for the flow.
STEP_FOLDERstringOutput folder for the current step in the flow.
CONFIGdictionaryDictionary containing the runtime configuration settings for the flow.Dictionary containing key-value pairs representing the configuration settings.
CLIENTSobjectCustom function clients object containing different client objects that can be used by the custom function to interact with other systems.Object containing client objects such as the LLM client (llm_client) and the file client (ibfile).
TOKEN_FRAMEWORK_REGISTRYobjectObject that provides access to default and custom token matchers and tokenizers.Object that provides access to token matchers and tokenizers, which can be used to perform text tokenization and matching operations within the custom function.
JOB_IDstringID for the job reported by the initial execution request.
INPUT_FOLDERstringInput folder for the flow job, mentioned during the execution request.

Output variables

NameTypeDescriptionValues
out_filesarrayArray of output file dictionaries.Array of dictionaries, each containing the following keys:
filename: Name of the output file, as a string
content: Contents of the output file, as bytes.

Each output file dictionary in the out_files array represents one output file generated by the custom function. The custom function must return this dictionary as its return value for the output files to be passed to the next step in the flow.

Example

The following map UDF renames the input file.

1from instabase.udf_utils.clients.udf_helpers import get_output_ibmsg
2
3def map_udf_func(input_record, step_folder, *args, **kwargs):
4 input_filepath = input_record['input_filepath']
5 file_content = input_record['content']
6 output_filename = input_record['output_filename']
7
8 # Copy output ibmsg from input
9 output_ibmsg, err = get_output_ibmsg(input_filepath, step_folder,
10 file_content)
11
12 return {
13 "out_files": [
14 {
15 "filename": f"map-udf-{output_filename}",
16 "content": output_ibmsg
17 }
18 ]
19 }
20
21def register(name_to_fn):
22 name_to_fn.update({
23 'map_udf': {
24 'fn': map_udf_func
25 }
26 })

Call this custom function in the Map UDF step by using the formula map_udf(INPUT_RECORD, STEP_FOLDER).

All input variables are also accessible via an object passed in as the _FN_CONTEXT_KEY keyword argument. See the code example below:

1fn_context = kwargs.get('_FN_CONTEXT_KEY')
2input_record, err = fn_context.get_by_col_name('INPUT_RECORD')
3file_content = input_record['content']

Reduce UDF

Reduce UDFs are used to combine the results of multiple output files from previous steps in a flow, and optionally apply additional logic to modify their values.

Input variables

NameTypeDescriptionValues
INPUT_RECORDSgeneratorPython generator that is used in a for loop to provide an input_record in each loop iteration.Generator that yields dictionaries containing information about the input files that are being processed by the custom function. Each dictionary contains the following keys:
input_filepath: Path to the input file
content: Contents of the input file, as a string
output_filename: Name to use for the output file generated by the custom function.
ROOT_OUTPUT_FOLDERstringRoot output folder for the flow.
STEP_FOLDERstringOutput folder for the current step in the flow.
CONFIGdictionaryDictionary containing the runtime configuration settings for the flow.Dictionary containing key-value pairs representing the configuration settings.
CLIENTSobjectCustom function clients object containing different client objects that can be used by the custom function to interact with other systems.Object containing client objects such as the LLM client (llm_client) and the file client (ibfile).
TOKEN_FRAMEWORK_REGISTRYobjectObject that provides access to default and custom token matchers and tokenizers.Object that provides access to token matchers and tokenizers, which can be used to perform text tokenization and matching operations within the custom function.
JOB_IDstringID for the job reported by the initial execution request.
INPUT_FOLDERstringInput folder for the flow job, mentioned during the execution request.

Example

Generate a summary of the files processed and write it to a file named summary.json.

1import json
2import logging
3import os
4from typing import Any, Dict, Generator, List, Text
5
6from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder, RefinedPhrase
7from instabase.udf_utils.clients.udf_helpers import get_output_ibmsg
8
9_SUMMARY_FILENAME = 'summary.json'
10
11def get_by_col_name(kwargs, col) -> Any:
12 val, err = kwargs['_FN_CONTEXT_KEY'].get_by_col_name(col)
13 if err:
14 raise Exception(f'{col} is not available in UDF context')
15 return val
16
17def generate_summary(*args: Any, **kwargs: Any) -> Generator[Dict, None, None]:
18 logging.info('Started running Reduce UDF function')
19
20 job_id = get_by_col_name(kwargs, 'JOB_ID')
21 input_records = get_by_col_name(kwargs, 'INPUT_RECORDS')
22 root_output_folder = get_by_col_name(kwargs, 'ROOT_OUTPUT_FOLDER')
23 step_folder = get_by_col_name(kwargs, 'STEP_FOLDER')
24 clients = get_by_col_name(kwargs, 'CLIENTS')
25 summary_dict = {}
26
27 for payload in input_records:
28 input_filepath = payload['input_filepath']
29 output_filename = payload['output_filename']
30 content = payload['content']
31
32 # Loading ibmsg so we can get records from it.
33 builder, err = ParsedIBOCRBuilder.load_from_str(input_filepath, content)
34 if err:
35 raise Exception(err)
36
37 # Pulls out ibmsg records only.
38 for ibocr_record in builder.get_ibocr_records():
39 raw_input_filepath = ibocr_record.get_document_path()
40 if raw_input_filepath not in summary_dict:
41 summary_dict[raw_input_filepath] = []
42 result = {}
43 if ibocr_record.has_class_label():
44 result['class_label'] = ibocr_record.get_class_label()
45 if ibocr_record.has_classify_page_range():
46 result['page_range'] = ibocr_record.get_classify_page_range()
47 else:
48 result['page_range'] = {}
49 result['page_range']['start_page'] = ibocr_record.get_page_numbers()[0]+1
50 result['page_range']['end_page'] = ibocr_record.get_page_numbers()[-1]+1
51
52 result['extracted_fields'] = {}
53 refined_phrases, _ = ibocr_record.get_refined_phrases()
54 for phrase in refined_phrases:
55 name = phrase.get_column_name()
56 value = phrase.get_column_value()
57 result['extracted_fields'][name] = value
58 summary_dict[raw_input_filepath].append(result)
59
60 # Copy output ibmsg from input.
61 output_ibmsg, err = get_output_ibmsg(input_filepath, step_folder, content)
62 if err:
63 raise Exception(err)
64
65 output_ibmsg_dict = {
66 'out_files': [{
67 'filename': output_filename,
68 'content': output_ibmsg
69 }]
70 }
71 yield output_ibmsg_dict
72
73 out_path = os.path.join(root_output_folder, _SUMMARY_FILENAME)
74 clients.ibfile.write_file(out_path, json.dumps(summary_dict))
75
76 return
77
78def register(name_to_fn):
79 name_to_fn.update({
80 'generate_summary': {
81 'fn': generate_summary
82 }
83 })

Call this custom function in the Reduce UDF step by using the formula generate_summary(INPUT_RECORDS, ROOT_OUTPUT_FOLDER, STEP_FOLDER, CLIENTS).

Pre-flow UDF

A pre-flow UDF is a hook that runs at the start of the flow before any of the steps have started execution. Pre-flow UDFs can be used to perform any necessary setup such as copying files into the input folder from another directory. You can add a pre-flow UDF in the flow editor by selecting Events > Pre-flow UDF.

Input variables

NameTypeDescriptionValues
ROOT_OUTPUT_FOLDERstringRoot output folder for the flow.
CONFIGdictionaryDictionary containing the runtime configuration settings for the flow.Dictionary containing key-value pairs representing the configuration settings.
CLIENTSobjectCustom function clients object containing different client objects that can be used by the custom function to interact with other systems.Object containing client objects such as the LLM client (llm_client) and the file client (ibfile).
JOB_IDstringID for the job reported by the initial execution request.
INPUT_FOLDERstringInput folder for the flow job, mentioned during the execution request.

Example

The following custom function writes a summary file containing both the job ID and flow start timestamp.

1from typing import Any, Text
2import json
3import time
4
5def flow_info(clients: Any, root_output_folder:Text, job_id: Text, **kwargs):
6 flow_info = {
7 'Job ID': job_id,
8 'Start timestamp': time.time(),
9 }
10 out_path = root_output_folder + '/flowinfo.json'
11 clients.ibfile.write_file(out_path, json.dumps(flow_info))
12 return
13
14def register(name_to_fn):
15 name_to_fn.update({
16 'flow_info': {
17 'fn': flow_info
18 }
19 })

Call this custom function in the pre-flow UDF hook by using the formula flow_info(CLIENTS, ROOT_OUTPUT_FOLDER, JOB_ID).

Post-flow UDF

A post-flow UDF is a hook that runs after the flow completes execution. Post-flow UDFs can be used to perform any post-processing tasks such as sending results to a downstream system or doing folder cleanup. You can add a post-flow UDF in the flow editor by selecting Events > Post-flow UDF.

Input variables

NameTypeDescriptionValues
ROOT_OUTPUT_FOLDERstringRoot output folder for the flow.
CONFIGdictionaryDictionary containing the runtime configuration settings for the flow.Dictionary containing key-value pairs representing the configuration settings.
CLIENTSobjectCustom function clients object containing different client objects that can be used by the custom function to interact with other systems.Object containing client objects such as the LLM client (llm_client) and the file client (ibfile)
JOB_IDstringID for the job reported by the initial execution request.
INPUT_FOLDERstringInput folder for the flow job, mentioned during the execution request.

Example

In a post-flow UDF, you can consume results by reading the batch.ibflowresults file or using the API, and sending it to the downstream system. The following is an example you can use as a starting point to implement this integration.

1import logging
2import os
3import json
4
5from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder
6
7def write_summary(**kwargs):
8 summary_dict = {}
9
10 fn_context = kwargs.get('_FN_CONTEXT_KEY')
11 clients, _ = fn_context.get_by_col_name('CLIENTS')
12 root_out_folder, _ = fn_context.get_by_col_name('ROOT_OUTPUT_FOLDER')
13
14 res_path = os.path.join(root_out_folder, 'batch.ibflowresults')
15 output, err = clients.ibfile.read_file(res_path)
16 if err:
17 return None, err
18
19 results = json.loads(output)
20 if results['can_resume']:
21 # can_resume = True implies flow is stopped at checkpoint.
22 # If true, skip writing summary.
23 return None, None
24
25 for result_id in results['results']:
26 result = results['results'][result_id]
27 for record in result['records']:
28 if record['status'] == 'OK':
29 ibocr_path = record['ibocr_full_path']
30 ibocr, err = clients.ibfile.read_file(ibocr_path)
31 if err:
32 return None, f'Failed to fetch ibocr path={ibocr_path} err={err}'
33
34 builder, err = ParsedIBOCRBuilder.load_from_str(ibocr_path, ibocr)
35 if err:
36 return None, f'Failed to parse ibocr path={ibocr_path} err={err}'
37
38 # Iterate over the IBOCR records.
39 for ibocr_record in builder.get_ibocr_records():
40 raw_input_filepath = ibocr_record.get_document_path()
41 if raw_input_filepath not in summary_dict:
42 summary_dict[raw_input_filepath] = []
43 result = {}
44 if ibocr_record.has_class_label():
45 result['class_label'] = ibocr_record.get_class_label()
46 if ibocr_record.has_classify_page_range():
47 result['page_range'] = ibocr_record.get_classify_page_range()
48 else:
49 result['page_range'] = {}
50 result['page_range']['start_page'] = ibocr_record.get_page_numbers()[0]+1
51 result['page_range']['end_page'] = ibocr_record.get_page_numbers()[-1]+1
52
53 result['extracted_fields'] = {}
54 refined_phrases, _ = ibocr_record.get_refined_phrases()
55 for phrase in refined_phrases:
56 name = phrase.get_column_name()
57 value = phrase.get_column_value()
58 result['extracted_fields'][name] = value
59 summary_dict[raw_input_filepath].append(result)
60
61 _, err = clients.ibfile.write_file(os.path.join(root_out_folder, 'summary.json'), json.dumps(summary_dict, indent=4))
62 if err:
63 return None, f'Failed to write summary file err={err}'
64 return summary_dict, None
65
66
67def send_results(**kwargs):
68 summary, err = write_summary(**kwargs)
69 if err:
70 logging.error(err)
71 return
72
73 if not summary:
74 return
75
76 # TODO: Send result to downstream system
77 pass
78
79def register(name_to_fn):
80 name_to_fn.update({
81 'send_results': {
82 'fn': send_results
83 }
84 })

Call this custom function in the post-flow UDF hook by using the formula send_results().

The example implements two functions: send_results and write_summary. A post-flow UDF is called every time a flow stops. This means the post-flow UDF is called when a flow completes and also when a checkpoint fails and the flow is stopped. The write_summary function reads the flow results file and checks if the flow has completed by checking the can_resume flag. If the flow has been completed, it proceeds to read the IBOCR records, construct a summary containing the extracted results and returns it. Complete the send_results function to send the returned summary to the downstream system.