You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
5.7 KiB
Python

from django.contrib.auth.models import AbstractUser
from django.db import models
from django.utils import timezone
import pickle
from base64 import b64encode, b64decode
from random import randint
from phe import paillier
from uuid import uuid4
class User(AbstractUser):
pass
public_key = models.TextField()
private_key = models.TextField()
def save(self, *args, **kwargs):
if hasattr(self, '_phe_public_key') and hasattr(self, '_phe_private_key'):
self.phe_public_key = b64encode(pickle.dumps(self._phe_public_key)).decode('utf-8')
self.phe_private_key = b64encode(pickle.dumps(self._phe_private_key)).decode('utf-8')
super().save(*args, **kwargs)
@property
def deserialized_public_key(self):
return pickle.loads(b64decode(self.phe_public_key))
@property
def deserialized_private_key(self):
return pickle.loads(b64decode(self.phe_private_key))
class AttributeType(models.Model):
DATATYPE_CHOICES = [
('string', 'String'),
('boolean', 'Boolean'),
('integer', 'Integer'),
]
is_private = models.BooleanField(default=False)
datatype = models.CharField(max_length=15)
significant_digits = models.PositiveIntegerField(null=True, blank=True)
name = models.CharField(max_length=40)
def save(self, *args, **kwargs):
if self.datatype.startswith('float'):
if self.significant_digits is None:
raise ValueError('significant_digits must be set for float datatype')
self.datatype = f'float_{self.significant_digits}'
elif self.significant_digits is not None:
raise ValueError('significant_digits must be None for non-float datatype')
super().save(*args, **kwargs)
def __str__(self) -> str:
return self.name
class Attribute(models.Model):
attribute_type = models.ForeignKey(AttributeType, on_delete=models.CASCADE)
value = models.CharField(max_length=2048)
class UserAttribute(Attribute):
user = models.ForeignKey(User, on_delete=models.CASCADE)
last_modified = models.DateTimeField(auto_now=True)
class File(models.Model):
owner = models.ForeignKey(User, on_delete=models.CASCADE)
name = models.CharField(max_length=255)
file = models.FileField(upload_to='uploads/')
created_at = models.DateTimeField(auto_now_add=True)
last_modified = models.DateTimeField(auto_now=True)
def __str__(self):
return self.name
class Meta:
ordering = ['name', 'created_at']
class Rule(models.Model):
name = models.CharField(max_length=255)
file = models.ForeignKey(File, on_delete=models.CASCADE)
read = models.BooleanField(default=False)
write = models.BooleanField(default=False)
delete = models.BooleanField(default=False)
def is_satisfied_by(self, user)->(bool, int, int):
x = randint(0,1000)
y = randint(0,1000)
try:
paillier_public_key = paillier.PaillierPublicKey(n=int(user.public_key))
except:
return False, 0, 0
encrypted_sum = paillier_public_key.encrypt(x)
for rule_attribute in self.ruleattribute_set.all():
if not rule_attribute.attribute_type.is_private:
try:
user_attribute = UserAttribute.objects.get(
user=user,
attribute_type=rule_attribute.attribute_type,
)
except UserAttribute.DoesNotExist:
# The user does not have this attribute; the rule is not satisfied.
return False, 0, 0
if not self.attribute_satisfies_rule(user_attribute, rule_attribute):
return False, 0, 0
else:
try:
user_attribute = UserAttribute.objects.get(
user=user,
attribute_type=rule_attribute.attribute_type,
)
except UserAttribute.DoesNotExist:
# The user does not have this attribute; the rule is not satisfied.
return False, 0, 0
encrypted_attribute_value = paillier.EncryptedNumber(paillier_public_key, int(user_attribute.value))
encrypted_sum += (encrypted_attribute_value-int(rule_attribute.value))*y
return True, x, encrypted_sum.ciphertext()
def attribute_satisfies_rule(self, user_attribute, rule_attribute):
operator = rule_attribute.operator
if operator == 'EQ':
return user_attribute.value == rule_attribute.value
elif operator == 'NEQ':
return not (user_attribute.value == rule_attribute.value)
elif operator == 'GT':
return user_attribute.value > rule_attribute.value
elif operator == 'LT':
return user_attribute.value < rule_attribute.value
return False
class RuleAttribute(Attribute):
OPTION_EQUALS = 'EQ'
OPTION_DOES_NOT_EQUAL = 'NEQ'
OPTION_GREATER_THAN = 'GT'
OPTION_LESS_THAN = 'LT'
OPTION_CHOICES = [
('EQ', 'Equals'),
('NEQ', "Doesn't Equal"),
('GT', 'Greater Than'),
('LT', 'Less Than'),
]
rule = models.ForeignKey(Rule, on_delete=models.CASCADE)
operator = models.CharField(max_length=3, choices=OPTION_CHOICES, default=OPTION_EQUALS)
class TrustedAuthority(models.Model):
name = models.CharField(max_length=255)
public_key = models.TextField()
class EncryptedData(models.Model):
file = models.ForeignKey(File, on_delete=models.CASCADE)
x_values = models.TextField() # Store the x values as comma-separated
token = models.UUIDField(default=uuid4, unique=True, editable=False)