Wednesday, May 1, 2013

Simple Mapper Class for NDB on App Engine

This class is based on the db mapper found in remote_api article. But using ndb, the purpose of this is if you want to iterate through a lot of entities but not enough time to do it on request time. So this library helps you create a map of your entities of given kind.

You should use this in cases like, deleting users that requested for deletion or updating counters for specific filters.

Here is the NDB version of the Mapper. I have added a bit of improvement that I have used for in the past. -Edit- I have removed the memcache ability to stop duplicates. It should just now be handled with the task scope, like taskname or different filters per task (can be done with different initial data and overriding the get_query method and use it as some filter).


import logging
from google.appengine.ext import deferred, ndb
from google.appengine.runtime import DeadlineExceededError


class Mapper(object):

    def __init__(self, use_cache=False):
        ndb.get_context().set_cache_policy(use_cache)
        if not use_cache:
            ndb.get_context().clear_cache()

        self.kind = None
        self.to_put = []
        self.to_delete = []
        self.terminate = False
        # Data you wanna carry on in case of error
        self.data = None
        # Temporary Data that won't carry on in case of error
        self.tmp_data = None
        self.filters = []
        self.orders = []
        self.keys_only = False
        # implement init for different initializations
        self.init()

    def delete(self, entity):
        self.to_delete.append(entity if isinstance(entity, ndb.Key) else entity.key)

    def update(self, entity):
        self.to_put.append(entity)

    def map(self, entity):
        """Updates a single entity.

        Implementers should return a tuple containing two iterables (to_update, to_delete).
        """

    def init(self):
        # initialize variables
        pass

    def deadline_error(self):
        # on deadline error execute
        pass

    def finish(self):
        """Called when the mapper has finished, to allow for any final work to be done."""
        pass

    def get_query(self):
        """Returns a query over the specified kind, with any appropriate filters applied."""
        q = self.kind.query()
        for filter in self.filters:
            q = q.filter(filter)
        for order in self.orders:
            q = q.order(order)

        return q

    def run(self, batch_size=100, initial_data=None):
        if initial_data is None:
            initial_data = self.data
        """Starts the mapper running."""
        if hasattr(self, '_pre_run_hook'):
            getattr(self, '_pre_run_hook')()

        self._continue(None, batch_size, initial_data)

    def _batch_write(self):
        """Writes updates and deletes entities in a batch."""
        if self.to_put:
            ndb.put_multi(self.to_put)
            del self.to_put[:]
        if self.to_delete:
            ndb.delete_multi(self.to_delete)
            del self.to_delete[:]

    def _continue(self, cursor, batch_size, data):
        self.data = data
        q = self.get_query()
        if q is None:
            self.finish()
            return
        # If we're resuming, pick up where we left off last time.
        iter = q.iter(produce_cursors=True, start_cursor=cursor, keys_only=self.keys_only)
        try:
            # Steps over the results, returning each entity and its index.
            i = 0
            while iter.has_next():
                entity = iter.next()
                self.map(entity)
                # Do updates and deletes in batches.
                if (i + 1) % batch_size == 0:
                    # Record the last entity we processed.
                    self._batch_write()
                i += 1
                if self.terminate:
                    break

            self._batch_write()
        except DeadlineExceededError:
            # Write any unfinished updates to the datastore.
            self._batch_write()
            self.deadline_error()
            # Queue a new task to pick up where we left off.
            deferred.defer(self._continue, iter.cursor_after(), batch_size, self.data)
            logging.error(self.__class__.__name__ + ' DeadlineExceedError')
            return
        self.finish()


Then here is a sample usage:
from google.appengine.ext.ndb import blobstore

class DeleteUser(Mapper):

    def init(self):
        self.kind = User
        # Im using a generic property cause it was an 
        # expando model where I added this on their deletion request
        # then gave the user enough time to undelete with a future date.
        self.filters = [ndb.GenericProperty('deleted') <= datetime.datetime.now()]

    def map(self, user):
        # Sample usage why you want to run this in a mapper
        blobstore.delete_multi(user.photos)
        # mini batches here
        for_delete = []
        for comment_key in Comment.query(Comment.user == user.key).iter(keys_only=True):
             for_delete.append(comment_key)
             if len(for_delete) >= 100:
                 ndb.delete_multi(for_delete)
                 for_delete = []
        ndb.delete_multi(for_delete)
        # and more, the more you do here probably the best to make the batches small
        # to avoid having to duplicate runs on a failure
        self.delete(user)



You can use this on both frontend and backend instances, the 10 minute limit should be handled automatically and continue from the last successful batch. Then to run this using a deferred library or if you will run it in cron just create a handler that simply runs it:
# on a handler
deleteUser = DeleteUser()
deleteUser.run(1)  # I made batch 1 since we are doing a lot of things

# with deferred library (For someone not familiar, It's a convenient library for taskqueue)
from google.appengine.ext import deferred
# anything that starts with _ is for taskqueue api, before that is for your method
deferred.defer(deleteUser.run, 1, _target='backend_name_if_you_want', _name='a_name_to_avoid_dups')