InsuranceLake Collect-to-Cleanse Transform Reference

This section describes each of the user-configured data transforms provided with the InsuranceLake ETL. The library of transforms can be extended by users of InsuranceLake using PySpark.

Contents

Transformation Reference Table

FormattingDescription
currencyConvert specified numeric field with currency formatting to decimal (fixed precision)
changetypeConvert specified fields to decimal (fixed precision), int, bigint, string, and more
dateConvert specified date fields to International Organization for Standardization (ISO) format based on a known input format
implieddecimalConvert specified numeric fields to decimal (fixed precision) type with implied decimal point support (in other words, the last two digits are to the right of decimal)
timestampConvert specified datetime fields to ISO format based on a known input format
titlecaseConvert specified string column in DataFrame to title or proper case
String ManipulationDescription
columnfromcolumnAdd or replace column in DataFrame based on regular expression group match pattern
columnreplaceAdd or replace a column in DataFrame with regular expression substitution on an existing column
combinecolumnsAdd column to DataFrame using format string and source columns
literalAdd column to DataFrame with static or literal value supplied in specification
filenameAdd column in DataFrame based on regular expression group match pattern on the filename argument to the AWS Glue job
Data SecurityDescription
hashHash specified column values using SHA256
redactRedact specified column values using supplied redaction string
tokenizeReplace specified column values with hash and store original value in DynamoDB table
Policy Data OperationsDescription
flipsignFlip the sign of a numeric column in a Spark DataFrame, optionally in a new column
addcolumnsAdd two or more columns together in a new or existing column
multiplycolumnsMultiply two or more columns together in a new or existing column
earnedpremiumCalculate monthly earned premium
enddateAdd a number of months to a specified date to get an ending or expiration date
expandpolicymonthsExpand dataset to one row for each month the policy is active with a calculated earned premium
policymonthsCalculate number of months between policy start and end dates
Structured DataDescription
jsonexpandarrayExpand array type columns from JSON files into multiple rows
jsonexpandmapExpand struct or map type columns from JSON files into multiple rows
flattenFlattens one level of a column containing nested data
jsonstructuredConvert string column with JSON data to structured column
xmlstructuredConvert string column with XML data to structured column
Miscellaneous Data OperationsDescription
lookupReplace specified column values with values looked up from an external table
multilookupAdd columns looked up from an external table using multiple conditions, returning any number of attributes
filldownFill starting column value down the columns for all null values until the next non-null
filterrowsFilter out rows based on standard SQL WHERE statement
mergeMerge columns using coalesce
rownumberAdds row number column to rows based on a partition column list

Using Transforms

  • Transform configuration is specified in the transform-spec section of the workflow’s JSON configuration file. The filename follows the convention of <database name>-<table name>.json and is stored in the /etl/transformation-spec folder in the etl-scripts bucket. When using AWS CDK for deployment, the contents of the /lib/glue_scripts/lib/transformation-spec directory will be automatically deployed to this location.

  • For an example of all transforms in one place, refer to the all-transforms-example.json in the transformation-spec directory of the repository.

  • The order that you enter the transforms into the JSON file is important and should be chosen deliberately. Each transform is executed in the order they are defined on the incoming dataset starting from the beginning of the transform_spec section of the file.

  • If a transform name is specified in the configuration that is undefined (no transform function exists), the workflow will not fail. Instead, you will see a warning message in the logs, as shown below. The remaining transforms will be executed; this behavior is designed to make iterative development easier.

      Transform function transform_futuretransform called for in SyntheticGeneralData-PolicyData.json not implemented
    
  • Except where noted, transforms will overwrite an existing field if specified as the result field. Where available, use the source parameter to indicate that a different column should be used as the source data, and the column specified in the field parameter should be used for the result value. Specifying a source field to create a new column for transforms is useful for debugging issues with a transform, preserving original data, or having a backup datapoint available when incoming data formats are less clear.

Using Transforms More Than Once

Transform types can be specified more than once in the transform specification by using an optional unique suffix, in the form of : following by a string. The string can be any number or identifier that is meaningful to the data pipeline designer. The suffix does not determine the order of exection; the transforms are executed in the order they are defined in the transform specification.

InsuranceLake-provided transforms are optimized to run in a single group using Apache Spark’s withColumns and select DataFrame methods. Specifying multiple transform types will limit this optimization and should only be used when strictly necessary for the workflow. Read more about optimizing workflow performance when running a large number of transforms in The hidden cost of Spark withColumn.

    "transform_spec": {
        "lookup:1": [
            {
                "field": "CoverageCode",
                "source": "CoverageName",
                "lookup": "CoverageCode"
            }
        ],
        "combinecolumns": [
            {
                "field": "Program",
                "format": "{}-{}",
                "source_columns": [ "CoverageCode", "PolicyYear" ]
            }
        ],
        "lookup:2": [
            {
                "field": "ProgramCode",
                "source": "Program",
                "lookup": "ProgramCode"
            }
        ]
    }

Behavior When There is No Transformation Specification

When there is no transformation specification file or an empty transformation specification in the ETL Scripts S3 bucket for the workflow, the ETL will perform no transformations. However, the ETL will save a recommended transformation specification file to the AWS Glue Temp bucket, <environment>-insurancelake-<account>-<region>-glue-temp, in the folder /etl/collect_to_cleanse following the naming convention <database>-<table>.json.

When this behavior occurs, you will see the following log message in the AWS Glue Job Output Logs:

No transformation specification found, generating recommended spec to: s3://...

This recommended transformation specification file can be used as a starting point to build your own transformations. Currently, the recommended transformation specification supports the following:

  • decimal transforms for any fields that Spark identifies as float or double.
  • date transforms for any fields that contain the text date in the field names.
  • timestamp transforms for any fields that contain the text time in the field names.
  • input_spec section for Excel files identified by their extension with default values.

Formatting

currency

Convert specified numeric field with currency formatting to decimal (fixed precision).

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting decimal conversion, and source field if source not specified separately
formatoptionalDecimal precision and scale (separated by comma), defaults to 16,2
sourceoptionalName of source field, defaults to destination field
eurooptionalIf true, handle European (5.000.000,12 or 5 000 000,12) currency format, otherwise handle 5,000,000.12; defaults to false
  • While this transform will work on numeric fields, we recommend changetype to convert to decimal values, because it is more efficient when combined with other data type changes.
  • This conversion essentially extracts any valid number from a string value; it removes any character that is not in [0-9,-.].
"currency": [
    {
        "field": "SmallDollars",
        "format": "6,2"
    },
    {
        "field": "EuroValue",
        "source": "EuroValueFormatted",
        "euro": true
    }
]

changetype

Convert specified fields to decimal (fixed precision), int, bigint, string, and more.

ParameterTypeDescription
keyrequiredName of the field to convert
valuerequiredDestination data type expressed using the Spark simpleString syntax
  • Transform specification is a single JSON object containing a list of string value pairs for each field to convert.

  • Transform can be used to rename a nested field in place by redefining the struct data type, with new field names using Spark’s simpleString syntax for struct types, for example: struct<name:type,name2:array<int>>. See all-transforms-example.json for a more complex example.

"changetype": {
    "ExpiringPremiumAmount": "decimal(10,2)",
    "WrittenPremiumAmount": "decimal(10,2)",
    "EarnedPremium": "decimal(10,2)",
    "PrimaryKeyId": "bigint",
    "GrowingCount": "bigint",
    "PolicyKey": "string",
    "notes_struct": "json"
}

date

Convert specified date fields to ISO format based on known input format.

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting date conversion, and source field if source not specified separately
formatrequiredDate format specified using Spark datetime patterns
sourceoptionalName of source field, defaults to destination field
  • With Spark datetime patterns, M (uppercase) means month and m (lowercase) means minutes. Mixing these up will result in date parse errors.

  • Use dd to indicate exactly two digit dates and d to indicate either one or two digits dates. This applies to all other symbols in the datetime pattern. Single character symbols are the most flexible.

  • An error similar to the following typically means that some or all of your dates are not formatted in the way the date pattern expects. Consider using data quality rules to test your data.

      You may get a different result due to the upgrading of Spark 3.0
      Fail to parse 'YYYY-M-d' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
    
"date": [
    {
        "field": "StartDate",
        "format": "M/d/yy"
    },
    {
        "field": "EndDate",
        "format": "yy-MM-dd"
    },
    {
        "field": "valuationdate",
        "source": "valuationdatestring",
        "format": "yyyyMMdd"
    }
]

implieddecimal

Convert specified numeric fields to decimal (fixed precision) type with implied decimal point support (in other words, last two digits are to the right of decimal).

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting decimal conversion, and source field if source not specified separately
formatrequiredDecimal precision and scale (separated by comma)
sourceoptionalName of source field, defaults to destination field
num_impliedoptionalNumber of implied decimal digits in the source field, defaults to 2
  • Use this transform to interpret decimal precision data stored in integer format, common in mainframe or flat file data formats.
"implieddecimal": [
    {
        "field": "indemnity_paid_current_period",
        "num_implied": "4",
        "format": "16,4"
    },
    {
        "field": "claim_amount",
        "source": "claim_amount_string",
        "format": "16,2"
    }
]

timestamp

Convert specified datetime fields to ISO format based on known input format.

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting timestamp conversion, and source field if source not specified separately
formatrequiredTimestamp format specified using Spark datetime patterns
sourceoptionalName of source field, defaults to destination field
"timestamp": [
    {
        "field": "GenerationDate",
        "format": "yyyy-MM-dd HH:mm:ss.SSS+0000"
    },
    {
        "field": "DataLoadTimestamp",
        "source": "DataLoadString",
        "format": "yyyy-MM-dd HH:mm:ss.SSSZ"
    }
]

titlecase

Convert specified string field to title or proper case (for example, “my name” will become “My Name”).

ParameterTypeDescription
fieldrequiredName of field to convert in place to title case
  • Transform specification is a simple list of fields of type string to convert.
"titlecase": [
    "CompanyName",
    "AddressStreet"
]

String Manipulation

columnfromcolumn

Add or replace column based on a regular expression group match pattern.

ParameterTypeDescription
fieldrequiredName of destination field to hold the extracted pattern, and source field if source not specified separately
patternrequiredRegular expression pattern with one match group following the Java Pattern syntax
sourceoptionalName of source field, defaults to destination field
  • We recommend building and testing your regular expressions using a visualization tool, such as Regex Vis.
  • Only the first match group will be used per specification block. For multiple groups, use multiple specification blocks and shift the parentheses.
  • This transform uses the Spark regexp_extract function.
"columnfromcolumn": [
    {
        "field": "username",
        "source": "emailaddress",
        "pattern": "(\\S+)@\\S+"
    },
    {
        "field": "policyyear",
        "source": "policyeffectivedate",
        "pattern": "(\\d\\d\\d\\d)/\\d\\d/\\d\\d"
    }
]

columnreplace

Add or replace a column with regular expression substitution on an existing column.

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting substituted value, and source field if source not specified separately
patternrequiredRegular expression pattern following the Java Pattern syntax
replacementrequiredString value to replace anything matched by the pattern
sourceoptionalName of source field, defaults to destination field
"columnreplace": [
    {
        "field": "clean_date_field",
        "source": "bad_date_field",
        "pattern": "0000-00-00",
        "replacement": ""
    },
    {
        "field": "field_with_extra_data",
        "pattern": "[a-zA-z]{3,5}",
        "replacement": ""
    }
]

combinecolumns

Add a column using a format string and list of source columns.

ParameterTypeDescription
fieldrequiredName of destination field to hold the resulting combined value, and source field if source not specified separately
formatrequiredFormat string using Python format string syntax. Implicit references, positional references, and the format specification mini-language are supported. Keyword arguments are not supported.
source_columnsrequiredList of source column names specified as a JSON array (at least one is required)
"combinecolumns": [
    {
        "field": "RowKey",
        "format": "{}-{}-{}",
        "source_columns": [ "LOBCode", "PolicyNumber", "StartDate" ]
    }
]

literal

Add or replace a column with the supplied static or literal value.

ParameterTypeDescription
keyrequiredName of the field to add or replace
valuerequiredLiteral value to store in the field (all JSON data types supported, including objects, arrays, and null)
  • Transform specification is a single JSON object containing a list of string value pairs for each field to create or replace.
"literal": {
    "source": "syntheticdata",
    "line_of_business": "synthetic"
}

filename

Add or replace a column using a regular expression group match pattern applied to the incoming source data filename.

ParameterTypeDescription
fieldrequiredName of destination field to hold the extracted pattern
patternrequiredRegular expression pattern with one match group following the Python regular expression syntax
requiredrequiredtrue/false value indicating whether to halt the workflow if the pattern is not matched; if required is false and the pattern is not matched, a null value will be used
  • We recommend building and testing your regular expressions using a visualization tool, such as Regex Vis.
  • Only the first match group will be used per specification block. For multiple groups, use multiple specification blocks, and shift the parenthesis.
"filename": [
    {
        "field": "valuationdate",
        "pattern": "\\S+-(\\d{8})\\.csv",
        "required": true
    },
    {
        "field": "program",
        "pattern": "([A-Za-z0-9]+)\\S+\\.csv",
        "required": true
    }
]

Data Security

hash

Apply a SHA256 hash function to specified column values.

ParameterTypeDescription
fieldrequiredName of field to convert in place to SHA256 hash
  • Transform specification is a simple list of fields of type string to convert.
  • If field does not exist, workflow will be halted to prevent unexpected schema changes from exposing sensitive data.
"hash": [
    "InsuredContactCellPhone",
    "InsuredContactEmail"
]

redact

Redact or replace specified column values using supplied redaction string.

ParameterTypeDescription
keyrequiredName of the field to replace
valuerequiredLiteral value to store in the field (all JSON data types supported, including objects, arrays, and null)
  • Transform spec is a single JSON object containing a list of string value pairs for each field to convert.
  • If the field does not exist, the workflow will be halted to prevent unexpected schema changes from exposing sensitive data.
"redact": {
    "CustomerNo": "****"
}

tokenize

Replace the specified column values with a SHA256 hash and store original values in a DynamoDB table.

ParameterTypeDescription
fieldrequiredName of field to convert in place to SHA256 hash; original value will be stored in a DynamoDB table
  • Transform specification is a simple list of fields of type string to convert.
  • The <environment>-insurancelake-etl-hash-values DynamoDB table will be used for storage of all tokens for all fields and data sets. Since the hashing is deterministic, each value will only be stored once, regardless of how many columns contain the value.
  • If the field does not exist, the workflow will be halted to prevent unexpected schema changes from exposing sensitive data.
"tokenize": [
    "EIN"
]

Policy Data Operations

flipsign

Flips the sign of a numeric column, optionally, in a new column.

ParameterTypeDescription
fieldrequiredName of numeric format destination field for which to flip the sign (+/-), and source field if source not specified separately
sourceoptionalName of source field, defaults to destination field
"flipsign": [
    {
        "field": "Balance"
    },
    {
        "field": "NewAccountBalance",
        "source": "AccountBalance"
    }
]

addcolumns

Mathematically add two or more columns together in a new column.

ParameterTypeDescription
fieldrequiredName of destination field to hold the result of adding the source columns; can be the same field as one of the source columns, which will overwrite the original value with the sum
source_columnsrequiredList of numeric source column names specified as a JSON array (at least 1 is required)
  • Empty (null) source columns will be treated as 0 values.
"addcolumns": [
    {
        "field": "TotalWrittenPremium",
        "source_columns": [ "WrittenPremiumAmount" ]
    }
]

multiplycolumns

Multiply two or more columns together in a new or existing column.

ParameterTypeDescription
fieldrequiredName of destination field to hold the result of multiplying the source columns; can be the same field as one of the source columns, which will overwrite the original value with the product
source_columnsrequiredList of numeric source column names specified as a JSON array (at least one is required)
empty_valueoptionalSpecifies the value to use for empty (null) fields, defaults to a value of 1
  • Use cases for this transform include calculating premium splits or allocating expenses.
"multiplycolumns": [
    {
        "field": "SplitPremium",
        "source_columns": [ "WrittenPremiumAmount", "SplitPercent1", "SplitPercent2" ],
        "empty_value": 0
    }
]

earnedpremium

Calculate monthly earned premium.

ParameterTypeDescription
fieldrequiredName of destination field to hold the calculated earned premium result; can be the same field as one of the written_premium_list columns, which will overwrite the original value with the result
written_premium_listrequiredList of numeric source column names containing written premium amounts specified as a JSON array (at least one is required)
policy_effective_daterequiredIndicates the existing date column to use for determining the start of the policy. When byday is true, this date will be used to pro-rate the earned premium for the first month of the policy; when byday is false, it will be used to identify the number of active policy months (always a whole number).
policy_expiration_daterequiredIndicates the existing date column to use for determining the end of the policy. When byday is true, this date will be used to pro-rate the earned premium for the first month of the policy; when byday is false, it will be used to identify the number of active policy months (always a whole number).
period_start_daterequiredIndicates the existing date column to use for determining the start of the earned premium calculation period for each row of data; usually this is the first day of the month and is created by the expandpolicymonths transform
period_end_daterequiredIndicates the existing date column to use for determining the end of the earned premium calculation period for each row of data; usually this is the last day of the month and is created by the expandpolicymonths transform
bydayoptionalUsed to specify the calculation method: if true, earned premium will be proportional to the number of days in the reporting period; if false, earned premium will be divided evenly across all active policy months, defaults to false
  • If you are overwriting an existing field and calculating the earned premium multiple times (for example, different methods), be aware that the operations will be processed in sequence and impact subsequent operations. In other words, the value that overwrites the field in the first operation will be used in the second operation and so on. If you need to calculate earned premium multiple times using the same inputs, you should use a new field for the result.

  • If any of the date inputs have null values, the earned premium will be null. Empty or null written premium values are treated as 0 values.

"earnedpremium": [
    {
        "field": "CalcEarnedPremium",
        "written_premium_list": [
                "WrittenPremiumAmount"
        ],
        "policy_effective_date": "EffectiveDate",
        "policy_expiration_date": "ExpirationDate",
        "period_start_date": "StartDate",
        "period_end_date": "EndDate",
        "byday": true
    }
]

enddate

Add a number of months to a specified date to calculate an ending or expiration date.

ParameterTypeDescription
fieldrequiredName of destination field to hold the calculated end date; can be an existing field, which will overwrite the original value with the result
start_daterequiredIndicates the existing date column to use for determining the start of the policy
num_monthsrequiredIndicates the existing numeric column to use for determining the number of policy months
"enddate": [
    {
        "field": "CalcExpirationDate",
        "start_date": "EffectiveDate",
        "num_months": "Term"
    }
]

expandpolicymonths

Expand the dataset to one row for each month the policy is active with a calculated earned premium.

ParameterTypeDescription
policy_effective_daterequiredIndicates the existing date column to use for determining the start of the policy. When byday is true, this date will be used to pro-rate the earned premium for the first month of the policy; when byday is false, it will be used to identify the number of active policy months (always a whole number).
policy_expiration_daterequiredIndicates the existing date column to use for determining the end of the policy. When byday is true, this date will be used to pro-rate the earned premium for the first month of the policy; when byday is false, it will be used to identify the number of active policy months (always a whole number).
policy_month_start_fieldrequiredIndicates the name of the field to add to the dataset containing the first day of the month for the expanded row of data
policy_month_end_fieldrequiredIndicates the name of the field to add to the dataset containing the last day of the month for the expanded row of data
policy_month_indexrequiredIndicates the name of field to add to the dataset containing the expanded policy month index
uniqueidoptionalUse to specify a field name to add with a generated GUID, unique to each policy
  • Use this transform to convert a list of insurance policies (one row per policy) to a list of active policy months (one row per month per policy). This transform will change the shape and size of the input data; specifically, it will increase the number of rows to number of policies x number of policy months.
  • This transform will add colums to each row containing the first day of the month, the last day of the month, and the policy month number or index.
  • The index column is required so that it is always possible to recover the original number of rows using a simple WHERE statement (in other words, WHERE PolicyMonthIndex = 1).
  • If either policy_effective_date or policy_expiration_date field values are null, the policy row will not be expanded to any additional rows and will have null values for the policy month and index fields.
  • Index column values are 1-based, matching the array reference standard in Athena SQL.
"expandpolicymonths": {
    "policy_effective_date": "EffectiveDate",
    "policy_expiration_date": "ExpirationDate",
    "uniqueid": "generated_policy_number",
    "policy_month_start_field": "StartDate",
    "policy_month_end_field": "EndDate",
    "policy_month_index": "PolicyMonthIndex"
}

policymonths

Calculate the number of months between policy start and end dates.

ParameterTypeDescription
fieldrequiredName of destination field to hold the calculated number of months; can be an existing field, which will overwrite the original value with the result
policy_effective_daterequiredIndicates the existing date column to use for determining the start of the policy
policy_expiration_daterequiredIndicates the existing date column to use for determining the end of the policy
normalizedoptionalIf true, the calculated number of months will always be a whole number (uses Python’s rrule dateutil function to perform a calendar walk); if false, the calculated number of months will be a fractional number based on the exact number of days between the effective and expiration dates; defaults to false
"policymonths": [
    {
        "field": "CalcNumMonths",
        "policy_effective_date": "EffectiveDate",
        "policy_expiration_date": "ExpirationDate",
        "normalized": true
    }
]

Structured Data

jsonexpandarray

Convert an ArrayType column (typically created from loading JSON nested data) to one row per array element with index.

ParameterTypeDescription
fieldrequiredName of destination field to hold expanded array elements, and source ArrayType field if source is not specified separately
sourceoptionalSource ArrayType field; defaults to destination field
index_fieldrequiredName of field to hold the expanded array element index
  • This transform uses Apache Spark’s posexplode_outer function, so empty or null array values will remain in the dataset as a single row with null in the destination field.
  • Index column values are one-based, not zero-based (in other words, the first column is numbered as 1), which matches the array reference standard in Amazon Athena SQL.
  • The index column is required so that there is always an easy way to get back to the data before being expanded (in other words, where index == 1).
"jsonexpandarray": [
    {
        "field": "policyaddress",
        "source": "policyaddresses",
        "index_field": "policyaddress_index"
    }
]

jsonexpandmap

Convert a MapType or StructType column (typically created from loading JSON nested data) to one row per map key, value pair with index column.

ParameterTypeDescription
fieldrequiredName of destination field to hold expanded map values, and source MapType or StructType field if source is not specified separately
sourceoptionalSource MapType or StructType field; defaults to destination field
index_fieldrequiredName of field to hold the expanded map key, value pair index
key_fieldrequiredName of field to hold the expanded map key name
  • This transform uses Spark’s posexplode_outer function, so empty or null map values will remain in the dataset as a single row with null in the destination field.
  • Index column values are one-based, not zero-based (in other words, the first column is numbered as 1), which matches the array reference standard in Amazon Athena SQL.
  • The index column is required so that there is always an easy way to get back to the data before being expanded (in other words, where index == 1).
"jsonexpandmap": [
    {
        "field": "activities",
        "index_field": "activity_index",
        "key_field": "activity_id"

    }
]

flatten

Shifts left all fields in a column containing structured data. Effectively flattens one level of nested data. No conversion to a map or exploding of rows is done.

ParameterTypeDescription
fieldrequiredName of field of type Struct to flatten
keep_fieldoptionalControls whether to keep the specified field or drop it; defaults to true
"flatten": [
    {
        "field": "general_policy_details",
        "keep_field": false
    }
]

jsonstructured

Convert string column containing JSON data to a structured or nested data type column.

ParameterTypeDescription
fieldrequiredName of string field to convert in place to structured data
"json": [
    "jsonfield"
]

xmlstructured

Convert string column containing XML data to a structured or nested data type column.

ParameterTypeDescription
fieldrequiredName of string field to convert in place to structured data

This transform will not work without installing the Spark-XML driver. If you attempt to run this transform without installing the driver properly, you will see the following error:

TypeError: 'JavaPackage' object is not callable
  • Similar to XML input file format support, this transform uses the Databricks Spark-XML driver for full XML parsing functionality through Apache Spark DataFrames. Refer to the XML section of the File Formats and Input Specification Documentation for instructions on obtaining and installing the Spark-XML driver.

  • This function uses Spark’s from_xml function. At this time, it is not possible to specify options, but the transform can be extended to support this.

"xml": [
    "xmlfield"
]

Miscellaneous Data Operations

lookup

Replace or add specified column values with values looked up from a DynamoDB table using a single value lookup key.

ParameterTypeDescription
fieldrequiredName of destination field to hold looked up values, and source field if source is not specified separately
sourceoptionalSource field with values matching the lookup data; defaults to destination field
lookuprequiredName of lookup set of data which is used to match the column_name attribute in the DynamoDB table
nomatchoptionalValue to use for lookups that have no match; defaults to null; must be the same data type as the looked up data.
source_systemoptionalValue to use for the source_system attribute in the DynamoDB table; defaults to the database name or (first level folder structure name in the Collect bucket). Use this override parameter to share lookups across different databases.
"lookup": [
    {
        "field": "smokingclass",
        "lookup": "smokingclass"
    },
    {
        "field": "issuestatename",
        "source": "issuestate",
        "lookup": "StateCd",
        "nomatch": "N/A",
        "source_system": "global"
    }
]
  • If your lookup data exceeds the DynamoDB item size limit, consider using the multilookup transform instead, which will split the lookup data into multiple items.

  • The provided resources/load_dynamodb_lookup_table.py script can be used to load prepared JSON data into the DynamoDB table:

    • Script parameters: |Parameter |Type |Description | |— |— |— | |source_system |required |String value that should match the source system name (first level folder structure name in the Collect bucket) for the workflow that will use the lookup |table_name |required |The name of the DynamoDB table deployed by the InsuranceLake CDK stack for single value lookups, in the form <environment>-<resource prefix>-etl-value-lookup |data_file |required |Filename of the local JSON file containing lookup data to load into DynamoDB (format below)

    • Example usage:
        ./load_dynamodb_lookup_table.py SyntheticGeneralData dev-insurancelake-etl-value-lookup syntheticgeneral_lookup_data.json
      
    • JSON format of the lookup data file:
        {
                "column_name1": { "lookup_value1": "lookedup_value1", "lookup_value2": "lookedup_value2", ... },
                "column_name2": { ... }
        }
      

multilookup

Add columns looked up from an external table using multiple conditions, returning any number of attributes.

ParameterTypeDescription
lookup_grouprequiredName of lookup set of data which is used to match the lookup_group attribute in the DynamoDB table; use to uniquely identify the set of lookup data
match_columnsrequiredList of one or more columns specified as a JSON array to use for matching the lookup data; the order of columns specified must match the order of the columns specified during the data load
return_attributesrequiredSpecifies the attribute names in the DynamoDB lookup table to add to the incoming dataset; defined as a JSON array and must contain at least one attribute
nomatchoptionalValue to use for lookups that have no match, defaults to null. Used as the value for all return_attributes columns. Must be the same data type as the looked up data.
"multilookup": [
    {
        "lookup_group": "LOBCoverage",
        "match_columns": [
            "program",
            "coverage"
        ],
        "return_attributes": [
            "coveragenormalized",
            "lob"
        ],
        "nomatch": "N/A"
    }
]
  • The match_columns names only refer to the incoming dataset. The column names in your lookup data (in DynamoDB) do not matter, because all the lookup column values are stored in a concatenated string in the lookup_item sort key.

If a column specified in return_attributes already exists, a duplicate column will be created, which will raise an error when saving to Apache Parquet format. Take care to map your incoming dataset correctly so that it has unique column names after performing the multilookup transform. For example, suppose your incoming data has a lineofbusiness column, but it is composed of bespoke values that you want to normalize. A best practice would be to use the schema map to rename lineofbusiness to originallineofbusiness so the incoming data is preserved, and use the multilookup to return a new (normalized) lineofbusiness attribute value.

  • The provided resources/load_dynamodb_multilookup_table.py script can be used to load prepared CSV data into the DynamoDB table:

    • Script parameters: |Parameter |Type |Description | |— |— |— | |table_name |required |The name of the DynamoDB table deployed by the InsuranceLake CDK stack for multi-lookups, in the form <environment>-<resource prefix>-etl-multi-lookup. All multilookup lookup datasets are stored in the same table and grouped by lookup_group. |data_file |required |Filename of the local CSV file containing lookup data to load into DynamoDB |lookup_group |required |Any meaningful name to uniquely identify the lookup data in the DynamoDB table |lookup_columns |required |One ore more columns in the CSV file to use as lookup values, listed last, separated by spaces. Note that field values from each specified column will be concatenated with a hyphen (-) separator to form a lookup key that matches the lookup_item attribute in the DynamoDB table. This is important to understand when editing the data in the future.

    • Example usage:
        ./load_dynamodb_multilookup_table.py dev-insurancelake-etl-multi-lookup lookups.csv PolicyData-LOBCoverage originalprogram originalcoverage
      
    • The lookup data file should be saved as CSV and include all the match columns and return value columns. It is acceptable to have some columns that are not used, because the transform specification allows the user to select the specific return columns they want in each transform.

    • All columns that are not specified as lookup columns in the CSV file will be imported as separate attributes in the DynamoDB table and be available as return attributes.

filldown

Fill starting column value down the columns for all null values until the next non-null.

ParameterTypeDescription
fieldrequiredName of column on which to perform the filldown operation
sortoptionalList of columns to use for sorting of the data before filling down, specified as a JSON array. This will change the order of the data for subsequent transforms. Defaults to no sort (data is left in the state from which it was loaded or from the last transform).
  • This function is useful for replacing null values created by pivot tables in Excel that have category headers inline with only the first row of data. This will normalize the data, ensuring that the headers are on all rows.

  • This function works by partitioning the data over non-null values in the columns, so it is important that your rows of data are organized such that the non-null values indicate the values you want to fill in the subsequent rows of data. If your data is not already organized in this way, use the sort optional parameter.

  • This is a Spark implementation of Pandas ffill based on the article How to utilize nested tables and window functions to fill down over null values by Robert O’Brien

"filldown": [
    {
        "field": "category"
    },
    {
        "field": "subcategory",
        "sort": [ "timestamp" ]
    }
]

filterrows

Filter out rows based on standard SQL WHERE statement.

ParameterTypeDescription
conditionrequiredString filter condition using Apache Spark WHERE clause syntax; rows that match will remain in the data set
descriptionoptionalThis parameter will be ignored, but we recommend using it to document the purpose of each filter condition
"filterrows": [
    {
        "description": "Claim number or file number is required",
        "condition": "claim_number is not null or file_number is not null"
    },
    {
        "condition": "`startdate` >= cast('1970-01-01' as date)"
    }
]

merge

Merge column values using coalesce.

ParameterTypeDescription
fieldrequiredName of destination field to hold the result of merging or coalescing the source columns; can be the same field as one of the source columns, which will overwrite the original value
source_listrequiredList of source column names specified as a JSON array (at least one is required)
defaultoptionalSpecifies the literal value to use when all source columns have empty (null) values, defaults to null
empty_string_is_nulloptionalSpecifies whether empty strings should be treated as null values, in other words, whether empty string values should be replaced; defaults to false
"merge": [
    {
        "field": "insuredstatemerge",
        "source_list": [
            "insuredstatename", "insuredstatecode"
        ],
        "default": "Unknown",
        "empty_string_is_null": true
    }
]

rownumber

Adds row number column to rows based on an optional partition column list, and optional sort column list. Use this transform to add row numbers, to index rows within categories, or to enumerate possible duplicate rows based on primary keys.

ParameterTypeDescription
fieldrequiredName of destination field to hold the rownumber result; can be an existing field, which will overwrite the original value.
partitionoptionalList of columns to partition over (look for changing values) specified as a JSON array; if not specified, the function will number every row of data in the set sequentially.
sortoptionalList of columns to use for sorting of the data before numbering, specified as a JSON array. This will change the order of the data for subsequent transforms. Defaults to no sort (data is left in the state from which it was loaded or from the last transform).
"rownumber": [
    {
        "field": "row_number"
    },
    {
        "field": "policy_month_index",
        "partition": [ "policynumber" ],
        "sort": [ "start_date" ]
    }
]

Back to top

Copyright Amazon.com and its affiliates; all rights reserved. This file is Amazon Web Services Content and may not be duplicated or distributed without permission.

Page last modified: Nov 18 2024.