InsuranceLake File Formats and Input Specification
This section provides details on file formats and input specification.
Contents
- Input Specification
- CSV (Comma Separated Value)
- TSV (Tab Separated Value)
- Pipe-delimited
- JSON
- XML
- Fixed Width
- Parquet
- Microsoft Excel Format Support
Input Specification
Input specification configuration is defined in the input_spec
section of the workflow’s JSON configuration file. The filename follows the convention of <database name>-<table name>.json
and is stored in the /etl/transformation-spec
folder in the etl-scripts
bucket. When using AWS CDK for deployment, the contents of the /lib/glue_scripts/lib/transformation-spec
directory will be automatically deployed to this location. The input_spec
section is used to specify input file format configuration and other data pipeline configuration and co-exists with the transform_spec
section.
Parameter | Description |
---|---|
table_description | Text string description to use for the table in the Data Catalog; only applies to the table in the Cleanse bucket |
allow_schema_change | Setting to control permitted schema evolution; supported values: permissive , strict , reorder , evolve ; more information is provided in the Schema Evolution documentation |
strict_schema_mapping | Boolean value that controls whether to halt the pipeline operation if fields specified in the schema mapping are not present in the input file; more information is provided in the Schema Mapping Dropping Columns documenatation |
csv | Section to specify CSV file specific configuration |
tsv | Section to specify TSV file specific configuration |
pipe | Section to specify pipe-delimited file specific configuration |
parquet | Section to indicate Apache Parquet input file support |
json | Section to specify JSON file specific configuration |
xml | Section to specify XML file specific configuration |
fixed | Section to indicate fixed width input file support |
excel | Section to specify Excel file specific configuration |
Example of other data pipeline configuration parameters:
{
"input_spec": {
"table_description": "Description will only apply to cleanse bucket",
"allow_schema_change": "permissive",
"strict_schema_mapping": true
}
}
CSV (Comma Separated Value)
Comma separated value file format is the default file format for the InsuranceLake ETL. If no input specification configuration is specified, and the file extension is not recognized, the ETL will assume that CSV file format is desired.
The csv
configuration section can be used to specify additional format options, including a custom field delimiter:
Format Option | Default | Description |
---|---|---|
header | true | Specifies whether the first row should be interpreted as a header row; to understand the schema when no header row is present, refer to the Schema Mapping Files with No Header documentation |
quote_character | " | Character used for escaping quoted field values containing the field separator |
escape_character | " | Character used for escaping quotes inside an already quoted field value |
delimiter | Dependent on the section header | Field separator character or characters; overrides the character indicated by the specified file format |
Spark CSV read defaults differ in some ways from Request For Comments (RFC) 4180 definitions for CSV. For example, multi-line quoted is not supported by default, and the escape character is backslash by default. InsuranceLake ETL changes the default escape character to
"
to match RFC 4180. However, multi-line quoted is not supported due to its negative impact on parallelization in Spark.Other Spark CSV read defaults affect the way the ETL interprets delimited files. Refer to the Spark CSV File Data Source Option documentation for details.
Overriding the delimiter indicated by the section header could reduce the self-documenting effectiveness of your input specification configuration.
The following is an example of an input specification for a CSV file with no header, single quote for the quote character, and a backslash escape character (instead of the default double-quote character):
{
"input_spec": {
"csv": {
"header": false,
"quote_character": "'",
"escape_character": "\\"
}
}
}
TSV (Tab Separated Value)
Tab separated value input files are not identified by any file extension; the ETL will only interpret an input file as tab separated value if the file format is specified in the input_spec
configuration.
The tsv
configuration section can be used to indicate if a header row is expected, to specify a different quote character, to specify a different escape character, or to specify a custom field delimiter. CSV, TSV, and pipe-delimited formats share the same configuration section options. Complete details can be found in the CSV format support documentation.
The following is an example of a configuration for a TSV file with a header row:
{
"input_spec": {
"tsv": {
"header": true
}
}
}
Pipe-delimited
Pipe-delimited input files are not identified by any file extension; the ETL will only interpret an input file as pipe-delimited if the file format is specified in the input_spec
configuration.
The pipe
configuration section can be used to indicate if a header row is expected, to specify a different quote character, to specify a different escape character, or to specify a custom field delimiter. CSV, TSV, and pipe-delimited formats share the same configuration section options. Complete details can be found in the CSV format support documentation.
The following is an example of a configuration for a pipe-delimited file with a header row:
{
"input_spec": {
"pipe": {
"header": true
}
}
}
JSON
JSON input files are identified with the file extensions .json
and .jsonl
.
Single line JSON or JSONLines formatted multi-record files are supported by default. To support multiline single-record JSON, use the multiline
parameter.
Example of a configuration for a single-record multiline JSON file:
{
"input_spec": {
"json": {
"multiline": true
}
}
}
XML
While AWS Glue has support for XML parsing through DynamicFrames, InsuranceLake uses the Databricks Spark-XML driver for full XML parsing functionality through Spark DataFrames.
Obtaining the XML Driver
This library is not provided by AWS Glue or AWS; you must download the library and its dependencies from a 3rd party and install them in your environment.
You can download the pre-compiled Spark XML library at MVN Repository.
The last version known to work with AWS Glue 4.0 is: 2.12 / 0.18.0. AWS Glue 4.0 provides all the required dependencies for this version.
XML Driver Installation
To include the Spark JAR package in the AWS Glue session, simply copy the JAR files to the lib/glue_scripts/lib
folder and redeploy the AWS Glue stack using CDK. The CDK code will automatically identify the new libraries and include them in the --extra-jars
parameter for all AWS Glue jobs.
If you attempt to load an XML file without installing the driver properly, you will see the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o125.load.: java.lang.ClassNotFoundException: Failed to find data source: xml. Please find packages at https://spark.apache.org/third-party-projects.html
XML Configuration
XML input files are identified with the file extension .xml
.
The Spark XML driver is designed to read multiple rows of data from an XML file. This is accomplished by identifying a tag in the XML file that represents a new row, called the row tag. InsuranceLake ETL uses row
as the default row tag. If your data uses a different tag to identify rows of data, you must specify it in the input specification using the row_tag
parameter. If the row tag is not found in the XML source data, the ETL will not load any rows of data.
Example of a configuration for an XML file:
{
"input_spec": {
"xml": {
"row_tag": "xmlrow"
}
}
}
Fixed Width
The InsuranceLake ETL will only interpret an input file as fixed width if the file format is specified in the input_spec
configuration; fixed width input files are not identified by any file extension. The fixed
configuration section has no other parameters; the value should be defined as an empty JSON object (for forward compatibility).
To specify the width and position of each column in the file, use the schema mapping file with the extra column Width
. More details and an example schema mapping file is available in the Schema Mapping Fixed Width File Format documentation.
Example of a configuration for a fixed width file:
{
"input_spec": {
"fixed": {}
}
}
Handling Encoding Issues
Fixed width format data files may have characters encoded in formats that are not expected by the source system (for example, from more modern upstream systems). Specifically, there may be multi-byte single characters in the fixed width file that are counted as the multiple characters for the width of field. Spark text file reading will detect and encode these characters automatically, which can result in incorrectly calculated widths for specific rows.
To work around this issue, the ETL’s fixed width handling can be modified to decode the text data using US-ASCII or ISO-8859-1, and split the line based on the width of fields in the schema mapping. This will allow all bytes of the multi-byte characters to count towards the field width, thus parsing the field width correctly.
To implement this change, modify the fixed width parsing Spark statement in etl_collect_to_cleanse.py by adding a decode()
function as shown.
Remember to add the decode
function to the list of imports from pyspark.sql.functions
.
initial_df = spark.read.text(source_path)
initial_df = initial_df.select(
[
# Remove all encoding to calculate width, select column widths, then trim
trim(decode(initial_df.value, 'US-ASCII').substr(
# Calculate start index based on all prior column widths
reduce(lambda a, b: a + int(b['width']), mapping_data[:index], 0) + 1,
int(field_data['width'])
)).alias(field_data['destname'])
for index, field_data in enumerate(mapping_data)
if field_data['destname'].lower() != 'null'
]
)
Parquet
Apache Parquet files are identified either by the input file extension parquet
or by specifying parquet
in the input_spec
configuration (for Apache Parquet files with varying extensions).
By default, Parquet files will be read one at a time, each initiating a separate ETL pipeline execution, and no folder structure discovery will be performed. Refer to the next section, Handling Multi-file Data Sets for instructions on how to change this default behavior.
The following compression formats are supported transparently: uncompressed, snappy, gzip, lzo (AWS Glue documentation reference).
Handling Multi-file Data Sets
In the future, handling of multi-file Parquet data sources will be better integrated into InsuranceLake. The instructions in this section will detail how to modify the source code of the Collect-to-Cleanse AWS Glue job, and the etl-trigger Lambda function to support multi-file Parquet data sources.
Multi-file incoming data sets are often landed in the Collect S3 bucket in a nested folder structure. A nested folder structure for a Parquet data set could look similar to a partition override folder structure. For this reason, you must comment out and disable the partition override support in the etl-trigger Lambda function on line 127 as follows:
# try:
# # Check if uploader has provided folders to override the default partitions
# p_year = path_components[2]
# p_month = path_components[3]
# p_day = path_components[4]
# except IndexError:
# # Keep any defaults
# pass
To support multiple files that belong to the same dataset landed in the Collect bucket, we will expect one file to be added to the collection with a unique prefix that can be used to trigger the workflow; all other files will be ignored by the etl-trigger AWS Lambda function.
Add the following code at Line 123 of the same AWS Lambda function to skip processing of all files except the “success” or “trigger” file. The code assumes the trigger filename will start with the _
character.
if len(path_components) > 2 and not object_base_file_name.startswith('_'):
logger.error(f'Ignoring individual files in folders beyond 2 levels (assuming collection): {object_full_path}')
return {
'statusCode': 400,
'body': json.dumps('Received PutObject for individual file in collection; will not process')
}
Lastly, add the following code to the Collect-to-Cleanse AWS Glue job on Line 204 to handle ETL executions initiated by the trigger file. This change ensures that you load the entire folder in which the trigger file exists, and otherwise load individual files for all other executions.
elif ext.lower() == '.parquet' or ( 'parquet' in input_spec and args['base_file_name'].startswith('_') ):
if args['base_file_name'].startswith('_'):
initial_df = spark.read.format('parquet').load(args['source_bucket'] + '/' + args['source_path'] + '/')
else:
initial_df = spark.read.format('parquet').load(source_path)
After making these code modifications, you will need to ensure that the parquet
format is specified in the input_spec
section of the workflow’s JSON configuration file. You will then be able to upload a folder of Parquet files inside the second level of folder structure with the Collect S3 bucket (in other words, you still must have folders representing the database and table name to use), and the ETL will initiate a single execution to process the entire uploaded folder.
Microsoft Excel Format Support
InsuranceLake has been designed to support Microsoft Excel format input files using the OpenSource Crealytics Spark Excel driver.
Obtaining the Driver
This library and the mentioned dependencies are not provided by AWS Glue or AWS; you must download the library and its dependencies from a 3rd party and install them in your environment.
You can download the pre-compiled Spark Excel library at MVN Repository.
The last version known to work with AWS Glue 4.0 is: 2.12 / 3.3.1_0.18.7
The Spark Excel library also has dependencies that are not included with AWS Glue by default. Specifically, you must also download the Apache POI API Based On OPC and OOXML Schemas library and a newer version of XML Beans.
The versions that are required for the Spark Excel version 3.3.1_0.18.7 are poi-ooxml-5.2.3 and xmlbeans-5.1.1.
If you want to work with another version of Spark Excel, follow these steps to ensure AWS Glue compatibility:
Verify the version of Apache Spark included with the AWS Glue version in the AWS documentation.
Verify the version of Scala included with the Apache Spark version either from the Spark package dependencies or the Spark compatibility matrix.
Download the version of Spark Excel corresponding to the desired Apache Spark and Scala version. If the Apache Spark version doesn’t exist, find the closest match. For example, AWS Glue 4.0 uses Apache Spark 3.3.0 and Scala 2.12. The closest match from the pre-built Spark Excel packages is Spark 3.3.1 and Scala 2.12.
Check the Compile Dependencies of the Spark Excel library, specifically looking for the Apache POI version, XML Beans and Apache Commons. Library version details included with AWS Glue can be found here. Take note of libraries missing from AWS Glue or where a significantly newer version is needed than the version included with AWS Glue. Typically, this is limited to Apache POI and XML Beans.
Download the required versions of the dependency libraries.
Driver Installation
To include all Spark JAR packages in the AWS Glue session, simply copy the JAR files to the lib/glue_scripts/lib
folder and redeploy the AWS Glue stack using CDK. The CDK code will automatically identify the new libraries and include them in the --extra-jars
parameter for all AWS Glue jobs.
If you attempt to load an Excel file without installing the driver properly, you will see the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o124.load.: java.lang.ClassNotFoundException: Failed to find data source: excel. Please find packages at https://spark.apache.org/third-party-projects.html
Configuration
InsuranceLake recognizes Excel files by their extension. Currently, the following extensions (case insensitive) are supported: xls
, xlsx
, xlm
, xlsm
. Regardless of your configuration, the source data file must have one of these extensions to be detected as Excel.
With no additional configuration, InsuranceLake will attempt to read tabular data from the first sheet in the workbook, starting at cell A1
.
Parameter | Type | Description |
---|---|---|
sheet_names | optional | A JSON list object of possible sheet names to try. Only one sheet is matched; the first match will be used. Sheets can be named or referenced by index. For referencing by index, specify the sheet number (starting from 0) as a string (for example, "1" ). Default: [ "0" ] |
data_address | optional | Either a Cell Reference representing the top left corner of the data, or a Range Reference specifying a specific block of data. Default: A1 |
header | optional | JSON boolean indicating whether the data has a header row or not. Default: true |
password | optional | Password to use to unlock password-protected Excel workbooks. Default: none |
Example configuration:
{
"input_spec": {
"excel": {
"sheet_names": [ "Sheet1" ],
"data_address": "A2",
"header": true,
"password": ""
}
}
}
To load multiple sheets of data in the workbook into multiple tables, simply copy the same Excel file to multiple Collect bucket folders, each with their own transformation spec file and Excel configuration (which references each sheet).
Notes on Excel Support
- Original date formats in Excel are not always visible in some versions of Excel. In other words, when viewing values of date fields in Excel, the date format you see on the screen may not be what is stored in the file. This is important if you are writing a
date
ortimestamp
transform spec and need to specify the correct date pattern. If you have an incorrect date format, symptoms include:- Null/empty date field values.
- An error similar to one of the following:
You may get a different result due to the upgrading of Spark 3.0 Fail to parse 'YYYY-M-d' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
To avoid this issue, we recommend importing the Excel file first with no date conversion. Specifically:
- Import the Excel workbook with no
date
ortimestamp
transform. - Note the field type that Spark chooses for your date columns.
- If the field is already a date or timestamp, it may mean that Apache Spark was able to detect the date format and there is no need for a date transform. To confirm, ensure the date field values are correct.
- If the field is a string, note the format of the date field values and add a
date
ortimestamp
transform for the field.
You may experience issues importing Excel workbooks with macros that are used to write cells containing data you want to import. The Spark Excel library is subject to the limitations of the Apache POI project.
You may encounter the error
Zip bomb detected
when reading some Excel workbooks. The error message will look similar to this:py4j.protocol.Py4JJavaError: An error occurred while calling o37.load. : java.io.IOException: Zip bomb detected! The file would exceed the max. ratio of compressed file size to the size of the expanded data. This may indicate that the file is used to inflate memory usage and thus could pose a security risk. You can adjust this limit via ZipSecureFile.setMinInflateRatio() if you need to work with files which exceed this limit. Uncompressed size: 106496, Raw/compressed size: 859, ratio: 0.008066 Limits: MIN_INFLATE_RATIO: 0.010000
This is a security feature of the Apache POI library that tries to detect and block malicious Excel files. It has led to false positives for many users of Spark Excel. You can read details and support a solution in the Spark Excel Issue Tracker.
Because there is no method to access
setMinInflateRatio()
from PySpark at this time, we recommend working around the issue by opening the Excel file, saving the desired tab in a separate file, and uploading the newly saved file into the Collect bucket.The Spark Excel driver has a file size limit by default. If you exceed this limit, you will see an error message similar to this:
Caused by: org.apache.poi.util.RecordFormatException: Tried to allocate an array of length 322,589,970, but the maximum length for this record type is 100,000,000. If the file is not corrupt or large, please open an issue on bugzilla to request Increasing the maximum allowable size for this record type. As a temporary workaround, consider setting a higher override value with IOUtils.setByteArrayMaxOverride()
You can correct this issue by increasing the Apache POI ByteArrayMaxOverride value in the Spark Excel read options in etl_collect_to_cleanse.py.
Example supporting up to 2 GB files:
initial_df = spark.read.format('excel') \ .option('header', header) \ .option('inferSchema', 'true') \ .option('mode', 'PERMISSIVE') \ .option("usePlainNumberFormat", 'true') \ .option('dataAddress', f"'{sheet_name}'!{data_address}") \ .option('workbookPassword', workbook_password) \ .option("maxByteArraySize", 2147483647) .load(source_path)