You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
393 lines
14 KiB
Python
393 lines
14 KiB
Python
from django.shortcuts import render, redirect, get_object_or_404
|
|
from django.http.response import HttpResponseNotAllowed
|
|
from django.contrib.auth.decorators import permission_required, login_required, user_passes_test
|
|
from django.http import HttpResponse, FileResponse, HttpResponseForbidden, HttpResponseBadRequest, HttpResponseNotAllowed, JsonResponse
|
|
from django.contrib.auth import logout
|
|
from django.urls import reverse
|
|
from django.contrib import messages
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
import hashlib
|
|
import json
|
|
from phe import paillier
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
from .forms import FileUploadForm, UploadCertificateForm, RuleAttributeForm, TrustedAuthorityForm
|
|
from .models import File, Rule, UserAttribute, RuleAttribute, AttributeType, User, TrustedAuthority, EncryptedData
|
|
|
|
|
|
@login_required
|
|
def landing_page(request):
|
|
files = File.objects.all()
|
|
return render(request, 'landing_page.html', {'files': files})
|
|
|
|
def logout_view(request):
|
|
logout(request)
|
|
return redirect('abac:login')
|
|
|
|
@login_required
|
|
def upload_file_view(request):
|
|
if request.method == 'POST':
|
|
form = FileUploadForm(request.POST, request.FILES)
|
|
if form.is_valid():
|
|
file_instance = form.save(commit=False)
|
|
file_instance.owner = request.user
|
|
file_instance.save()
|
|
return redirect('abac:home')
|
|
else:
|
|
form = FileUploadForm()
|
|
return render(request, 'file_upload.html', {'form': form})
|
|
|
|
def create_user_view(request):
|
|
return HttpResponse('Create User View')
|
|
|
|
@login_required
|
|
def upload_certificate_view(request):
|
|
if request.method == 'POST':
|
|
form = UploadCertificateForm(request.POST, request.FILES)
|
|
if form.is_valid():
|
|
certificate = form.cleaned_data['certificate']
|
|
signature = form.cleaned_data['signature'].read()
|
|
|
|
is_verified = False
|
|
for authority in TrustedAuthority.objects.all():
|
|
pub_key = serialization.load_pem_public_key(
|
|
authority.public_key.encode(),
|
|
backend=default_backend()
|
|
)
|
|
|
|
try:
|
|
pub_key.verify(
|
|
signature,
|
|
certificate.read(),
|
|
padding.PSS(
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
),
|
|
hashes.SHA256()
|
|
)
|
|
is_verified = True
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if is_verified:
|
|
certificate.seek(0)
|
|
certificate_data = json.load(certificate)
|
|
|
|
if not request.user.public_key:
|
|
messages.warning(request, 'Please upload your public key first.')
|
|
return redirect(reverse('abac:upload_public_key', args=[request.user.username]))
|
|
|
|
public_key_native = paillier.PaillierPublicKey(n=int(request.user.public_key))
|
|
|
|
|
|
for attribute_data in certificate_data:
|
|
name = attribute_data.get('name')
|
|
value = attribute_data.get('value')
|
|
attribute_type = get_or_create_attribute_type(name, value, private=True)
|
|
encrypted_value = str(public_key_native.encrypt(value).ciphertext())
|
|
|
|
attribute, created = UserAttribute.objects.update_or_create(
|
|
user=request.user,
|
|
attribute_type=attribute_type,
|
|
defaults={'value': encrypted_value, 'last_modified': timezone.now()}
|
|
)
|
|
|
|
messages.success(request, 'Certificate uploaded successfully.')
|
|
return redirect('abac:user_details', username=request.user.username)
|
|
else:
|
|
messages.error(request, 'Certificate verification failed.')
|
|
|
|
|
|
else:
|
|
form = UploadCertificateForm()
|
|
|
|
return render(request, 'upload_certificate.html', {'form': form})
|
|
|
|
@login_required
|
|
def file_detail(request, file_id):
|
|
user = request.user
|
|
file = get_object_or_404(File, id=file_id)
|
|
if check_permission(user, file):
|
|
rules = Rule.objects.filter(file=file)
|
|
return render(request, 'file_detail.html', {'file': file, 'rules': rules})
|
|
else:
|
|
return HttpResponseForbidden("You don't have the permission to access this file")
|
|
|
|
|
|
@login_required
|
|
def create_rule(request, file_id):
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
if request.method == "POST":
|
|
rule_name = request.POST.get('rule_name')
|
|
if rule_name:
|
|
rule = Rule.objects.create(name=rule_name, file=file)
|
|
url = reverse('abac:rule_detail', args=[file_id, rule.id])
|
|
return redirect(url)
|
|
|
|
# If the rule_name is not provided or if the method is GET, redirect to file_detail view.
|
|
return redirect('abac:file_detail', file_id=file_id)
|
|
|
|
|
|
@login_required
|
|
def rule_detail(request, file_id, rule_id=None):
|
|
file = get_object_or_404(File, id=file_id)
|
|
rule = get_object_or_404(Rule, id=rule_id) if rule_id else None
|
|
|
|
if request.method == "POST":
|
|
form = RuleAttributeForm(request.POST)
|
|
if form.is_valid():
|
|
rule_attribute = form.save(commit=False)
|
|
rule_attribute.rule = rule
|
|
rule_attribute.save()
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
else:
|
|
form = RuleAttributeForm()
|
|
|
|
rule_attributes = RuleAttribute.objects.filter(rule=rule) if rule else []
|
|
|
|
return render(request, 'rule_detail.html', {
|
|
'file': file,
|
|
'rule': rule,
|
|
'form': form,
|
|
'rule_attributes': rule_attributes
|
|
})
|
|
|
|
@login_required
|
|
def delete_rule_attribute(request, file_id, rule_id, rule_attribute_id):
|
|
rule_attribute = get_object_or_404(RuleAttribute, id=rule_attribute_id)
|
|
rule_attribute.delete()
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
|
|
@login_required
|
|
def user_detail_view(request, username):
|
|
user = get_object_or_404(User, username=username)
|
|
|
|
# Check if the requested user is the same as the logged-in user or the logged-in user has the required permission
|
|
if user != request.user and not request.user.is_superuser:
|
|
return HttpResponseForbidden('You do not have permission to view this page.')
|
|
|
|
attributes = UserAttribute.objects.filter(user=user)
|
|
return render(request, 'user_detail.html', {'user': user, 'attributes': attributes})
|
|
|
|
|
|
|
|
|
|
def get_or_create_attribute_type(name, value, private=False):
|
|
# Determine the datatype from the value
|
|
if isinstance(value, int):
|
|
datatype = 'INTEGER'
|
|
elif isinstance(value, float):
|
|
datatype = 'FLOAT'
|
|
elif isinstance(value, bool):
|
|
datatype = 'BOOLEAN'
|
|
elif isinstance(value, str):
|
|
datatype = 'STRING'
|
|
else:
|
|
raise ValidationError('Invalid data type in certificate for attribute: {}'.format(name))
|
|
|
|
# Try to get the existing AttributeType object with matching name and datatype.
|
|
attribute_type, created = AttributeType.objects.get_or_create(
|
|
name=name,
|
|
datatype=datatype,
|
|
defaults={'is_private': private}
|
|
)
|
|
|
|
if not created and attribute_type.datatype != datatype:
|
|
# If an AttributeType with the same name but different datatype exists, create a new one.
|
|
attribute_type = AttributeType.objects.create(name=name, datatype=datatype, is_private=private)
|
|
|
|
return attribute_type
|
|
|
|
|
|
@login_required
|
|
def download_file(request, file_id):
|
|
file = get_object_or_404(File, id=file_id)
|
|
user = request.user
|
|
allowed, x, enc = check_permission(user, file)
|
|
if allowed and not x and not enc:
|
|
return serve_file(file)
|
|
elif not allowed:
|
|
return HttpResponseForbidden('You do not have permission to download this file.')
|
|
elif enc:
|
|
encrypted_data = EncryptedData(file=file, x_values=','.join(map(str, x)))
|
|
encrypted_data.save()
|
|
context = {
|
|
'encryptions_as_strings': [str(e) for e in enc],
|
|
'token': str(encrypted_data.token)
|
|
}
|
|
return render(request, 'decrypt_and_forward.html', context)
|
|
|
|
@login_required
|
|
def verify_decryption(request):
|
|
if request.method == 'POST':
|
|
token = request.POST.get('token')
|
|
decryptions = request.POST.getlist('decryptions')
|
|
encrypted_data = get_object_or_404(EncryptedData, token=token)
|
|
|
|
#Compare decryptions with stored x values TODO: for some reason this didn't work when I tried to make sure the order was preserved
|
|
x_values = list(map(int, encrypted_data.x_values.split(',')))
|
|
|
|
for decrypted in decryptions:
|
|
if int(decrypted) in x_values:
|
|
return serve_file(encrypted_data.file)
|
|
|
|
return HttpResponseForbidden("You don't have the permission to access this file!")
|
|
return HttpResponseNotAllowed(['POST'])
|
|
|
|
|
|
def check_permission(user, file, mode = None):
|
|
"""
|
|
Method that checks if a user is allowed to perform the operation specified in mode with the file
|
|
TODO: Implement mode
|
|
|
|
"""
|
|
if mode:
|
|
pass
|
|
else:
|
|
# a) Check if the user is the owner of the file.
|
|
if user == file.owner:
|
|
return True, [], []
|
|
|
|
# b) Check if the user is a superuser.
|
|
if user.is_superuser:
|
|
return True, [], []
|
|
|
|
# c) Check the user's attributes against the file's rules.
|
|
x = []
|
|
enc = []
|
|
for rule in file.rule_set.all():
|
|
result, x_i, enc_i = rule.is_satisfied_by(user)
|
|
if result:
|
|
x.append(x_i)
|
|
enc.append(enc_i)
|
|
else:
|
|
return False, [], []
|
|
return True, x, enc
|
|
|
|
def serve_file(file):
|
|
# Serve the file using FileResponse.
|
|
response = FileResponse(file.file.open('rb'))
|
|
response['Content-Disposition'] = f'attachment; filename="{file.name}"'
|
|
return response
|
|
|
|
|
|
@login_required
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
def user_management(request):
|
|
if request.method == "POST":
|
|
username = request.POST.get('username')
|
|
password = request.POST.get('password')
|
|
is_superuser = 'is_superuser' in request.POST # The checkbox for superuser will be in request.POST if checked.
|
|
|
|
if not username or not password:
|
|
messages.error(request, 'Username and password are required.')
|
|
else:
|
|
User.objects.create_user(username=username, password=password, is_superuser=is_superuser)
|
|
messages.success(request, f'User {username} created successfully.')
|
|
|
|
return redirect('abac:user_management')
|
|
|
|
users = User.objects.all().order_by('-is_superuser', 'username')
|
|
return render(request, 'user_management.html', {'users': users})
|
|
|
|
|
|
@login_required
|
|
def upload_public_key(request, username):
|
|
if request.method == 'POST':
|
|
public_key = request.POST.get('public_key')
|
|
request.user.public_key = public_key
|
|
request.user.save()
|
|
messages.success(request, 'Public key uploaded successfully.')
|
|
return redirect(reverse('abac:user_details', args=[username]))
|
|
|
|
return render(request, 'upload_public_key.html')
|
|
|
|
|
|
def get_hierarchy_data():
|
|
users = User.objects.all()
|
|
data = {
|
|
"name": "Users",
|
|
"children": []
|
|
}
|
|
|
|
for user in users:
|
|
user_data = {
|
|
'name': user.username,
|
|
'children': []
|
|
}
|
|
|
|
for file in user.file_set.all():
|
|
file_data = {
|
|
'name': file.name,
|
|
'children': []
|
|
}
|
|
|
|
for rule in file.rule_set.all():
|
|
rule_data = {
|
|
'name': rule.name,
|
|
'children': []
|
|
}
|
|
|
|
for rule_attribute in rule.ruleattribute_set.all():
|
|
rule_data['children'].append({
|
|
'name': f"{rule_attribute.attribute_type.name}; {rule_attribute.operator}; {rule_attribute.value}",
|
|
'children': []
|
|
})
|
|
|
|
file_data['children'].append(rule_data)
|
|
|
|
user_data['children'].append(file_data)
|
|
|
|
data['children'].append(user_data)
|
|
|
|
return json.dumps(data)
|
|
|
|
|
|
@login_required
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
def hierarchy_view(request):
|
|
context = {'data': get_hierarchy_data()}
|
|
return render(request, 'vis_tree.html', context=context)
|
|
|
|
@login_required
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
def trusted_authorities(request):
|
|
if request.method == "POST":
|
|
form = TrustedAuthorityForm(request.POST)
|
|
if form.is_valid():
|
|
form.save()
|
|
messages.success(request, 'Authority successfully added!')
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
else:
|
|
messages.warning(request, 'Invalid RSA Key')
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
|
|
authorities = TrustedAuthority.objects.all()
|
|
authorities_with_hashes = [(authority, hashlib.sha256(authority.public_key.encode()).hexdigest()) for authority in authorities]
|
|
|
|
form = TrustedAuthorityForm()
|
|
context = {
|
|
'authorities_with_hashes': authorities_with_hashes,
|
|
'form': form
|
|
}
|
|
return render(request, 'trusted_authorities.html', context)
|
|
|
|
@login_required
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
def delete_authority(request, authority_id):
|
|
authority = get_object_or_404(TrustedAuthority, id=authority_id)
|
|
if request.method == 'POST':
|
|
authority.delete()
|
|
messages.success(request, 'Authority successfully deleted!')
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
|
|
else:
|
|
return redirect(reverse('abac:trusted_authorities')) |