Simple Mapper Class for NDB on App Engine
This class is based on the db mapper found in remote_api article. But using ndb, the purpose of this is if you want to iterate through a lot of entities but not enough time to do it on request time. So this library helps you create a map of your entities of given kind.
You should use this in cases like, deleting users that requested for deletion or updating counters for specific filters.
Here is the NDB version of the Mapper. I have added a bit of improvement that I have used for in the past. -Edit- I have removed the memcache ability to stop duplicates. It should just now be handled with the task scope, like taskname or different filters per task (can be done with different initial data and overriding the get_query method and use it as some filter).
Then here is a sample usage:
You can use this on both frontend and backend instances, the 10 minute limit should be handled automatically and continue from the last successful batch. Then to run this using a deferred library or if you will run it in cron just create a handler that simply runs it:
You should use this in cases like, deleting users that requested for deletion or updating counters for specific filters.
Here is the NDB version of the Mapper. I have added a bit of improvement that I have used for in the past. -Edit- I have removed the memcache ability to stop duplicates. It should just now be handled with the task scope, like taskname or different filters per task (can be done with different initial data and overriding the get_query method and use it as some filter).
import logging from google.appengine.ext import deferred, ndb from google.appengine.runtime import DeadlineExceededError class Mapper(object): def __init__(self, use_cache=False): ndb.get_context().set_cache_policy(use_cache) if not use_cache: ndb.get_context().clear_cache() self.kind = None self.to_put = [] self.to_delete = [] self.terminate = False # Data you wanna carry on in case of error self.data = None # Temporary Data that won't carry on in case of error self.tmp_data = None self.filters = [] self.orders = [] self.keys_only = False # implement init for different initializations self.init() def delete(self, entity): self.to_delete.append(entity if isinstance(entity, ndb.Key) else entity.key) def update(self, entity): self.to_put.append(entity) def map(self, entity): """Updates a single entity. Implementers should return a tuple containing two iterables (to_update, to_delete). """ def init(self): # initialize variables pass def deadline_error(self): # on deadline error execute pass def finish(self): """Called when the mapper has finished, to allow for any final work to be done.""" pass def get_query(self): """Returns a query over the specified kind, with any appropriate filters applied.""" q = self.kind.query() for filter in self.filters: q = q.filter(filter) for order in self.orders: q = q.order(order) return q def run(self, batch_size=100, initial_data=None): if initial_data is None: initial_data = self.data """Starts the mapper running.""" if hasattr(self, '_pre_run_hook'): getattr(self, '_pre_run_hook')() self._continue(None, batch_size, initial_data) def _batch_write(self): """Writes updates and deletes entities in a batch.""" if self.to_put: ndb.put_multi(self.to_put) del self.to_put[:] if self.to_delete: ndb.delete_multi(self.to_delete) del self.to_delete[:] def _continue(self, cursor, batch_size, data): self.data = data q = self.get_query() if q is None: self.finish() return # If we're resuming, pick up where we left off last time. iter = q.iter(produce_cursors=True, start_cursor=cursor, keys_only=self.keys_only) try: # Steps over the results, returning each entity and its index. i = 0 while iter.has_next(): entity = iter.next() self.map(entity) # Do updates and deletes in batches. if (i + 1) % batch_size == 0: # Record the last entity we processed. self._batch_write() i += 1 if self.terminate: break self._batch_write() except DeadlineExceededError: # Write any unfinished updates to the datastore. self._batch_write() self.deadline_error() # Queue a new task to pick up where we left off. deferred.defer(self._continue, iter.cursor_after(), batch_size, self.data) logging.error(self.__class__.__name__ + ' DeadlineExceedError') return self.finish()
Then here is a sample usage:
from google.appengine.ext.ndb import blobstore class DeleteUser(Mapper): def init(self): self.kind = User # Im using a generic property cause it was an # expando model where I added this on their deletion request # then gave the user enough time to undelete with a future date. self.filters = [ndb.GenericProperty('deleted') <= datetime.datetime.now()] def map(self, user): # Sample usage why you want to run this in a mapper blobstore.delete_multi(user.photos) # mini batches here for_delete = [] for comment_key in Comment.query(Comment.user == user.key).iter(keys_only=True): for_delete.append(comment_key) if len(for_delete) >= 100: ndb.delete_multi(for_delete) for_delete = [] ndb.delete_multi(for_delete) # and more, the more you do here probably the best to make the batches small # to avoid having to duplicate runs on a failure self.delete(user)
You can use this on both frontend and backend instances, the 10 minute limit should be handled automatically and continue from the last successful batch. Then to run this using a deferred library or if you will run it in cron just create a handler that simply runs it:
# on a handler deleteUser = DeleteUser() deleteUser.run(1) # I made batch 1 since we are doing a lot of things # with deferred library (For someone not familiar, It's a convenient library for taskqueue) from google.appengine.ext import deferred # anything that starts with _ is for taskqueue api, before that is for your method deferred.defer(deleteUser.run, 1, _target='backend_name_if_you_want', _name='a_name_to_avoid_dups')
Comments
Post a Comment