My table is around 220mb with 250k records within it. I'm trying to pull all of this data into python. I realize this needs to be a chunked batch process and looped through, but I'm not sure how I can set the batches to start where the previous left off.
Is there some way to filter my scan? From what I read that filtering occurs after loading and the loading stops at 1mb so I wouldn't actually be able to scan in new objects.
Any assistance would be appreciated.
import boto3dynamodb = boto3.resource('dynamodb',aws_session_token = aws_session_token,aws_access_key_id = aws_access_key_id,aws_secret_access_key = aws_secret_access_key,region_name = region)table = dynamodb.Table('widgetsTableName')data = table.scan()
Best Answer
I think the Amazon DynamoDB documentation regarding table scanning answers your question.
In short, you'll need to check for LastEvaluatedKey
in the response. Here is an example using your code:
import boto3dynamodb = boto3.resource('dynamodb',aws_session_token=aws_session_token,aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=region)table = dynamodb.Table('widgetsTableName')response = table.scan()data = response['Items']while 'LastEvaluatedKey' in response:response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])data.extend(response['Items'])
DynamoDB limits the scan
method to 1mb of data per scan.
Documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
Here is an example loop to get all the data from a DynamoDB table using LastEvaluatedKey
:
import boto3client = boto3.client('dynamodb')def dump_table(table_name):results = []last_evaluated_key = Nonewhile True:if last_evaluated_key:response = client.scan(TableName=table_name,ExclusiveStartKey=last_evaluated_key)else: response = client.scan(TableName=table_name)last_evaluated_key = response.get('LastEvaluatedKey')results.extend(response['Items'])if not last_evaluated_key:breakreturn results# Usagedata = dump_table('your-table-name')# do something with data
boto3 offers paginators that handle all the pagination details for you. Here is the doc page for the scan paginator. Basically, you would use it like so:
import boto3client = boto3.client('dynamodb')paginator = client.get_paginator('scan')for page in paginator.paginate():# do something
Riffing off of Jordon Phillips's answer, here's how you'd pass a FilterExpression
in with the pagination:
import boto3client = boto3.client('dynamodb')paginator = client.get_paginator('scan')operation_parameters = {'TableName': 'foo','FilterExpression': 'bar > :x AND bar < :y','ExpressionAttributeValues': {':x': {'S': '2017-01-31T01:35'},':y': {'S': '2017-01-31T02:08'},}}page_iterator = paginator.paginate(**operation_parameters)for page in page_iterator:# do something
Code for deleting dynamodb format type as @kungphu mentioned.
import boto3from boto3.dynamodb.types import TypeDeserializerfrom boto3.dynamodb.transform import TransformationInjectorclient = boto3.client('dynamodb')paginator = client.get_paginator('query')service_model = client._service_model.operation_model('Query')trans = TransformationInjector(deserializer = TypeDeserializer())for page in paginator.paginate():trans.inject_attribute_value_output(page, service_model)
Turns out that Boto3 captures the "LastEvaluatedKey" as part of the returned response. This can be used as the start point for a scan:
data= table.scan(ExclusiveStartKey=data['LastEvaluatedKey'])
I plan on building a loop around this until the returned data is only the ExclusiveStartKey
The 2 approaches suggested above both have problems: Either writing lengthy and repetitive code that handles paging explicitly in a loop, or using Boto paginators with low-level sessions, and foregoing the advantages of higher-level Boto objects.
A solution using Python functional code to provide a high-level abstraction allows higher-level Boto methods to be used, while hiding the complexity of AWS paging:
import itertoolsimport typingdef iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:"""A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items forevery responseArgs:function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'This could be a bound method of an object.Returns:A generator which yields the 'Items' field of the result for every response"""response = function_returning_response(*args, **kwargs)yield response["Items"]while "LastEvaluatedKey" in response:kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]response = function_returning_response(*args, **kwargs)yield response["Items"]returndef iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:"""A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.Items are yielded to the caller as soon as they are received.Args:function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'This could be a bound method of an object.Returns:An iterator which yields one response item at a time"""return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))# Example, assuming 'table' is a Boto DynamoDB table object:all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))
If you are landing here looking for a paginated scan with some filtering expression(s):
def scan(table, **kwargs):response = table.scan(**kwargs)yield from response['Items']while response.get('LastEvaluatedKey'):response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)yield from response['Items']
Example usage:
table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')items = list(scan(table, FilterExpression=Attr('name').contains('foo')))
I had some problems with Vincent's answer related to the transformation being applied to the LastEvaluatedKey and messing up the pagination. Solved as follows:
import boto3from boto3.dynamodb.types import TypeDeserializerfrom boto3.dynamodb.transform import TransformationInjectorclient = boto3.client('dynamodb')paginator = client.get_paginator('scan')operation_model = client._service_model.operation_model('Scan')trans = TransformationInjector(deserializer = TypeDeserializer())operation_parameters = {'TableName': 'tablename', }items = []for page in paginator.paginate(**operation_parameters):has_last_key = 'LastEvaluatedKey' in pageif has_last_key:last_key = page['LastEvaluatedKey'].copy()trans.inject_attribute_value_output(page, operation_model)if has_last_key:page['LastEvaluatedKey'] = last_keyitems.extend(page['Items'])
I can't work out why Boto3 provides high-level resource abstraction but doesn't provide pagination. When it does provide pagination, it's hard to use!
The other answers to this question were good but I wanted a super simple way to wrap the boto3 methods and provide memory-efficient paging using generators:
import typingimport boto3import boto3.dynamodb.conditionsdef paginate_dynamodb_response(dynamodb_action: typing.Callable, **kwargs) -> typing.Generator[dict, None, None]:# Using the syntax from https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/dynamodb/GettingStarted/scenario_getting_started_movies.pykeywords = kwargsdone = Falsestart_key = Nonewhile not done:if start_key:keywords['ExclusiveStartKey'] = start_keyresponse = dynamodb_action(**keywords)start_key = response.get('LastEvaluatedKey', None)done = start_key is Nonefor item in response.get("Items", []):yield item## Usage ##dynamodb_res = boto3.resource('dynamodb')dynamodb_table = dynamodb_res.Table('my-table')query = paginate_dynamodb_response(dynamodb_table.query, # The boto3 method. E.g. query or scan# Regular Query or Scan parameters## IndexName='myindex' # If requiredKeyConditionExpression=boto3.dynamodb.conditions.Key('id').eq('1234'))for x in query:print(x)```