from datetime import datetime from turbogears.database import PackageHub from sqlobject import * from sqlobject.sqlbuilder import JOIN, LEFTJOINConditional, NATURALJOIN, Select, EXISTS from turbogears import identity import turbogears as tg from felicity.hacks import safe_str hub = PackageHub("felicity") __connection__ = hub #--- #custom imports import logging log = logging.getLogger("felicity.model") #-- #sqlobject has problems translating True and "1==1" #the settings below work with both postgresql and sqlite trueClause = AND(True, True) falseClause = AND(False, False) #=========== #tg generated, minimal modifications # identity models. class Visit(SQLObject): class sqlmeta: table = "visit" visit_key = StringCol(length=40, alternateID=True, alternateMethodName="by_visit_key") created = DateTimeCol(default=datetime.now) expiry = DateTimeCol() def lookup_visit(cls, visit_key): try: return cls.by_visit_key(visit_key) except SQLObjectNotFound: return None lookup_visit = classmethod(lookup_visit) class VisitIdentity(SQLObject): visit_key = StringCol(length=40, alternateID=True, alternateMethodName="by_visit_key") user_id = IntCol() class Group(SQLObject): """ An ultra-simple group definition. """ # names like "Group", "Order" and "User" are reserved words in SQL # so we set the name to something safe for SQL class sqlmeta: table = "tg_group" group_name = UnicodeCol(length=16, alternateID=True, alternateMethodName="by_group_name") display_name = UnicodeCol(length=255) created = DateTimeCol(default=datetime.now) # collection of all users belonging to this group users = RelatedJoin("User", intermediateTable="user_group", joinColumn="group_id", otherColumn="user_id") # collection of all permissions for this group permissions = RelatedJoin("Permission", joinColumn="group_id", intermediateTable="group_permission", otherColumn="permission_id") class User(SQLObject): """ Reasonably basic User definition. Email_address need not be unique. """ # names like "Group", "Order" and "User" are reserved words in SQL # so we set the name to something safe for SQL class sqlmeta: table = "tg_user" user_name = UnicodeCol(length=16, alternateID=True, alternateMethodName="by_user_name") #nb may not be unique email_address = UnicodeCol(length=255, alternateMethodName="by_email_address") display_name = UnicodeCol(length=255) password = UnicodeCol(length=40) campus_address = UnicodeCol(default=None) telephone = UnicodeCol(default=None) created = DateTimeCol(default=datetime.now) # groups this user belongs to groups = RelatedJoin("Group", intermediateTable="user_group", joinColumn="user_id", otherColumn="group_id") def _get_permissions(self): perms = set() for g in self.groups: perms = perms | set(g.permissions) return perms def _set_password(self, cleartext_password): "Runs cleartext_password through the hash algorithm before saving." hash = identity.encrypt_password(cleartext_password) self._SO_set_password(hash) def set_password_raw(self, password): "Saves the password as-is to the database." self._SO_set_password(password) class Permission(SQLObject): permission_name = UnicodeCol(length=16, alternateID=True, alternateMethodName="by_permission_name") description = UnicodeCol(length=255) groups = RelatedJoin("Group", intermediateTable="group_permission", joinColumn="permission_id", otherColumn="group_id") #=========== #custom bits #--------------- # functions def _do_query(expr): """execute a SQLObject Select statement""" conn = User._connection sql = conn.sqlrepr(expr) return conn.queryAll(sql) def to_key(obj): """Attempts to convert obj to whatever is being used for keys in database. Assumes same type is being used for all tables/classes.""" return int(obj) def _get_id(obj): """converts obj to an id if it is not already an id. Allows methods to be called with either an object or its id.""" if hasattr(obj,"id"): return obj.id return obj def _remove_non_persistent_items(obj, kw): """param obj is a SQLObject (can be class or instance). param kw is a dictionary. Function removes items from kw where the item is not a persistent attribute of obj.""" to_remove = [] for item in kw: if not hasattr(obj.q, item): itemID = "%sID" % item if not hasattr(obj.q, itemID): to_remove.append(item) for item in to_remove: del kw[item] return kw #--------------- # exceptions class FelicityException(Exception): """base for all application errors where it is safe to catch and dispaly details to users""" pass class ModelError(FelicityException): "Some error occured in an action on a model." class UpdateError(FelicityException): "Some error occurred while updating a model object." class OutOfYearsException(FelicityException): "Attempted to move term back or forwards but there are no more years" class LogicError(FelicityException): "Error occured in logic module" #----------- #abstract base class for objects -- defines common class methods class FelicitySQLObject(SQLObject): @classmethod def from_id(cls, note_id): """attempts to get the note by id, returns None if not found. Usable providing this object has an id attribute""" try: return cls.get(note_id) except (SQLObjectNotFound, ValueError): return None @classmethod def from_code(cls, code): """returns object with code or None if not found. Usable providing this object has a code attribute.""" try: return cls.byCode(code) except SQLObjectNotFound: return None #knows its own url _url_base = '/error/' #override def _url(self, raw=False): """all FelicitySQLObjects know their own url, if they have one.""" if hasattr(self.q, 'code'): raw_url = self._url_base + safe_str(self.code) else: raw_url = self._url_base + safe_str(self.id) if raw: return raw_url return tg.url(raw_url) url = property(_url) def _url_raw(self): return self._url(raw=True) url_raw = property(_url_raw) #----------- # model classes class Student(FelicitySQLObject): firstname = UnicodeCol() lastname = UnicodeCol() email_address = UnicodeCol(default=None) #code of student in some external system, e.g. SITS code code = UnicodeCol(alternateID=True, unique=True, notNone=True) #code of student in another external system, Warick IT code its_code = UnicodeCol(default=None) user = ForeignKey('User', default=None) notes = MultipleJoin('StudentNote') created = DateTimeCol(default=datetime.now) def _set_email_address(self, email_address): if self.user is not None: self.user.email_address=email_address self._SO_set_email_address(email_address) @classmethod def from_user(cls, user): "returns the student corresponding to the User parameter" r = list(cls.select(cls.q.userID==user.id)) if len(r) != 1: return None return r[0] @staticmethod def search(lastname_or_code_fragment): """returns a list of students where the code or name contains the parameter""" lastname_or_code_fragment = safe_str(lastname_or_code_fragment) results = Student.select(OR(func.upper(Student.q.lastname).contains(lastname_or_code_fragment.upper()), func.upper(Student.q.code).contains(lastname_or_code_fragment.upper()) ), orderBy=Student.q.lastname) return list(results) def _is_current_property(self): """returns True if student is current, False otherwise. Being current means either has current StudentDegree or has current StudentModule. Warning: slow""" #first try to fill using sits q1 = StudentDegree.all(student=self, only_current=True, query_only=True) if q1.count() > 0: return True q2 = StudentModule.all(student=self, only_current=True, query_only=True) if q2.count() > 0: return True return False is_current = property(_is_current_property) def _has_submitted_reports(self): clause = trueClause clause = AND(clause, Report.q.studentID==self.id) clause = AND(clause, Report.q.submitted!=None) return (Report.select(clause).count())>0 has_submitted_reports = property(_has_submitted_reports) def degree_codes(self, year_id=None): """returns a comma-separated list of codes of current degrees for the specified year followed by SITS level code in brackets. If no year specified, works for the current year. NB: assumes that degree codes are recorded in StudentDegree data under route_code""" if year_id is None: #work with the current year year_id = Year.current_year().id lines = StudentDegree.get_current(student=self, year_id=year_id) degree_codes=",".join([line.route_code+"("+line.level_code+")" for line in lines]) return degree_codes @classmethod def find_or_create(cls,code,firstname,lastname): "returns a student with the code or creates and returns a new one if non exists. Returns None if an error occurs. Also returns True if object is new, False if not." try: return Student.byCode(code), False except SQLObjectNotFound: try: return Student(code=code,firstname=firstname,lastname=lastname), True except Exception, e: log.error("Exception in attempting to create Student: %s" % str(e)) except Exception, e: log.error("Undexpected exception when searching for a Student: %s" % str(e)) return None, None @staticmethod def all(for_paginate=False, dept=None, only_current=False): """returns list of all students sorted by lastname. Limit: when param only_current==True, relies on StudentDegree so will not return students that don't feature there. I.e. 'only_current' means has a current degree assignment. param dept relies on StudentDegree and DeptDegree -- it means get all students on a degree related by DeptDegree to specified dept""" if dept is None and only_current==False: q = Student.select() else: #either only_current or a specific dept is requested student_degrees = StudentDegree.all(dept=dept, only_current=only_current) student_ids = [str(sd.student.id) for sd in student_degrees] if student_ids is not None and len(student_ids)>0: q = Student.select(IN(Student.q.id,student_ids)) else: #no students found in dept q=Student.select(falseClause) q = q.orderBy((Student.q.lastname,Student.q.firstname)) if for_paginate is True: return q else: return list(q) @staticmethod def all_userless(only_current=True): """return a list of all CURRENT (unles only_current==False) students with no user account for reporttool. Current means 'has current degree'""" students = Student.all(only_current=only_current) return [s for s in students if s.user == None ] @staticmethod def all_userless_by_module(only_current=True): """returns a list of all CURRENT (unles only_current==False) students with no user account for reporttool. Current means 'has current module assignment'""" lines = StudentModule.all(only_current=only_current) students = set() for l in lines: students.add(l.student) #filter to only those without user accounts return [s for s in students if s.user is None] @staticmethod def which_exist(student_codes): "returns list of students which have codes in the list provided" if student_codes is None or len(student_codes ) < 1: #causes syntax error in query below return [] return list(Student.select(IN(Student.q.code, student_codes))) #-------- def module_group(self,module,year,term): "returns the module_group this student is in for the module/term/year. Assumes max 1" groups=module.groups(year=year,term=term) for group in groups: if self in group.students: return group #no group found return None def _personal_tutors(self): return PersonalTutor.find(student=self) personal_tutors = property(_personal_tutors) def _get_from_student_degree(self,attr_name): """method used to create a properties that are actually properties of StudentDegree""" sprs = self._get_student_degrees() if sprs is None or len(sprs) < 1: log.debug("No %s found for student %i because no matching student_degrees." % (attr_name,self.id)) return None findings = {} #keys are dates so we can get the attr from the most recent line for spr in sprs: value = getattr(spr,attr_name) if value is not None: if spr.updated is not None: findings[spr.updated]=value else: findings[spr.created]=value if len(findings) >0: #return attr from the most recently updated or created StudentDegree return findings[max(findings.keys())] else: log.debug("No %s found for student %i despite %i matching student_degrees." % (attr_name,self.id,len(sprs))) return None def _get_student_degrees(self): sprs = list(StudentDegree.select(StudentDegree.q.studentID==self.id)) return sprs student_degrees = property(_get_student_degrees, doc='entries for this student from the sits spr table') def _has_student_degrees(self): return StudentDegree.select(StudentDegree.q.studentID==self.id).count()>0 has_student_degrees = property(_has_student_degrees, doc='True if there are entries for this student from the sits spr table') def _has_exam_marks(self): return ExamMark.select(ExamMark.q.student_fID==self.id).count()>0 has_exam_marks = property(_has_exam_marks, doc="True if there are exam marks for this student in the Exam Marks table") def exam_marks(self): q = ExamMark.select(ExamMark.q.student_fID==self.id) return list(q) def notes(self): "returns StudentNotes on this student" return list(StudentNote.select(StudentNote.q.studentID==self.id,orderBy=StudentNote.q.date)) def __str__(self): return self.firstname+" "+self.lastname _url_base = '/student/' class StudentNote(FelicitySQLObject): """Note on a student, may be entered on the web or sent by email. If sent by email, sender should manually confirm that the email has been correctly recognised.""" student = ForeignKey('Student',notNone=True) staff = ForeignKey('Staff',default=None) #if staff is None, note was filed by the student def _author(self): """returns either staff or student depending who the author is""" if self.staff is not None: return self.staff else: return self.student author = property(_author) confirmed = DateTimeCol(dbName="confirmed_h",default=None) is_confirmed = property(lambda self: self.confirmed is not None) sender = UnicodeCol(default=None) recepient = UnicodeCol(default=None) subject = UnicodeCol(default=None) body = UnicodeCol(default=None) date = DateTimeCol(default=datetime.now) parent_note = ForeignKey('StudentNote', default=None) def _child_notes(self): return list(StudentNote.select(StudentNote.q.parent_noteID==self.id)) child_notes = property(_child_notes) def confirm(self): self.confirmed = datetime.now() class FileItRecord(FelicitySQLObject): """Records operation of the FileIt module--which emails have been processed etc. Two types of record: where email has become StudentNote, id of StudentNote is recorded. Where email had error, error_msg is recorded.""" uid = UnicodeCol(notNone=True) #stores uid of message on server (if any) uid_account = UnicodeCol(notNone=True) #stores email account this msg obtained from (if any) student_note = ForeignKey('StudentNote',default=None) error_msg = UnicodeCol(default=None) @staticmethod def get_uids(uid_account): recs = list(FileItRecord.select(FileItRecord.q.uid_account==uid_account)) uids = [rec.uid for rec in recs] return uids class Staff(FelicitySQLObject): """represents a member of staff.""" firstname = UnicodeCol() lastname = UnicodeCol() #code is used to link object to records in external data, e.g. SITS #useful for imports because SITS uses code for personal tutor etc, e.g. P0123821 code = UnicodeCol(alternateID=True) #its_code is code of object in another external system, used to link to ITS, e.g. pysdag its_code= UnicodeCol(default=None) created = DateTimeCol(default=datetime.now) user = ForeignKey('User', default=None) dept = ForeignKey('Dept', notNone=True) email_address = UnicodeCol(default=None) def _set_email_address(self, email_address): if self.user is not None: self.user.email_address=email_address self._SO_set_email_address(email_address) start_date = DateCol(default=lambda:datetime.date(datetime.now())) end_date = DateCol(default=None) def _has_login_account(self): return self.user != None has_login_account = property(_has_login_account) def _user_id(self): if self._has_login_account(): return self.user.id return "" user_id = property(_user_id) @classmethod def all(cls, dept=None, only_current=True, lastname=None): """return list of staff from specified dept (or all staff if dept is none). Ignores start_date in calculating whether staff are current. Sorts by lastname""" today = datetime.date(datetime.today()) clause=trueClause if only_current: clause = AND(clause, OR(Staff.q.end_date==None, Staff.q.end_date >= today)) if dept is not None: clause = AND(clause, Staff.q.deptID==dept.id) if lastname is not None: clause = AND(clause, Staff.q.lastname==lastname) return list(Staff.select(clause).orderBy((Staff.q.lastname,Staff.q.firstname))) @classmethod def from_its_code(cls, its_code): "returns staff with its_code or None if not found" try: return cls.select(cls.q.its_code==its_code)[0] except IndexError: return None @classmethod def from_lastname(cls, lastname, only_current=False): """returns none unless exactly one staff has lastname""" results = cls.all(lastname=lastname, only_current=only_current) if results is not None and len(results)==1: return results[0] return None @classmethod def from_user(cls, user): "returns the staff corresponding to the User parameter, or None if current User not related to any staff." q = Staff.select(Staff.q.userID==user.id) if q.count() != 1: return None return q[0] def get_name(self): return str(self.firstname)+" "+str(self.lastname) _url_base = '/staff/code/' def __str__(self): return self.firstname+" "+self.lastname class Module(FelicitySQLObject): name = UnicodeCol(notNone=True) #sits calls this 'Full name' #code is used to link to omr and sits data. SITS calls this 'module_code' code = UnicodeCol(alternateID=True,unique=True,notNone=True) _dept_key = ForeignKey('Dept',default=None,dbName='dept_id') #used by dept fake property created = DateTimeCol(default=datetime.now) updated = DateTimeCol(default=datetime.now,notNone=True,dbName='updated_') #controlls whether students can join or switch module groups for this module allow_module_group_change = BoolCol(default=False, notNone=True) #code of department that owns this module (might be best to use foreign key to dept) department_code = UnicodeCol(default=None,) #attempt to work out dept when a module is created def _set_code(self,module_code): self._SO_set_code(module_code) #attempt to set _dept_key if self._dept_key is None: dept = Module._dept_from_module_code(module_code) if dept is not None: self._dept_key = dept def _set_department_code(self, department_code): self._SO_set_department_code(department_code) #attempt to set _dept_key if department_code is not None and self._dept_key is None: dept_q = Dept.select(Dept.q.code==department_code) if dept_q.count() == 1: self._dept_key = list(dept_q)[0] @staticmethod def _dept_from_module_code(code): """attempts to determine a department from the module code. Currently does this by taking initial letters of module code then searching for a dept whose code is those letters""" n = 0 dept_code ="" # while n 0: dd[0].destroySelf() class Programme(FelicitySQLObject): """A programme is like a degree but may include several degree codes, e.g. PPE. ReportTool programmes simply as clusters of degrees to simplify admin tasks""" code = UnicodeCol(alternateID=True,notNone=True) name = UnicodeCol(alternateID=True,notNone=True) created = DateTimeCol(default=datetime.now) def all_degrees(self): """returns all degress in programme""" degree_ids = [x.degreeID for x in ProgrammeDegree.select(ProgrammeDegree.q.programmeID==self.id)] if len(degree_ids)==0: return [] return list(Degree.select(IN(Degree.q.id, degree_ids))) @staticmethod def create(code, name): return Programme(code=code, name=name) def __str__(self): return self.code +" "+self.name class DeptProgramme(FelicitySQLObject): """specifies that a particular programme is in a particular dept""" programme = ForeignKey('Programme', notNone=True) dept = ForeignKey('Dept', notNone=True) dept_degree_index = DatabaseIndex('programme', 'dept', unique=True) @staticmethod def create(dept, programme): """raises an exception if not possible -- e.g. because would be a duplicate""" return DeptProgramme(dept=dept, programme=programme) @staticmethod def destroy(dept, programme): """remove the association -- silently do nothing if none existed""" dp = DeptProgramme.select(AND(DeptProgramme.q.programmeID==programme.id, DeptProgramme.q.deptID==dept.id)) if dp.count() > 0: dp[0].destroySelf() class ProgrammeDegree(FelicitySQLObject): """specifies that a particular degree is part of a particular programme""" programme = ForeignKey('Programme', notNone=True) degree = ForeignKey('Degree', notNone=True) programme_degree_index = DatabaseIndex('degree', 'programme', unique=True) @staticmethod def create(programme, degree): """raises an exception if not possible -- e.g. because would be a duplicate""" return ProgrammeDegree(programme=programme, degree=degree) @staticmethod def destroy(programme, degree): """remove the association -- silently do nothing if none existed""" pd = ProgrammeDegree.select(AND(ProgrammeDegree.q.degreeID==degree.id, ProgrammeDegree.q.programmeID==programme.id)) if pd.count() > 0: pd[0].destroySelf() class StudentModule(FelicitySQLObject): class sqlmeta: table ='student_module' idName ='id' student = ForeignKey('Student',notNone=True) #this is the code of the student in some external system, e.g. Warwick's OMR student_code = UnicodeCol(notNone=True) #like student_code but also contains info about degree (can use for linking to StudentDegree) spr_code = UnicodeCol(default=None) firstname = UnicodeCol() lastname = UnicodeCol() #fubar property -- used for recovery in case things go wrong, may also be used # when displaying to save a join year_f = UnicodeCol() year = ForeignKey('Year',notNone=True) module = ForeignKey('Module',notNone=True) module_code = property(lambda self:self.module.code) module_name = property(lambda self:self.module.name) module_code_f = UnicodeCol(default=None) #the dataset this record was uploaded as part of. If linked to a dataset, # it can be deleted when the same dataset is next uploaded without this record in it dataset = ForeignKey('UploadDataSet', default=None) #maybe_withdrawn is true if there's reason to think the student may not be taking this module after all maybe_withdrawn = BoolCol(notNone=True, default=False) def _set_module(self,m): self._SO_set_module(m) if self.module_code_f is None: try: self.module_code_f=m.code #set recovery property except Exception, e: log.error("exception setting module_code_f: %s" % str(e)) cats = FloatCol() assessment_code = UnicodeCol() year_of_study = UnicodeCol(notNone=True) degree_code = UnicodeCol(notNone=True) def _degree(self): code = self.degree_code if code is None: return None return Degree.from_code(code) degree = property(_degree) status = UnicodeCol(default=None) created = DateTimeCol(default=datetime.now) updated = DateTimeCol(default=datetime.now, notNone=True, dbName='updated_') def may_delete(self): """returns True or False. Call this before removing an StudentModule""" #check not linked to any seminar groups #TODO use EXISTS nof_seminar_groups = ModuleGroupStudent.select(ModuleGroupStudent.q.student_moduleID==self.id).count() if nof_seminar_groups > 0: return False #check not linked to any reports #TODO for efficiency change to "select exists (select 1 from report where student_module_id=10);" nof_reports = Report.select(Report.q.student_moduleID==self.id).count() if nof_reports > 0: return False return True @classmethod def all(cls, module=None, student=None, year_id=None, spr_code=None, only_current=False, query_only=False, maybe_withdrawn=None, dept=None, clause=trueClause, modules=None): "returns all StudentModule objects." if year_id is None: year_id = Year.current_year().id if only_current: clause = AND(clause, StudentModule.q.yearID==year_id) if module is not None: clause = AND(clause, StudentModule.q.moduleID==module.id) if student is not None: clause = AND(clause, StudentModule.q.studentID==student.id) if spr_code is not None: clause = AND(clause, StudentModule.q.spr_code==spr_code) if dept is not None: #identify StudentModule in Dept by selecting students on modules associated with this Dept modules = Module.all(dept=dept) module_ids = [module.id for module in modules] if module_ids is None or len(module_ids)<1: #there are no modules in dept so there are no students taking modules in this dept #so query returns nothing clause = AND(clause, falseClause) else: clause = AND(clause, IN(StudentModule.q.moduleID, module_ids)) if maybe_withdrawn is not None: clause = AND(clause, StudentModule.q.maybe_withdrawn==maybe_withdrawn) if query_only: return StudentModule.select(clause) else: return list(StudentModule.select(clause)) @staticmethod def modules_with_students(dept, year=None, result_as_dict=False ): """returns list of modules in dept that have students in the current year (or the specified year). If result_as_dict, return a dictionary with module_id as key and nof students as value""" clause = trueClause if year is None: year_id = Year.current_year().id else: year_id = year.id #limit to modules in department modules = Module.all(dept=dept) module_ids = [m.id for m in modules] #select module_id, count(student_id) from student_module group by module_id q = """select module_id, count(student_id) from %(tableName)s where year_id = %(year_id)s and %(in_clause)s group by module_id;""" in_clause = IN(StudentModule.q.moduleID, module_ids) q = q % dict(tableName=StudentModule.q.tableName, year_id=year_id, in_clause=in_clause) results = StudentModule._connection.queryAll(q) if not result_as_dict: #return list of modules return [Module.get(row[0]) for row in results] #return result as dictionary d = {} for row in results: d[row[0]] = row[1] return d @staticmethod def all_maybe_withdrawn(dept, year): q = StudentModule.all(dept=dept, year_id=year.id, maybe_withdrawn=True, query_only=True) return q.orderBy([StudentModule.q.lastname, StudentModule.q.firstname]) @staticmethod def mark_maybe_withdrawn(student_degrees, year): spr_codes = [str(s.spr_code) for s in student_degrees] #str becuase no unicode allowed q = StudentModule.select(AND(StudentModule.q.yearID==year.id, AND(StudentModule.q.maybe_withdrawn!=True, IN(StudentModule.q.spr_code, spr_codes)))) for line in q: q.maybe_withdrawn=True @classmethod def update_or_create(cls, year, student, module, degree_code, **kw): """either creates new StudentModule or updates and existing one. matches if year, student, degree_code and module match. Silently ignores any values in kw that are not persistent attributes. Also returns True if object is new, False if it is old.""" #clean kw kw = _remove_non_persistent_items(cls, kw) try: clause = AND(StudentModule.q.studentID==student.id, StudentModule.q.degree_code==degree_code, StudentModule.q.moduleID==module.id, StudentModule.q.yearID==year.id) if 'spr_code' in kw: spr_code = kw['spr_code'] clause = AND(clause, StudentModule.q.spr_code==spr_code) student_module = list(StudentModule.select(clause))[0] except IndexError: #nothing found--create new one #first fill in any essential values for creation if 'student_code' not in kw: kw['student_code'] = student.code if 'firstname' not in kw: kw['firstname'] = student.firstname if 'lastname' not in kw: kw['lastname'] = student.lastname if 'year_f' not in kw: kw['year_f'] = year.name if 'cats' not in kw: kw['cats'] = 999 for name in ('assessment_code', 'year_of_study'): if name not in kw: kw[name] = "unknown" return StudentModule(year=year, student=student, module=module, degree_code=degree_code, **kw), True #omr line exists, update it student_module.update(degree_code=degree_code, **kw) #don't need kws that can't be updated return student_module, False def update(self, **kw): """updates a record. silently ignores kw that are not persistent attributes. """ kw = _remove_non_persistent_items(self, kw) unmodifiable_attrs = ('spr_code','student','module','degree', 'year') for attr_name in unmodifiable_attrs: if kw.has_key(attr_name): self_value = getattr(self,attr_name) called_value = kw[attr_name] if self_value != called_value: msg = "StudentModule.update called with wrong %s (called with '%s', should have been '%s')." % (attr_name, called_value, self_value) log.debug(msg) raise ModelError, msg is_updated = False for attr_name in kw.keys(): if attr_name in unmodifiable_attrs: continue #don't update these old_value = getattr(self, attr_name) new_value = kw[attr_name] if old_value != new_value: is_updated = True setattr(self,attr_name,new_value) if is_updated: self.updated=datetime.now() @staticmethod def modules_for_student(student, student_degree=None, only_current=False): """returns list of pairs of year names and StudentModules for specified student, ordered by year and then module code. If student_degree is specified, only returns modules associated with the sits_spr_degree code """ if student_degree==None: lines = StudentModule.all(student=student, only_current=only_current) else: lines = StudentModule.all(student=student, only_current=only_current, spr_code = student_degree.spr_code) years = [] for line in lines: if line.year not in years: years.append(line.year) years.sort(key=lambda x: x.ordering, reverse=True) results = [] for year in years: q = StudentModule.select(AND(StudentModule.q.studentID==student.id, StudentModule.q.yearID==year.id), orderBy=StudentModule.q.module_code_f) results += [(year.name, list(q))] return results #the following are used to make functions for setting default values. def _next_ordering(obj): """where param obj has an ordering attribute, this will get the least value of ordering not yet used.""" conn = obj._connection table_name = obj.q.tableName max_ordering_tuple = conn.queryAll("SELECT MAX(ordering) FROM %s ;" % table_name) result = max_ordering_tuple[0][0] if result is None: return 1 else: return result+1 def year_next_ordering(): return _next_ordering(Year) def term_next_ordering(): return _next_ordering(Term) def component_next_ordering(): return _next_ordering(Component) def _remove_duplicates(list): return [x for x in set(list)] def _current(sql_cls): result = list(sql_cls.select(sql_cls.q.current==True)) if len(result) != 1: return None return result[0] def _current_year_id(): return _current(Year).id def _current_term_id(): return _current(Term).id class Year(FelicitySQLObject): name = UnicodeCol(alternateID=True,unique=True,notNone=True) start_date = DateCol(notNone=True) end_date=DateCol(default=None) ordering = IntCol(default=year_next_ordering,unique=True) current = BoolCol(default=False) @classmethod def from_name(cls,name): "returns a year with name name or None if none found" try: return Year.byName(name) except SQLObjectNotFound, e: return None @staticmethod def current_year(): "returns the current academic year" return _current(Year) @staticmethod def all_exist(names): "returns true if there is a year for every name in names" for name in names: try: Year.byName(name) except SQLObjectNotFound: return False return True @staticmethod def all(): "returns all years, ordered appropriately" return list(Year.select(orderBy=Year.q.ordering)) def previous(self): "returns previous year, or None" q = Year.select(Year.q.ordering < self.ordering, orderBy=Year.q.ordering) nof = q.count() if nof == 0: return None return q[nof-1] def next(self): "returns next year, or None" q = Year.select(Year.q.ordering > self.ordering, orderBy=Year.q.ordering) nof = q.count() if nof == 0: return None return q[0] def __str__(self): return self.name class Term(FelicitySQLObject): name = UnicodeCol(alternateID=True,unique=True,notNone=True) ordering = IntCol(default=term_next_ordering,unique=True) current = BoolCol(default=False) @classmethod def from_name(cls,name): "returns a term with name name or None if none found" try: return Term.byName(name) except SQLObjectNotFound, e: return None @staticmethod def current_term(): return _current(Term) @staticmethod def last_term(): conn = Term._connection max_ordering_tuple = conn.queryAll("SELECT MAX(ordering) FROM %s ;" % Term.q.tableName) max_ordering = max_ordering_tuple[0][0] return Term.select(Term.q.ordering == max_ordering)[0] @staticmethod def first_term(): conn = Term._connection min_ordering_tuple = conn.queryAll("SELECT MIN(ordering) FROM %s ;" % Term.q.tableName) min_ordering = min_ordering_tuple[0][0] return Term.select(Term.q.ordering == min_ordering)[0] @staticmethod def previous_year_term(year=None, term=None): """returns the year and term prior to the specified one""" if year is None: year = Year.current_year() if term is None: term = Term.current_term() previous_terms = Term.select(Term.q.ordering < term.ordering) if previous_terms.count()>0: previous_term = previous_terms[-1] previous_year = year else: #there are no previous terms, return previous year and last term previous_year = year.previous() previous_term = Term.last_term() return (previous_year, previous_term) @staticmethod def all(): "returns all currently in use terms, ordered appropriately" return list(Term.select(orderBy=Term.q.ordering)) def all_so_far(self): "returns all the terms upto and including self" return list(Term.select(Term.q.ordering<=self.ordering, orderBy=Term.q.ordering)) def __str__(self): return self.name+" Term" class ModuleLeader(FelicitySQLObject): """assigns a staff to lead a module for a term, usually module's lecturer. Although this is not enforced, software assumes max 1 module leader per module, term and year.""" staff = ForeignKey('Staff', notNone=True) module = ForeignKey('Module', notNone=True) year = ForeignKey('Year',default=_current_year_id, notNone=True) term = ForeignKey('Term',default=_current_term_id, notNone=True) created = DateTimeCol(default=datetime.now) @staticmethod def update_or_create(staff_id,module_id,year_id,term_id): "updates staff if there's already a record for this module/year/term. Will delete if staff_id is None and the record exists. If staff_id is None and the record does not exist, will return None without doing anything." current = list(ModuleLeader.select(AND(ModuleLeader.q.moduleID==module_id, ModuleLeader.q.yearID==year_id, ModuleLeader.q.termID==term_id))) if len(current)==0 and staff_id is None: return None #treat as request to delete record if any exists, and none exists if len(current)==0: return ModuleLeader(staff=staff_id, module=module_id, term=term_id, year=year_id) if len(current)==1: ml = current[0] if staff_id is None: ml.destroySelf() return None else: ml.staff=staff_id return ml raise ModelError, "Multiple ModuleLeaders assigned for the same term/year/module combination, %s/%s/%s" % (term_id,year_id,module_id) @staticmethod def modules_for_staff(staff, year, term): """return a list of all modules staff is teaching in year and term""" q = ModuleLeader.select(AND(ModuleLeader.q.staffID==staff.id, AND(ModuleLeader.q.yearID==year.id, ModuleLeader.q.termID==term.id))) return [ml.module for ml in list(q)] class PaymentType(FelicitySQLObject): """For calculating payment for making, each ModuleAssignment is assigned a PaymentType that then gets converted to cash when the claim is made. """ name = UnicodeCol(notNone=True) amount = FloatCol(notNone=True) class ModuleAssignment(FelicitySQLObject): """Lecturers create assignments for their modules. Their seminar group leaders can then report student's progress on the assignment. """ name = UnicodeCol(notNone=True) staff = ForeignKey('Staff', notNone=True) module = ForeignKey('Module', notNone=True) year = ForeignKey('Year', default=_current_year_id, notNone=True) term = ForeignKey('Term', default=_current_term_id, notNone=True) payment_type = ForeignKey('PaymentType', default=None) #-- indexes module_term_year_index = DatabaseIndex('module','term','year') #-- methods @staticmethod def create(name, staff, module, year, term, payment_type=None, **kw): q = ModuleAssignment.select(AND(ModuleAssignment.q.moduleID==module.id, AND(ModuleAssignment.q.name==name, AND(ModuleAssignment.q.termID==term.id, ModuleAssignment.q.yearID==year.id)))) if q.count() > 0: raise ModelError, "attempt to create duplicate module assignment" return ModuleAssignment(name=name, staff=staff, module=module, year=year, term=term, payment_type=payment_type, **kw) @staticmethod def assignments_for_module(module, term, year): q = ModuleAssignment.select(AND(ModuleAssignment.q.moduleID==module.id, AND(ModuleAssignment.q.termID==term.id, ModuleAssignment.q.yearID==year.id))) return list(q) class ModuleAssignmentStudent(FelicitySQLObject): """records the fact that a student has completed a ModuleAssignment, (or not completed it)""" #header fields module_assignment = ForeignKey('ModuleAssignment', notNone=True) student = ForeignKey('Student', notNone=True) staff_marker = ForeignKey('Staff', notNone=True) module_group = ForeignKey('ModuleGroup', notNone=True) module_f = ForeignKey('Module', notNone=True) term_f = ForeignKey('Term', notNone=True) year_f = ForeignKey('Year', notNone=True) #content fields marked = DateTimeCol(default=None) mark = FloatCol(default=None) late = BoolCol(notNone=True, default=False) comments = UnicodeCol(notNone=True, default="") #-- indexes module_assignment_index = DatabaseIndex('module_assignment') staff_marker_index = DatabaseIndex('staff_marker') #-- methods @staticmethod def create(module_assignment, student, **kw): module = module_assignment.module term = module_assignment.term year = module_assignment.year module_group = ModuleGroup.groups_for_student(student, module, term, year) staff_marker = module_group.staff return ModuleAssignmentStudent(module_assignment=module_assignment, student=student, term=term, year=year, module_group=module_group, staff_marker=staff_marker, **kw) class ModuleGroup(FelicitySQLObject): """sub-group of a module--tutorial or seminar group. Use a special method for creating (because I can't work out where the hook for multi-column validation is.""" staff = ForeignKey('Staff') name = UnicodeCol(notNone=True) time = UnicodeCol(default="") venue = UnicodeCol(default="") max_students = IntCol(default=None) module = ForeignKey('Module',notNone=True) module_code_f = UnicodeCol(default="") module_name_f = UnicodeCol(default="") students = RelatedJoin('Student', intermediateTable='module_group_student', createRelatedTable=False, orderBy=('lastname', 'firstname')) #list of students in this group year = ForeignKey('Year') #,default=_current_year_id term = ForeignKey('Term') #,default=_current_term_id created = DateTimeCol(default=datetime.now) #these properties will be duplicated (excludes students) properties_to_duplicate = ('staff', 'name', 'time', 'venue', 'max_students', 'module') #override addStudent so that StudentModule record is added as well def addStudent(self, student, ignore_max=False): """overrides built in method. If student is not registered for this module (no StudentModule), this will also register the student for the module""" #check not too many students if not ignore_max: if self.max_students is not None and len(self.students) >= self.max_students: raise ModelError, "Group %s is full." % self #create the record module = self.module year_id = self.yearID student_modules = StudentModule.all(module=module, student=student, year_id=year_id) if len(student_modules) > 0: student_module = student_modules[0] else: student_module, is_new = StudentModule.update_or_create(year=self.year, student=student, module=module, degree_code="") ModuleGroupStudent(module_group=self, student=student, student_module=student_module) #-- methods for controlling updatesSQLObject def _set_module(self,m): self._SO_set_module(m) #set recovery properties m = self.module if m is not None: #helps debug if self.module_code_f is None or self.module_code_f == "": self.module_code_f = m.code if self.module_name_f is None or self.module_name_f == "": self.module_name_f = m.name #-- general methods def _nof_students(self): q = "select count(student_id) from module_group_student where module_group_id=%(module_group_id)s;" q = q % dict(module_group_id=self.id) results = ModuleGroup._connection.queryAll(q) return results[0][0] nof_students = property(_nof_students) def is_full(self): """returns true if the group does not have at least as many students as max_students. Returns true if max_students is None. Students can sign up to a group that is not full""" if self.max_students is None: return True if self.students is None and self.max_students > 0: return False return self.nof_students >= self.max_students full = property(is_full) @staticmethod def create(**kw): "always use this method to create--it supplies validation" mg, novel= ModuleGroup.update_or_create(fail_if_exists=True, **kw) return mg @staticmethod def all(module=None, term=None, year=None, staff=None, only_current=True, clause=trueClause, only_open_to_students=False, query_only=False): """Used by Report to work out who a student's seminar group leader is. term_id and year_id are ignored unless only_current is true. raise Exception, "this is untested, esp. the student clause".""" if term is None: term = Term.current_term() if year is None: year = Year.current_year() if only_current: clause = AND(clause, ModuleGroup.q.termID==term.id, ModuleGroup.q.yearID==year.id) if module is not None: clause = AND(clause, ModuleGroup.q.moduleID==module.id) if staff is not None: clause = AND(clause, ModuleGroup.q.staffID==staff.id) if only_open_to_students: clause = AND(clause, ModuleGroup.q.max_students is not None) clause = AND(clause, ModuleGroup.q.max_students > 0 ) q = ModuleGroup.select(clause, orderBy=ModuleGroup.q.name) if query_only: return q return list(q) @staticmethod def group_for_student(student, module, term=None, year=None): """for finding which ModuleGroup a student is in. Returns None if cannot find one. Assumes that a student is never in more than one module group per module/term/year""" if term is None: term = Term.current_term() if year is None: year = Year.current_year() mgs = ModuleGroup.all(module=module, term=term, year=year, only_current=True) for mg in mgs: if student in mg.students: return mg return None @staticmethod def groups_for_student(student, term=None, year=None): """returns all module groups a student is for term and year (which default to the current term and year). Assumes that the table joining ModuleGroup to Student is module_group_student . """ #first get all module groups student is member of q = "select module_group_id from module_group_student where student_id=%(student_id)s;" q = q % dict(student_id = student.id) results = ModuleGroup._connection.queryAll(q) all_module_groups = [row[0] for row in results] if len(all_module_groups)==0: return [] #no current module groups #now get all module groups that are current for term and year if term is None: term = Term.current_term() if year is None: year = Year.current_year() q = ModuleGroup.select(AND(IN(ModuleGroup.q.id, all_module_groups), AND(ModuleGroup.q.yearID==year.id, ModuleGroup.q.termID==term.id))) return list(q) @staticmethod def update_or_create(name, module, term=None, year=None, fail_if_exists=False, **kw): "returns object plus True if novel or False otherwise. year and term default to current" if year is None: year = _current_year_id() if term is None: term = _current_term_id() name_id, module_id= _get_id(name), _get_id(module) term_id, year_id =_get_id(term), _get_id(year) #groups with this combination of name,module, term and year q = ModuleGroup.select(AND(ModuleGroup.q.name==name, ModuleGroup.q.moduleID==module_id, ModuleGroup.q.termID==term_id, ModuleGroup.q.yearID==year_id)) num_exist = q.count() if num_exist==0: return ModuleGroup(name=name, module=module, term=term, year=year, **kw), True if num_exist > 1: raise ModelError, "duplicate groups with this name already exists (which should never happen)" if num_exist > 0: #module group exists if fail_if_exists: raise ModelError, "a module group already exists with that name" #do the update existing_group = list(q)[0] for key in kw: setattr(existing_group,key,kw[key]) return existing_group, False def duplicate(self, term, year): """duplicates this group, including students, for the specified term and year""" kw = dict(term=term, year=year) for attr_name in self.properties_to_duplicate: kw[attr_name] = getattr(self, attr_name) new_group = ModuleGroup(**kw) #now add all students for student in self.students: new_group.addStudent(student, ignore_max=True) return new_group def _url_join(self): "url for student to join this module group" return tg.url('/join_module_group/%s' % self.id) url_join = property(_url_join) def __str__(self): return self.name _url_base = '/module_group/id/' class ModuleGroupStudent(FelicitySQLObject): class sqlmeta: #nb do not delete -- this is the join table for ModuleGroup.students table = "module_group_student" student = ForeignKey('Student', notNone=True) module_group = ForeignKey('ModuleGroup', notNone=True) #this is included to avoid deleting module assigments where student is in module group student_module = ForeignKey('StudentModule', notNone=True) class UserAction(FelicitySQLObject): "records actions such as creating reports, logging in etc" user = ForeignKey('User') item = UnicodeCol() #name of class of item modified or created item_id = IntCol() #id of object modified description = UnicodeCol() #not an SQLObject, pickled in ReportFormat.fields class ReportField(object): """Defines a field in a ReportFormat or a FeedbackFormat object. Param field_type can be: text, slider, select-one, select-many, heading Param options is the options for select Param label_above == True means that the label appears above the input box""" def __init__(self, name="", field_type="text", size=1, options={}, hide_name=False, slider_left_text=None, slider_right_text=None, slider_steps=5, label_above=False): self.name=name if name=="" and slider_right_text is not None: self.name = "%s -> %s" % (slider_left_text, slider_right_text) self.field_type=field_type #field_type may be "text", "integer", "select-one" or "select-many" or "slider" or "heading" self.size=size #nof rows for text fields self.options=options #list of text labels for field_type=="select-one" or "select-many" or "slider" self.hide_name=hide_name self.slider_left_text=slider_left_text self.slider_right_text=slider_right_text self.slider_steps=slider_steps self.label_above = label_above class ReportFormatDept(FelicitySQLObject): """defines ReportFormats used in a department including a default one""" report_format = ForeignKey('ReportFormat', notNone=True) dept = ForeignKey('Dept', notNone=True) is_default = BoolCol(default=False) @staticmethod def for_dept(dept): """return default ReportFormat for dept, if any""" q = ReportFormatDept.select(AND(ReportFormatDept.q.deptID==dept.id, ReportFormatDept.q.is_default==True)) if q.count()==1: return q[0] class ReportFormat(FelicitySQLObject): """defines a format for reports. fields should be a list of ReportField, they appear in order.""" name = UnicodeCol(notNone=True) fields = PickleCol() #a list of ReportField objects @staticmethod def all(dept=None): """"result ordered by name. Dept is currently ignored.""" return list(ReportFormat.select(orderBy=ReportFormat.q.name)) class FeedbackFormatDept(FelicitySQLObject): """defines ReportFormats used in a department including a default one""" feedback_format = ForeignKey('FeedbackFormat', notNone=True) dept = ForeignKey('Dept', notNone=True) is_default = BoolCol(default=False) @classmethod def for_dept(cls, dept): """return default FeedbackFormat for dept, if any""" q = cls.select(AND(cls.q.deptID==dept.id, cls.q.is_default==True)) if q.count()==1: return q[0] class FeedbackFormat(FelicitySQLObject): """defines a format for feedback forms. fields should be a list of ReportField (yes, despite the name), they appear in order.""" name = UnicodeCol(notNone=True) fields = PickleCol() #a list of ReportField objects @staticmethod def all(dept=None): """"result ordered by name. Dept is currently ignored.""" return list(ReportFormat.select(orderBy=ReportFormat.q.name)) def _create_default_content(format): """creates a default content object for FeedbackFormat or ReportFormat. Requires that format.fields be a list of ReportField objects""" content = [] for rf in format.fields: data_to_add = "" if rf.field_type=="slider": #where slider, defaults to midpoint of scale data_to_add = (rf.slider_steps+1)/2 content.append(data_to_add) return content class StudentComment(object): """comment by a student on a report. text is the text of the comment, name is the name of the person who made it (which should be a string). NB: this is not a persistent object, it is persisted as a list under the 'student_comments' attribute of Report""" def __init__(self, text, name, created=datetime.now()): self.text = text self.name = name self.created = created def _created_formatted(self): if self.created is None: return "" return self.created.strftime("%d %b %Y %H:%M") created_formatted = property(_created_formatted) class ReportOrFeedback(FelicitySQLObject): """abstract base class for Report and Feedback which have many fields and methods in common. NB: There is no table corresponding to this class, it should not be persisted.""" #-- update functions def _init(self, *args, **kw): """called after object retrived or inserted""" is_new_object = False if not hasattr(self, 'id') or self.id is None: is_new_object = True SQLObject._init(self, *args, **kw) if not is_new_object: return #don't attempt to fill in these fields unless new object if self.lecturerID is None: l = self.module.leader(term=self.term, year=self.year) if l is not None: self.lecturerID = l.id if self.seminar_group_leaderID is None: sgl = ModuleGroup.group_for_student(student=self.student, module=self.module, term=self.term, year=self.year) if sgl is not None: self.seminar_group_leaderID = sgl.staffID #-- fake properties def _degree_code(self): if self.degree_code_f is not None: return self.degree_code_f if self.degree is None: return "" #set fubar property for later use self.degree_code_f = self.degree.code return self.degree.code degree_code=property(_degree_code) def _degree_name(self): if self.degree is None: return "" return self.degree.name degree_name = property(_degree_name) def _submitted_formatted(self): if self.submitted is None: return "" return self.submitted.strftime("%d %b %Y %H:%M") submitted_formatted = property(_submitted_formatted) def _approved_formatted(self): if self.approved is None: return "" return self.approved.strftime("%d %b %Y %H:%M") approved_formatted = property(_approved_formatted) def submit(self): self.submitted = datetime.now() def get_module_lecturer(self): """returns module lecturer in term module was taken, if known. If no or multiple assignments, 'unknown' is returned. NB does not return None""" leader = self.lecturer if leader is not None: return "%s, %s" % (leader.lastname, leader.firstname) return 'unknown' def get_seminar_tutor(self): """returns seminar tutor in term module was taken, if known. If no or multiple assignments, 'unknown' is returned. Does not return None.""" result = self.seminar_group_leader if result is None: return 'unknown' return "%s, %s" % (result.lastname, result.firstname) seminar_group_leader_str = property(get_seminar_tutor) @classmethod def all(cls, clause=trueClause, module=None, author=None, student=None, students=None, dept_or_programme=None, not_in_dept_or_programme=None, limit_to_modules_in_dept=None, only_submitted=False, only_unsubmitted=False, only_approved=False, only_unapproved=False, term=None, year=None, only_current=True, query_only=False, ids_only=False, start_date=None, order_by=('degree_code_f','lastname_f', 'firstname_f', 'module_code_f')): """Param dept_or_programme restricts to students on degrees in dept (according to DeptDegree), param not_in_dept_or_programme restricts to students not on degrees in dept (or in programme). param limit_to_modules_in_dept requires a Dept; if set, will restrict results to modules in that Dept. The clause parameter allows you to add conditions. Sorts by degree then by student lastname, firstname, module by default (unless param order_by is specified). Uses fubar properties for sorting by default. param ids_only returns a list of ids, this list is not sorted. param ids_only cannot be combined with query_only. Not all params are appropriate for Feedback (e.g. only_approved makes no sense).""" if term is None: term = Term.current_term() else: #term specified so must also have only_current (even if explicity set to False) only_current = True if year is None: year = Year.current_year() else: #year specified so must also have only_current (even if explicity set to False) only_current = True if only_submitted and only_unsubmitted: raise ModelError, "scripting error: only_submitted and only_unsubmitted both true--this query cannot return any results" if only_submitted: clause = AND(clause, cls.q.submitted!=None) if only_unsubmitted: clause = AND(clause, cls.q.submitted==None) if only_approved: clause = AND(clause, cls.q.approved!=None) if only_unapproved: clause = AND(clause, cls.q.approved==None) if only_current: clause = AND(clause, cls.q.yearID==year.id) clause = AND(clause, cls.q.termID==term.id) if module is not None: clause = AND(clause, cls.q.moduleID==module.id) if author is not None: clause = AND(clause, cls.q.authorID==author.id) if start_date is not None: clause = AND(clause, cls.q.submitted>=safe_str(start_date)) if student is not None: clause = AND(clause, cls.q.studentID==student.id) if students is not None: if len(students)==0: #no students so query will return no reports clause = AND(clause, falseClause) else: clause = AND(clause, IN(cls.q.studentID, [s.id for s in students])) if dept_or_programme is not None: #identify reports from dept_or_programme by finding degrees associated with dept_or_programme degrees = dept_or_programme.all_degrees() if len(degrees)>0: clause = AND(clause, IN(cls.q.degreeID, [d.id for d in degrees])) else: #no degrees so query will return nothing clause = AND(clause,falseClause) if not_in_dept_or_programme is not None: #this means: not in a degree associated with the dept_or programme degrees = not_in_dept_or_programme.all_degrees() if len(degrees)>0: clause = AND(clause, NOT(IN(cls.q.degreeID, [d.id for d in degrees]))) else: pass #no degrees associated with dept_or_programme so condition met if limit_to_modules_in_dept is not None: modules = Module.all(dept=limit_to_modules_in_dept) module_ids = [m.id for m in modules] if len(module_ids)>0: clause = AND(clause, IN(cls.q.moduleID, module_ids)) else: #no modules in dept so nothing to return clause = AND(clause, falseClause) if ids_only: expr = Select(cls.q.id, where=clause) q = _do_query(expr) #convert list of tuples of ids to list of ids q = [x[0] for x in q] else: #this is the normal case q = cls.select(clause, orderBy=order_by) if query_only: return q return list(q) class Report(ReportOrFeedback): """ Creating multiple reports is a very slow process because lots of database lookup is done per report. """ format = ForeignKey('ReportFormat',notNone=True) author = ForeignKey('Staff',notNone=True) student = ForeignKey('Student',notNone=True) #a list of StudentComments which are comments # misleading name--staff can leave these comments too student_comments = PickleCol(default=[], notNone=True) approved = DateTimeCol(default=None) viewed = DateTimeCol(default=None) #personal_tutor is staff who was personal tutor at time of report, NOT current personal tutor personal_tutor = ForeignKey('Staff', default=None) personal_tutor_name_f = UnicodeCol(default="") #fubar properties author_firstname_f = UnicodeCol(default=None) author_lastname_f = UnicodeCol(default=None) #-- GENERIC PROPERTIES COMMON TO FEEDBACK AND REPORT, DON'T CHANGE EXCEPT CHANGE ALL content = PickleCol(default=None) #a list of items corresponding to the fields in format module = ForeignKey('Module',notNone=True) degree = ForeignKey('Degree', default=None) term = ForeignKey('Term', default=_current_term_id, notNone=True) year = ForeignKey('Year', default=_current_year_id, notNone=True) student_module = ForeignKey('StudentModule') #may be null student_degree = ForeignKey('StudentDegree', default=None) #may be null lecturer = ForeignKey('Staff', default=None) #auto filled on create seminar_group_leader = ForeignKey('Staff', default=None) #auto filled on create submitted = DateTimeCol(default=None) created = DateTimeCol(default=datetime.now) #fubar recovery properties, also used for sorting student_code_f = UnicodeCol(default=None) firstname_f = UnicodeCol(default=None) lastname_f = UnicodeCol(default=None) module_code_f = UnicodeCol(default=None) module_name_f = UnicodeCol(default="") degree_code_f = UnicodeCol(default=None) term_name_f = UnicodeCol(default=None) #NB not working year_name_f = UnicodeCol(default=None) #NB not working def _set_degree(self, d): self._SO_set_degree(d) #set recovery properties d = self.degree if self.degree_code_f is None: if d is not None: #helps debug self.degree_code_f = d.code def _set_module(self, m): self._SO_set_module(m) #set recovery properties m = self.module if m is not None: #helps debug if self.module_code_f is None or self.module_code_f == "": self.module_code_f = m.code if self.module_name_f is None or self.module_name_f == "": self.module_name_f = m.name def _set_year(self,y): self._SO_set_year(y) #set recovery properties y = self.year if self.year_name_f is None: if y is not None: #helps debug self.year_name_f = y.name def _set_term(self,t): self._SO_set_term(t) #set recovery properties t=self.term if self.term_name_f is None: if t is not None: #helps debug self.term_name_f = t.name #-- END GENERIC SECTION #used for formatting (see felicity.report_out) _standard_headers = [("report id", 'id'), ("student code",'student.code'), ("firstname", 'student.firstname'), ("lastname",'student.lastname'), ("degree code", 'degree_code'), ("degree name", 'degree_name'), ("module code", 'module.code'), ("module name", 'module.name'), ("module lecturer", 'lecturer'), ("seminar group leader", 'seminar_group_leader_str'), ("personal tutor", 'personal_tutor_name_f'), ("term", 'term'), ("year", 'year'), ("report author", 'author')] _pdf_headers = [("Report Id", 'id'), ("Student Code",'student.code'), ("Student Name", 'student'), ("Degree", 'degree'), ("Module", 'module'), ("Module Lecturer", 'lecturer'), ("Seminar Group Leader", 'seminar_group_leader_str'), ("Personal Tutor", 'personal_tutor_name_f'), ("Term", 'term'), ("Year", 'year'), ("Report Author", 'author'), ('Date Submitted', 'submitted_formatted'), ('Date Approved', 'approved_formatted')] #-- update functions def _set_student(self,s): self._SO_set_student(s) #set recovery properties s = self.student if s is None: raise Exception, "Programming error--student is null" if self.student_code_f is None: self.student_code_f=s.code if self.firstname_f is None: self.firstname_f=s.firstname if self.lastname_f is None: self.lastname_f=s.lastname if self.personal_tutor is None: personal_tutors = PersonalTutor.find(student=s) if personal_tutors is not None and len(personal_tutors)>0: self.personal_tutor = personal_tutors[0] self.personal_tutor_name_f = safe_str(personal_tutors[0]) def _set_author(self,a): self._SO_set_author(a) #set recovery properties a=self.author if self.author_firstname_f is None: if a is not None: #helps debug self.author_firstname_f = a.firstname if self.author_lastname_f is None: if a is not None: #helps debug self.author_lastname_f = a.lastname #-- ordinary methods def approve(self): self.approved = datetime.now() def get_personal_tutor(self): """returns name of personal tutor of student in term module was taken, if known. Otherwise '' is returned.""" if self.personal_tutor_name_f is not None: return self.personal_tutor_name_f return "" @staticmethod def create(student, author, module, format, degree=None, student_degree=None, student_module=None): """create a report. auto searches for any StudentDegree and StudentModule links""" if student_module is None: lines = StudentModule.all(student=student, module=module, only_current=True) if len(lines)==1: student_module=lines[0] if student_degree is None: if student_module is not None and student_module.spr_code is not None: student_degree = StudentDegree.from_code(student_module.spr_code) else: lines = StudentDegree.all(student=student, only_current=True) if len(lines)==1: student_degree=lines[0] if degree is None: #slightly tricky question which source should take priority. No guarantee that they agree if student_module is not None and student_module.degree is not None: degree = student_module.degree elif student_degree is not None and student_degree.degree is not None: degree=student_degree.degree content = _create_default_content(format) return Report(student=student, author=author, module=module, format=format, degree=degree, content=content, student_degree=student_degree, student_module=student_module) def __str__(self): return "%s (%s)" % (self.id, self.author_lastname_f) _url_base = '/report/edit/' class Feedback(ReportOrFeedback): """Must specify student when creating feedback. lecturer and seminar_group_leader will be set automatically""" student = ForeignKey('Student') format = ForeignKey('FeedbackFormat',notNone=True) #-- GENERIC PROPERTIES COMMON TO FEEDBACK AND REPORT, DON'T CHANGE EXCEPT CHANGE ALL content = PickleCol(default=None) #a list of items corresponding to the fields in format module = ForeignKey('Module',notNone=True) degree = ForeignKey('Degree', default=None) term = ForeignKey('Term', default=_current_term_id, notNone=True) year = ForeignKey('Year', default=_current_year_id, notNone=True) student_module = ForeignKey('StudentModule') #may be null student_degree = ForeignKey('StudentDegree', default=None) #may be null lecturer = ForeignKey('Staff', default=None) #auto filled on create seminar_group_leader = ForeignKey('Staff', default=None) #auto filled on create submitted = DateTimeCol(default=None) created = DateTimeCol(default=datetime.now) #fubar recovery properties, also used for sorting student_code_f = UnicodeCol(default=None) firstname_f = UnicodeCol(default=None) lastname_f = UnicodeCol(default=None) module_code_f = UnicodeCol(default=None) module_name_f = UnicodeCol(default="") degree_code_f = UnicodeCol(default=None) term_name_f = UnicodeCol(default=None) #NB not working year_name_f = UnicodeCol(default=None) #NB not working def _set_degree(self, d): self._SO_set_degree(d) #set recovery properties d = self.degree if self.degree_code_f is None: if d is not None: #helps debug self.degree_code_f = d.code def _set_module(self, m): self._SO_set_module(m) #set recovery properties m = self.module if m is not None: #helps debug if self.module_code_f is None or self.module_code_f == "": self.module_code_f = m.code if self.module_name_f is None or self.module_name_f == "": self.module_name_f = m.name def _set_year(self,y): self._SO_set_year(y) #set recovery properties y = self.year if self.year_name_f is None: if y is not None: #helps debug self.year_name_f = y.name def _set_term(self,t): self._SO_set_term(t) #set recovery properties t=self.term if self.term_name_f is None: if t is not None: #helps debug self.term_name_f = t.name #-- END GENERIC SECTION #used for formatting _standard_headers = [("feedback id", 'id'), ("degree code", 'degree_code'), ("degree name", 'degree_name'), ("module code", 'module.code'), ("module name", 'module.name'), ("module lecturer", 'lecturer'), ("seminar group leader", 'seminar_group_leader_str'), ("term", 'term'), ("year", 'year')] _pdf_headers = [("Feedback Id", 'id'), ("Degree", 'degree'), ("Module", 'module'), ("Module lecturer", 'lecturer'), ("Term", 'term'), ("Year", 'year')] #update functions def _set_student(self,s): self._SO_set_student(s) #set recovery properties s = self.student if s is None: raise Exception, "Programming error--student is null" if self.student_code_f is None: self.student_code_f=s.code if self.firstname_f is None: self.firstname_f=s.firstname if self.lastname_f is None: self.lastname_f=s.lastname @staticmethod def create(student, module, format, degree=None, student_degree=None, student_module=None): """create a feedback form. auto searches for any StudentDegree and StudentModule links""" if student_module is None: lines = StudentModule.all(student=student, module=module, only_current=True) if len(lines)==1: student_module=lines[0] if student_degree is None: if student_module is not None and student_module.spr_code is not None: student_degree = StudentDegree.from_code(student_module.spr_code) else: lines = StudentDegree.all(student=student, only_current=True) if len(lines)==1: student_degree=lines[0] if degree is None: #slightly tricky question which source should take priority. No guarantee that they agree if student_module is not None and student_module.degree is not None: degree = student_module.degree elif student_degree is not None and student_degree.degree is not None: degree=student_degree.degree content = _create_default_content(format) return Feedback(student=student, module=module, format=format, degree=degree, content=content, student_degree=student_degree, student_module=student_module) @staticmethod def is_current_feedback_submitted(student, module): """has student submitted feedback for module in the current term?""" q = Feedback.all(student=student, module=module, only_submitted=True, only_current=True, query_only=True) return ( q.count() > 0 ) @staticmethod def find_or_create_url(student_id, module_id, raw=False): raw_url = "/feedback/find_or_create/%s/%s" % (student_id, module_id) if raw: return raw_url return tg.url(raw_url) def __str__(self): return "Feedback form %s by %s %s on module %s" % (self.id, self.firstname_f, self.lastname_f, self.module_code_f) _url_base = '/feedback/id/' class StudentDegree(FelicitySQLObject): """Assignment of a student to a degree. The year_f fields are not updated unless they None. - For required fields, see StudentDegree.required_fields.""" student = ForeignKey('Student', notNone=True) #degree will be auto_set from route_code where possible. # If import Degree objects later, link will not work. degree = ForeignKey('Degree', default=None) def _degree_code(self): "temporary measure--SITS seems to use route_code to mean degree code" return self.route_code degree_code = property(_degree_code) #spr_code is an identifier for this StudentDegree from an external system (eg. SITS) spr_code = UnicodeCol(default=None,alternateID=True,notNone=True,) created = DateTimeCol(default=datetime.now) updated = DateTimeCol(default=datetime.now, notNone=True, dbName='updated_') #treat spr_code as 'code'; this is a bit clumsy @classmethod def from_code(cls, code): try: return cls.bySpr_code(code) except SQLObjectNotFound: return None @classmethod def required_fields(cls): """returns a list of names of attributes that cannot be null because they are used in the logic. This is used by autogeneration scripts.""" return cls.unique_fields()+('start_date', 'expected_completion_date', 'student_code', 'surname', 'forename_1', 'scheduling_group', 'student_status_code') @classmethod def unique_fields(cls): return cls.alternate_id_fields()+() @classmethod def alternate_id_fields(cls): return ('spr_code',) surname = UnicodeCol(notNone=True,) forename_1 = UnicodeCol(notNone=True,) student_code = UnicodeCol(notNone=True,) student_status_code = UnicodeCol(default='', notNone=True) route_code = UnicodeCol(default=None,) def _set_route_code(self,_route_code): self._SO_set_route_code(_route_code) degree = Degree.from_code(_route_code) self.degree=degree mode_of_attendance_code = UnicodeCol(default=None,) start_date = DateCol(notNone=True,) expected_completion_date = DateCol(notNone=True,) level_code = UnicodeCol(default=None,) scheduling_group = UnicodeCol(notNone=True,) def _degree_name(self): _degree = self.degree if _degree is None or _degree.name is None: return "" return _degree.name degree_name= property(_degree_name) #degree, personal tutor assignments are created from these data. def is_current(self, year_id=None): """returns True if this line refers to a degree currently working. Likely to be slow, could improve by adding is_expired = BoolCol(default=False), setting this here but ensuring that it is unset when expected_completion_date is updated. If year_id is specified, is_current returns true when the StudentDegree was current on the start_date of the year.""" if self.student_status_code == 'P': #this code means not current return False if year_id is None: today = datetime.date(datetime.today()) else: today = Year.get(year_id).start_date if today > self.expected_completion_date: return False #completed if self.start_date > today: return False #not yet started return True @classmethod def get_current(cls,dept=None,student=None,query_only=False,year_id=None): "returns all current StudentDegree objects." return cls.all(dept=dept, student=student, only_current=True, year_id=year_id, query_only=query_only) @classmethod def all(cls, dept=None, student=None, student_ids=None, route_code=None, only_current=False, query_only=False, year_id=None, only_status_code_not_c=False): """returns all StudentDegree objects. param only_current means that today (or the start of year_id if year_id is specified) is between the start_date and expected_completion date, and that student_status_code != 'P'. param dept, if specified, relies on DeptDegree -- i.e. looks for StudentDegrees where the degree stands in DeptDegree to the specified dept.""" if year_id is None: today = datetime.date(datetime.today()) else: #if not current year, we define being current as being current # on the start_date of the specified year today = Year.get(year_id).start_date clause = trueClause if only_current: #nb student_status_code can't be null (constraint required for this query to work) clause = AND(clause, AND(StudentDegree.q.student_status_code != 'P', AND(StudentDegree.q.expected_completion_date >= today, StudentDegree.q.start_date <= today))) if dept is not None: #we find the degrees in this dept and require a match degree_ids_in_dept = [dd.degreeID for dd in list(DeptDegree.select(DeptDegree.q.deptID==dept.id))] if len(degree_ids_in_dept)==0: #no degrees in this dept so no matches clause = AND(clause, falseClause) else: clause = AND(clause, IN(StudentDegree.q.degreeID, degree_ids_in_dept)) if student is not None: clause = AND(clause, StudentDegree.q.studentID==student.id) if student_ids is not None: if student_ids is None or len(student_ids)<1: caluse = AND(clause, falseClause) else: clause = AND(clause, IN(StudentDegree.q.studentID, student_ids)) if route_code is not None: clause = AND(clause, StudentDegree.q.route_code==route_code) if only_status_code_not_c: clause = AND(clause, StudentDegree.q.student_status_code!="C") q = StudentDegree.select(clause) if query_only: return q else: return list(q) #general methods @staticmethod def create(spr_code, student, start_date, expected_completion_date, degree=None, **kw): """User-friendly create method for ad-hoc creates (mainly in testing)""" if degree is not None: if 'route_code' not in kw: kw['route_code']=degree.code new_obj = StudentDegree(spr_code=spr_code, student=student, surname=student.lastname, forename_1=student.firstname, student_code=student.code, start_date=start_date, expected_completion_date=expected_completion_date, degree=degree, **kw) return new_obj @classmethod def update_or_create(cls, spr_code, **values): """returns a student_degree with the code or creates and returns a new one if non exists. Will silently ignore params in values that are not attributes. Returns None if an error occurs. Also returns True if object is new, False if it is old.""" #first prune values values = _remove_non_persistent_items(cls, values) #now find or create the new item try: result = StudentDegree.bySpr_code(spr_code) for item in values: if hasattr(result, item): setattr(result, item, values[item]) result.updated = datetime.now() return result, False except SQLObjectNotFound: try: return StudentDegree(spr_code=spr_code, **values), True except Exception, e: print "Exception in attempting to create StudentDegree: %s" % str(e) log.error("Exception in attempting to create StudentDegree: %s" % str(e)) except Exception, e: print "Undexpected exception when searching for or updating a StudentDegree: %s" % str(e) log.error("Undexpected exception when searching for or updating a StudentDegree: %s" % str(e)) return None, None def student_modules(self, year=None): clause = StudentModule.q.spr_code==self.spr_code if year is not None: clause = AND(clause, StudentModule.q.yearID==year.id) return StudentModule.select(clause) def _url(self, raw=False): """NB This is not the normal url method!!! (TODO rename) returns a url for identifying this object. Can retrieve the object using byUrl. Assumes that no spr code contanis both / and _. Assumes that where spr codes start with the student codes+'/', the part of the spr code not including the student code+'/' does not match any spr codes. """ if self.spr_code.startswith(self.student_code+"/"): u = self.spr_code.replace(self.student_code+"/","",1) else: u = self.spr_code.replace('/','_') if raw: return u else: return tg.url(u) url=property(_url) @classmethod def byUrl(cls, student_code, url): """Given the student_code and the StudentDegree.url for this record, returns the actual record.""" full_spr_code = student_code+'/'+url res = StudentDegree.select(AND(StudentDegree.q.student_code==student_code, StudentDegree.q.spr_code==full_spr_code)) if res.count() < 1: res = StudentDegree.select(StudentDegree.q.spr_code==url) res = list(res) assert len(res) > 0 #possible programming error if result not found assert len(res) < 2 #programming error if result not unique return res[0] class PersonalTutor(FelicitySQLObject): staff = ForeignKey('Staff', notNone=True) student = ForeignKey('Student', notNone=True) start_date = DateCol(default=(lambda : datetime.date(datetime.today()))) end_date = DateCol(default=None) @staticmethod def find(student=None, staff=None, return_assignments=False, return_count=False): """Use with either staff or student set (not both unless either return_assignments or return_count is set). Returns current personal tutors of student or tutees of staff. """ today = datetime.date(datetime.today()) clause = AND(PersonalTutor.q.start_date <= today, OR( PersonalTutor.q.end_date==None, PersonalTutor.q.end_date >= today)) if student: clause = AND(clause, PersonalTutor.q.studentID==student.id) if staff: clause = AND(clause, PersonalTutor.q.staffID==staff.id) query = PersonalTutor.select(clause) if return_count: return query.count() result = list(query) if return_assignments: return result if student: return [r.staff for r in result] if staff: return [r.student for r in result] raise Exception, "PersonalTutor.find cannot be called with no paramters" @staticmethod def remove(student, except_staff=None): """expire all current personal tutor assignments except those with a specified end_date""" today = datetime.date(datetime.today()) yesterday = today.fromordinal(today.toordinal()-1) current_pts = PersonalTutor.find(student=student, return_assignments=True) for pt in current_pts: if except_staff is not None: if pt.staff.id == except_staff.id: continue #don't remove the 'except_staff' parameter if pt.end_date is None: pt.end_date = yesterday @staticmethod def replace(student,staff): """expires all other current personal tutors of student and assigns staff starting today, except where current personal tutors have a set end date. Returns True if the assignment is new, False if the staff was already the student's personal tutor.""" PersonalTutor.remove(student, except_staff=staff) if staff is not None and PersonalTutor.find(student=student, staff=staff, return_count=True) > 0: #staff is already student's personal tutor, don't do anything return False PersonalTutor(student=student, staff=staff) return True #-------- # delete? class Component(FelicitySQLObject): module = ForeignKey('Module', notNone=True) term = ForeignKey('Term', notNone=True) year = ForeignKey('Year', notNone=True) name = UnicodeCol(notNone=True) ordering = IntCol(default=component_next_ordering) @staticmethod def all(module, term=None, year=None): """param module is required""" clause=trueClause clause = AND(clause, Component.q.moduleID==module.id) if term is not None: clause = AND(clause, Component.q.termID==term.id) if year is not None: clause = AND(clause, Component.q.yearID==year.id) return Component.select(clause, orderBy=Component.q.ordering) def create_components(self, module_group): for student in module_group.students: StudentComponent(component=self, student=student) class StudentComponent(FelicitySQLObject): component = ForeignKey('Component', notNone=True) student = ForeignKey('Student', notNone=True) attended = BoolCol(default=None) absent = BoolCol(default=None) mark = IntCol(default=None) notes = UnicodeCol(default="") emailed = BoolCol(default=False, notNone=True) # delete? #-------- class UploadDataSet(FelicitySQLObject): """introduced for OMR uploads. When you upload data, you can specify a dataset by name. Next time you upload data, if you specify that it is the same dataset, any records missing from the upload will be marked maybe_withdrawn or removed from the database. """ name = UnicodeCol(notNone=True, unique=True) dept = ForeignKey('Dept', notNone=True) #name of import type, e.g. OMR type = UnicodeCol(notNone=True) @staticmethod def get_datasets(dept, type): q = UploadDataSet.select(AND(UploadDataSet.q.deptID==dept.id, UploadDataSet.q.type==type)) return list(q) @staticmethod def fetch(dept, name): q = UploadDataSet.select(AND(UploadDataSet.q.name==name, UploadDataSet.q.deptID==dept.id)) if q.count() > 0: return q[0] else: return None class Job(FelicitySQLObject): """manages an asynchronous job performed in a thread""" class sqlmeta: cacheValues = False #no caching because jobs accessed in multiple threads user = ForeignKey('User', notNone=True) created = DateTimeCol(default=datetime.now) progress_msg = UnicodeCol(default="", notNone=True) #can check if job complete by seing if this field is None completed = DateTimeCol(default=None) succeeded = BoolCol(default=None) display_result_using_template = StringCol(default=None) #result object will be passed to template (if job succeeds) result = PickleCol(default=None) error_msg = UnicodeCol(default=None) #use this field to pass other data to the template custom_settings = PickleCol(default=None) #user who created the job def update_progress(self, msg): hub.begin() self.progress_msg += msg self.progress_msg += "\n" hub.commit() def done(self, result): self.result = result self.succeeded = True self.completed = datetime.now() def failed(self, error_msg): self.completed = datetime.now() self.succeeded = False self.error_msg = error_msg @staticmethod def new(user): "creates a new job" return Job(user=user) _url_base = '/job/' class ExamMark(FelicitySQLObject): """must be unique per student_degree, module and year """ student_degree = ForeignKey('StudentDegree', notNone=True) student_f = ForeignKey('Student', notNone=True) module = ForeignKey('Module', notNone=True) module_code_f = UnicodeCol(default="", notNone=True) cats = FloatCol(notNone=True) year = ForeignKey('Year', notNone=True) mark = FloatCol(notNone=True) @staticmethod def create(student_degree, module, cats, mark, year): module_code = module.code student = student_degree.student return ExamMark(student_f=student, student_degree=student_degree, module=module, module_code_f=module_code, cats=cats, mark=mark, year=year) @classmethod def update_or_create(cls, student_degree, module, cats, mark, year): """returns an ExamMark or creates and returns a new one if non exists. Also returns True if object is new, False if it is old.""" q = ExamMark.select(AND(ExamMark.q.student_degreeID==student_degree.id, AND(ExamMark.q.moduleID==module.id, ExamMark.q.yearID==year.id))) if q.count() > 0: return q[0], False return cls.create(student_degree, module, cats, mark, year), True #---- #This is used for tests and init model_objects_in_creation_order = [Visit, VisitIdentity, Group, Permission, User, Student, Dept, Degree, DeptDegree, Programme, DeptProgramme, ProgrammeDegree, Module, Staff, StudentNote, FileItRecord, Year, Term, StudentModule, ModuleLeader, ModuleGroup, ModuleGroupStudent, UserAction, StudentDegree, ReportFormat, Report, PersonalTutor, FeedbackFormat, Feedback, UploadDataSet, Job, ExamMark, Component, StudentComponent, ModuleAssignment, ModuleAssignmentStudent, ]