Source code for aiorethink.document

import collections
import functools
import inspect

import inflection
import rethinkdb as r

from . import ALL, DECLARED_ONLY, UNDECLARED_ONLY
from .errors import IllegalSpecError, AlreadyExistsError, NotFoundError
from .registry import registry
from .db import db_conn, CursorAsyncIterator, CursorAsyncMap, ChangesAsyncMap,\
            _run_query
from .field import Field, FieldAlias
from .values_and_valuetypes.field_container import FieldContainer, _MetaFieldContainer

__all__ = [ "Document" ]


class _MetaDocument(_MetaFieldContainer):

    def __init__(cls, name, bases, classdict):
        cls._tablename = cls._get_tablename()

        # make sure that the following runs only for subclasses of Document.
        # There's no really nice way to do this AFAIK, because 'Document' is
        # not known yet when this is called. The best I could come up with is
        # this:
        if bases != (FieldContainer,):
            registry.register(name, cls)

        super().__init__(name, bases, classdict)



[docs]class Document(FieldContainer, metaclass = _MetaDocument): """ Non-obvious customization: cls._table_create_options dict with extra kwargs for rethinkdb.table_create """ _tablename = None # customize with _get_tablename - don't set this attr _table_create_options = None # dict with extra kwargs for rethinkdb.table_create def __init__(self, **kwargs): """Makes a Document object, but does not save it yet to the database. Call save() for writing to DB, or use the create() classmethod to create and save in one step. Use kwargs to set fields. """ super().__init__(**kwargs) self._stored_in_db = False # will be set to True if Doc is retrieved # from DB, and when saved to DB ########################################################################### # class creation (metaclass constructor calls this) ########################################################################### @classmethod def _get_tablename(cls): """Override this in subclasses if you want anything other than a table name automatically derived from the class name using inflection.tableize(). Mind the dangers of further subclassing, and of using the same table for different Document classes. """ return inflection.tableize(cls.__name__) @classmethod def _check_field_spec(cls): # make sure that this runs only for subclasses of Document. There's no # really nice way to do this AFAIK, because 'Document' is not known yet # when this is called. The best I could come up with is this: if cls.__bases__ == (FieldContainer,): return # make sure that we have at most one primary key field pk_name = None for fld_name, fld_obj in cls._declared_fields_objects.items(): if fld_obj.primary_key: if pk_name != None: raise IllegalSpecError("Document can't have more than 1 " "primary key") pk_name = fld_name # primary key: either we have an explicitly declared one, or we add a # primary key field named "id" if pk_name == None: if hasattr(cls, "id"): raise IllegalSpecError("Need {}.id for RethinkDB's automatic " "primary key attribute") cls.id = Field(primary_key = True) cls.id.name = "id" cls._declared_fields_objects[cls.id.name] = cls.id cls._dbname_to_field_name[cls.id.dbname] = cls.id.name pk_name = cls.id.name # add cls.pkey alias, pointing to the primary key field if getattr(cls, "pkey", None) and not isinstance(cls.pkey, FieldAlias): raise IllegalSpecError("'pkey' attribute is reserved for a " "FieldAlias to the primary key field.") cls.pkey = FieldAlias(getattr(cls, pk_name)) ########################################################################### # simple properties and due diligence ########################################################################### def __repr__(self): s = "{o.__class__.__name__}({o.__class__.pkey.name}={o.pkey})" return s.format(o = self) @property def stored_in_db(self): return self._stored_in_db ########################################################################### # DB table management ########################################################################### @classmethod async def table_exists(cls, conn = None): cn = conn or await db_conn db_tables = await r.table_list().run(cn) return cls._tablename in db_tables @classmethod async def _create_table(cls, conn = None): cn = conn or await db_conn # make sure table doesn't exist yet if await cls.table_exists(cn): raise AlreadyExistsError("table {} already exists" .format(cls._tablename)) # assemble kwargs for call to table_create create_args = {} if cls._table_create_options != None: create_args.update(cls._table_create_options) ## declare primary key field if it is not "id" if cls.pkey.dbname != "id": create_args["primary_key"] = cls.pkey.dbname await r.table_create(cls._tablename, **create_args).run(cn) # create secondary indexes for fields that have indexed == True for fld in cls._declared_fields_objects.values(): if fld.indexed: await cls.cq().index_create(fld.dbname).run(cn) await cls._create_table_extras(cn) @classmethod async def _create_table_extras(cls, conn = None): """Override this classmethod in subclasses to take care of complex secondary indices and other advanced stuff that we can't (or don't) automatically deal with. The table exists at this point, so you can use cls.cq(). """ pass @classmethod async def _reconfigure_table(cls, conn = None): """In the absence of proper migrations in aiorethink, there is no nice way to adapt an existing database to changes to either your _table_create_options or your _create_table_extras(). When you want to adapt an existing DB you would have to resort to do that either manually or by running a custom script that does the necessary changes. However, aiorethink allows you to do this from within your application code, resulting in automatic DB migration, if only in a pretty mad way... look at it as a very dirty but sometimes valuable hack. You can override this method in subclasses to reconfigure database tables in order to reflect your changes to _table_create_options and _create_table_extras change over time. This method is called every time aiorethink.db.init(reconfigure_db = True) is run, which might be every time your app is started. This has very serious implications on how you have to implement this method. Make 100% sure that this method works for every version of your database out there, and that it is idempotent (i.e. you can run this method N times and the result will always be the same). Any deviation from this will likely cause problems. aiorethink can not help you with anything here, you are entirely on your own. You see how this is convenient to use, but probably a bitch to write, compared to just running a custom script manually when necessary? And that it might pollute your code over time? Again: using this function is asking for trouble, but sometimes it might just come in very handy. Use it carefully. You have been warned. The default implementation does nothing. """ pass ########################################################################### # DB queries (load, save, delete...) and related funcs ########################################################################### @classmethod
[docs] def cq(cls): """RethinkDB query prefix for queries on the Document's DB table. """ return r.table(cls._tablename)
@classmethod
[docs] async def aiter_table_changes(cls, changes_query = None, conn = None): """Executes `changes_query` and returns an asynchronous iterator (a ``ChangesAsyncMap``) that yields (document object, changefeed message) tuples. If `changes_query` is None, `cls.cq().changes(include_types = True)` will be used, so the iterator will yield all new or changed documents in cls's table, and changefeed messages will have a "type" attribute giving you more information about what kind of change happened. If you sepcify `changes_query`, the query must return one complete document in new_val on each message. So don't use pluck() or something to that effect in your query. The query may or may not already have called run(): * if run() has been called, then the query (strictly speaking, the awaitable) is just awaited. This gives the caller the opportunity to customize the run() call. * if run() has not been called, then the query is run on the given connection (or the default connection). This is more convenient for the caller than the former version. """ if changes_query == None: changes_query = cls.cq().changes(include_types = True) feed = await _run_query(changes_query) mapper = functools.partial(cls.from_doc, stored_in_db = True) return ChangesAsyncMap(feed, mapper)
@classmethod
[docs] async def load(cls, pkey_val, conn = None): """Loads an object from the database, using its primary key for identification. """ obj = await cls.from_query( cls.cq().get(pkey_val), conn) if obj == None: raise NotFoundError("no matching document") return obj
@classmethod def from_doc(cls, doc, stored_in_db, **kwargs): obj = super().from_doc(doc, **kwargs) obj._stored_in_db = stored_in_db return obj @classmethod
[docs] async def create(cls, **kwargs): """Makes a Document and saves it into the DB. Use keyword arguments for fields. """ obj = cls(**kwargs) await obj.save() return obj
[docs] def q(self): """RethinkDB query prefix for queries on the document. """ pkey_dbval = self.__class__.pkey._do_convert_to_doc(self) return self.__class__.cq().get(pkey_dbval)
async def save(self, conn = None): cn = conn or await db_conn if self._stored_in_db: return await self._update_in_db(cn) else: return await self._insert_into_db(cn) async def _update_in_db(self, conn = None): cn = conn or await db_conn if len(self._updated_fields) == 0: return self.validate() # make the dictionary for the DB query update_dict = {} for fld_name in self._updated_fields.keys(): if self.__class__.has_field_attr(fld_name): # Field instance: convert field value to DB-serializable format fld_obj = getattr(self.__class__, fld_name) db_key = fld_obj.dbname db_val = fld_obj._do_convert_to_doc(self) update_dict[db_key] = db_val else: # undeclared field: we assume that the value is serializable update_dict[fld_name] = self._undeclared_fields.get(fld_name, None) # NOTE the field might have been deleted from # _undeclared_fields (see __delitem__). But since we can not # remove fields from a RethinkDB document, we have to overwrite # 'deleted' fields with something (here: None)... # TODO: make replace() query then # update in DB self._updated_fields = {} return await self.q().\ update(update_dict).\ run(cn) async def _insert_into_db(self, conn = None): cn = conn or await db_conn self.validate() # make dict for the DB query insert_dict = {} for fld_name, fld_obj in self.__class__._declared_fields_objects.items(): # don't store if primary key and not set (then DB should autogenerate) if fld_obj.primary_key and self.get(fld_name, None) == None: continue # convert field value to DB-serializable format db_key = fld_obj.dbname db_val = fld_obj._do_convert_to_doc(self) insert_dict[db_key] = db_val insert_dict.update(self._undeclared_fields) # insert document into DB self._updated_fields = {} insert_result = await self.__class__.cq().\ insert(insert_dict).\ run(cn) ## the DB might have made an automatic id for us if "generated_keys" in insert_result: new_key_dbval = insert_result["generated_keys"][0] self.__class__.pkey._store_from_doc(self, new_key_dbval) self._stored_in_db = True return insert_result async def delete(self, conn = None, **kwargs_delete): cn = conn or await db_conn res = await self.q().delete(**kwargs_delete).run(cn) self._stored_in_db = False return res
[docs] def copy(self, which = ALL): """Creates a new Document (same class as self) and (shallow) copies all fields except for the primary key field, which remains unset. The new Document is returned. """ doc = super().copy(which) del doc[self.__class__.pkey.name] return doc
########################################################################### # Point changefeeds (changefeeds on a single document object) ########################################################################### class ChangesAsyncIterator(collections.abc.AsyncIterator): def __init__(self, doc, conn = None): self.doc = doc self.conn = conn async def __aiter__(self): query = self.doc.q().changes(include_initial = True, include_types = True) self.cursor = await _run_query(query, self.conn) return self async def __anext__(self): while True: try: msg = await self.cursor.next() except r.ReqlCursorEmpty: raise StopAsyncIteration if "new_val" not in msg: continue doc = self.doc # update doc and return changed fields if msg["new_val"] == None: doc._stored_in_db = False return doc, None, msg else: changed_fields = {k: v for k, v in msg["new_val"].items() if k not in doc or v != doc.get_dbvalue(k)} if not changed_fields: continue for k, v in changed_fields.items(): doc_key = doc.get_key_for_dbkey(k) doc.set_dbvalue(doc_key, v, mark_updated = False) return doc, list(changed_fields.keys()), msg
[docs] async def aiter_changes(self, conn = None): """Note: be careful what you wish for. The document object is updated in place when you iterate. Unsaved changes to it might then be overwritten. """ return self.__class__.ChangesAsyncIterator(self, conn)