Overview
This implementation guide demonstrates an asynchronous workflow pattern for exporting video segments from Amazon Kinesis Video Streams (KVS) using AWS Lambda and AWS Step Functions. The solution is specifically designed to handle long-running KVS streams through an asynchronous, iterative processing workflow.
The architecture leverages AWS Step Functions to orchestrate a state machine that coordinates the video export process. The Lambda function processes video fragments in optimized batches, with each iteration handling up to 200 fragments. For longer streams, the Step Function workflow automatically manages multiple Lambda invocations, tracking progress through continuation tokens and maintaining state between iterations.
Key technical components of the workflow include:
- A stateful Step Function that manages the export lifecycle
- An iterative Lambda execution pattern for processing large streams
- Efficient fragment batching with FFmpeg for video processing
- Asynchronous S3 upload integration for scalable storage
- Progress tracking and state management between iterations
- Embedded timeline subtitles generated from fragment timestamps
Key Features
- Batch processing of video fragments (max 200 fragments per batch)
- Automatic subtitle generation with timestamps
- MP4 export with configurable quality
- Progress tracking for long exports
- S3 integration for output storage
- Support for partial exports with continuation tokens
Prerequisites
AWS Environment
- AWS Account with appropriate permissions
- S3 bucket for video exports created
- Kinesis Video Stream with archived footage existed
Development Environment
- Java 17 JDK
- Gradle 7.x or higher
- FFmpeg (provided via Lambda layer)
- Git (for version control)
AWS Service Limits
- Lambda execution time: 15 minutes maximum
- Lambda memory: Recommended 1024MB minimum
- KVS GetClip API: 200 fragments per request
AWS KVS GetClip API Quota
The Kinesis Video Streams GetClip API has a quota of 200 fragments per stream per second. This is a hard limit imposed by AWS as documented in the API Reference.
Impact on Implementation
- Batch Size Limitation
- Our implementation processes fragments in batches of 200 to comply with this quota
- Each batch corresponds to one GetClip API call
- Ensures we don’t exceed the API’s rate limit
- Performance Considerations
- Fragment duration typically ranges from 2-10 seconds
- 200 fragments could represent 400-2000 seconds of video
- Processing time varies based on fragment size and content
- Rate Limiting Strategy
// Example of how we handle the 200 fragment limit public List<VideoExportBatch> createBatches(List<Fragment> fragments, String s3Path) { List<VideoExportBatch> batches = new ArrayList<>(); int totalFragments = fragments.size(); for (int i = 0; i < totalFragments; i += MAX_FRAGMENTS_PER_REQUEST) { int endIndex = Math.min(i + MAX_FRAGMENTS_PER_REQUEST, totalFragments); List<Fragment> batchFragments = fragments.subList(i, endIndex); batches.add(new VideoExportBatch(batchFragments, s3Path, batches.size() + 1)); } return batches; }
- Error Handling
- If a batch fails, only that specific 200-fragment segment needs to be retried
- Subsequent batches can continue processing
- Progress tracking via
lastExportedTimestamp
enables resumption
- Best Practices
- Always process fragments sequentially within the 200-fragment limit
- Include sufficient wait time between batch processing
- Monitor API throttling errors and adjust timing if needed
- Use Step Functions for orchestrating multiple batches
Required IAM Permissions
Operation | AWS Action | Purpose |
---|---|---|
Get KVS endpoint | kinesisvideo:GetDataEndpoint | Get endpoint for KVS operations |
List fragments | kinesisvideo:ListFragments | List video fragments in time range |
Get video clip | kinesisvideo:GetClip | Export video segment as MP4 |
Upload to S3 | s3:PutObject | Store exported video in S3 |
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesisvideo:GetDataEndpoint",
"kinesisvideo:GetClip",
"kinesisvideo:ListFragments"
],
"Resource": "*"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::your-bucket/*"
}
]
}
Building and Deployment
1. Execute the Java Code Below provided in the Implementation Guide
Copy the provided Java code into your project structure:
- Create the model classes in the
com.amazonaws.videoanalytics.videologistics.model
package - Create the main handler in
com.amazonaws.videoanalytics.videologistics.KVSClipExportHandler
- Create the utility class in
com.amazonaws.videoanalytics.videologistics.util.VideoExportUtil
2. Creating the FFmpeg Lambda Layer
# Create layer directory
mkdir -p ffmpeg-layer/bin
cd ffmpeg-layer
# Download and extract FFmpeg
curl -O https://johnvansickle.com/ffmpeg/builds/ffmpeg-git-amd64-static.tar.xz
tar xf ffmpeg-git-amd64-static.tar.xz
cp ffmpeg-git-*/ffmpeg bin/
cp ffmpeg-git-*/ffprobe bin/
# Create layer ZIP
zip -r ffmpeg-layer.zip bin/
# Publish layer
aws lambda publish-layer-version \
--layer-name ffmpeg \
--description "FFmpeg for video processing" \
--zip-file fileb://ffmpeg-layer.zip \
--compatible-runtimes java17
3. Deploying with CDK
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
export class VideoExportStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// FFmpeg Layer
const ffmpegLayer = lambda.LayerVersion.fromLayerVersionArn(this, 'FFmpegLayer',
'arn:aws:lambda:${region}:${account}:layer:ffmpeg:1'
);
// Lambda Function
const kvsExportLambda = new lambda.Function(this, 'KVSClipExportFunction', {
runtime: lambda.Runtime.JAVA_17,
code: lambda.Code.fromAsset('../build/libs/video-export-lambda-all.jar'),
handler: 'com.amazonaws.videoanalytics.videologistics.KVSClipExportHandler::handleRequest',
timeout: cdk.Duration.minutes(15),
memorySize: 1024,
layers: [ffmpegLayer],
environment: {
AWS_REGION: cdk.Stack.of(this).region,
PATH: '/var/task/bin:/opt/bin'
}
});
// Add permissions
kvsExportLambda.addToRolePolicy(new iam.PolicyStatement({
actions: [
'kinesisvideo:GetDataEndpoint',
'kinesisvideo:GetClip',
'kinesisvideo:ListFragments'
],
resources: ['*']
}));
kvsExportLambda.addToRolePolicy(new iam.PolicyStatement({
actions: ['s3:PutObject'],
resources: ['arn:aws:s3:::your-bucket/*']
}));
}
}
Implementation Details
1. Required Dependencies
Add these dependencies to your build.gradle
:
dependencies {
implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.261')
implementation 'com.amazonaws:aws-lambda-java-core:1.2.2'
implementation 'com.amazonaws:aws-lambda-java-events:3.11.0'
implementation 'com.amazonaws:aws-java-sdk-kinesisvideo:1.12.261'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.261'
implementation 'org.bytedeco:javacv-platform:1.5.7'
implementation 'org.bytedeco:ffmpeg-platform:5.0-1.5.7'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'ch.qos.logback:logback-classic:1.2.11'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
}
2. Request and Response Models
public class KVSExportRequest {
private String streamName;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
private LocalDateTime startTime;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
private LocalDateTime endTime;
private String s3Path;
// Builder pattern implementation
public static class Builder {
private String streamName;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String s3Path;
public Builder streamName(String streamName) {
this.streamName = streamName;
return this;
}
public Builder startTime(LocalDateTime startTime) {
this.startTime = startTime;
return this;
}
public Builder endTime(LocalDateTime endTime) {
this.endTime = endTime;
return this;
}
public Builder s3Path(String s3Path) {
this.s3Path = s3Path;
return this;
}
public KVSExportRequest build() {
validateFields();
KVSExportRequest request = new KVSExportRequest();
request.streamName = this.streamName;
request.startTime = this.startTime;
request.endTime = this.endTime;
request.s3Path = this.s3Path;
return request;
}
private void validateFields() {
if (streamName == null || streamName.isEmpty()) {
throw new IllegalArgumentException("streamName cannot be null or empty");
}
if (startTime == null) {
throw new IllegalArgumentException("startTime cannot be null");
}
if (endTime == null) {
throw new IllegalArgumentException("endTime cannot be null");
}
if (endTime.isBefore(startTime)) {
throw new IllegalArgumentException("endTime cannot be before startTime");
}
if (s3Path == null || s3Path.isEmpty()) {
throw new IllegalArgumentException("s3Path cannot be null or empty");
}
if (!s3Path.startsWith("s3://")) {
throw new IllegalArgumentException("s3Path must start with 's3://'");
}
if (!s3Path.endsWith(".mp4")) {
throw new IllegalArgumentException("s3Path must end with '.mp4'");
}
}
}
public static Builder builder() {
return new Builder();
}
// Getters
public String getStreamName() { return streamName; }
public LocalDateTime getStartTime() { return startTime; }
public LocalDateTime getEndTime() { return endTime; }
public String getS3Path() { return s3Path; }
}
public class KVSExportResponse {
private String s3Path;
private LocalDateTime lastExportedTimestamp;
private boolean complete;
// Builder pattern implementation
public static class Builder {
private String s3Path;
private LocalDateTime lastExportedTimestamp;
private boolean complete;
public Builder s3Path(String s3Path) {
this.s3Path = s3Path;
return this;
}
public Builder lastExportedTimestamp(LocalDateTime lastExportedTimestamp) {
this.lastExportedTimestamp = lastExportedTimestamp;
return this;
}
public Builder complete(boolean complete) {
this.complete = complete;
return this;
}
public KVSExportResponse build() {
KVSExportResponse response = new KVSExportResponse();
response.s3Path = this.s3Path;
response.lastExportedTimestamp = this.lastExportedTimestamp;
response.complete = this.complete;
return response;
}
}
public static Builder builder() {
return new Builder();
}
// Getters
public String getS3Path() { return s3Path; }
public LocalDateTime getLastExportedTimestamp() { return lastExportedTimestamp; }
public boolean isComplete() { return complete; }
}
public class VideoExportBatch {
private static final DateTimeFormatter TIMESTAMP_FORMAT =
DateTimeFormatter.ofPattern("HH:mm:ss,SSS");
private final List<Fragment> fragments;
private final LocalDateTime startTime;
private final LocalDateTime endTime;
private final String s3Path;
private final int batchNumber;
public VideoExportBatch(List<Fragment> fragments, LocalDateTime startTime,
LocalDateTime endTime, String s3Path, int batchNumber) {
this.fragments = fragments;
this.startTime = startTime;
this.endTime = endTime;
this.s3Path = s3Path;
this.batchNumber = batchNumber;
validateInput(startTime, endTime, s3Path);
}
private void validateInput(LocalDateTime startTime, LocalDateTime endTime, String s3Path) {
Objects.requireNonNull(startTime, "startTime cannot be null");
Objects.requireNonNull(endTime, "endTime cannot be null");
Objects.requireNonNull(s3Path, "s3Path cannot be null");
if (endTime.isBefore(startTime)) {
throw new IllegalArgumentException("endTime cannot be before startTime");
}
if (!s3Path.startsWith("s3://")) {
throw new IllegalArgumentException("s3Path must start with 's3://'");
}
}
public String generateSrtContent() {
if (fragments == null || fragments.isEmpty()) {
return "";
}
StringBuilder srt = new StringBuilder();
int subtitleNumber = 1;
for (Fragment fragment : fragments) {
LocalDateTime fragmentStart = LocalDateTime.ofInstant(
fragment.getProducerTimestamp().toInstant(), ZoneOffset.UTC);
LocalDateTime fragmentEnd = fragmentStart.plusNanos(
fragment.getFragmentLengthInMilliseconds() * 1_000_000L);
if (fragmentEnd.isAfter(endTime)) {
fragmentEnd = endTime;
}
if (fragmentStart.isBefore(startTime)) {
fragmentStart = startTime;
}
if (fragmentStart.isBefore(fragmentEnd)) {
srt.append(subtitleNumber).append("\n");
srt.append(formatTimestamp(fragmentStart))
.append(" --> ")
.append(formatTimestamp(fragmentEnd))
.append("\n");
srt.append("Fragment: ")
.append(fragment.getFragmentNumber())
.append("\n\n");
subtitleNumber++;
}
}
return srt.toString();
}
private String formatTimestamp(LocalDateTime time) {
return time.format(TIMESTAMP_FORMAT);
}
// Getters
public LocalDateTime getStartTime() { return startTime; }
public LocalDateTime getEndTime() { return endTime; }
public String getS3Path() { return s3Path; }
public int getBatchNumber() { return batchNumber; }
}
2. Main Lambda Handler
@Override
public KVSExportResponse handleRequest(KVSExportRequest request, Context context) {
logger.info("Processing export request for stream: {}", request.getStreamName());
try {
// Get KVS archive media client
String endpointUrl = getArchiveMediaEndpoint(request.getStreamName());
AmazonKinesisVideoArchivedMedia archiveMediaClient = createArchiveMediaClient(endpointUrl);
// List fragments
ListFragmentsResult fragments = archiveMediaClient.listFragments(new ListFragmentsRequest()
.withStreamName(request.getStreamName())
.withFragmentSelector(new FragmentSelector()
.withFragmentSelectorType(FragmentSelectorType.PRODUCER_TIMESTAMP)
.withTimestampRange(new TimestampRange()
.withStartTimestamp(Date.from(request.getStartTime().toInstant(ZoneOffset.UTC)))
.withEndTimestamp(Date.from(request.getEndTime().toInstant(ZoneOffset.UTC))))));
if (fragments.getFragments().isEmpty()) {
logger.info("No fragments found in the specified time range");
return KVSExportResponse.builder()
.s3Path(request.getS3Path())
.complete(true)
.lastExportedTimestamp(request.getStartTime())
.build();
}
// Process fragments in batches
List<Fragment> currentBatch = new ArrayList<>();
LocalDateTime lastExportedTimestamp = request.getStartTime();
boolean isComplete = true;
int batchNumber = 1;
for (Fragment fragment : fragments.getFragments()) {
// Check remaining time
if (context.getRemainingTimeInMillis() < MIN_REMAINING_TIME_MS) {
logger.warn("Insufficient remaining time ({}ms). Stopping early.",
context.getRemainingTimeInMillis());
isComplete = false;
break;
}
currentBatch.add(fragment);
// Process batch if it's full or it's the last fragment
if (currentBatch.size() >= MAX_FRAGMENTS_PER_BATCH ||
fragment == fragments.getFragments().get(fragments.getFragments().size() - 1)) {
LocalDateTime batchStartTime = LocalDateTime.ofInstant(
currentBatch.get(0).getProducerTimestamp().toInstant(),
ZoneOffset.UTC
);
LocalDateTime batchEndTime = LocalDateTime.ofInstant(
currentBatch.get(currentBatch.size() - 1).getProducerTimestamp().toInstant(),
ZoneOffset.UTC
);
VideoExportBatch batch = new VideoExportBatch(
new ArrayList<>(currentBatch),
batchStartTime,
batchEndTime,
request.getS3Path(),
batchNumber++
);
VideoExportUtil.processBatch(batch, archiveMediaClient, request.getStreamName(), s3Client);
lastExportedTimestamp = batchEndTime;
currentBatch.clear();
}
}
return KVSExportResponse.builder()
.s3Path(request.getS3Path())
.complete(isComplete)
.lastExportedTimestamp(lastExportedTimestamp)
.build();
} catch (Exception e) {
logger.error("Error processing video export", e);
throw new RuntimeException("Failed to process video export", e);
}
}
3. Video Processing Utility
public static void processBatch(VideoExportBatch batch, AmazonKinesisVideoArchivedMedia archiveMediaClient,
String streamName, AmazonS3 s3Client) {
Path tempDir = null;
try {
// Create temporary directory
tempDir = Files.createTempDirectory(TEMP_DIR + File.separator + "kvs_export_");
logger.info("Created temp directory: {}", tempDir);
// Generate SRT file
Path srtPath = tempDir.resolve("subtitles.srt");
Files.writeString(srtPath, batch.generateSrtContent());
logger.info("Generated SRT file at: {}", srtPath);
// Download video clip
Path videoPath = tempDir.resolve("clip.mp4");
downloadClip(batch, archiveMediaClient, streamName, videoPath);
logger.info("Downloaded video clip to: {}", videoPath);
// Merge video and subtitles
Path outputPath = tempDir.resolve("output.mp4");
mergeVideoAndSubtitles(videoPath, srtPath, outputPath);
logger.info("Merged video and subtitles to: {}", outputPath);
// Upload to S3
uploadToS3(outputPath.toFile(), batch.getS3Path(), s3Client);
logger.info("Uploaded processed video to S3: {}", batch.getS3Path());
} catch (Exception e) {
logger.error("Error processing batch {}", batch.getBatchNumber(), e);
throw new RuntimeException("Error processing video batch", e);
} finally {
// Cleanup temporary files
if (tempDir != null) {
try {
Files.walk(tempDir)
.map(Path::toFile)
.forEach(File::delete);
Files.delete(tempDir);
logger.info("Cleaned up temporary directory: {}", tempDir);
} catch (IOException e) {
logger.warn("Failed to cleanup temporary directory: {}", tempDir, e);
}
}
}
}
Usage
Lambda Input Format
{
"streamName": "your-kvs-stream",
"startTime": "2024-01-01T00:00:00Z",
"endTime": "2024-01-01T00:05:00Z",
"s3Path": "s3://your-bucket/exports/video.mp4"
}
Lambda Response Format
{
"s3Path": "s3://your-bucket/exports/video.mp4",
"lastExportedTimestamp": "2024-01-01T00:05:00Z",
"isComplete": true
}
Handling Long Exports
For exports that exceed Lambda’s 15-minute timeout, the function returns:
isComplete
: falselastExportedTimestamp
: timestamp of last processed fragment Use these values to continue the export in subsequent requests.
Async Workflow Implementation
For long-running exports, we implement an async workflow using AWS Step Functions:
1. State Machine Definition
{
"Comment": "KVS Video Export State Machine",
"StartAt": "InitializeExport",
"States": {
"InitializeExport": {
"Type": "Pass",
"Next": "ExportVideoSegment",
"Result": {
"streamName.$": "$.streamName",
"startTime.$": "$.startTime",
"endTime.$": "$.endTime",
"s3Path.$": "$.s3Path",
"isComplete": false
}
},
"ExportVideoSegment": {
"Type": "Task",
"Resource": "${KVSExportLambdaArn}",
"Next": "CheckExportComplete",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 30,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleError"
}
]
},
"CheckExportComplete": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.isComplete",
"BooleanEquals": true,
"Next": "ExportComplete"
}
],
"Default": "WaitBeforeNextBatch"
},
"WaitBeforeNextBatch": {
"Type": "Wait",
"Seconds": 5,
"Next": "PrepareNextBatch"
},
"PrepareNextBatch": {
"Type": "Pass",
"Next": "ExportVideoSegment",
"Result": {
"streamName.$": "$.streamName",
"startTime.$": "$.lastExportedTimestamp",
"endTime.$": "$.endTime",
"s3Path.$": "$.s3Path",
"isComplete.$": "$.isComplete"
}
},
"ExportComplete": {
"Type": "Pass",
"End": true,
"Result": {
"status": "COMPLETED",
"s3Path.$": "$.s3Path",
"startTime.$": "$.startTime",
"endTime.$": "$.endTime"
}
},
"HandleError": {
"Type": "Pass",
"End": true,
"Result": {
"status": "FAILED",
"error.$": "$.error",
"cause.$": "$.cause"
}
}
}
}
2. Step Functions CDK Implementation
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
// Create Step Functions Task
const exportTask = new tasks.LambdaInvoke(this, 'ExportVideoSegment', {
lambdaFunction: kvsExportLambda,
outputPath: '$.Payload'
});
// Create Wait state
const wait = new sfn.Wait(this, 'WaitBeforeNextBatch', {
time: sfn.WaitTime.duration(cdk.Duration.seconds(5))
});
// Create Choice state
const isComplete = new sfn.Choice(this, 'CheckExportComplete')
.when(sfn.Condition.booleanEquals('$.isComplete', true),
new sfn.Pass(this, 'ExportComplete'))
.otherwise(new sfn.Pass(this, 'PrepareNextBatch')
.next(exportTask));
// Create State Machine
const stateMachine = new sfn.StateMachine(this, 'VideoExportStateMachine', {
definition: exportTask
.next(isComplete)
.next(wait),
timeout: cdk.Duration.hours(24)
});
Verification Steps
1. Verify Lambda Deployment
- Go to AWS Lambda console and check that your function exists with the correct configuration:
- Runtime: Java 17
- Memory: At least 1024MB
- Timeout: 15 minutes
- FFmpeg layer attached
- Required IAM permissions attached
2. Verify Step Functions Deployment
- Go to AWS Step Functions console and verify:
- State machine exists with correct definition
- IAM role has necessary permissions
- Execution history shows successful test runs
3. Test Video Export
- Start a test execution with sample input:
{ "streamName": "your-kvs-stream", "startTime": "2024-01-01T00:00:00Z", "endTime": "2024-01-01T00:05:00Z", "s3Path": "s3://your-bucket/exports/test.mp4" }
- Monitor execution in Step Functions console
- Check CloudWatch logs for Lambda function
- Verify exported video in S3 bucket:
- File exists at specified path
- File size is appropriate
- Video plays correctly with embedded timestamps
4. Verify Error Handling
- Test with invalid stream name
- Test with non-existent time range
- Test with invalid S3 path
- Verify error states are handled gracefully
5. Performance Verification
- Monitor Lambda execution time
- Check memory utilization
- Verify no API throttling errors
- Confirm successful cleanup of temporary files
Best Practices
1. Error Handling
- Implement comprehensive retry policies
- Handle transient failures gracefully
- Clean up temporary files in finally blocks
- Log errors with sufficient context
2. Performance
- Use appropriate batch sizes (max 200 fragments)
- Monitor memory usage
- Implement continuation tokens for long exports
- Configure FFmpeg parameters based on requirements
Common Issues
- Lambda Timeout
- Solution: Use Step Functions for orchestration
- Implement continuation token logic
- Memory Issues
- Solution: Increase Lambda memory
- Optimize batch size
- Monitor memory usage
- FFmpeg Layer Problems
- Solution: Verify layer ARN
- Check PATH environment variable
- Validate FFmpeg installation
- KVS Fragment Access
- Solution: Check IAM permissions
- Verify fragment retention period
- Validate stream name and time range