import xml.etree.ElementTree as et from django.db import transaction from boards.models import KeyPair, GlobalId, Signature, Post ENCODING_UNICODE = 'unicode' TAG_MODEL = 'model' TAG_REQUEST = 'request' TAG_RESPONSE = 'response' TAG_ID = 'id' TAG_STATUS = 'status' TAG_MODELS = 'models' TAG_TITLE = 'title' TAG_TEXT = 'text' TAG_THREAD = 'thread' TAG_PUB_TIME = 'pub-time' TAG_SIGNATURES = 'signatures' TAG_SIGNATURE = 'signature' TAG_CONTENT = 'content' TAG_ATTACHMENTS = 'attachments' TAG_ATTACHMENT = 'attachment' TYPE_GET = 'get' ATTR_VERSION = 'version' ATTR_TYPE = 'type' ATTR_NAME = 'name' ATTR_VALUE = 'value' ATTR_MIMETYPE = 'mimetype' ATTR_KEY = 'key' STATUS_SUCCESS = 'success' class SyncManager: @staticmethod def generate_response_get(model_list: list): response = et.Element(TAG_RESPONSE) status = et.SubElement(response, TAG_STATUS) status.text = STATUS_SUCCESS models = et.SubElement(response, TAG_MODELS) for post in model_list: model = et.SubElement(models, TAG_MODEL) model.set(ATTR_NAME, 'post') content_tag = et.SubElement(model, TAG_CONTENT) tag_id = et.SubElement(content_tag, TAG_ID) post.global_id.to_xml_element(tag_id) title = et.SubElement(content_tag, TAG_TITLE) title.text = post.title text = et.SubElement(content_tag, TAG_TEXT) text.text = post.get_sync_text() if not post.is_opening(): thread = et.SubElement(content_tag, TAG_THREAD) thread_id = et.SubElement(thread, TAG_ID) post.get_thread().get_opening_post().global_id.to_xml_element(thread_id) else: # TODO Output tags here pass pub_time = et.SubElement(content_tag, TAG_PUB_TIME) pub_time.text = str(post.get_pub_time_str()) signatures_tag = et.SubElement(model, TAG_SIGNATURES) post_signatures = post.signature.all() if post_signatures: signatures = post_signatures # TODO Adding signature to a post is not yet added. For now this # block is useless else: # TODO Maybe the signature can be computed only once after # the post is added? Need to add some on_save signal queue # and add this there. key = KeyPair.objects.get(public_key=post.global_id.key) signatures = [Signature( key_type=key.key_type, key=key.public_key, signature=key.sign(et.tostring(content_tag, ENCODING_UNICODE)), )] for signature in signatures: signature_tag = et.SubElement(signatures_tag, TAG_SIGNATURE) signature_tag.set(ATTR_TYPE, signature.key_type) signature_tag.set(ATTR_VALUE, signature.signature) signature_tag.set(ATTR_KEY, signature.key) return et.tostring(response, ENCODING_UNICODE) @staticmethod @transaction.atomic def parse_response_get(response_xml): tag_root = et.fromstring(response_xml) tag_status = tag_root.find(TAG_STATUS) if STATUS_SUCCESS == tag_status.text: tag_models = tag_root.find(TAG_MODELS) for tag_model in tag_models: tag_content = tag_model.find(TAG_CONTENT) valid = SyncManager._verify_model(tag_content, tag_model) if not valid: raise Exception('Invalid model signature') tag_id = tag_content.find(TAG_ID) global_id, exists = GlobalId.from_xml_element(tag_id) if exists: print('Post with same ID already exists') else: global_id.save() title = tag_content.find(TAG_TITLE).text text = tag_content.find(TAG_TEXT).text pub_time = tag_content.find(TAG_PUB_TIME).text thread = tag_content.find(TAG_THREAD) if thread: opening_post = Post.objects.get( id=thread.find(TAG_ID).text) else: opening_post = None # TODO Get tags here # TODO Check that the replied posts are already present # before adding new ones # TODO Get images post = Post.objects.import_post( title=title, text=text, pub_time=pub_time, opening_post=opening_post) post.global_id = global_id else: # TODO Throw an exception? pass @staticmethod def _verify_model(tag_content, tag_model): """ Verifies all signatures for a single model. """ valid = True tag_signatures = tag_model.find(TAG_SIGNATURES) for tag_signature in tag_signatures: signature_type = tag_signature.get(ATTR_TYPE) signature_value = tag_signature.get(ATTR_VALUE) signature_key = tag_signature.get(ATTR_KEY) if not KeyPair.objects.verify( signature_key, et.tostring(tag_content, ENCODING_UNICODE), signature_value, signature_type): valid = False break return valid