InsuranceLake Developer Guide
This section provides details for developing with InsuranceLake.
Contents
- Local AWS CDK Deployment
- Local AWS Glue and Apache Spark Development
- Codebase
- Code Quality
- Unit Testing
- AWS CodePipeline and GitHub Integration
- Known Issues
Local AWS CDK Deployment
Reference the AWS CDK Instructions for standard AWS CDK project setup.
Prerequisites include:
Local AWS Glue and Apache Spark Development
To setup a local AWS Glue and Apache Spark environment for testing, use the AWS-provided Glue Docker container image which can be retrieved from the AWS Glue Dockerhub repository. Ensure you use the right tag for the version of AWS Glue used in the stack (currently v4).
For detailed instructions on setting up a local AWS Glue and Apache Spark environment, refer to the following guides:
- AWS Developer Guide: Developing and testing AWS Glue job scripts locally
- AWS Blog: Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container
- Developing AWS Glue ETL jobs locally using a container
AWS Glue Data Quality is not available in the AWS Glue Docker images.
Local Apache Iceberg Development
Local development and testing of Apache Iceberg tables is possible using the AWS-provided Glue Docker container. To enable this capability, the Docker container needs one or two additional libraries.
- Spark with Scala runtime Jar
- Required to run unit tests
- aws-bundle Jar
- Required for integration testing or local running and debugging with an AWS session
Both JAR files can be downloaded from the Apache Iceberg Releases page. The last versions known to work with AWS Glue 4.0 are iceberg-spark-runtime-3.3_2.12-1.5.2.jar and iceberg-aws-bundle-1.5.2.jar.
To enable these libraries, copy them to the /home/glue_user/spark/jars/
location in the AWS Glue Docker image.
For more detailed information on Iceberg unit testing, refer to the article Navigating the Iceberg: unit testing Iceberg tables with PySpark
If you attempt to run the unit tests without installing the Iceberg library, you will see the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o119.create.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'local': org.apache.iceberg.spark.SparkCatalog
Visual Studio Code
To run and debug the ETL AWS Glue jobs in Visual Studio Code, you’ll need a launch configuration defined in launch.json
. The following configuration provides the required parameters for all three of the InsuranceLake ETL AWS Glue jobs. You can also find this sample configuration in the launch.json.default
Visual Studio Code configuration file included in the repository. Be sure to replace the AWS Account ID placeholders with the AWS Account ID where InsuranceLake is deployed.
{
"name": "Python: Glue pySpark jobs",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"args": [
"--JOB_NAME=VisualStudioCode",
"--enable-glue-datacatalog=true",
"--TempDir=s3://dev-insurancelake-ACCOUNT_ID-us-east-2-glue-temp/etl/collect-to-cleanse",
"--enable-metrics=true",
"--enable-spark-ui=true",
"--spark-event-logs-path=s3://dev-insurancelake-ACCOUNT_ID-us-east-2-glue-temp/etl/collect-to-cleanse/",
"--enable-job-insights=true",
"--enable-continuous-cloudwatch-log=true",
"--state_machine_name=dev-insurancelake-etl-state-machine",
"--execution_id=test_execution_id",
"--environment=Dev",
"--source_bucket=s3://dev-insurancelake-ACCOUNT_ID-us-east-2-collect",
"--source_key=SyntheticGeneralData/PolicyData",
"--source_path=SyntheticGeneralData/PolicyData",
"--target_database_name=SyntheticGeneralData",
"--target_bucket=s3://dev-insurancelake-ACCOUNT_ID-us-east-2-cleanse",
"--base_file_name=syntheticgeneral-policy-data.csv",
"--p_year=2024",
"--p_month=07",
"--p_day=15",
"--table_name=PolicyData",
"--txn_bucket=s3://dev-insurancelake-ACCOUNT_ID-us-east-2-etl-scripts",
"--txn_spec_prefix_path=/etl/transformation-spec/",
"--txn_sql_prefix_path=/etl/transformation-sql/",
"--hash_value_table=dev-insurancelake-etl-hash-values",
"--value_lookup_table=dev-insurancelake-etl-value-lookup",
"--multi_lookup_table=dev-insurancelake-etl-multi-lookup",
"--dq_results_table=dev-insurancelake-etl-dq-results",
"--data_lineage_table=dev-insurancelake-etl-data-lineage",
"--additional-python-modules=rapidfuzz",
"--iceberg_catalog=glue_catalog"
],
"env": {
"AWS_DEFAULT_REGION": "us-east-2",
"PYDEVD_WARN_EVALUATION_TIMEOUT": "15"
}
}
Codebase
Source Code Structure
The table below explains how this source code is structured.
File / Folder | Description |
---|---|
app.py | Application entry point |
code_commit_stack | Optional stack to deploy an empty CodeCommit respository for mirroring |
pipeline_stack | CodePipeline stack entry point |
pipeline_deploy_stage | CodePipeline deploy stage entry point |
dynamodb_stack | Stack to create DynamoDB tables for job auditing and ETL transformation rules |
glue_stack | Stack to create AWS Glue jobs and supporting resources such as AWS Glue Connections, S3 Buckets (script and temporary), and an IAM execution role |
step_functions_stack | Stack to create an ETL Step Function State Machine which invokes AWS Glue jobs and supporting Lambda functions (state machine trigger and status notification) |
athena_helper_stack | Stack to create an Athena workgroup with query results bucket ready for demonstration SQL queries |
Collect-to-Cleanse AWS Glue Script | AWS Glue PySpark job data processing logic for Collect bucket data, which stores results in the Cleanse bucket |
Cleanse-to-Consume AWS Glue Script | AWS Glue PySpark job data processing logic for Cleanse bucket data, which stores results in the Consume bucket |
Entity Match AWS Glue Script | AWS Glue PySpark job data processing logic for entity matching, which stores results in the Consume bucket |
ETL Job Auditor | Lambda function to update DynamoDB in case of AWS Glue job success or failure |
ETL Trigger | Lambda function to trigger step function and initiate DynamoDB |
ETL Transformation Mapping and Specification | Field mapping and transformation specification logic to be used for data processing from Collect to Cleanse |
ETL Transformation SQL | Transformation SQL logic to be used for data processing from Cleanse to Consume |
ETL Data Quality Rules | AWS Glue Data Quality rules for quality checks from Cleanse to Consume |
test | This folder contains pytest unit tests |
resources | This folder has sample data and helper scripts |
docs | This folder contains all the documentation for InsuranceLake, including architecture diagrams, developer documentation, and user documentation |
Transformation Modules
The table below lists the AWS Glue transformation modules.
File / Folder | Description |
---|---|
datatransform_dataprotection | PySpark logic to redact, hash, and tokenize sensitive data columns |
datatransform_lookup | PySpark logic to perform column value lookup operations |
datatransform_misc | PySpark logic for miscellaneous data transformation functions, such as filtering rows |
datatransform_premium | PySpark logic to perform common insurance industry data transforms |
datatransform_stringmanipulation | PySpark logic to perform regular expression transforms and Python formatting string operations on data |
datatransform_structureddata | PySpark logic to perform operations on nested data structures usually created from JSON files |
datatransform_typeconversion | PySpark logic to convert date columns and other data types to standard format |
custom_mapping | PySpark logic to rename columns according to a map file |
dataquality_check | AWS Glue logic to run AWS Glue Data Quality rules according to a rules file |
datalineage | Custom data lineage tracking class designed to work with InsuranceLake transforms |
Code Quality
Style Guide for Python Code
AWS InsuranceLake follows PEP8 enforced through flake8 and pre-commit (instructions are shown below).
To enable pre-commit checks, install and setup the pre-commit library:
pip install pre-commit
pre-commit install
The above will create a git hook which will validate code prior to commits using flake8. Configuration for standards can be found in:
AWS Glue Job/Apache Spark Code
Due to their complexity, InsuranceLake AWS Glue jobs are not editable in the AWS Glue Visual Editor. AWS Glue job development and testing is best done in AWS Glue Notebooks or a local Python-enabled integrated development environment (IDE) in am AWS Glue Docker container. Documentation on setting up an AWS Glue Docker container can be found in the Local AWS Glue and Apache Spark Development section.
To more quickly diagnose issues with features not available in the AWS Glue Docker container, or when working with large datasets that may be too slow to process locally, bypassing the Step Functions workflow to execute specific AWS Glue jobs can be helpful. The following are example executions for each of the three InsuranceLake ETL AWS Glue jobs with the minimum required parameters:
aws glue start-job-run --job-name dev-insurancelake-cleanse-to-consume-job --arguments '
{
"--execution_id": "manual_execution_identifier",
"--source_bucketname": "dev-insurancelake-<Account ID>-us-east-2-glue-temp",
"--source_key": "MyDB/MyTable",
"--base_file_name": "input_file.csv",
"--database_name_prefix": "MyDB",
"--table_name": "MyTable",
"--p_year": "2024",
"--p_month": "01",
"--p_day": "01",
"--data_lineage_table": "dev-insurancelake-etl-data-lineage",
"--state_machine_name": "dev-insurancelake-etl-state-machine"
}'
aws glue start-job-run --job-name dev-insurancelake-cleanse-to-consume-job --arguments '
{
"--execution_id": "manual_execution_identifier",
"--source_bucketname": "dev-insurancelake-<Account ID>-us-east-2-glue-temp",
"--source_key": "MyDB/MyTable",
"--base_file_name": "input_file.csv",
"--database_name_prefix": "MyDB",
"--table_name": "MyTable",
"--p_year": "2024",
"--p_month": "01",
"--p_day": "01",
"--data_lineage_table": "dev-insurancelake-etl-data-lineage",
"--state_machine_name": "dev-insurancelake-etl-state-machine"
}'
aws glue start-job-run --job-name dev-insurancelake-consume-entity-match-job --arguments '
{
"--execution_id": "manual_execution_identifier",
"--source_key": "MyDB/MyTable",
"--database_name_prefix": "MyDB",
"--table_name": "MyTable",
"--data_lineage_table": "dev-insurancelake-etl-data-lineage",
"--state_machine_name": "dev-insurancelake-etl-state-machine",
"--p_year": "2024",
"--p_month": "01",
"--p_day": "01"
}'
The following are additional code considerations:
To include third party Python libraries, use the AWS Glue job parameter
--additional-python_modules
specified in the AWS Glue job definition in the AWS Glue stack. Review the AWS Glue User Guide for more information.To include your own Python libraries, simply create the additional library in
lib/glue_scripts/lib
and redeploy the AWS Glue stack using AWS CDK. The CDK AWS Glue stack will automatically identify the new library and include it in the--extra-py-files
parameter for all AWS Glue jobs. Ensure you also import the module in the AWS Glue job script.To include Spark JAR libraries, simply copy the JAR file to the
lib/glue_scripts/lib
folder and redeploy the AWS Glue stack using AWS CDK. The CDK AWS Glue stack will automatically identify the new library and include it in the--extra-jars
parameter for all AWS Glue jobs.InsuranceLake AWS Glue jobs follow the AWS best practice of getting the Spark session from the AWS Glue context and calling
job.init()
.job_commit()
is always called at the end of the script. These functions are used to update the state change to the service.InsuranceLake AWS Glue jobs do not make use of AWS Glue job bookmarks because almost all transformation of data is done in Spark DataFrames. AWS Glue job bookmarks are disabled by default for all InsuranceLake AWS Glue jobs. For future use, the AWS Glue jobs set the
transformation_ctx
for all DynamicFrame operations.A difference between the standard AWS Glue job source code and the InsuranceLake code is that the job initialization is contained within
main
and not in the global Python context. This is done to facilitate unit testing of the AWS Glue jobs in a local environment using the AWS Glue Docker container.The suggested skeleton code to construct a new AWS Glue job in the InsuranceLake pipeline follows:
import sys import os # Other Python imports from pyspark.context import SparkContext from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job # Other pyspark and awsglue imports # For running in local Glue container sys.path.append(os.path.dirname(__file__) + '/lib') from glue_catalog_helpers import upsert_catalog_table, clean_column_names, generate_spec, put_s3_object from datalineage import DataLineageGenerator # Other local libraries needed from lib/glue_scripts/lib # Minimum job arguments (add others here if required) expected_arguments = [ 'JOB_NAME', 'environment', 'TempDir', 'state_machine_name', 'execution_id', 'source_key', ] # Handle optional arguments for arg in sys.argv: if '--data_lineage_table' in arg: expected_arguments.append('data_lineage_table') def main(): args = getResolvedOptions(sys.argv, expected_arguments) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) lineage = DataLineageGenerator(args) # Job specific code here job.commit() if __name__ == '__main__': main()
The majority of InsuranceLake operations are done using Spark-native DataFrames because conversions to AWS Glue DynamicFrames and Pandas DataFrames come with a cost. InsuranceLake was also designed to be as portable as possible to other Spark environments (with the exception of AWS Glue Data Quality). We recommend you follow the practice of avoiding DataFrame conversions in your AWS Glue jobs.
- When there is functionality needed from Pandas that is not available in Spark, there are three methods to consider:
Using the
DataFrame.pandas_api()
is performant because the data and operations are distributed. Avoid operations likeDataFrame.to_numpy()
that require the data to be collected on the driver (non-distributed). The Pandas API on Spark does not target 100% compatibility, so you may experience errors running your workflow.Pandas UDFs Pandas user-defined functions (UDFs) are executed by Spark using Apache Arrow to transfer data and Pandas to work with the data. When used in combination with
withColumn
orselect
, a Pandas UDF can perform Pandas library vectorized operations in a distributed manner on individual columns supplied as arguments.If full conversion to a Pandas DataFrame is needed, ensure your AWS Glue job enables Apache Arrow support for data conversion as follows:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True) pandas_df = spark_df.toPandas() # Pandas operations here spark_df = spark.createDataFrame(pandas_df)
When implementing custom transforms or custom AWS Glue jobs, developers should carefully consider the use of Spark actions and operations that cause data shuffling or that force immediate evaluation of transforms. Use of these operations can significantly impact the performance of your AWS Glue job.
For a list of operations that cause data shuffling, refer to the Spark Programming Guide list of shuffle operations.
Spark actions cause any unevaluated transformation to immediately execute. For a full list of Spark functions, refer to the list of transformations and the list of actions.
Use cache when you need to perform multiple operations on the same dataset to avoid reading from storage repeatedly.
Unit Testing
The InsuranceLake Infrastructure and ETL unit tests for CDK code, Lambda functions, and AWS Glue jobs use pytest.
To install the Python dependencies for development and testing, use the following pip
command:
pip install -r requirements-dev.txt
Once the dependencies are installed, run tests with the following command (--cov
will include a code coverage report):
python -m pytest --cov
AWS Glue job unit tests will be automatically skipped if no AWS Glue or Spark environment is detected. A message similar to the following will be indicated:
test/test_custom_mapping.py::test_custommapping_renames_field SKIPPED (No PySpark environment found) [ 17%]
To set up a local AWS Glue and Spark environment for testing, refer to the Local AWS Glue and Apache Spark Development section.
AWS CodePipeline and GitHub Integration
Integration between CodePipeline and GitHub requires a personal access token. This access token is stored in AWS Secrets Manager. This is a one-time setup and is applicable for all target AWS environments and all repositories created under the organization in GitHub. Follow the below steps:
Create a personal access token in your GitHub. Refer to Creating a personal access token for details.
- Run the below command:
python3 ./lib/prerequisites/configure_account_secrets.py
- Enter or paste in the GitHub personal access token when prompted.
The access token value will not appear on the screen.
Confirm the information for the Secrets Manager Secret is correct, and type ‘y’.
- Expected output:
Pushing secret: /InsuranceLake/GitHubToken A secret is added to AWS Secrets Manager with name **/InsuranceLake/GitHubToken**
Known Issues
CodeBuild Quotas
Ensure you are aware of the service quotas for AWS CodeBuild. Exceeding a quota will result in an error similar to the following:
Action execution failed Error calling startBuild: Cannot have more than 1 builds in queue for the account (Service: AWSCodeBuild; Status Code: 400; Error Code: AccountLimitExceededException; Request ID: e123456-d617-40d5-abcd-9b92307d238c; Proxy: null)
S3 Object Lock
Enabling S3 Object Lock on the Cleanse or Consume S3 buckets breaks ETL data writes to the buckets. This is caused by known limitations to Hadoop’s S3A driver used by Spark. These open issues are being tracked as HADOOP-19080 and HADOOP-15224. Enabling S3 Object Lock on these S3 buckets will result in an error similar to the following:
An error occurred while calling o237.saveAsTable. Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <extended_request_id>; Proxy: null)
It is possible to convert the ETL data write operations to use the GlueContext getSink method which supports writing to S3 buckets with Object Lock. However, this introduces a side effect of creating a new version of the Data Catalog table schema for every write operation.