This class is based on the db mapper found in
remote_api article. But using ndb, the purpose of this is if you want to iterate through a lot of entities but not enough time to do it on request time. So this library helps you create a map of your entities of given kind.
You should use this in cases like, deleting users that requested for deletion or updating counters for specific filters.
Here is the NDB version of the Mapper. I have added a bit of improvement that I have used for in the past. Such as memcache for keeping track of running tasks. But this is limited to a mapper at a time, if you have different filters you can add that as a different key. This is also in conjunction with naming my task differently and catch a duplicate task entry to avoid it.
import datetime
import logging
from google.appengine.api import memcache
from google.appengine.ext import deferred, ndb
from google.appengine.runtime import DeadlineExceededError
class Mapper(object):
prefix_key = 'mapper_'
def __init__(self, use_cache=False):
ndb.get_context().set_cache_policy(use_cache)
if not use_cache:
ndb.get_context().clear_cache()
self.KIND = None
self.to_put = []
self.to_delete = []
self.terminate = False
# Data you wanna carry on in case of error
self.DATA = None
# Temporary Data that won't carry on in case of error
self.TMP_DATA = None
self.FILTERS = []
self.ORDERS = []
# implement init for different initializations
self.init()
def delete(self, entity):
self.to_delete.append(entity.key)
def update(self, entity):
self.to_put.append(entity)
def map(self, entity):
"""Map a single entity"""
def init(self):
# initialize variables
pass
def deadline_error(self):
# on deadline error execute
pass
def finish(self):
"""Called when the mapper has finished, to allow for any final work to be done."""
pass
def get_query(self):
"""Returns a query over the specified kind, with any appropriate filters applied."""
q = self.KIND.query()
for filter in self.FILTERS:
q = q.filter(filter)
for order in self.ORDERS:
q = q.order(order)
return q
def run(self, batch_size=100, initial_data=None):
if initial_data is None:
initial_data = self.DATA
"""Starts the mapper running."""
if hasattr(self, '_pre_run_hook'):
getattr(self, '_pre_run_hook')()
self._continue(None, batch_size, initial_data)
def _batch_write(self):
"""Writes updates and deletes entities in a batch."""
if self.to_put:
ndb.put_multi(self.to_put)
del self.to_put[:]
if self.to_delete:
ndb.delete_multi(self.to_delete)
del self.to_delete[:]
def _continue(self, cursor, batch_size, data):
self.DATA = data
q = self.get_query()
if q is None:
self.finish()
return
# If we're resuming, pick up where we left off last time.
iter = q.iter(produce_cursors=True, start_cursor=cursor)
# Keep updating records until we run out of time.
cache_id = self.prefix_key + self.__class__.__name__
try:
# create a 10 minute cache
start_time = datetime.datetime.now()
memcache.set(cache_id, start_time, 60 * 10)
# Steps over the results, returning each entity and its index.
i = 0
while iter.has_next():
entity = iter.next()
self.map(entity)
# Do updates and deletes in batches.
if (i + 1) % batch_size == 0:
# Record the last entity we processed.
self._batch_write()
# check if time has expired
if (datetime.datetime.now() - start_time).seconds > 60 * 10:
start_time = datetime.datetime.now()
memcache.set(cache_id, start_time, 60 * 10)
i += 1
if self.terminate:
break
self._batch_write()
except DeadlineExceededError:
# Write any unfinished updates to the datastore.
self._batch_write()
self.deadline_error()
# Queue a new task to pick up where we left off.
deferred.defer(self._continue, iter.cursor_after(), batch_size, self.DATA)
logging.error(self.__class__.__name__ + ' DeadlineExceedError')
return
self.finish()
memcache.delete(cache_id)
Then here is a sample usage:
from google.appengine.ext.ndb import blobstore
class DeleteUser(Mapper):
def init(self):
self.KIND = User
# Im using a generic property cause it was an
# expando model where I added this on their deletion request
# then gave the user enough time to undelete with a future date.
self.FILTERS = [ndb.GenericProperty('deleted') <= datetime.datetime.now()]
def map(self, user):
# Sample usage why you want to run this in a mapper
blobstore.delete_multi(user.photos)
# mini batches here
for_delete = []
for comment_key in Comment.query(Comment.user == user.key).iter(keys_only=True):
for_delete.append(comment_key)
if len(for_delete) >= 100:
ndb.delete_multi(for_delete)
for_delete = []
ndb.delete_multi(for_delete)
# and more, the more you do here probably the best to make the batches small
# to avoid having to duplicate runs on a failure
self.delete(user)
You can use this on both frontend and backend instances, the 10 minute limit should be handled automatically and continue from the last successful batch. Then to run this using a deferred library or if you will run it in cron just create a handler that simply runs it:
# on a handler
deleteUser = DeleteUser()
deleteUser.run(1) # I made batch 1 since we are doing a lot of things
# with deferred library (For someone not familiar, It's a convenient library for taskqueue)
from google.appengine.ext import deferred
# anything that starts with _ is for taskqueue api, before that is for your method
deferred.defer(deleteUser.run, 1, _target='backend_name_if_you_want', _name='a_name_to_avoid_dups')