Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Overview

Implementation Guide (IG) for OpenSearch integration

Query Inferences enables users to perform advanced search and analytics operations on inference data collected from edge devices using Amazon OpenSearch Service. This part of the Guidance adds a functionality to efficiently search, analyze, and visualize inference results stored from distributed edge camera environments.

Functionally, this document will provide instructions to add OpenSearch on top of existing guidance solution that can:

  • Store an AI event on the cloud
  • Retrieve an AI event stored on the cloud
  • Search for an event by event type, time-duration, per camera
  • Search for an attribute by event type, time-duration, per camera
  • Managed lifecycle based on data retention

Architecture workflow

PutInference: At a high level, Cloud Models and the Device will be going through the same flow to store an AI inference on the cloud. It goes through Fathom’s Data Plane and will be ingested via a buffer, a lambda will read from this buffer and aggregate the results then saves it to the OpenSearch cluster.

QueryInference: At a high level, the user will call the QueryInference API to search for inferences from the cluster. This will flow through Fathom’s Control Plane which will read from the cluster.

Prerequisites

AWS Environment

  • AWS Account with appropriate permissions
  • OpenSearch domain for inference query

Development Environment

  • Java 17 JDK
  • Gradle 7.x or higher
  • Git (for version control)

Required Permissions

  • Add the following permissions to deployment/video-logistics-cdk/VideoAnalyticsVideoLogisticsCDK/lib/stacks/serviceStack/serviceStack.ts:
    • QueryInferencesActivityLambda requires the following Policy Statement:
new PolicyStatement({
  effect: Effect.ALLOW,
  actions: [
    'es:ESHttpPost',
    'es:ESHttpGet'
  ],
  resources: [
    `arn:aws:es:${this.region}:${this.account}:domain/${domainName}/*`
  ]
})

Model Definition

Sample model definition for OpenAPI example. Sample definition includes all required sub-types, enums, and structs.

Request Model

public class QueryInferencesRequestContent {
  public static final String SERIALIZED_NAME_MODEL_NAME = "modelName";
  @SerializedName(SERIALIZED_NAME_MODEL_NAME)
  private String modelName;

  public static final String SERIALIZED_NAME_MODEL_VERSION = "modelVersion";
  @SerializedName(SERIALIZED_NAME_MODEL_VERSION)
  private String modelVersion;

  public static final String SERIALIZED_NAME_NEXT_TOKEN = "nextToken";
  @SerializedName(SERIALIZED_NAME_NEXT_TOKEN)
  private String nextToken;

  public static final String SERIALIZED_NAME_MAX_RESULTS = "maxResults";
  @SerializedName(SERIALIZED_NAME_MAX_RESULTS)
  private BigDecimal maxResults;

  public static final String SERIALIZED_NAME_SEARCH_EXPRESSION = "searchExpression";
  @SerializedName(SERIALIZED_NAME_SEARCH_EXPRESSION)
  private SearchExpression searchExpression;

  public static final String SERIALIZED_NAME_SORT_BY = "sortBy";
  @SerializedName(SERIALIZED_NAME_SORT_BY)
  private String sortBy;

  public static final String SERIALIZED_NAME_SORT_ORDER = "sortOrder";
  @SerializedName(SERIALIZED_NAME_SORT_ORDER)
  private SortOrder sortOrder;

  public QueryInferencesRequestContent() {
  }

  public QueryInferencesRequestContent modelName(String modelName) {
    this.modelName = modelName;
    return this;
  }

  /**
   * Get modelName
   * @return modelName
   */
  @javax.annotation.Nonnull
  public String getModelName() {
    return modelName;
  }

  public void setModelName(String modelName) {
    this.modelName = modelName;
  }


  public QueryInferencesRequestContent modelVersion(String modelVersion) {
    this.modelVersion = modelVersion;
    return this;
  }

  /**
   * Get modelVersion
   * @return modelVersion
   */
  @javax.annotation.Nullable
  public String getModelVersion() {
    return modelVersion;
  }

  public void setModelVersion(String modelVersion) {
    this.modelVersion = modelVersion;
  }


  public QueryInferencesRequestContent nextToken(String nextToken) {
    this.nextToken = nextToken;
    return this;
  }

  /**
   * Get nextToken
   * @return nextToken
   */
  @javax.annotation.Nullable
  public String getNextToken() {
    return nextToken;
  }

  public void setNextToken(String nextToken) {
    this.nextToken = nextToken;
  }


  public QueryInferencesRequestContent maxResults(BigDecimal maxResults) {
    this.maxResults = maxResults;
    return this;
  }

  /**
   * Get maxResults
   * @return maxResults
   */
  @javax.annotation.Nullable
  public BigDecimal getMaxResults() {
    return maxResults;
  }

  public void setMaxResults(BigDecimal maxResults) {
    this.maxResults = maxResults;
  }


  public QueryInferencesRequestContent searchExpression(SearchExpression searchExpression) {
    this.searchExpression = searchExpression;
    return this;
  }

  /**
   * Get searchExpression
   * @return searchExpression
   */
  @javax.annotation.Nullable
  public SearchExpression getSearchExpression() {
    return searchExpression;
  }

  public void setSearchExpression(SearchExpression searchExpression) {
    this.searchExpression = searchExpression;
  }


  public QueryInferencesRequestContent sortBy(String sortBy) {
    this.sortBy = sortBy;
    return this;
  }

  /**
   * Get sortBy
   * @return sortBy
   */
  @javax.annotation.Nullable
  public String getSortBy() {
    return sortBy;
  }

  public void setSortBy(String sortBy) {
    this.sortBy = sortBy;
  }


  public QueryInferencesRequestContent sortOrder(SortOrder sortOrder) {
    this.sortOrder = sortOrder;
    return this;
  }

  /**
   * Get sortOrder
   * @return sortOrder
   */
  @javax.annotation.Nullable
  public SortOrder getSortOrder() {
    return sortOrder;
  }

  public void setSortOrder(SortOrder sortOrder) {
    this.sortOrder = sortOrder;
  }
}

Response Model

public class QueryInferencesResponseContent {
  public static final String SERIALIZED_NAME_DATA = "data";
  @SerializedName(SERIALIZED_NAME_DATA)
  private QueryData data;

  public static final String SERIALIZED_NAME_NEXT_TOKEN = "nextToken";
  @SerializedName(SERIALIZED_NAME_NEXT_TOKEN)
  private String nextToken;

  public static final String SERIALIZED_NAME_RESULT_COUNT = "resultCount";
  @SerializedName(SERIALIZED_NAME_RESULT_COUNT)
  private BigDecimal resultCount;

  public QueryInferencesResponseContent() {
  }

  public QueryInferencesResponseContent data(QueryData data) {
    this.data = data;
    return this;
  }

  /**
   * Get data
   * @return data
   */
  @javax.annotation.Nullable
  public QueryData getData() {
    return data;
  }

  public void setData(QueryData data) {
    this.data = data;
  }


  public QueryInferencesResponseContent nextToken(String nextToken) {
    this.nextToken = nextToken;
    return this;
  }

  /**
   * Get nextToken
   * @return nextToken
   */
  @javax.annotation.Nullable
  public String getNextToken() {
    return nextToken;
  }

  public void setNextToken(String nextToken) {
    this.nextToken = nextToken;
  }


  public QueryInferencesResponseContent resultCount(BigDecimal resultCount) {
    this.resultCount = resultCount;
    return this;
  }

  /**
   * Get resultCount
   * @return resultCount
   */
  @javax.annotation.Nullable
  public BigDecimal getResultCount() {
    return resultCount;
  }

  public void setResultCount(BigDecimal resultCount) {
    this.resultCount = resultCount;
  }
}

All Subtypes Required for Model

{
  "ResourceName": {
    "type": "string",
    "pattern": "^[a-zA-Z0-9:_\\-]+$",
    "minLength": 1,
    "maxLength": 128
  },
  "NextToken": {
    "type": "string",
    "pattern": "^[^\\n\\r<>&'\"\\x08]+$"
  },
  "SortOrder": {
    "type": "enum",
    "values": ["Ascending", "Descending"]
  },
  "QueryData": {
    "type": "structure",
    "members": {
      "modelName": {"type": "ResourceName"},
      "modelVersion": {"type": "string"},
      "timestamp": {"type": "Timestamp"},
      "inferenceGenerated": {"type": "DocumentList"}
    }
  },
  "SearchExpression": {
    "type": "structure",
    "members": {
      "filters": {"type": "QueryFilterList"},
      "operator": {"type": "SearchExpressionOperator"},
      "subExpressions": {"type": "SearchExpressionList"},
      "aggregation": {"type": "Aggregation"}
    }
  },
  "QueryFilterList": {
    "type": "list",
    "member": {"type": "QueryFilter"}
  },
  "QueryFilter": {
    "type": "structure",
    "members": {
      "name": {"type": "string"},
      "operator": {"type": "QueryOperator"},
      "value": {"type": "StringList"}
    }
  },
  "SearchExpressionOperator": {
    "type": "enum",
    "values": ["And", "Or"]
  },
  "SearchExpressionList": {
    "type": "list",
    "member": {"type": "SearchExpression"}
  },
  "DocumentList": {
    "type": "list",
    "member": {"type": "Document"}
  },
  "StringList": {
    "type": "list",
    "member": {"type": "string", "minLength": 1, "maxLength": 100},
    "minItems": 1,
    "maxItems": 100,
    "uniqueItems": true
  },
  "GroupByPropertyList": {
    "type": "list",
    "member": {"type": "GroupByProperty"},
    "minItems": 1,
    "maxItems": 5,
    "uniqueItems": true
  },
  "AggregationResult": {
    "type": "structure",
    "members": {
      "inclusiveProperties": {"type": "InferencePropertyList"},
      "dedupeByProperties": {"type": "InferencePropertyList"}
    }
  },
  "SortList": {
    "type": "list",
    "member": {"type": "Sort"},
    "minItems": 1,
    "maxItems": 5,
    "uniqueItems": true
  },
  "QueryOperator": {
    "type": "enum",
    "values": [
      "Equals", "NotEquals", "GreaterThan", "GreaterThanOrEqualTo",
      "LessThan", "LessThanOrEqualTo", "Contains", "Exists",
      "NotExists", "In"
    ]
  },
  "Aggregation": {
    "type": "structure",
    "members": {
      "groupByProperties": {"type": "GroupByPropertyList"},
      "aggregationResult": {"type": "AggregationResult"},
      "maxInferencesPerBucket": {"type": "integer"},
      "sortPerBucket": {"type": "SortList"}
    }
  },
  "GroupByProperty": {
    "type": "structure",
    "members": {
      "name": {"type": "InferenceProperty"},
      "sortOrder": {"type": "SortOrder"},
      "nullPropertyInclusive": {"type": "boolean"}
    }
  },
  "InferencePropertyList": {
    "type": "list",
    "member": {"type": "InferenceProperty"},
    "minItems": 1,
    "maxItems": 10,
    "uniqueItems": true
  },
  "Sort": {
    "type": "structure",
    "members": {
      "sortBy": {"type": "InferenceProperty"},
      "sortOrder": {"type": "SortOrder"}
    }
  },
  "InferenceProperty": {
    "type": "string",
    "minLength": 1,
    "maxLength": 256
  }
}

Implementation Details

‘QueryInferences’ supports both simple queries and complex aggregation queries for inference data stored in OpenSearch. It’s designed to leverage OpenSearch’s Point in Time (PIT) feature for reliable pagination and supports both flatten and nested aggregation patterns, making it suitable for diverse query requirements while ensuring proper model validation and default behaviors.

  1. Query Types
    1. Simple queries (non-aggregated)
    2. Flatten aggregation queries
    3. Nested aggregation queries
  2. Request Handlers
    1. QueryInferencesHandler
    2. FlattenAggregationHandler
    3. NestedAggregationHandler

Activity Layer

@Service(SERVICE_NAME)
public class QueryInferencesActivity extends Activity {
    private static final Logger LOG = LogManager.getLogger(QueryInferencesActivity.class);
    private final VLQueryBuilder vlQueryBuilder;
    private final OpenSearchClientProvider openSearchClientProvider;
    private final NextTokenSerializerDeserializer nextTokenSerializerDeserializer;
    private final OpenSearchPitDAO openSearchPitDAO;
    private final SchemaRepository schemaRepository;
    private final AggregationResponseHelper aggregationResponseHelper;

    @Inject
    public QueryInferencesActivity(final VLQueryBuilder vlQueryBuilder,
                                   final OpenSearchClientProvider openSearchClientProvider,
                                   final NextTokenSerializerDeserializer nextTokenSerializerDeserializer,
                                   final OpenSearchPitDAO openSearchPitDAO,
                                   final SchemaRepository schemaRepository,
                                   final AggregationResponseHelper aggregationResponseHelper) {
        this.vlQueryBuilder = vlQueryBuilder;
        this.openSearchClientProvider = openSearchClientProvider;
        this.nextTokenSerializerDeserializer = nextTokenSerializerDeserializer;
        this.openSearchPitDAO = openSearchPitDAO;
        this.schemaRepository = schemaRepository;
        this.aggregationResponseHelper = aggregationResponseHelper;
    }

    @Operation("QueryInferences")
    public QueryInferencesResponse queryInferences(final QueryInferencesRequest input) throws Exception {
        if (input == null) {
            throw new ValidationException(INVALID_INPUT);
        }

        String modelName = input.getModelName();
        String modelVersion = input.getModelVersion();

        LOG.info("Entered queryInferences method for {}-{}", modelName, modelVersion);
        if (!schemaRepository.exists(modelName, modelVersion)) {
            throw new ValidationException(INVALID_MODEL_VERSION);
        }

        if (input.getMaxResults() == null) {
            input.setMaxResults(DEFAULT_MAX_RESULTS);
        }

        InferencesRequestHandler handler = getInferencesRequestHandler(input);
        return handler.execute();
    }

    InferencesRequestHandler getInferencesRequestHandler(QueryInferencesRequest input) {
        InferencesRequestHandler handler;
        SearchExpression expression = input.getSearchExpression();
        if (expression == null || expression.getAggregation() == null) {
            handler = new QueryInferencesHandler(input, openSearchPitDAO,
                    nextTokenSerializerDeserializer, openSearchClientProvider, vlQueryBuilder, schemaRepository);
        } else {
            AggregationMetadata metadata = new AggregationMetadata(input, schemaRepository);

            if (metadata.getAggregationType() == AggregationType.Flatten) {
                handler = new FlattenAggregationHandler(input, openSearchPitDAO,
                        nextTokenSerializerDeserializer, openSearchClientProvider, vlQueryBuilder, metadata,
                        aggregationResponseHelper, schemaRepository);
            } else if (metadata.getAggregationType() == AggregationType.Nested) {
                handler = new NestedAggregationHandler(input, openSearchPitDAO,
                        nextTokenSerializerDeserializer, openSearchClientProvider, vlQueryBuilder, metadata,
                        aggregationResponseHelper, schemaRepository);
            } else {
                throw new ValidationException(NOT_SUPPORT_FLATTEN_NESTED_AGGREGATION);
            }
        }

        return handler;
    }
}

Main Lambda Handler

There are three cases with three different handlers that each execute a lambda function to return a valid QueryInferencesResponse. They extend a parent class InferencesRequestHandler, which is defined below:

public abstract class InferencesRequestHandler {
    private static final Logger LOG = LogManager.getLogger(InferencesRequestHandler.class);

    protected final QueryInferencesRequest input;
    protected final List<SortBy> sortByList;
    protected final List<FieldSortBuilder> sortBuilderList;
    protected final VLQueryBuilder vlQueryBuilder;
    protected NextTokenSerializerDeserializer nextTokenSerializerDeserializer;
    protected String pitId;
    protected NextToken nextTokenObject;
    protected boolean isPaginationRequest;
    private final OpenSearchPitDAO openSearchPitDAO;
    private final OpenSearchClient openSearchClient;
    private final SchemaRepository schemaRepository;

    public InferencesRequestHandler(final QueryInferencesRequest input,
                                    final OpenSearchPitDAO openSearchPitDAO,
                                    final NextTokenSerializerDeserializer nextTokenSerializerDeserializer,
                                    final OpenSearchClientProvider openSearchClientProvider,
                                    final VLQueryBuilder vlQueryBuilder,
                                    final SchemaRepository schemaRepository) {

        this.input = input;
        this.sortByList = getSortList(input);
        this.sortBuilderList = Lists.newArrayList();
        this.openSearchPitDAO = openSearchPitDAO;
        this.nextTokenSerializerDeserializer = nextTokenSerializerDeserializer;
        this.vlQueryBuilder = vlQueryBuilder;
        this.schemaRepository = schemaRepository;

        // Hard-coding OpenSearch endpoint. Need to replace with separate util to fetch
        // endpoint based on caller's AWS account. 
        // String openSearchEndpoint = routingUtils.getOpenSearchEndpoint(customerAccountId);
        String openSearchEndpoint = "search-opensearchdomain-alpha-0-1-nbzypkivhgkyh5xql6ytf5xi4i.us-west-2.es.amazonaws.com";
        this.openSearchClient = openSearchClientProvider.getInstance(openSearchEndpoint);

        init(openSearchEndpoint);
    }

    public QueryInferencesResponse execute() throws Exception {
        ActionRequest actionRequest = createActionRequest();
        SearchResponse searchResponse = openSearchClient.search(actionRequest);
        return createQueryResponse(searchResponse);
    }

    protected abstract ActionRequest createActionRequest() throws Exception;

    protected abstract QueryInferencesResponse createQueryResponse(SearchResponse searchResponse);

    protected QueryInferencesResponse wrapDocuments(List<Document> documentList) {
        QueryData data = QueryData.builder()
                .withModelName(input.getModelName())
                .withModelVersion(input.getModelVersion())
                .withInferenceGenerated(documentList).build();

        return QueryInferencesResponse.builder()
                .withData(data)
                .withResultCount(documentList.size()).build();
    }

    protected QueryInferencesResponse wrapAggregationDocuments(AggregationResponseHelper utils, ParsedComposite compositeResult,
                                                               AggregationMetadata metadata) {

        List<Document> documentList = utils.extractResponseFromCompositeResult(compositeResult, metadata);
        QueryInferencesResponse response = wrapDocuments(documentList);
        if (response.getResultCount() > 0) {
            NextToken nextToken = new NextToken(pitId, null, compositeResult.afterKey());
            response.setNextToken(nextTokenSerializerDeserializer.serialize(nextToken));
        }

        return response;
    }

    private FieldSortBuilder getNestedQueryBuilder(SortBy sortBy) {
        List<String> nestedObjects = schemaRepository.getNestedAncestorProperties(sortBy.getName(), input.getModelName(),
                input.getModelVersion());
        FieldSortBuilder sortBuilder = new FieldSortBuilder(sortBy.getName())
                .order(OpenSearchRequestUtils.getSortOrder(sortBy));

        if (nestedObjects.isEmpty()) {
            return sortBuilder;
        }

        NestedSortBuilder nestedSortBuilder = null;
        for (String nestedObject : nestedObjects) {
            if (nestedSortBuilder == null) {
                nestedSortBuilder = new NestedSortBuilder(nestedObject);
            } else {
                nestedSortBuilder = new NestedSortBuilder(nestedObject).setNestedSort(nestedSortBuilder);
            }
        }

        return sortBuilder.setNestedSort(nestedSortBuilder);
    }

    private void init(String openSearchEndpoint) {
        if (input.getNextToken() == null) {
            // Get a PIT snapshot id from DDB
            // Hard-coding Routing AWS account. Need to replace with separate util.
            // String openSearchAwsAccountId = routingUtils.getOpenSearchAwsAccountId(customerAccountId);
            String openSearchAwsAccountId = "569952966180";
            OpenSearchPit pit = openSearchPitDAO.load(openSearchAwsAccountId, input.getModelName(), openSearchEndpoint);
            if (pit == null) {
                LOG.error(NO_POINT_IN_TIME_AVAILABLE, input.getModelName(), openSearchEndpoint);
                throw new InternalServerException(INTERNAL_SERVER_EXCEPTION);
            }
            this.pitId = pit.getPitId();
        } else {
            // For pagination request
            this.nextTokenObject = nextTokenSerializerDeserializer.deserialize(input.getNextToken());
            this.pitId = nextTokenObject.getPitId();
            this.isPaginationRequest = true;
        }

        for (SortBy sortBy : sortByList) {
            sortBuilderList.add(getNestedQueryBuilder(sortBy));
        }
    }

    private List<SortBy> getSortList(QueryInferencesRequest input) {
        String sortBy = Objects.requireNonNullElse(input.getSortBy(), TIMESTAMP_KEY);
        String sortOrder = Objects.requireNonNullElse(input.getSortOrder(), DEFAULT_SORTING_ORDER);

        return Lists.newArrayList(new SortBy(sortBy, sortOrder));
    }
}

Case: QueryInferencesHandler

Called for simple queries (non-aggregated).

public class QueryInferencesHandler extends InferencesRequestHandler {
    private static final Logger LOG = LogManager.getLogger(QueryInferencesHandler.class);

    private final Object[] searchAfter;

    public QueryInferencesHandler(final QueryInferencesRequest input,
                                  final OpenSearchPitDAO openSearchPitDAO,
                                  final NextTokenSerializerDeserializer nextTokenSerializerDeserializer,
                                  final OpenSearchClientProvider openSearchClientProvider,
                                  final VLQueryBuilder vlQueryBuilder,
                                  final SchemaRepository schemaRepository) {

        super(input, openSearchPitDAO, nextTokenSerializerDeserializer,
                openSearchClientProvider, vlQueryBuilder, schemaRepository);

        if (isPaginationRequest) {
            // User should not modify the original sort/order. Verified that pagination token validation doesn't include this.
            validateSortList(sortByList, nextTokenObject.getSortByList());
            searchAfter = nextTokenObject.getSortByList().stream().map(SortBy::getValue).toArray();
        } else {
            searchAfter = null;
        }
    }

    protected ActionRequest createActionRequest() throws Exception {
        QueryBuilder queryBuilder = this.vlQueryBuilder.transformSearchExpression(input.getSearchExpression(), input.getModelName(),
                input.getModelVersion());
        // Note that if a pagination token is passed and
        // the request params are different from the original request, the request should fail
        // https://tiny.amazon.com/8a66jkhh/codeamazpackAmazblobf54dsrc
        return buildSearchRequest(queryBuilder, null, input.getMaxResults(), pitId, sortBuilderList, searchAfter);
    }

    protected QueryInferencesResponse createQueryResponse(SearchResponse searchResponse) {
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        List<Document> documentList = new ArrayList<>();
        for (SearchHit searchHit : searchHits) {
            documentList.add(Document.fromMap(searchHit.getSourceAsMap()));
        }

        int currentDocumentCount = documentList.size();
        QueryInferencesResponse response = wrapDocuments(documentList);

        if (currentDocumentCount > 0) {
            SearchHit lastSearchHit = searchHits[currentDocumentCount - 1];
            String nextToken = getNextToken(lastSearchHit, pitId, sortByList);
            response.setNextToken(nextToken);
        }

        return response;
    }

    /**
     * Make sure customer doesn't modify sort in the query
     */
    private void validateSortList(List<SortBy> sortByListFromInput, List<SortBy> sortByListFromToken) {
        if (!sortByListFromToken.equals(sortByListFromInput)) {
            throw new ValidationException(String.format(
                    FathomExceptionMessage.DIFFERENT_QUERY_FROM_ORIGINAL, sortByListFromToken, sortByListFromInput));
        }
    }

    private String getNextToken(SearchHit lastSearchHit, String pitId, List<SortBy> sortByList) {
        // Construct SortBy object with sortBy attribute value from last document as pagination token
        List<SortBy> sortListWithSortValue = sortByList.stream().map(sortBy -> {
            Object sortValue;
            try {
                sortValue = JsonPath.read(lastSearchHit.getSourceAsMap(), "$." + sortBy.getName());
            } catch (PathNotFoundException e) {
                LOG.info("No sort-by {} in last inference with id: {}, default to null",
                        sortBy.getName(), lastSearchHit.getId());
                sortValue = null;
            }

            return new SortBy(sortBy.getName(), sortBy.getOrder(), sortValue);
        }).collect(Collectors.toList());

        return nextTokenSerializerDeserializer.serialize(new NextToken(pitId, sortListWithSortValue, null));
    }
}

Case: FlattenAggregationHandler

Called for flatten aggregation queries.

public class FlattenAggregationHandler extends InferencesRequestHandler {
    private static final Logger LOG = LogManager.getLogger(FlattenAggregationHandler.class);
    private final AggregationMetadata metadata;
    private final AggregationResponseHelper responseUtils;

    public FlattenAggregationHandler(final QueryInferencesRequest input,
                                     final OpenSearchPitDAO openSearchPitDAO,
                                     final NextTokenSerializerDeserializer nextTokenSerializerDeserializer,
                                     final OpenSearchClientProvider openSearchClientProvider,
                                     final VLQueryBuilder vlQueryBuilder,
                                     final AggregationMetadata metadata,
                                     final AggregationResponseHelper responseUtils,
                                     final SchemaRepository schemaRepository) {

        super(input, openSearchPitDAO, nextTokenSerializerDeserializer,
                openSearchClientProvider, vlQueryBuilder, schemaRepository);

        this.metadata = metadata;
        this.responseUtils = responseUtils;
    }

    protected ActionRequest createActionRequest() throws Exception {
        LOG.info("Process aggregation request with {}", metadata.toString());
        Aggregation aggregation = input.getSearchExpression().getAggregation();
        AggregationBuilder compositeAggregationBuilder = metadata.getCompositeBuilder(aggregation.getGroupByProperties(), nextTokenObject);
        compositeAggregationBuilder.subAggregation(metadata.getTopHitsBuilder());
        LOG.info("Composite aggregation: {}", compositeAggregationBuilder);

        QueryBuilder queryBuilder = vlQueryBuilder.transformSearchExpression(input.getSearchExpression(), input.getModelName(),
                input.getModelVersion());
        // Set maxResults to 0 since we don't need the actual inference documents for return
        // For aggregation, searchAfter is always null since aggregation has separate pagination token in aggregationBuilder
        return buildSearchRequest(queryBuilder, compositeAggregationBuilder, 0, pitId, sortBuilderList, null);
    }

    protected QueryInferencesResponse createQueryResponse(SearchResponse searchResponse) {
        ParsedComposite compositeResult = searchResponse.getAggregations().get(AggregationMetadata.COMPOSITE_AGGREGATOR_NAME);
        return wrapAggregationDocuments(responseUtils, compositeResult, metadata);
    }
}

Case: NestedAggregationHandler

Called for nested aggregation queries.

public class NestedAggregationHandler extends InferencesRequestHandler {
    private static final Logger LOG = LogManager.getLogger(NestedAggregationHandler.class);
    private final AggregationMetadata metadata;
    private final AggregationResponseHelper responseUtils;

    public NestedAggregationHandler(final QueryInferencesRequest input,
                                    final OpenSearchPitDAO openSearchPitDAO,
                                    final NextTokenSerializerDeserializer nextTokenSerializerDeserializer,
                                    final OpenSearchClientProvider openSearchClientProvider,
                                    final VLQueryBuilder vlQueryBuilder,
                                    final AggregationMetadata metadata,
                                    final AggregationResponseHelper responseUtils,
                                    final SchemaRepository schemaRepository) {

        super(input, openSearchPitDAO, nextTokenSerializerDeserializer,
                openSearchClientProvider, vlQueryBuilder, schemaRepository);

        this.metadata = metadata;
        this.responseUtils = responseUtils;
    }

    protected ActionRequest createActionRequest() throws Exception {
        LOG.info("Process aggregation request with {}", metadata.toString());
        Aggregation aggregation = input.getSearchExpression().getAggregation();
        // All group-by properties can be put into composite for pagination
        CompositeAggregationBuilder compositeAggregationBuilder = metadata.getCompositeBuilder(aggregation.getGroupByProperties(),
                nextTokenObject);
        compositeAggregationBuilder.subAggregation(metadata.getTopHitsBuilder());
        AggregationBuilder nestedAggregationBuilder = metadata.getNestedAggregationBuilder(compositeAggregationBuilder);
        LOG.info("Nested aggregation: {}", nestedAggregationBuilder);

        QueryBuilder queryBuilder = vlQueryBuilder.transformSearchExpression(input.getSearchExpression(), input.getModelName(),
                input.getModelVersion());
        // Set maxResults to 0 since we don't need the actual inference documents for return
        // For aggregation, searchAfter is always null since aggregation has separate pagination token in aggregationBuilder
        return buildSearchRequest(queryBuilder, nestedAggregationBuilder, 0, pitId, sortBuilderList, null);
    }

    protected QueryInferencesResponse createQueryResponse(SearchResponse searchResponse) {
        ParsedNested nestedResult = responseUtils.getMostInnerNestedResponse(searchResponse, metadata);
        ParsedComposite compositeResult = nestedResult.getAggregations().get(AggregationMetadata.COMPOSITE_AGGREGATOR_NAME);
        return wrapAggregationDocuments(responseUtils, compositeResult, metadata);
    }
}

Usage

Lambda Input Format

{
  "modelName": "Event",
  "modelVersion": "1.0",
  "maxResults": 100,
  "searchExpression": {
    "filters": [
      {
        "name": "metadata.deviceId",
        "operator": "Equals",
        "value": [
          "your-device-id"
        ]
      },
      {
        "name": "timestamp",
        "operator": "GreaterThanOrEqualTo",
        "value": [
          "2023-10-22T13:00:00.000-0700"
        ]
      },
      {
        "name": "timestamp",
        "operator": "LessThanOrEqualTo",
        "value": [
          "2023-11-10T20:00:00.000-0700"
        ]
      },
      {
        "name": "modelOutput.MetadataStream.VideoAnalytics.Frame.Source",
        "operator": "Equals",
        "value": [
          "VA_HEAT_BASIC"
        ]
      }
    ],
    "operator": "And"
  }
}

Lambda Response Format

{
  "data": {
    "modelName": "Event", 
    "modelVersion": "1.0",
    "timestamp": "2023-05-25T10:30:00Z", 
    "inferenceGenerated": [
      // DocumentList 
    ]
  },
  "nextToken": "string", 
  "resultCount": 123
}

Verification Steps

  1. Verify Lambda Deployment
    1. Go to AWS Lambda console and check that your function exists with the correct configuration:
      • Runtime: Java 17
      • Memory: At least 1024MB
      • Timeout: 15 minutes
      • Required IAM permissions attached
  2. Verify Step Functions Deployment
    1. Go to AWS Step Functions console and verify:
      • State machine exists with correct definition
      • IAM role has necessary permissions
      • Execution history shows successful test runs
  3. Test Query Inferences
    1. Start a test execution with sample input:
       {
         "modelName": "Event",
         "modelVersion": "1.0",
         "maxResults": 100,
         "searchExpression": {
       "filters": [
         {
           "name": "metadata.deviceId",
           "operator": "Equals",
           "value": [
             "your-device-id"
           ]
         },
         {
           "name": "timestamp",
           "operator": "GreaterThanOrEqualTo",
           "value": [
             "2023-10-22T13:00:00.000-0700"
           ]
         },
         {
           "name": "timestamp",
           "operator": "LessThanOrEqualTo",
           "value": [
             "2023-11-10T20:00:00.000-0700"
           ]
         },
         {
           "name": "modelOutput.MetadataStream.VideoAnalytics.Frame.Source",
           "operator": "Equals",
           "value": [
             "VA_HEAT_BASIC"
           ]
         }
       ],
       "operator": "And"
         }
       }
      
    2. Monitor execution in Step Functions console
    3. Check CloudWatch logs for Lambda function
    4. Verify:
      1. One page returned for inference
      2. The result is ordered by correct timestamp order
  4. Verify Error Handling
    1. Test with pagination settings
    2. Test with different max results
    3. Test with different sort order configs
    4. Verify error states are handled gracefully
  5. Performance Verification
    1. Monitor Lambda execution time
    2. Check memory utilization
    3. Verify no API throttling errors
    4. Confirm successful cleanup of temporary files