Source code for labbookdb.db.query

#!/usr/bin/python

import argh
import json
import sys

import pandas as pd

import datetime
import os
from copy import deepcopy
from sqlalchemy.orm import sessionmaker, aliased, contains_eager
import sqlalchemy

from .common_classes import *

ALLOWED_CLASSES = {
	"Animal": Animal,
	"AnimalExternalIdentifier": AnimalExternalIdentifier,
	"AnesthesiaProtocol": AnesthesiaProtocol,
	"Arena": Arena,
	"Biopsy": Biopsy,
	"BrainBiopsy": BrainBiopsy,
	"BrainExtractionProtocol": BrainExtractionProtocol,
	"Cage": Cage,
	"CageStay": CageStay,
	"DrinkingMeasurement": DrinkingMeasurement,
	"DNAExtractionProtocol": DNAExtractionProtocol,
	"Evaluation": Evaluation,
	"FluorescentMicroscopyMeasurement": FluorescentMicroscopyMeasurement,
	"FMRIMeasurement": FMRIMeasurement,
	"FMRIScannerSetup": FMRIScannerSetup,
	"ForcedSwimTestMeasurement": ForcedSwimTestMeasurement,
	"Genotype": Genotype,
	"HandlingHabituationProtocol": HandlingHabituationProtocol,
	"HandlingHabituation": HandlingHabituation,
	"Incubation": Incubation,
	"Ingredient": Ingredient,
	"Irregularity": Irregularity,
	"MeasurementUnit": MeasurementUnit,
	"Measurement": Measurement,
	"Observation": Observation,
	"OpenFieldTestMeasurement": OpenFieldTestMeasurement,
	"Operator": Operator,
	"Operation": Operation,
	"OpticFiberImplant": OpticFiberImplant,
	"OpticFiberImplantProtocol": OpticFiberImplantProtocol,
	"OrthogonalStereotacticTarget": OrthogonalStereotacticTarget,
	"Protocol": Protocol,
	"SectioningProtocol": SectioningProtocol,
	"StimulationProtocol": StimulationProtocol,
	"StimulationEvent": StimulationEvent,
	"Substance": Substance,
	"SucrosePreferenceMeasurement": SucrosePreferenceMeasurement,
	"Solution": Solution,
	"Treatment": Treatment,
	"TreatmentProtocol": TreatmentProtocol,
	"Virus": Virus,
	"VirusInjectionProtocol": VirusInjectionProtocol,
	"WeightMeasurement": WeightMeasurement,
	}



[docs]@argh.arg('-p', '--db_path', type=str) @argh.arg('database', default=None, nargs="?") def animal_info(identifier, database, db_path=None, ): """Return the __str__ attribute of an Animal object query filterd by the id column OR by arguments of the external_id objects. Parameters ---------- db_path : string Path to a LabbookDB formatted database. identifier : int or string The identifier of the animal database : string or None, optional If specified gives a constraint on the AnimalExternalIdentifier.database column AND truns the identifier attribute into a constraint on the AnimalExternalIdentifier.identifier column. If unspecified, the identfier argument is used as a constraint on the Animal.id column. """ if not db_path: db_path = os.environ["LDB_PATH"] if database and identifier: session, engine = load_session(db_path) sql_query = session.query(Animal) sql_query = sql_query.join(Animal.external_ids)\ .filter(AnimalExternalIdentifier.database == database).filter(AnimalExternalIdentifier.identifier == identifier)\ .options(contains_eager('external_ids')) identifier = [i for i in sql_query][0].id session.close() engine.dispose() session, engine = load_session(db_path) sql_query = session.query(Animal) sql_query = sql_query.filter(Animal.id == identifier) try: animal = [i.__str__() for i in sql_query][0] except: if database == None: print("No entry was found with {id} in the Animal.id column of the database located at {loc}.".format(id=identifier,loc=db_path)) else: print("No entry was found with {id} in the AnimalExternalIdentifier.identifier column and {db} in the AnimalExternalIdentifier.database column of the database located at {loc}.".format(id=identifier,db=database,loc=db_path)) else: print(animal) session.close() engine.dispose()
[docs]def cage_info(db_path, identifier, ): """Return the __str__ attribute of an Animal object query filterd by the id column OR by arguments of the external_id objects. Parameters ---------- db_path : string Path to a LabbookDB formatted database. identifier : int or string The identifier of the animal database : string or None, optional If specified gives a constraint on the AnimalExternalIdentifier.database colun AND truns the identifier attribute into a constraint on the AnimalExternalIdentifier.identifier column. If unspecified, the identfier argument is used as a constraint on the Animal.id column. """ session, engine = load_session(db_path) sql_query = session.query(Cage) sql_query = sql_query.filter(Cage.id == identifier) cage = [i.__str__() for i in sql_query][0] try: cage = [i.__str__() for i in sql_query][0] except: print("No entry was found with {id} in the Cage.id column of the database located at {loc}.".format(id=identifier,loc=db_path)) else: print(cage) session.close() engine.dispose()
[docs]def load_session(db_path): db_path = "sqlite:///" + os.path.expanduser(db_path) engine = sqlalchemy.create_engine(db_path, echo=False) Session = sessionmaker(bind=engine) session = Session() Base.metadata.create_all(engine) return session, engine
[docs]def commit_and_close(session, engine): try: session.commit() except sqlalchemy.exc.IntegrityError: print("Please make sure this was not a double entry.") session.close() engine.dispose()
[docs]def add_all_columns(cols, class_name): joinclassobject = ALLOWED_CLASSES[class_name] #we need to catch this esception, because for aliased classes a mapper is not directly returned try: col_name_cols = sqlalchemy.inspection.inspect(joinclassobject).columns.items() except AttributeError: col_name_cols = sqlalchemy.inspection.inspect(joinclassobject).mapper.columns.items() for col_name, col in col_name_cols: column = getattr(joinclassobject, col.key) cols.append(column.label("{}_{}".format(class_name, col_name)))
[docs]def get_for_protocolize(db_path, class_name, code): """Return a dataframe containing a specific entry from a given class name, joined with its related tables up to three levels down. """ session, engine = load_session(db_path) cols = [] joins = [] classobject = ALLOWED_CLASSES[class_name] insp = sqlalchemy.inspection.inspect(classobject) for name, col in insp.columns.items(): cols.append(col.label(name)) for name, rel in insp.relationships.items(): alias = aliased(rel.mapper.class_, name=name) joins.append((alias, rel.class_attribute)) for col_name, col in sqlalchemy.inspection.inspect(rel.mapper).columns.items(): #the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id) if col.key != "id": aliased_col = getattr(alias, col.key) cols.append(aliased_col.label("{}_{}".format(name, col_name))) sub_insp = sqlalchemy.inspection.inspect(rel.mapper.class_) for sub_name, sub_rel in sub_insp.relationships.items(): if "contains" not in sub_name: sub_alias = aliased(sub_rel.mapper.class_, name=name+"_"+sub_name) joins.append((sub_alias, sub_rel.class_attribute)) for sub_col_name, sub_col in sqlalchemy.inspection.inspect(sub_rel.mapper).columns.items(): #the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id) if sub_col.key != "id": sub_aliased_col = getattr(sub_alias, sub_col.key) cols.append(sub_aliased_col.label("{}_{}_{}".format(name, sub_name, sub_col_name))) sql_query = session.query(*cols).select_from(classobject) for join in joins: sql_query = sql_query.outerjoin(*join) sql_query = sql_query.filter(classobject.code == code) mystring = sql_query.statement mydf = pd.read_sql_query(mystring,engine) session.close() engine.dispose() return mydf
[docs]def get_df(db_path, col_entries=[], default_join="inner", filters=[], join_entries=[], join_types=[], ): """Return a dataframe from a complex query of a LabbookDB-style database Arguments --------- db_path : string Path to database file. col_entries : list A list of tuples containing the columns to be queried: * 1-tuples indicate all attributes of a class are to be queried * 2-tuples indicate only the attribute specified by the second element, of the class specified by the first element is to be queried * 3-tuples indicate that an aliased class of the type given by the second element is to be created and named according to the first and second elements, separated by an underscore. The attribute given by the third element will be queried; if the thid element is empty, all attributes will be queried join_entries : list A list of tuples specifying the desired join operations: * 1-tuples give the join * 2-tuples give the class to be joined on the first element, and the explicit relationship (attribute of another class) on the second element If any of the elements contains a period, the expression will be evaluated as a class (preceeding the period) attribute (after the period)$ filters : list A list of lists giving filters for the query. In each sub-list the first and second elements give the class and attribute to be matched. Every following element specifies a possible value for the class attribute (implemented as inclusive disjunction). If the attribute name ends in "date" the function computes datetime objects from the subsequent strings containing numbers separated by commas. !!!incomplete documentation Examples -------- >>> col_entries=[ ("Animal","id"), ("Treatment",), ("FMRIMeasurement",), ("TreatmentProtocol","code"), ("Cage","id"), ("Cage","Treatment",""), ("Cage","TreatmentProtocol","code") ] >>> join_entries=[ ("Animal.treatments",), ("FMRIMeasurement",), ("Treatment.protocol",), ("Animal.cage_stays",), ("CageStay.cage",), ("Cage_Treatment","Cage.treatments"), ("Cage_TreatmentProtocol","Cage_Treatment.protocol") ] >>> filters = [["Cage_Treatment","start_date","2016,4,25,19,30"]] >>> reference_df = get_df("~/syncdata/meta.db", col_entries=col_entries, join_entries=join_entries, filters=filters) """ session, engine = load_session(db_path) cols=[] for col_entry in col_entries: if len(col_entry) == 1: add_all_columns(cols, col_entry[0]) if len(col_entry) == 2: cols.append(getattr(ALLOWED_CLASSES[col_entry[0]],col_entry[1]).label("{}_{}".format(*col_entry))) if len(col_entry) == 3: aliased_class = aliased(ALLOWED_CLASSES[col_entry[1]]) ALLOWED_CLASSES[col_entry[0]+"_"+col_entry[1]] = aliased_class if col_entry[2] == "": add_all_columns(cols, col_entry[0]+"_"+col_entry[1]) else: cols.append(getattr(aliased_class,col_entry[2]).label("{}_{}_{}".format(*col_entry))) joins=[] for join_entry in join_entries: join_parameters = [] for join_entry_substring in join_entry: if "." in join_entry_substring: class_name, table_name = join_entry_substring.split(".") #if this unpacks >2 values, the user specified strings are malformed join_parameters.append(getattr(ALLOWED_CLASSES[class_name],table_name)) else: join_parameters.append(ALLOWED_CLASSES[join_entry_substring]) joins.append(join_parameters) sql_query = session.query(*cols) #we need to make sure we don't edit the join_types variable passed to this function join_types = deepcopy(join_types) while len(join_types) < len(joins): join_types.append(default_join) for ix, join in enumerate(joins): if join_types[ix] == "inner": sql_query = sql_query.join(*join) elif join_types[ix] == "outer": sql_query = sql_query.outerjoin(*join) for sub_filter in filters: if sub_filter: if sub_filter[1][-4:] == "date" and isinstance(sub_filter[2], str): for ix, i in enumerate(sub_filter[2:]): sub_filter[2+ix] = datetime.datetime(*[int(a) for a in i.split(",")]) if len(sub_filter) == 3: sql_query = sql_query.filter(getattr(ALLOWED_CLASSES[sub_filter[0]], sub_filter[1]) == sub_filter[2]) else: sql_query = sql_query.filter(sqlalchemy.or_(getattr(ALLOWED_CLASSES[sub_filter[0]], sub_filter[1]) == v for v in sub_filter[2:])) mystring = sql_query.statement df = pd.read_sql_query(mystring,engine) session.close() engine.dispose() return df
#THIS IS KEPT TO REMEMBER WHENCE THE ABOVE AWKWARD ROUTINES CAME AND HOW THE CODE IS SUPPOSED TO LOOK IF TYPED OUT # CageTreatment = aliased(Treatment) # CageTreatmentProtocol = aliased(TreatmentProtocol) # sql_query = session.query( # Animal.id.label("Animal_id"), # TreatmentProtocol.code.label("TreatmentProtocol_code"), # Cage.id.label("Cage_id"), # CageTreatment.id.label("Cage_Treatment_id"), # CageTreatmentProtocol.code.label("Cage_TreatmentProtocol_code"), # )\ # .join(Animal.treatments)\ # .join(Treatment.protocol)\ # .join(Animal.cage_stays)\ # .join(CageStay.cage)\ # .join(CageTreatment, Cage.treatments)\ # .join(CageTreatmentProtocol, CageTreatment.protocol)\ # .filter(Animal.id == 43) # mystring = sql_query.statement # reference_df = pd.read_sql_query(mystring,engine) # print reference_df.columns # print reference_df