tokens index
parent
3763ceefd3
commit
547a1042b3
|
@ -1,296 +1,299 @@
|
|||
from __future__ import unicode_literals
|
||||
from __future__ import print_function
|
||||
import requests
|
||||
import urllib.parse
|
||||
import json
|
||||
import collections
|
||||
import datetime
|
||||
|
||||
|
||||
def quote_named_param(p):
|
||||
return urllib.parse.quote(str(p)).replace('/', '%2f')
|
||||
|
||||
|
||||
def json_decode_ordered(txt):
|
||||
return json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(txt)
|
||||
|
||||
|
||||
class CakeCMS:
|
||||
def __init__(self, url, token=None, course='system'):
|
||||
self.url = url.rstrip('/')
|
||||
self.course = course
|
||||
self.session = requests.Session()
|
||||
self.debug = False
|
||||
self.token = token
|
||||
|
||||
def get(self, uri, course=None, named_params={}, **kwargs):
|
||||
if self.token:
|
||||
self.session.headers['X-CMS-API-TOKEN'] = self.token
|
||||
url = self.url + '/' + (self.course if course is None else course) + '/api/' + uri
|
||||
for k, v in named_params.items():
|
||||
url += '/{}:{}'.format(quote_named_param(k), quote_named_param(v))
|
||||
if self.debug:
|
||||
print('> GET', url, ' ', end='')
|
||||
r = self.session.get(url, **kwargs)
|
||||
if self.debug:
|
||||
print('[', r.status_code, ']')
|
||||
return r
|
||||
|
||||
def post(self, uri, data, course=None, named_params={}, **kwargs):
|
||||
if self.token:
|
||||
self.session.headers['X-CMS-API-TOKEN'] = self.token
|
||||
url = self.url + '/' + (self.course if course is None else course) + '/api/' + uri
|
||||
for k, v in named_params.items():
|
||||
url += '/{}:{}'.format(quote_named_param(k), quote_named_param(v))
|
||||
if self.debug:
|
||||
print('> POST', url, ' ', end='')
|
||||
r = self.session.post(url, json=data, **kwargs)
|
||||
if self.debug:
|
||||
print('[', r.status_code, ']')
|
||||
return r
|
||||
|
||||
def assert_success(self, result):
|
||||
if 'error' in result and result['error'] == 'exception':
|
||||
if self.debug: print('<!', result['type'], ':', result['message'])
|
||||
raise Exception(str(result['type']) + ': ' + str(result['message']))
|
||||
if self.debug and 'message' in result:
|
||||
print('<', result['state'], ':', result['message'])
|
||||
if result['state'] == 'success':
|
||||
return True
|
||||
else:
|
||||
raise Exception(str(result['state']) + ': ' + str(result['message']))
|
||||
|
||||
def courses_index(self):
|
||||
"""
|
||||
:return:all Course objects of the given instance that you can see.
|
||||
"""
|
||||
return self.get('courses/index', course='system').json()['courses']
|
||||
|
||||
def courses_list(self):
|
||||
"""
|
||||
:return: a dict containing all courses. Format is id => shortname
|
||||
"""
|
||||
courses = self.courses_index()
|
||||
return {course['Course']['id']: course['Course']['shortname'] for course in courses}
|
||||
|
||||
def courses_get(self, id=None, shortname=None):
|
||||
"""
|
||||
:param id:
|
||||
:param shortname:
|
||||
:return: a course description object (as courses_index gives) of a specific course, identified by id or shortname
|
||||
"""
|
||||
assert id != None or shortname != None
|
||||
for course in self.courses_index():
|
||||
if course['Course']['id'] == id or course['Course']['shortname'] == shortname:
|
||||
return course
|
||||
return None
|
||||
|
||||
def students_index(self, course=None, columns=None, named_params={}, **kwargs):
|
||||
if columns:
|
||||
named_params['cols'] = '~'.join(columns)
|
||||
return self.get('students/index', course=course, named_params=named_params, **kwargs).json()
|
||||
|
||||
def students_index_csv(self, course=None, columns=None, excel_compatible=False, named_params={}, **kwargs):
|
||||
named_params['format'] = 'excelcsv' if excel_compatible else 'csv'
|
||||
if columns:
|
||||
named_params['cols'] = '~'.join(columns)
|
||||
return self.get('students/index', course=course, named_params=named_params, **kwargs).content
|
||||
|
||||
def students_index_get_cols(self, course=None, named_params={}, **kwargs):
|
||||
"""
|
||||
:param course:
|
||||
:param named_params:
|
||||
:param kwargs:
|
||||
:rtype: dict[unicode, unicode]
|
||||
:return: an ordered dictionary of all available student columns (short key => model.field name)
|
||||
"""
|
||||
r = self.get('students/index', course=course, named_params={'limit': 1})
|
||||
return json_decode_ordered(r.text)['compressionTable']
|
||||
|
||||
def testingresults_index_by_student_id(self, course=None, testing_id=1):
|
||||
"""
|
||||
:param course:
|
||||
:param testing_id:
|
||||
:rtype: dict[int, dict]
|
||||
:return: A dict mapping student id's to a testingresult entry (containing points, admitted, and mode)
|
||||
"""
|
||||
students = self.students_index(course=course,
|
||||
columns=['Si', 'Sm', 'T' + str(testing_id)]) # id, matriculation, points
|
||||
key = 'Testingresult' + str(testing_id)
|
||||
return {entry['Student']['id']: entry[key] for entry in students['students']}
|
||||
|
||||
def admissions_index(self, course=None, testing_id=None):
|
||||
return \
|
||||
self.get('admissions/index', course=course, named_params={'bytesting': testing_id or 0, 'limit': 0}).json()[
|
||||
'admissions']
|
||||
|
||||
def admissions_edit(self, id, data, course=None):
|
||||
data = dict(data)
|
||||
data['id'] = id
|
||||
return self.post('admissions/edit/' + str(id), {'Admission': data}, course=course).json()
|
||||
|
||||
def admissions_edit_many(self, data, course=None):
|
||||
return self.post('admissions/edit_many', {'Admission': data}, course=course).json()
|
||||
|
||||
def calendar_events_index(self, course=None):
|
||||
return self.get('full_calendar/events/index', course=course).json()
|
||||
|
||||
def submissions_index(self, course=None):
|
||||
"""
|
||||
:param course:
|
||||
:return: A list of all submissions in the system (submission = something where students can submit to). Contains details.
|
||||
"""
|
||||
return self.get('submissions/index', course=course).json()['submissions']
|
||||
|
||||
def submission_items_index(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A list of all submission items (files submitted to a single submission), including many details.
|
||||
"""
|
||||
named_params = {'limit': '0'}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/index', course=course, named_params=named_params).json()['submissionItems']
|
||||
|
||||
def submission_items_download(self, course=None, submission_item_id=None):
|
||||
"""
|
||||
:param course:
|
||||
:param submission_item_id: the submission item (file) id you want to download
|
||||
:return: The file content of a single submission file
|
||||
"""
|
||||
assert submission_item_id
|
||||
return self.get('submission_items/download/' + str(submission_item_id), course=course).content
|
||||
|
||||
def submission_items_download_all(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
Downloads a zip archive of submitted files. Be aware:
|
||||
- The archive is stored in memory!
|
||||
- Zip archives can't be larger than 2GB
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A zip archive containing all files submitted to a "submission".
|
||||
"""
|
||||
named_params = {}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/downloadAll', course=course, named_params=named_params).content
|
||||
|
||||
def submission_items_download_all_streamed(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
Downloads a zip archive of submitted files. Be aware:
|
||||
- This method returns a file-descriptor-like object (that can be used to copy the data chunk by chunk)
|
||||
- Zip archives can't be larger than 2GB
|
||||
- You have to manually close the returned object. Use this function in a "with as" statement.
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A file-like object that can be used to read a zip archive from the server. The zip file contains all submitted files.
|
||||
"""
|
||||
named_params = {}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/downloadAll', course=course, named_params=named_params, stream=True).raw
|
||||
|
||||
def registrations_import(self, id, matriculations, system='hispos', complete=True, timestamp=None, course=None):
|
||||
if not timestamp:
|
||||
dt = datetime.datetime.now()
|
||||
timestamp = dt.strftime('%d.%m.%Y %H:00:00')
|
||||
body = {'Import': {'text': '\n'.join(map(str, matriculations)),
|
||||
'timestamp': timestamp,
|
||||
'file': {'error': 4},
|
||||
'system': system}}
|
||||
resp = self.post('registration_items/import/' + str(id), body)
|
||||
data = resp.json()
|
||||
if 'message' in data:
|
||||
print(data['message'])
|
||||
|
||||
register = []
|
||||
unregister = []
|
||||
for student in data['students']:
|
||||
if student['contained'] and not student['registered']:
|
||||
register.append(student['matriculation'])
|
||||
elif not student['contained'] and complete:
|
||||
unregister.append(student['matriculation'])
|
||||
|
||||
body = {'Import': {'timestamp': timestamp,
|
||||
'complete': complete,
|
||||
'register': json.dumps(register),
|
||||
'unregister': json.dumps(unregister),
|
||||
'system': system}}
|
||||
return self.post('registration_items/import/' + str(id), body).json()
|
||||
|
||||
def notes_index(self, course=None):
|
||||
"""
|
||||
:param course:
|
||||
:return: List of all notes configured in a course.
|
||||
"""
|
||||
return self.get('notes/index', course=course).json()['notes']
|
||||
|
||||
def notes_change(self, note_id, student_id, value, course=None):
|
||||
"""
|
||||
Edit the value of a note for a single student.
|
||||
:param note_id:
|
||||
:param student_id:
|
||||
:param value: str, int or bool
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
if value is True: value = 1
|
||||
if value is False: value = 0
|
||||
return self.post('notes_entries/change', {'NotesEntry': {'student_id': student_id, 'note_id': note_id, 'value': value}}, course=course)
|
||||
|
||||
def testings_index(self, course=None):
|
||||
"""
|
||||
Get all testings in nested tree structure
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.get('testings/index', course=course).json()['testings']
|
||||
|
||||
def testingresults_change(self, testing_id, student_id, points, course=None):
|
||||
"""
|
||||
Edit the points of a student.
|
||||
:param testing_id:
|
||||
:param student_id:
|
||||
:param points: int or float
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.post('testingresults/change', {'Testingresult': {'student_id': student_id, 'testing_id': testing_id, 'points': points}}, course=course).json()
|
||||
|
||||
def materials_index(self, course=None):
|
||||
"""
|
||||
Get a list of all accessible material categories and files.
|
||||
Files can be either stored files (to be retrieved with material_download) or links to other sites.
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.get('material_categories/index', course=course).json()
|
||||
|
||||
def __get_content_disposition_filename(self, response):
|
||||
filename = response.headers['Content-Disposition'].split('filename=')[1]
|
||||
if filename.startswith('"'):
|
||||
return filename[1:-1]
|
||||
return filename
|
||||
|
||||
def material_download(self, id, filename=None, course=None):
|
||||
"""
|
||||
Download a material file. If filename is given, the result is a pair (filename, file-content).
|
||||
If filename is given, the download is written to this file (and the suggested name is returned).
|
||||
:param id:
|
||||
:param filename:
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
if not filename:
|
||||
response = self.get('material_files/download/' + str(id), course=course)
|
||||
return self.__get_content_disposition_filename(response), response.content
|
||||
else:
|
||||
response = self.get('material_files/download/' + str(id), course=course, stream=True)
|
||||
with open(filename, 'wb') as fd:
|
||||
for chunk in response.iter_content(chunk_size=4096):
|
||||
fd.write(chunk)
|
||||
return self.__get_content_disposition_filename(response)
|
||||
from __future__ import unicode_literals
|
||||
from __future__ import print_function
|
||||
import requests
|
||||
import urllib.parse
|
||||
import json
|
||||
import collections
|
||||
import datetime
|
||||
|
||||
|
||||
def quote_named_param(p):
|
||||
return urllib.parse.quote(str(p)).replace('/', '%2f')
|
||||
|
||||
|
||||
def json_decode_ordered(txt):
|
||||
return json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(txt)
|
||||
|
||||
|
||||
class CakeCMS:
|
||||
def __init__(self, url, token=None, course='system'):
|
||||
self.url = url.rstrip('/')
|
||||
self.course = course
|
||||
self.session = requests.Session()
|
||||
self.debug = False
|
||||
self.token = token
|
||||
|
||||
def get(self, uri, course=None, named_params={}, **kwargs):
|
||||
if self.token:
|
||||
self.session.headers['X-CMS-API-TOKEN'] = self.token
|
||||
url = self.url + '/' + (self.course if course is None else course) + '/api/' + uri
|
||||
for k, v in named_params.items():
|
||||
url += '/{}:{}'.format(quote_named_param(k), quote_named_param(v))
|
||||
if self.debug:
|
||||
print('> GET', url, ' ', end='')
|
||||
r = self.session.get(url, **kwargs)
|
||||
if self.debug:
|
||||
print('[', r.status_code, ']')
|
||||
return r
|
||||
|
||||
def post(self, uri, data, course=None, named_params={}, **kwargs):
|
||||
if self.token:
|
||||
self.session.headers['X-CMS-API-TOKEN'] = self.token
|
||||
url = self.url + '/' + (self.course if course is None else course) + '/api/' + uri
|
||||
for k, v in named_params.items():
|
||||
url += '/{}:{}'.format(quote_named_param(k), quote_named_param(v))
|
||||
if self.debug:
|
||||
print('> POST', url, ' ', end='')
|
||||
r = self.session.post(url, json=data, **kwargs)
|
||||
if self.debug:
|
||||
print('[', r.status_code, ']')
|
||||
return r
|
||||
|
||||
def assert_success(self, result):
|
||||
if 'error' in result and result['error'] == 'exception':
|
||||
if self.debug: print('<!', result['type'], ':', result['message'])
|
||||
raise Exception(str(result['type']) + ': ' + str(result['message']))
|
||||
if self.debug and 'message' in result:
|
||||
print('<', result['state'], ':', result['message'])
|
||||
if result['state'] == 'success':
|
||||
return True
|
||||
else:
|
||||
raise Exception(str(result['state']) + ': ' + str(result['message']))
|
||||
|
||||
def courses_index(self):
|
||||
"""
|
||||
:return:all Course objects of the given instance that you can see.
|
||||
"""
|
||||
return self.get('courses/index', course='system').json()['courses']
|
||||
|
||||
def courses_list(self):
|
||||
"""
|
||||
:return: a dict containing all courses. Format is id => shortname
|
||||
"""
|
||||
courses = self.courses_index()
|
||||
return {course['Course']['id']: course['Course']['shortname'] for course in courses}
|
||||
|
||||
def courses_get(self, id=None, shortname=None):
|
||||
"""
|
||||
:param id:
|
||||
:param shortname:
|
||||
:return: a course description object (as courses_index gives) of a specific course, identified by id or shortname
|
||||
"""
|
||||
assert id != None or shortname != None
|
||||
for course in self.courses_index():
|
||||
if course['Course']['id'] == id or course['Course']['shortname'] == shortname:
|
||||
return course
|
||||
return None
|
||||
|
||||
def students_index(self, course=None, columns=None, named_params={}, **kwargs):
|
||||
if columns:
|
||||
named_params['cols'] = '~'.join(columns)
|
||||
return self.get('students/index', course=course, named_params=named_params, **kwargs).json()
|
||||
|
||||
def students_index_csv(self, course=None, columns=None, excel_compatible=False, named_params={}, **kwargs):
|
||||
named_params['format'] = 'excelcsv' if excel_compatible else 'csv'
|
||||
if columns:
|
||||
named_params['cols'] = '~'.join(columns)
|
||||
return self.get('students/index', course=course, named_params=named_params, **kwargs).content
|
||||
|
||||
def students_index_get_cols(self, course=None, named_params={}, **kwargs):
|
||||
"""
|
||||
:param course:
|
||||
:param named_params:
|
||||
:param kwargs:
|
||||
:rtype: dict[unicode, unicode]
|
||||
:return: an ordered dictionary of all available student columns (short key => model.field name)
|
||||
"""
|
||||
r = self.get('students/index', course=course, named_params={'limit': 1})
|
||||
return json_decode_ordered(r.text)['compressionTable']
|
||||
|
||||
def testingresults_index_by_student_id(self, course=None, testing_id=1):
|
||||
"""
|
||||
:param course:
|
||||
:param testing_id:
|
||||
:rtype: dict[int, dict]
|
||||
:return: A dict mapping student id's to a testingresult entry (containing points, admitted, and mode)
|
||||
"""
|
||||
students = self.students_index(course=course,
|
||||
columns=['Si', 'Sm', 'T' + str(testing_id)]) # id, matriculation, points
|
||||
key = 'Testingresult' + str(testing_id)
|
||||
return {entry['Student']['id']: entry[key] for entry in students['students']}
|
||||
|
||||
def admissions_index(self, course=None, testing_id=None):
|
||||
return \
|
||||
self.get('admissions/index', course=course, named_params={'bytesting': testing_id or 0, 'limit': 0}).json()[
|
||||
'admissions']
|
||||
|
||||
def admissions_edit(self, id, data, course=None):
|
||||
data = dict(data)
|
||||
data['id'] = id
|
||||
return self.post('admissions/edit/' + str(id), {'Admission': data}, course=course).json()
|
||||
|
||||
def admissions_edit_many(self, data, course=None):
|
||||
return self.post('admissions/edit_many', {'Admission': data}, course=course).json()
|
||||
|
||||
def calendar_events_index(self, course=None):
|
||||
return self.get('full_calendar/events/index', course=course).json()
|
||||
|
||||
def submissions_index(self, course=None):
|
||||
"""
|
||||
:param course:
|
||||
:return: A list of all submissions in the system (submission = something where students can submit to). Contains details.
|
||||
"""
|
||||
return self.get('submissions/index', course=course).json()['submissions']
|
||||
|
||||
def submission_items_index(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A list of all submission items (files submitted to a single submission), including many details.
|
||||
"""
|
||||
named_params = {'limit': '0'}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/index', course=course, named_params=named_params).json()['submissionItems']
|
||||
|
||||
def submission_items_download(self, course=None, submission_item_id=None):
|
||||
"""
|
||||
:param course:
|
||||
:param submission_item_id: the submission item (file) id you want to download
|
||||
:return: The file content of a single submission file
|
||||
"""
|
||||
assert submission_item_id
|
||||
return self.get('submission_items/download/' + str(submission_item_id), course=course).content
|
||||
|
||||
def submission_items_download_all(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
Downloads a zip archive of submitted files. Be aware:
|
||||
- The archive is stored in memory!
|
||||
- Zip archives can't be larger than 2GB
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A zip archive containing all files submitted to a "submission".
|
||||
"""
|
||||
named_params = {}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/downloadAll', course=course, named_params=named_params).content
|
||||
|
||||
def submission_items_download_all_streamed(self, course=None, submission_id=None, tutorial_id=None):
|
||||
"""
|
||||
Downloads a zip archive of submitted files. Be aware:
|
||||
- This method returns a file-descriptor-like object (that can be used to copy the data chunk by chunk)
|
||||
- Zip archives can't be larger than 2GB
|
||||
- You have to manually close the returned object. Use this function in a "with as" statement.
|
||||
:param course:
|
||||
:param submission_id: the submission you want to get the file list for (optional)
|
||||
:param tutorial_id: filter by tutorial (optional)
|
||||
:return: A file-like object that can be used to read a zip archive from the server. The zip file contains all submitted files.
|
||||
"""
|
||||
named_params = {}
|
||||
if submission_id: named_params['bySub'] = submission_id
|
||||
if tutorial_id: named_params['byTutorial'] = tutorial_id
|
||||
return self.get('submission_items/downloadAll', course=course, named_params=named_params, stream=True).raw
|
||||
|
||||
def registrations_import(self, id, matriculations, system='hispos', complete=True, timestamp=None, course=None):
|
||||
if not timestamp:
|
||||
dt = datetime.datetime.now()
|
||||
timestamp = dt.strftime('%d.%m.%Y %H:00:00')
|
||||
body = {'Import': {'text': '\n'.join(map(str, matriculations)),
|
||||
'timestamp': timestamp,
|
||||
'file': {'error': 4},
|
||||
'system': system}}
|
||||
resp = self.post('registration_items/import/' + str(id), body)
|
||||
data = resp.json()
|
||||
if 'message' in data:
|
||||
print(data['message'])
|
||||
|
||||
register = []
|
||||
unregister = []
|
||||
for student in data['students']:
|
||||
if student['contained'] and not student['registered']:
|
||||
register.append(student['matriculation'])
|
||||
elif not student['contained'] and complete:
|
||||
unregister.append(student['matriculation'])
|
||||
|
||||
body = {'Import': {'timestamp': timestamp,
|
||||
'complete': complete,
|
||||
'register': json.dumps(register),
|
||||
'unregister': json.dumps(unregister),
|
||||
'system': system}}
|
||||
return self.post('registration_items/import/' + str(id), body).json()
|
||||
|
||||
def notes_index(self, course=None):
|
||||
"""
|
||||
:param course:
|
||||
:return: List of all notes configured in a course.
|
||||
"""
|
||||
return self.get('notes/index', course=course).json()['notes']
|
||||
|
||||
def notes_change(self, note_id, student_id, value, course=None):
|
||||
"""
|
||||
Edit the value of a note for a single student.
|
||||
:param note_id:
|
||||
:param student_id:
|
||||
:param value: str, int or bool
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
if value is True: value = 1
|
||||
if value is False: value = 0
|
||||
return self.post('notes_entries/change', {'NotesEntry': {'student_id': student_id, 'note_id': note_id, 'value': value}}, course=course)
|
||||
|
||||
def testings_index(self, course=None):
|
||||
"""
|
||||
Get all testings in nested tree structure
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.get('testings/index', course=course).json()['testings']
|
||||
|
||||
def testingresults_change(self, testing_id, student_id, points, course=None):
|
||||
"""
|
||||
Edit the points of a student.
|
||||
:param testing_id:
|
||||
:param student_id:
|
||||
:param points: int or float
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.post('testingresults/change', {'Testingresult': {'student_id': student_id, 'testing_id': testing_id, 'points': points}}, course=course).json()
|
||||
|
||||
def materials_index(self, course=None):
|
||||
"""
|
||||
Get a list of all accessible material categories and files.
|
||||
Files can be either stored files (to be retrieved with material_download) or links to other sites.
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
return self.get('material_categories/index', course=course).json()
|
||||
|
||||
def __get_content_disposition_filename(self, response):
|
||||
filename = response.headers['Content-Disposition'].split('filename=')[1]
|
||||
if filename.startswith('"'):
|
||||
return filename[1:-1]
|
||||
return filename
|
||||
|
||||
def material_download(self, id, filename=None, course=None):
|
||||
"""
|
||||
Download a material file. If filename is given, the result is a pair (filename, file-content).
|
||||
If filename is given, the download is written to this file (and the suggested name is returned).
|
||||
:param id:
|
||||
:param filename:
|
||||
:param course:
|
||||
:return:
|
||||
"""
|
||||
if not filename:
|
||||
response = self.get('material_files/download/' + str(id), course=course)
|
||||
return self.__get_content_disposition_filename(response), response.content
|
||||
else:
|
||||
response = self.get('material_files/download/' + str(id), course=course, stream=True)
|
||||
with open(filename, 'wb') as fd:
|
||||
for chunk in response.iter_content(chunk_size=4096):
|
||||
fd.write(chunk)
|
||||
return self.__get_content_disposition_filename(response)
|
||||
|
||||
def tokens_index(self, course=None):
|
||||
return self.get('tokens/index', course=course).json()['tokens']
|
||||
|
|
Loading…
Reference in New Issue