Source code for data.sqliteinterface

'''
The class :class:`~!.DataSetInterface` specifies a set of methods that are need to be implemented by a database specific data interface. Currently the framework supports only `SQLite <www.sqlite.org>`_ which is implemented in :class:`data.sqliteinterface.SQLiteDI`. Implementing support for other SQL based database systems, e.g. MySQL, should be straight forward. Support for other database systems would require more effort.
'''

import logging

import sqlite3 
from itertools import repeat

from analytics.performance import time_analysis


from data.datainterface import DataSetInterface



[docs]class SQLiteDI(DataSetInterface): ''' A subclass of :class:`.DataSetInterface` that links a PRM to the SQLite database that it models. ''' def __init__(self,path): ''' Connection to SQLite database is created ''' DataSetInterface.__init__(self, 'SQLite') self.path = path """Path to SQLite DB file""" try: self.con = sqlite3.connect(self.path) # make connection self.con.isolation_level = None #self.con.row_factory = sqlite3.Row # tuples in result set will be Row objects self.cur = self.con.cursor() """SQLite cursor that will execute SQL commands and contain the result set """ except sqlite3.Error, e: logging.debug("An error occurred:", e.args[0])
[docs] def loadCountCPDdata(self, attribute): ''' We pass an `attribute`, and constructe a query such that the result set `self.cur` contains all the data to learn the local distribution of that `attribute`. The `Count` in the name indicates that the query is constructed such that the compuation is done on the SQL side, e.g. the number of occurences for each possible parent assignment is counted using `COUNT`. This function is used by :meth:`learners.cpdlearners.CPDTabularLearner.learnCPDsCount`. :arg attribute: Subclass of :class:`prm.attribute.Attribute` ''' sqlQuery = '' if len(attribute.parents)==0: sqlQuery = 'SELECT COUNT(*) FROM %s GROUP BY %s;'%(attribute.erClass.name,attribute.fullname) else: sqlAttributes = [attribute.fullname] sqlTables = [attribute.erClass.name] sqlWhere = [] for dep in attribute.dependenciesChild: if dep.aggregator is None: #attribute sqlAttributes.append(dep.parent.fullname) #tables for er in dep.slotchain: if er.name not in sqlTables: sqlTables.append(er.name) sqlWhere.extend(dep.slotchain_attr_string) else: # We created a VIEW to aggregate the parent values vName = dep.name #'%s_%s'%(dep.parent.name,attribute.name) sqlAttributes.append('AGG_%s'%(dep.parent.name)) sqlTables.append(vName) sqlWhere.extend(['%s.%s=%s'%(vName,pki.name,pki.fullname) for pki in attribute.erClass.pk]) # string representation of the sql lists sqlAttributes = ",".join(sqlAttributes) sqlTables = ",".join(sqlTables) sqlWhere = " AND ".join(sqlWhere) sqlQuery = 'SELECT COUNT(*) FROM %s WHERE %s GROUP BY %s;'%(sqlTables,sqlWhere,sqlAttributes) ''' We execute the query and return the result set with the ordering of the queried attributes ''' #logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadFullAggCPDdata(self, attribute): ''' We pass an `attribute`, and constructe a query such that the result set `self.cur` contains all the data to learn the local distribution of that `attribute`. The `Full` in the name indicates that we don't `COUNT` all occurences in the query but in the learner instead. The `Agg` indicates that if the `attribute` has multiple parents for one attribute object, we use `VIEWS` to aggregate the data using SQL. In practice this has proven to not be a good way to do things. First, it is much slower than the other methods. Second, by aggregating the values before counting them the number of occurences per parent assignment is much smaller. On the other hand, by not aggregating them we introduce a bias in which attribute objects with many parent attribute objects for on parent attribute are weighted more. Thus this methods is currently not used. This method is currently only used to compute the log likelihood of the model given the data using :meth:`learners.cpdlearners.CPDTabularLearner.loglikelihood`. :arg attribute: Subclass of :class:`prm.attribute.Attribute` ''' ''' create a list of attributes that defines what variables to query and the order in which they will be returned ''' attributes = [] attributes.append(attribute.fullname) ''' if the attribute has aggregation parents we aggregate ''' for dep in attribute.dependenciesChild: if dep.aggregator is None: attributes.append(dep.parent.fullname) else: attributes.append('ROUND(%s( %s))'%(dep.aggregator(self.dsiType),dep.parent.fullname)) ''' create a string representation for the sql query ''' # string list of attributes sqlAttributes = ",".join(attributes) #Using the slotchain(s) for the dependency(ies) of the given the given attribute we can construct the string list of tables merged = [attribute.erClass] for dep in attribute.dependenciesChild: for er in dep.slotchain: if er not in merged: merged.append(er) sqlTables = ",".join([er.name for er in merged]) # Again we use the slotchain information of the dependencies to create the string list of where clause scWhere = [] for dep in attribute.dependenciesChild: scWhere.extend(dep.slotchain_attr_string) sqlWhere = " AND ".join(scWhere) sqlGroupBy = ','.join(attribute.erClass.pk_string) if sqlWhere =='': sqlQuery = 'SELECT %s FROM %s GROUP BY %s;'%(sqlAttributes,sqlTables,sqlGroupBy) else: sqlQuery = 'SELECT %s FROM %s WHERE %s GROUP BY %s;'%(sqlAttributes,sqlTables,sqlWhere,sqlGroupBy) ''' We execute the query and return the result set with the ordering of the queried attributes ''' #logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadFullCPDdata(self, attribute): ''' We pass an `attribute`, and constructe a query such that the result set `self.cur` contains all the data to learn the local distribution of that `attribute`. The `Full` in the name indicates that we don't `COUNT` all occurences in the query but in the learner instead. This method is used by :meth:`learners.cpdlearners.CPDTabularLearner.learnCPDsFull`. :arg attribute: Subclass of :class:`prm.attribute.Attribute` ''' ''' create a list of attributes that defines what variables to query and the order in which they will be returned ''' attributes = [] attributes.append(attribute) attributes.extend(attribute.parents) ''' create a string representation for the sql query ''' # string list of attributes #sqlAttributes = ",".join([a.name for a in attr]) sqlAttributes = ",".join([a.fullname for a in attributes]) #Using the slotchain(s) for the dependency(ies) of the given the given attribute we can construct the string list of tables merged = [attribute.erClass] for dep in attribute.dependenciesChild: for er in dep.slotchain: if er not in merged: merged.append(er) sqlTables = ",".join([er.name for er in merged]) # Again we use the slotchain information of the dependencies to create the string list of where clause scWhere = [] for dep in attribute.dependenciesChild: scWhere.extend(dep.slotchain_attr_string) sqlWhere = " AND ".join(scWhere) sqlNull = ' AND '.join(['%s IS NOT NULL'%a.fullname for a in attributes]) if sqlWhere =='': sqlQuery = 'SELECT %s FROM %s WHERE %s;'%(sqlAttributes,sqlTables,sqlNull) else: sqlQuery = 'SELECT %s FROM %s WHERE %s AND %s;'%(sqlAttributes,sqlTables,sqlNull,sqlWhere) ''' We execute the query and return the result set with the ordering of the queried attributes ''' #logging.debug(sqlQuery) self.cur.execute(sqlQuery) #@time_analysis
[docs] def loadObjects(self, qvar): ''' When unrolling a Ground Bayes Net the inference engine :mod:`inference.engine` processes a set of event and evidence variables that are of type :class:`inference.query.Qvariable`. The method `self.loadObjects()` executes a SQL query that returns the set of all attribute objects that satisfy the constraints of `qvar` The result set will have the following structure : [attribute, pk1 , pk2, ....], e.g. * If qvar.erClass is `User` : [User.gender, User.user_id] * If qvar.erClass is `rates` : [rates.rating, rates.user_id,rates.item_id] :arg qvar: :class:`inference.query.Qvariable` ''' sqlAttribute = '%s,%s'%(qvar.attr.fullname,",".join( [pk.fullname for pk in qvar.erClass.pk ] )) #sqlAttribute = qvar.attr.fullname # We are only loading data from one table sqlTable = qvar.erClass.name # The where clause is defined by the qvar.objs, the primary key(s) of of the erClass ''' qbvar.objs.pkValues = [(pk1Val,pk2Val,..),(pk1Val,pk2Val,..),(pk1Val,pk2Val,..),.....] attr.erClass.pk = [pk1,pk2,...] sqlWhere = ''' query_where = [' AND '.join(['%s=%s'%(pk.fullname,pkVal) for (pk,pkVal) in zip(pks,pksValue)]) for (pks, pksValue) in zip(repeat(qvar.attr.erClass.pk), qvar.objs.pkValues)] query_where = ['(%s)'%pkStr for pkStr in query_where] sqlWhere = ' OR '.join(query_where) ''' sqlWhere = '' if qvar.erClass.isEntity(): sqlWhere = " AND ".join(map( lambda (a,val): '%s=%s'%(a.fullname,val), qvar.obj.items() )) else: sqlWhere = " AND ".join(map( lambda (a,val): '%s=%s'%(a.fullname,val), qvar.obj.items() )) ''' if sqlWhere =='': sqlQuery = 'SELECT %s FROM %s;'%(sqlAttribute,sqlTable) else: sqlQuery = 'SELECT %s FROM %s WHERE %s;'%(sqlAttribute,sqlTable,sqlWhere) #logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadDependencyParentObjects(self, dep, gbnVertices): """ Given a set of children attribute objects `gbnVertices` for a given dependency `dep`, we are loading the set of parents. The result set will consist of rows in the following format: | dep_child.pk1,dep_child.pk2,..., dep_parent.obj,dep_parent.val | < child indentification > < parent > :arg dep: :class:`prm.dependency.Dependency` :arg gbnVertices: A list of :class:`network.groundBN.GBNvertex` """ ''' Aggregation on the data level doesn't work well, legacy comment: sqlAttr : dep_child.pk1,dep_child.pk2,..., dep_parent.obj,dep_parent.val < child indentification > < parent > if dependency of type 1:1 or n:1 ( == no aggregation) -> dep_parent.obj = dep_parent.pk1,dep_parent.pk2,... dep_parent.val = dep_parent.val if dependency of type n:1 or m:n ( == aggregation) -> dep_parent.obj not necessary (ID of vertex will be dep_name.dep_child.obj -> dep_parent.val = AGGREGATION(dep_parent.values) ''' # attributes list query_attrs = [] query_attrs.extend(dep.child.erClass.pk_string) # to identify the child vertex # table list query_tables = [] query_tables.extend(dep.slotchain_string) # where list query_where = [] query_where.extend(dep.slotchain_attr_string) ''' We are not using the views for aggregation: with view it is painfully slow SELECT User.user_id,AGG_rating FROM User,rating_gender WHERE User.user_id=rating_gender.user_id AND (User.user_id=900); this is much much faster SELECT User.user_id,AVG(rates.rating) FROM User,rates WHERE User.user_id=rates.user_id AND (User.user_id=900); ''' ''' TODO : CAN ONE EVEN USE AGGREGATED VERTICES? YOU WOULD HAVE TO GUARANTEE THAT NO OTHER VERTEX EVER NEEDS TO SAMPLE FROM ONE OF THE COLLAPSED AGGREGATED VERTICES see engine.py for same problem # add parent attributes to query_attrs list if dep.aggregator is None: # n:1 or 1:1 dependency type #attributes query_attrs.extend(dep.parent.erClass.pk_string) #identify the parent obj query_attrs.append(dep.parent.fullname) # the parent value else: # aggregation 1:n or m:n dependency type query_attrs.append('%s(%s)'%(dep.aggregator(self.dsiType),dep.parent.fullname)) #aggregated parent value ''' # FOR NOW WE JUST ADD ALL VERTICES # add parent attributes to query_attrs list query_attrs.extend(dep.parent.erClass.pk_string) #identify the parent obj query_attrs.append(dep.parent.fullname) # the parent value ''' Adding obj values to were clause. As an obj is identified by possibly multiple primary key values, the where clause looks like this: WHERE ..(slotchain_where)... AND ( (ob1_pk1=val AND ob1_pk2=val...) OR ((ob2_pk1=val AND ob2_pk2=val...)), .....) ''' query_obj = [] for v in gbnVertices: temp_obj = [] for (pk_i,obj_i) in zip(v.attr.erClass.pk,v.obj): #and obj contains a index for every pk of the v.attr.erClass (=dep.child.erclass) temp_obj.append('%s=%s'%(pk_i.fullname,obj_i)) query_obj.append('(%s)'%(' AND '.join(temp_obj))) sqlAttr = ','.join(query_attrs) sqlFrom = ','.join(query_tables) sqlWhere = ' AND '.join(query_where) sqlObj = ' OR '.join(query_obj) sqlQuery = '' if sqlWhere!='': #slotchain present sqlQuery = "SELECT %s FROM %s WHERE %s AND (%s);"%(sqlAttr,sqlFrom,sqlWhere,sqlObj) else: #no slotchain sqlQuery = "SELECT %s FROM %s WHERE (%s);"%(sqlAttr,sqlFrom,sqlObj) # #HACK HACK HACK : just remove objects where clause (also introduced hack in engine.py) # if len(query_obj)>900: # if sqlWhere!='': # sqlQuery = "SELECT %s FROM %s WHERE %s;"%(sqlAttr,sqlFrom,sqlWhere) # else: # sqlQuery = "SELECT %s FROM %s;"%(sqlAttr,sqlFrom) # #print 'WARNING : QUERY TOO LONG' #print '\nLOAD parents for dep %s:\n%s\n'%(dep.name,sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadDependencyChildrenObjects(self, dep, gbnVertices): """ Given a set of parent attribute objects `gbnVertices` for a given dependency dep, we are loading the set of children. The result set will consist of rows in the following format: | dep_parent.pk1,dep_parent.pk2,..., dep_child.pk1,dep_child.pk2,...,dep_child.val | < parent indentification > < child > :arg dep: :class:`prm.dependency.Dependency` :arg gbnVertices: A list of :class:`network.groundBN.GBNvertex` """ # attributes list query_attrs = [] query_attrs.extend(dep.parent.erClass.pk_string) # to identify the parent vertex query_attrs.extend(dep.child.erClass.pk_string) # to identify the child vertex query_attrs.append(dep.child.fullname) # the parent value # table list query_tables = [] query_tables.extend(dep.slotchain_string) # where list query_where = [] query_where.extend(dep.slotchain_attr_string) ''' Adding obj values to were clause. As an obj is identified by possibly multiple primary key values, the where clause looks like this: WHERE ..(slotchain_where)... AND ( (ob1_pk1=val AND ob1_pk2=val...) OR ((ob2_pk1=val AND ob2_pk2=val...)), .....) ''' query_obj = [] #from IPython.Shell import IPShellEmbed #IPShellEmbed()() for v in gbnVertices: temp_obj = [] for (pk_i,obj_i) in zip(v.attr.erClass.pk,v.obj): #and obj contains a index for every pk of the v.attr.erClass (=dep.child.erclass) temp_obj.append('%s=%s'%(pk_i.fullname,obj_i)) query_obj.append('(%s)'%(' AND '.join(temp_obj))) sqlAttr = ','.join(query_attrs) sqlFrom = ','.join(query_tables) sqlWhere = ' AND '.join(query_where) sqlObj = ' OR '.join(query_obj) ''' sqlGroupBy = ','.join(dep.parent.erClass.pk_string) sqlQuery = "SELECT %s FROM %s WHERE %s AND (%s) GROUP BY %s;"%(sqlAttr,sqlFrom,sqlWhere,sqlObj,sqlGroupBy) ''' sqlQuery = '' if sqlWhere!='': #slotchain present sqlQuery = "SELECT %s FROM %s WHERE %s AND (%s);"%(sqlAttr,sqlFrom,sqlWhere,sqlObj) else: #no slotchain sqlQuery = "SELECT %s FROM %s WHERE (%s);"%(sqlAttr,sqlFrom,sqlObj) # #HACK HACK HACK : just remove objects where clause (also introduced hack in engine.py) # if len(query_obj)>900: # if sqlWhere!='': # sqlQuery = "SELECT %s FROM %s WHERE %s;"%(sqlAttr,sqlFrom,sqlWhere) # else: # sqlQuery = "SELECT %s FROM %s;"%(sqlAttr,sqlFrom) # #print 'WARNING : QUERY TOO LONG' #print '\nLOAD children for dep %s:\n%s\n'%(dep.name,sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadAttributeParentObjects(self, attr, gbnVertices): """ Given a set of children objects obj for a given attribute attr, we are loading the set of parents (for all depenencies that `attr` is a child of). This method is not used because it performs poor compared to :meth:`loadDependencyChildrenObjects` and :meth:`loadDependencyParentObjects`. """ #if there are no parents we can't do anything if len(attr.dependenciesChild) == 0: return None # attributes list query_attrs = [] query_attrs.extend(attr.erClass.pk_string) # to identify the child vertex # table list query_tables = [attr.erClass.name] # where list query_where = [] ''' # TODO : ALL THIS INFORMATION CAN BE PRECOMPUTED EXCEPT THE gbnVertices PART OF THE WHERE CLAUSE PROBLEM. VIEW'S MAKE IT RUN REALLY SLOW! THERE SHOULD BE AN EASY WAY TO BRING THE AGGREGATION INTO THE QUERY WITHOUT USING THE VIEW (E.G. SPLITTING LOADPARENTOBJECTS() UP BY QUERYING THE DATA ONCE FOR EVERY PARENT) with view it is painfully slow SELECT User.user_id,AGG_rating FROM User,rating_gender WHERE User.user_id=rating_gender.user_id AND (User.user_id=900); this is much much faster SELECT User.user_id,AVG(rates.rating) FROM User,rates WHERE User.user_id=rates.user_id AND (User.user_id=900); ''' for dep in attr.dependenciesChild: # add parent attributes to query_attrs list # merge the table names in query_tables # merge where statements (e.g. User.user_id=rates.user_id) if dep.aggregator is None: # n:1 or 1:1 dependency type #attributes query_attrs.extend(dep.parent.erClass.pk_string) query_attrs.append(dep.parent.fullname) #from for er in dep.slotchain_string: if er not in query_tables: query_tables.append(er) #where for att_str in dep.slotchain_attr_string: if att_str not in query_where: query_where.append(att_str) else: # aggregation 1:n or m:n dependency type query_attrs.append(('AGG_'+dep.parent.name)) query_tables.append(dep.name) ## attr.erClass.pk[i] = dep_name.parent for pk_i in attr.erClass.pk: query_where.append('%s=%s.%s'%(pk_i.fullname,dep.name,pk_i.name)) # adding obj values to were clause query_obj = [] for v in gbnVertices: for (pk_i,obj_i) in zip(attr.erClass.pk,v.obj): query_obj.append('%s=%s'%(pk_i.fullname,obj_i)) sqlAttr = ','.join(query_attrs) sqlFrom = ','.join(query_tables) sqlWhere = ' AND '.join(query_where) sqlObj = ' OR '.join(query_obj) sqlQuery = "SELECT %s FROM %s WHERE %s AND (%s);"%(sqlAttr,sqlFrom,sqlWhere,sqlObj) #logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadAttributeObjects(self, attr ): ''' All attribute objects of the attribute `attr` are queried. The result set will consist of rows in the following format: | attr.pk1,attr.pk2,........,attr.val | < attr indentification > < attr > :arg attr: :class:`.Attribute` ''' sqlAttribute = '%s,%s'%(",".join( [pk.fullname for pk in attr.erClass.pk ] ),attr.fullname) # We are only loading data from one table sqlTable = attr.erClass.name sqlQuery = 'SELECT %s FROM %s;'%(sqlAttribute,sqlTable) # logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def loadExistParents(self, dep, existdep ): ''' In the case of reference uncertainty, the exist attributes have a set of parents that need to be included in the ground Bayesian network. The SQL query needed is constructed in this method, the resultset will be of the following format. The `k-entity` references the entity on the `k` side of the `n:k` relationship (i.e. `Professor` in the student/prof example from Pasula). The primary key of the `k-entity` is used to identify | k_entity.pk1, dep.parent.pk1,dep.parent.pk2,.....,dep.parent.val | < k entity id >< parent indentification > < parent value > :arg dep: :class:`.UncertainDependency` :arg existdep: :class:`.Dependency` with the exist attribute as child ''' k_attr_id = ",".join( [pk.fullname for pk in dep.kAttribute.erClass.pk ]) parent_id = ",".join( [pk.fullname for pk in existdep.parent.erClass.pk ]) parent_val = existdep.parent.fullname sqlAttribute = '%s,%s,%s'%(k_attr_id,parent_id,parent_val) tables = [] for er in existdep.slotchain: if not er==dep.uncertainRelationship: # there are no entries in the database table of the uncertain relationship tables.append(er.name) sqlTable = ','.join(tables) if not len(existdep.slotchain_erclass_exclusive[dep.uncertainRelationship]) == 0: sqlWhere = ' AND '.join(existdep.slotchain_erclass_exclusive[dep.uncertainRelationship]) sqlQuery = 'SELECT %s FROM %s WHERE %s;'%(sqlAttribute,sqlTable,sqlWhere) else: # no where statements sqlQuery = 'SELECT %s FROM %s;'%(sqlAttribute,sqlTable) logging.debug(sqlQuery) self.cur.execute(sqlQuery)
[docs] def retrieveRow(self): ''' After executing a `loadXXX()` method, the cursor `self.cur` contains the result set for a specific SQL query. This method returns the next row in the result set, which allows a caller, e.g. :meth:`learners.cpdlearners.CPDTabularLearner.learnCPDsFull` or :meth:`inference.engine.unrollGBN`, to iterate over all rows without knowledge about the data interface. ''' return self.cur.fetchone()
[docs] def resultSet(self): ''' We return the the cursor which is an iterable result set of the executed query (after executing a `loadXXX()` method). The result set can then be iterated like this in the caller method:: for currentRow in dsi.resultSet(): do something with `currentRow` ''' return self.cur
[docs] def createView(self,dep): ''' If a probabilistic dependency between an attribute and a parent attribute is of type m:n or 1:n, some sort of aggregation has to be performed. A VIEW is created that performs the necessary aggregation and enables the datainterface to query the already aggregated values in one query. The name of the view is `dep.name` and it can be used when learning local distributions or unrolling a ground bayes net. When there are multiple dependencies from and to the same erClass, then we could create only one view. Instead we create a view for each dependency independently which makes it easier but redundant .. note:: `VIEWS` are implemented but not used. In practice their performance proved to be worse than working on the data directly. ''' #print "\tCreate View in SQLite for ",dep #string for set of primary keys (just one for an entity/multiple for a relationship) pk_string = ','.join(dep.child.erClass.pk_string) #aggregator keyword for SQLite (e.g. AVG) aggr_string = dep.aggregator(self.dsiType) aggr_attr_name = 'AGG_%s'%dep.parent.name sqlAttr = '%s, ROUND(%s(%s)) as %s'%(pk_string,aggr_string,dep.parent.fullname,aggr_attr_name) sqlFrom = ','.join(dep.slotchain_string) sqlWhere = ' AND '.join(dep.slotchain_attr_string) sqlGroup = pk_string sqlCreate = "CREATE VIEW IF NOT EXISTS %s AS SELECT %s FROM %s WHERE %s GROUP BY %s;"%(dep.name,sqlAttr,sqlFrom,sqlWhere,sqlGroup) #print sqlCreate self.cur.execute(sqlCreate)
def __repr__(self): ''' String representation for SQLite DI ''' return '%s DataSet Interface connecting to %s'%(self.dsiType,self.path.split('/')[-1])