# This file is part of ReportTool # ReportTool (Felicity) is copyright 2004-8 Steve Butterfill. # # ReportTool is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # ReportTool is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with ReportTool. If not, see . # # If you want to use ReportTool under a difference licence, email # s.butterfill@warwick.ac.uk. import sqlobject import csv import model from model import Dept, Degree, ExamMark, Module, PersonalTutor, Staff, Student, StudentModule, Year, ModelError, StudentDegree from sqlobject.sqlbuilder import Select, AND, IN, NOT #TODO move db code to model from datetime import datetime from felicity.hacks import safe_str import logging log = logging.getLogger(__name__) def _convert_spr_to_student_code(code): "converts spr_code (which identifes student + degree course) to student code" return code.split('/')[0] def omr_fields(): """returns list of StudentModule attribute names in the same order that the values these attributes should take appear in the omr export file to be uploaded""" return ['spr_code', 'lastname', 'firstname', 'year_f', 'module_code_f', 'u1', 'u2', 'assessment_code', 'cats', 'year_of_study', 'degree_code', 'status'] def omr(file, delimiter=',', field_names_in_first_line=False, dataset=None, job=None): """reads a warwick omr export file and creates new modules, students and student_modules. Aborts if the file references years that don't exist. If dataset is specified, StudentModules created or updated will be associated with that dataset so that if the same dataset is uploaded later, missing lines can be deleted. If job is specified, job.progress_msg will be updated as events occur.""" def _abort(msg): "returns an error when processing cannot continue. Usage: return _abort('file not found')" errors = {99999 : ''} error_explanations = {99999 : msg} return dict(total_lines=0, lines_added=0, students=[], existing_students=[], nof_lines_removed=0, student_modules=[], errors=errors, error_explanations=error_explanations) if dataset is not None: dataset_name = dataset.name #avoid weird transaction errors later reader = csv.reader(file, delimiter=delimiter) errors = {} #keys are line numbers, values are the lines error_explanations = {} #keys are line numbers, values are explanations of the errors counter = 0 #first line will be 1 because incremented first readlines = {} year_names = set() student_codes = set() student_degree_tuples = set() student_degree_module_tuples = set() student_lines = {} #keys are codes, values are dictionaries created from lines with info on students module_codes = set() if field_names_in_first_line: try: reader.next() #field names are ignored except StopIteration: return _abort("file has no lines") fields = omr_fields() if job: job.update_progress("Initialisation complete, now parsing file.") for line in reader: counter += 1 try: values = {} for item in fields: values[item]=line[fields.index(item)] values['cats'] = float(values['cats']) values['student_code'] = _convert_spr_to_student_code(values['spr_code']) except (IndexError, TypeError, ValueError): errors[counter] = str(line) error_explanations[counter] = """line has wrong number of items in it (has %i, should have %i), or one of the values is invalid (e.g. check that 'cats' is a number)""" % (len(line), len(omr_fields())) else: readlines[counter] = values year_names.add(values['year_f']) student_code = values['student_code'] if student_code not in student_codes: student_codes.add(student_code) student_lines[student_code]=values module_code = values['module_code_f'] module_codes.add(module_code) degree_code = values['degree_code'] spr_code = values['spr_code'] student_degree_tuples.add((spr_code, degree_code)) student_degree_module_tuples.add((spr_code, degree_code, module_code)) total_lines = counter if job: job.update_progress("Parsing file complete, %s lines read." % total_lines) #if the file contains no lines we abort if len(student_codes) == 0: return _abort("""Sorry, this file contains no usable lines (or only a header line). Have you specified the delimiter correctly?""") #If the file contains new years, we abort if not Year.all_exist(year_names): return _abort("""Sorry, this file contains year names that do not exist in the database of years. Please create the missing years and try again. The names of the years mentioned in the file are: %s""" % str(year_names)) #if the file contains records for more than one year we abort if len(year_names)>1: return _abort("""This file contains records for multiple years (%s). Please split the file by year and upload each year individually.""" % year_names) #this is the single year that this file is about year = Year.from_name(year_names.pop()) #if the file contains unknown module codes we create them existing_modules = Module.which_exist(list(module_codes)) modules = {} #dictionary of all modules, keys are their codes for m in existing_modules: modules[m.code]=m existing_module_codes = modules.keys() unknown_module_codes = [code for code in module_codes if code not in existing_module_codes] nof_new_modules = 0 for code in unknown_module_codes: try: new_module = Module(code=code, name="") modules[code]=new_module nof_new_modules += 1 except Exception, e: return _abort("""This file cannot be imported because it contains unknown module codes, and there was an error creating a module record for one of these module codes (%s, the error was: %s). Please upload or create the missing modules, then try to upload the file again. Alternatively, you could delete the lines mentioning these modules from the file you are trying to upload. The missing module codes are: %s.""" % (code, e, unknown_module_codes)) if job: job.update_progress("Created %s new modules." % nof_new_modules) #add students to database students = {} #dictionary of all students, existing and new; keys are codes existing_students = Student.which_exist(list(student_codes)) for s in existing_students: students[s.code]=s existing_student_codes = students.keys() nof_students_created = 0 for code, line in student_lines.items(): if code in existing_student_codes: continue try: student = Student(code=code, firstname=line['firstname'], lastname=line['lastname']) students[code] = student nof_students_created += 1 except Exception, e: return _abort(""""This file contains a student %s with code %s. I could neither find nor create that Student. Suspect bug or database error. (details: %s)""" % (line['lastname'],student_code,str(e))) if job: job.update_progress("Created %s new students." % nof_students_created) if len(student_degree_tuples)>0: student_modules_q = StudentModule.select(AND(StudentModule.q.yearID==year.id, IN([StudentModule.q.spr_code, StudentModule.q.degree_code], list(student_degree_tuples)))) else: student_modules_q = [] student_modules = {} for ol in student_modules_q: tuple = (ol.spr_code, ol.degree_code, ol.module_code_f,) student_modules[tuple]=ol #add student_modules to database new_student_modules = [] existing_student_modules = [] nof_lines_added = 0 for counter in readlines.keys(): line = readlines[counter] student_code = line['student_code'] spr_code = line['spr_code'] module_code = line['module_code_f'] degree_code = line['degree_code'] #just to be explicit tuple = (spr_code, degree_code, module_code, ) if tuple in student_modules.keys(): student_module = student_modules[tuple] #update this line student_module.update(maybe_withdrawn=False, dataset=dataset, **line) else: student = students[student_code] module = modules[module_code] try: new_line, is_new = StudentModule.update_or_create(year=year, student=student, module=module, maybe_withdrawn=False, dataset=dataset, **line) #add new_line to dict of existing lines, this way we can handle duplicate lines student_modules[tuple] = new_line nof_lines_added += 1 except Exception, e: errors[counter] = line error_explanations[counter] = "Failed to create new omr line %s. (details: %s)" % (str(counter),str(e)) if job: job.update_progress("Added %s module assignments to the databse." % nof_lines_added) #if dataset specified, remove any lines from that dataset not included in the upload nof_lines_removed = 0 if dataset is not None: if job: job.update_progress("Deleting lines that are missing from the %s dataset." % dataset_name ) #convert from unicode to string for constructing query student_degree_module_tuples_str = [(safe_str(s),safe_str(d),safe_str(m)) for s,d,m in student_degree_module_tuples] q = StudentModule.select(AND(StudentModule.q.yearID==year.id, AND(StudentModule.q.datasetID==dataset.id, NOT(IN([StudentModule.q.spr_code, StudentModule.q.degree_code, StudentModule.q.module_code_f], student_degree_module_tuples_str))))) for ol in q: model.hub.begin() spr_code, degree_code, module_code = ol.spr_code, ol.degree_code, ol.module_code_f if ol.maybe_withdrawn != True: ol.maybe_withdrawn=True model.hub.commit() model.hub.begin() if ol.may_delete(): try: ol.destroySelf() model.hub.commit() except Exception, e: #exception may be just because student is in seminar group or because report exists #must complete transaction otherwise errors next time model.hub.rollback() else: nof_lines_removed += 1 #finished with transaction -- docs say do this but doesn't work #trans.commit(close=True) #necessary to start a new transaction or view gives errors! model.hub.begin() return dict(total_lines=total_lines, lines_added=nof_lines_added, nof_students_created=nof_students_created, nof_lines_removed = nof_lines_removed, errors=errors, error_explanations=error_explanations) #helper methods may be used by multiple import method. def format(name): "formats a field name, converting it to the name used as an SQLObject class attribute" return str(name).lower().replace(' ','_') def format_name(name): "formats a person's name (necessary because SITS sometimes uses CAPS" return " ".join([x.title() for x in name.split(" ")]) def is_date_field(name): """returns true if name names a field which should contain a Date. Works for attribute and column headings equally, e.g. 'start_date' and 'START DATE'""" return 'date' in format(name).lower().split('_') def is_year_field(name): "returns true if name names a field which should contain a year. " return 'year' in format(name).lower().split('_') def _todate(s, informat="%d/%b/%Y",outformat="%Y-%m-%d"): """converts s to a date, raises ValueException if conversion not possible. Returns None when s is None or ''. informat is whatever format the date appears in the uploaded file, outformat is whatever format the database layer needs the date to appear in""" if s is None or len(s) <1: return None from time import strptime, strftime date = strptime(s, informat) #ensures is valid date return strftime(outformat,date) def _toyear(s): """converts s to a year, raises ValueError if conversion not possible. Returns None given empty string.""" if s==None or len(s)<1: return None s = str(s) int(s[0:2]) int(s[3:5]) if s[2] != '/': raise ValueError, "%s is supposed to be a year" % s y = Year.from_name(s) if y is not None: return y else: raise ValueError, "%s is supposed to be a year but I can't find it" % s def _abort(msg): """returns an error when processing cannot continue. Usage: return _abort('file not found')""" errors = {99999 : ''} error_explanations = {99999 : msg} return dict(total_lines=0, lines_added=0, errors=errors,error_explanations=error_explanations) def _add_error(counter, line, msg, errors, error_explanations): errors[counter]=line if error_explanations.has_key(counter): error_explanations[counter]+="\n"+msg else: error_explanations[counter]=msg def _get_values(line, field_names, missing_years, last_field_blank=False, check_for_incorrect_length=True): "gets values from the specified line and returns a dict with field_names as keys and bits of the line as values. Assumes that any field_name with 'date' in it is a date and with 'year' in it is a year. If any line is not the same length as field names (depending on last_field_blank), values returned is None. Adds any missing years to missing_years. Also returns a string describing any errors." error_msg = "" if check_for_incorrect_length: if last_field_blank: if len(line) != len(field_names)+1: #we removed one field name (the blank one at the end) error_msg += "line has wrong number of items in it (has %i, should have %i)" % (len(line), len(field_names)+1)+"\n" return None, error_msg #abort processing if not last_field_blank: if len(line) != len(field_names): error_msg += "line has wrong number of items in it (has %i, should have %i)" % (len(line), len(field_names))+"\n" return None, error_msg #abort processing else: if len(line) < len(field_names): error_msg += "line has too few items in it (has %i, must have %i)" % (len(line), len(field_names)+1)+"\n" return None, error_msg #abort processing #first create dict from line, then use it to create new object. values = {} for name in field_names: idx = field_names.index(name) values[name]=line[idx] if is_date_field(name): try: values[name]= _todate(line[idx]) except ValueError: error_msg += "line has date field wrongly formatted (field '%s' has value '%s'--can't parse this as a date)" % (name, line[idx])+"\n" if is_year_field(name): try: values[name]= _toyear(line[idx]) except ValueError: error_msg += "line has year field that I can't find a matching year for in the databse (field '%s' has value '%s'). If '%s' is a valid year, you will have to create a new year for it in the database." % (name, line[idx], line[idx])+"\n" if line[idx] not in missing_years: missing_years.append(line[idx]) return values, error_msg def sits_spr(file, delimiter=','): """read a sits spr bar-delimited file and create new StudentDegree objects. Also creates necessary student objects. Requires that Year and Dept objects exist, skips lines with errors where they do not. First line must contain field names.""" reader = csv.reader(file, delimiter=delimiter) errors = {} #keys are line numbers, values are the lines error_explanations = {} #keys are line numbers, values are explanations of the errors counter = 0 #first line will be 1 because incremented first readlines = {} student_codes = [] try: field_names = reader.next() except StopIteration: return _abort("This file is empty or for some reason could be read.") #check field_names are correct last_field_blank = False for name in field_names: idx = field_names.index(name) field_names[idx]= format(name) name = format(name) if idx==len(field_names)-1 and len(name) < 1: #last field may be empty last_field_blank = True del field_names[idx] #remove field name now prevents setting attribute later continue for name in StudentDegree.required_fields(): if not name in field_names: return _abort("This file does not contain a required field name, '%s'" % name) lines_added = 0; lines_updated = 0 missing_years = []; missing_students = [] #aids testing for line in reader: counter += 1 values, error_msg = _get_values(line=line, field_names=field_names, missing_years=missing_years, last_field_blank=last_field_blank) if error_msg is not None and len(error_msg) > 0: _add_error(counter, line, error_msg, errors, error_explanations) continue #don't process this line further if errors found #link to student, create if none exists firstname=format_name(line[field_names.index('forename_1')]) lastname=format_name(line[field_names.index('surname')]) student_code = line[field_names.index('student_code')] student, is_new = Student.find_or_create(code=student_code, firstname=firstname, lastname=lastname) if student is None: # not found and couldn't create _add_error(counter, line, "could not find or create student with code=%s, firstname=%s, lastname=%s)" % (student_code, firstname, lastname), errors, error_explanations) if student_code not in missing_students: missing_students.append(student_code) continue #abort processing this line values['student']=student #create SitsStudentModule obj, is_new = StudentDegree.update_or_create(**values) if obj is None: _add_error(counter, line, "could not find or create an entry in the student_degree table for this line (suspect database error)." , errors, error_explanations) continue #abort processing this line if is_new: lines_added += 1 else: lines_updated +=1 total_lines=counter return dict(total_lines=total_lines, lines_added=lines_added, lines_updated=lines_updated, errors=errors, error_explanations=error_explanations, missing_years=missing_years, missing_students=missing_students) #modules def module_special_names(): """for Module, returns list of pairs of field with special names, first item is name of attribute in this class, second name is name of heading in sits mod file.""" return [('code','module_code'), ('name','full_name'), ('name','module_name')] def modules(file, delimiter=','): """read a sits modules bar-delimited file and create new Module objects. First line must contain field names. Will silently ignore any fields that are not attributes of Module.""" reader = csv.reader(file, delimiter=delimiter) errors = {} #keys are line numbers, values are the lines error_explanations = {} #keys are line numbers, values are explanations of the errors counter = 0 #first line will be 1 because incremented first readlines = {} try: field_names = reader.next() except StopIteration: return _abort("This file is empty or for some reason could be read.") #record field_names last_field_blank = False for name in field_names: idx = field_names.index(name) field_names[idx]= format(name) name = format(name) if idx==len(field_names)-1 and len(name) < 1: #last field may be empty last_field_blank = True del field_names[idx] #remove field name now prevents setting attribute later continue #modify some field_names for target, current in module_special_names(): if current in field_names: idx = field_names.index(current) field_names[idx]=target #check field names are correct, ignore any that are unexpected ignore_field_names = [] for name in field_names: if not hasattr(Module.q, name): if not hasattr(Module.q, "%sID" % name): #return _abort("This file contains an unexpected field name, '%s'" % name) ignore_field_names.append(name) missing_years = [] #for testing lines_added = 0; lines_updated = 0 for line in reader: counter += 1 values, error_msg = _get_values(line=line, field_names=field_names, missing_years=missing_years, last_field_blank=last_field_blank) if error_msg is not None and len(error_msg) > 0: _add_error(counter, line, error_msg, errors, error_explanations) continue #abort if errors found #remove any field names that should be ignored for ifn in ignore_field_names: del values[ifn] #create Module obj, is_new = Module.update_or_create(**values) if obj is None: _add_error(counter, line, "could not find or create an entry in the modules table for this line (suspect database error).", errors, error_explanations) continue #abort processing this line if is_new: lines_added += 1 else: lines_updated +=1 total_lines=counter return dict(total_lines=total_lines, lines_added=lines_added, lines_updated=lines_updated, errors=errors, error_explanations=error_explanations, missing_years=missing_years ) def depts(file, delimiter=','): "read a csv file, first item is dept code, second item is dept name." reader = csv.reader(file, delimiter=delimiter) errors = {} #keys are line numbers, values are the lines error_explanations = {} #keys are line numbers, values are explanations of the errors counter = 0 #first line will be 1 because incremented first readlines = {} field_names=['code','name'] lines_added = 0; lines_updated = 0 for line in reader: counter += 1 values, error_msg = _get_values(line=line, field_names=field_names, missing_years=[], last_field_blank=False, check_for_incorrect_length=False) if error_msg is not None and len(error_msg) > 0: _add_error(counter, line, error_msg, errors, error_explanations) continue #abort if errors found #create Module obj, is_new = Dept.update_or_create(**values) if obj is None: _add_error(counter, line, "could not find or create an entry in the depts table for this line (suspect database error).", errors, error_explanations ) continue #abort processing this line if is_new: lines_added += 1 else: lines_updated +=1 total_lines=counter return dict(total_lines=total_lines, lines_added=lines_added, lines_updated=lines_updated, errors=errors, error_explanations=error_explanations) def degrees(file, delimiter=','): """read a csv file, first item is dept code, second item is dept name, third item is degree code, fourth item is degree name.""" reader = csv.reader(file, delimiter=delimiter) errors = {} #keys are line numbers, values are the lines error_explanations = {} #keys are line numbers, values are explanations of the errors counter = 0 #first line will be 1 because incremented first readlines = {} field_names=['dept_code','dept_name','degree_code','degree_name'] lines_added = 0; lines_updated = 0 for line in reader: counter += 1 values, error_msg = _get_values(line=line, field_names=field_names, missing_years=[], last_field_blank=False, check_for_incorrect_length=False) if error_msg is not None and len(error_msg) > 0: _add_error(counter, line, error_msg, errors, error_explanations) continue #abort if errors found #find linked dept if it exists dept = Dept.from_code(values['dept_code']) #create Degree obj, is_new = Degree.update_or_create(code=values['degree_code'], name=values['degree_name'], dept=dept) if obj is None: _add_error(counter, line, "could not find or create an entry in the degrees table for this line (suspect database error).", errors, error_explanations ) continue #abort processing this line if is_new: lines_added += 1 else: lines_updated +=1 total_lines=counter return dict(total_lines=total_lines, lines_added=lines_added, lines_updated=lines_updated, errors=errors, error_explanations=error_explanations) ## ---- new style download class ---- ## class UploadException(Exception): pass class AbstractUpload(object): """Base class for creating imports. Override field_names and create_or_update. See 'Degrees' or 'Modules' in felicity2 for simple examples. If necessary, override __init__ to create extra properties. If field_names is a dictionary, keys are column numbers and values are field names (passed as parameters to create_or_update). If field_names is a list, first line of csv file is read to determine the locations of the field names. Because not all field_names may need to be specified, is_requirements_met(actual_field_names) will be called once these are established. If this returns False, the upload is aborted with an error. Convention: where records cannot be created because of missing course/degrees/whatever, store a list of the codes of the things that can't be created in a property called 'missing_xyz'. See method '_items_missing' below. """ field_names = None # dictionary containing row index and parameter name, # or list if first line contains field names _field_names = {} # dictionary containing actual field names inferred def create_or_update(self, *args): """For each line, this function will called with parameters named as field_names.values(), where the value of each parameter will be the item in the corresponding row index given in field_names.keys(). This method may be a staticmethod but needn't be""" raise Exception, "create_or_update must be overriden" @staticmethod def is_requirements_met(actual_field_names): """Only called when field_names is a list (i.e. when the first line of the file contains field names). param actual_field_names is a list of the field names mentioned in 'field_names' and also found in the file. If this returns false, the file will not be processed. """ return len(actual_field_names)==len(self.field_names) def __init__(self, file, delimiter=None): self.file=file if delimiter=="" or delimiter is None: self.delimiter=None else: self.delimiter=str(delimiter) #properties used to report what was done and any errors self.lines_added = 0 #number of lines where create_or_update returned a new object self.lines_updated = 0 #number of lines where create_or_update didn't return a new object self.total_lines = None #number of lines read self.errors = {} #keys are line numbers of csv file containing errors # values are the lines from the file self.error_explanations = {} #keys are line numbers of csv file containing errors #values are descriptions of the errors self.missing_years = [] #list of years named in the csv file that cannot be found #(useful to show this to user so they can fix problem) #import the csv file self._do() def to_dict(self): """call after upload done to get dictionary of results. This is just for compatibility with the old upload functions.""" d = dict(total_lines=self.total_lines, lines_added=self.lines_added, lines_updated=self.lines_updated, errors=self.errors, error_explanations=self.error_explanations) #add the missing_XYZs (includes missing_years) for k,v in self.items_missing: d[k]=v return d @classmethod def display_name(cls): """return the nice name of the file uploader to display. """ dn = cls.__name__[:-6] #strip off Upload #add spaces dn = ' '.join(re.findall(r'([A-Z][a-z0-9]+|[a-z0-9]+|[A-Z0-9]+)', dn)) return dn def _items_missing(self): """Searches self for 'missing_xyz' properties (e.g. 'missing_years'), which are lists of missing xyzs. Returns a dictiony where the keys are 'missing_xyz' and the values are the lists of missing xyzs. """ names_of_lists = [x for x in dir(self) if x.startswith('missing_')] lists = [getattr(self,x) for x in names_of_lists] return zip(names_of_lists, lists) items_missing=property(_items_missing) # --- # functions that do the main work in parsing csv files. # the aim is to be able to write csv upload functions just by specifying # a dictionary mapping csvline indexes to field names and a 'create_or_update' function # see the functions 'degrees' or 'modules' below def _get_values(self, line): """gets values from the specified line and returns a dict with field_names.values() as keys and bits of the line as values, and error_msg describing any errors. field_names is a dictionary containing index and name of field. Adds any missing years to missing_years. (missing_years is a list of the names of years not found, we track these to make it easier for the user to create needed years.) If any line is not long enough, values returned is None. """ error_msg = "" #create dict from line, keys are index of row position, values are from line. values = {} for idx, name in self._field_names.items(): try: values[name]=line[idx] except IndexError: error_msg += "line has too few items in it (only has %i)\n" % len(line) return None, error_msg #abort processing if is_date_field(name): try: values[name]= _todate(line[idx]) except ValueError: error_msg += "line has date field wrongly formatted (item '%i' has value '%s')\n" % (idx, line[idx]) if is_year_field(name): try: values[name]= _toyear(line[idx]) except ValueError: error_msg += "line has year field that I can't parse as a year (item '%i' has value '%s').\n" % (idx, line[idx]) if line[idx] not in self.missing_years: self.missing_years.append(line[idx]) return values, error_msg def _add_error(self, counter, line, msg): """report error described in msg concerning the line whose number is counter""" self.errors[counter]=line if self.error_explanations.has_key(counter): self.error_explanations[counter]+="\n"+msg else: self.error_explanations[counter]=msg def _do(self): """read a csv file. field_names is a dictionary with indexes and names. create_or_update is a function which should (i) return a tuple (obj, is_new) where obj is ignored and is_new is True if an object was created and (ii) raise and exception if it's not possible to create or update. If delimiter is None, attempts to sniff the file (may fail for some we use)""" readlines = {} if self.delimiter is None: #attempt autodiscovery, doesn't always work dialect=csv.Sniffer().sniff(self.file.read()) self.file.seek(0) #reset file to start reader = csv.reader(self.file, dialect=dialect) else: self.file.seek(0) #seems to be necessary, not sure why reader = csv.reader(self.file, delimiter=self.delimiter) #check field_names is not a list if type(self.field_names) != type([]): self._field_names = self.field_names else: #get field_names from first line of csv file header_line = reader.next() self._field_names = {} for pos,name in enumerate(header_line): name = format(name) #convert spaces to _ etc if name in self.field_names: self._field_names[pos]=name ok = self.is_requirements_met(self._field_names.values()) if not ok: self._add_error(0, -1, "The first line of this file should contain field names; some required field names are missing (see the upload page for details of what is required.)") return counter = 0 #need to use this variable outside loop for counter, line in enumerate(reader): values, error_msg = self._get_values(line=line) if error_msg is not None and len(error_msg) > 0: self._add_error(counter, line, error_msg) continue #abort processing this line if errors found #create object try: obj, is_new = self.create_or_update(**values) except UploadException, e: self._add_error(counter, line, "could not find or create an entry for this line (%s)." % e ) continue #abort processing this line if is_new: self.lines_added += 1 else: self.lines_updated +=1 self.total_lines=counter+1 #+1 because started at 0 class PersonalTutorUpload(AbstractUpload): """Assign staff to be students' personal tutors. First item is student SITS SPR code, second item is sits code or name of personal tutor""" field_names= ['student_code','spr_code', 'personnel_tutor_1_code', 'personal_tutor'] @staticmethod def is_requirements_met(actual_field_names): req1 = ('student_code' in actual_field_names) or ('spr_code' in actual_field_names) req2 = ('personnel_tutor_1_code' in actual_field_names) or ('personal_tutor' in actual_field_names) return req1 and req2 @staticmethod def create_or_update(**kw): #sort out what field names are in use if 'student_code' in kw: student_code = kw['student_code'] else: spr_code = kw['spr_code'] student_code = _convert_spr_to_student_code(spr_code) if 'personal_tutor' in kw: personal_tutor = kw['personal_tutor'] else: personal_tutor = kw['personnel_tutor_1_code'] #find student student = Student.from_code(student_code) if student is None: raise UploadException, "Could not find a student with code '%s'." % student_code #find staff staff = Staff.from_code(personal_tutor) if staff is None and len(personal_tutor)>2: #see if first characters are upper case, and if so remove the first two if personal_tutor[0:2] == personal_tutor[0:2].upper(): personal_tutor = personal_tutor[2:] staff = Staff.from_code(personal_tutor) else: #try getting staff from name instead lastname = personal_tutor.split(" ")[-1] staff = Staff.from_lastname(lastname, only_current=True) if staff is None: raise UploadException, "could not find a matching staff for '%s'." % personal_tutor #create the personal tutor return None, PersonalTutor.replace(staff=staff, student=student) class StudentITSCodeUpload(AbstractUpload): """For students already in reporttool, this upload identifies their ITS codes (e.g. 'pyhaj'). This makes it possible to email them. """ field_names={1:'its_code', 2:'sits_code'} @staticmethod def create_or_update(its_code, sits_code): #find student its_code=its_code.strip() sits_code=sits_code.strip() student = Student.from_code(sits_code) if student is None: raise UploadException, "Could not find a student with code '%s'." % sits_code #find staff if student.its_code == its_code and (student.email_address is not None and not student.email_address.startswith(".")): return None, False student.its_code = its_code if student.email_address is None or student.email_address.startswith("."): student.email_address = its_code+"@warwick.ac.uk" return None, True class StaffCodeUpload(AbstractUpload): """For staff already in reporttool, this upload identifies their SITS and ITS codes (e.g. 'pyhaj'). This makes it possible to email them, and to use SITS personal tutor data. It only works where their surname is unique, or where their SITS or ITS code is already known. """ field_names={0:'name', 1:'its_code', 2:'sits_code'} @staticmethod def create_or_update(name, its_code, sits_code): #find staff its_code=its_code.strip() sits_code=sits_code.strip() lastname = name.strip().split(" ")[-1].strip() #try to match staff by (1) sits_code, then (2) its_code them (3) lastname staff = Staff.from_code(sits_code) if staff is None: staff = Staff.from_its_code(its_code) if staff is None: staff = Staff.from_lastname(lastname) if staff is None: #impossible to match raise UploadException, "No unique staff found (name=%s, lastname=%s)." % (name, lastname) #update the staff if ((staff.its_code==its_code and staff.code==sits_code) and not (staff.email_address is None or staff.email_address.startswith("."))): return staff, False #not news staff.its_code = its_code staff.code = sits_code if staff.email_address is None or staff.email_address.startswith("."): staff.email_address = its_code+"@warwick.ac.uk" return staff, True class CreateStaffUpload(AbstractUpload): """For staff not already in reporttool, this creates them along with their SITS and ITS codes (e.g. 'pyhaj'). The import will work if only name is specified, but it's best to include at least SITS or ITS codes because this will avoid errors arising from creating duplicate staff. If email_address is blank, will create using its_code@warwick.ac.uk. """ field_names={0:'name', 1:'its_code', 2:'sits_code', 4:'department_name', 5:'email_address'} @staticmethod def create_or_update(name, its_code, sits_code, department_name, email_address): #find department dept = Dept.from_name(department_name.strip()) if dept is None: raise UploadException, "Could not find department called '%s'." % department_name #either ITS or SITS code must be present its_code=its_code.strip() sits_code=sits_code.strip() if sits_code is None or its_code is None or sits_code=="" or its_code=="": raise UploadException, "Either SITS or ITS code must be specified." #find staff #try to match staff by (1) sits_code, then (2) its_code staff1 = Staff.from_code(sits_code) if staff1 is not None: raise UploadException, "Staff with this SITS code already exists." staff2 = Staff.from_its_code(its_code) if staff2 is not None: raise UploadException, "Staff with this ITS code already exists." #create the staff if email_address is None or email_address=='': if its_code is not None and its_code!='': email_address = its_code + "@warwick.ac.uk" #warwick specific names = name.strip().split(" ") lastname=names[-1] firstname=" ".join(names[0:-1]) staff=Staff(code=sits_code, its_code=its_code, lastname=lastname, firstname=firstname, dept=dept.id, email_address=email_address) return staff, True class ExamMarkUpload(AbstractUpload): """Uploads exam marks. """ field_names={0:'spr_code', 1:'module_code', 2:'cats', 3:'mark'} def __init__(self, file, year, delimiter=None, **kw): self.year = year AbstractUpload.__init__(self, file, delimiter, **kw) def create_or_update(self, spr_code, module_code, cats, mark): #find student and module spr_code=spr_code.strip() student_degree = StudentDegree.from_code(spr_code) try: cats = float(cats) except ValueError: raise UploadException, "CATS must be a number (may be a float), not %s" % cats try: mark = float(mark) except ValueError: raise UploadException, "Mark must be a number (may be a float), not %s" % mark if student_degree is None: raise UploadException, "No student with code %s (maybe their SITS data has not been imported into reporttool)." % spr_code module_code=module_code.strip() module = Module.from_code(module_code) if module is None: raise UploadException, "No module with code %s." % module_code return ExamMark.update_or_create(student_degree, module, cats, mark, self.year)