Hier ist der Code, den ich zum Erstellen des X.509-Zertifikats verwende:
Code: Select all
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from datetime import datetime, timedelta, timezone
def create_x509_certificate() -> tuple[x509.Certificate, rsa.RSAPrivateKey]:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096)
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, "Self-Signed Certificate"),
]
)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
.sign(private_key, hashes.SHA256())
)
return (cert, private_key)
Code: Select all
import base64
import time
import requests
client_state = APP.config.setdefault("client_state", str(int(time.time())))
expiration_time = (datetime.now(timezone.utc) + timedelta(minutes=120)).isoformat()
cert, private_key = create_x509_certificate()
APP.config["private_key"] = private_key
b64_encoded_der_public_key = base64.b64encode(cert.public_bytes(Encoding.DER)).decode()
LOGGER.debug("Public key: '%s'", b64_encoded_der_public_key)
subscription_payload = {
"resource": "communications/onlineMeetings/getAllTranscripts",
"changeType": "created",
"notificationUrl": f"{SERVER_URL}/webhook",
"lifecycleNotificationUrl": f"{SERVER_URL}/webhook",
"expirationDateTime": expiration_time,
"clientState": client_state,
"includeResourceData": True,
"encryptionCertificate": b64_encoded_der_public_key,
"encryptionCertificateId": f"CertificateId-{int(time.time())}",
}
response = requests.post(
"https://graph.microsoft.com/v1.0/subscriptions",
headers=_get_auth_headers(),
json=subscription_payload,
timeout=(5, 60),
)
from cryptography.hazmat.primitives.asymmetric.padding import OAEP, MGF1
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.padding import PKCS7
import base64
def _decrypt_payload(
private_key: rsa.RSAPrivateKey,
b64_encoded_payload: str,
b64_encoded_data_key: str,
b64_encoded_data_signature: str,
) -> str:
# Decrypt the symmetric key
symmetric_key = private_key.decrypt(
base64.b64decode(b64_encoded_data_key),
OAEP(
mgf=MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
encrypted_payload = base64.b64decode(b64_encoded_payload)
# Create HMAC using decrypted symmetric key
h = hmac.HMAC(symmetric_key, hashes.SHA256())
h.update(encrypted_payload)
calculated_signature = base64.b64encode(h.finalize()).decode()
# Verify signature
if b64_encoded_data_signature != calculated_signature:
raise Exception("Signature is invalid. Possible tampering detected.")
# Decrypt the content
iv = symmetric_key[:16] # First 16 bytes of the symmetric key as IV
cipher = Cipher(algorithms.AES(symmetric_key), modes.CBC(iv))
decryptor = cipher.decryptor()
decrypted_payload = decryptor.update(encrypted_payload) + decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
unpadded_data = unpadder.update(decrypted_payload) + unpadder.finalize()
return unpadded_data.decode()
< /code>
Ausgabe: < /strong>
Entschlüsselung schlägt konsequent fehl, wenn versucht wird, den AES -Schlüssel mit privat_key.decrypt () < /code> zu entschlüsseln. Ich vermute, dass das Problem möglicherweise damit zusammenhängt, wie ich das Zertifikat für Abonnement oder die Verarbeitung der Nutzlast während der Entschlüsselung codieren kann. />
Bin ich das Zertifikat in der und base64 korrekt für das Feld "Verschlüsselungszertifikat" < /code> in der Abonnementanforderung? < /li>
codiert Irgendwas falsch in der Art, wie ich den symmetrischen Schlüssel entschlüsse oder die HMAC -Signatur überprüfe? >
< /ol>
Alle Erkenntnisse oder Vorschläge wären sehr geschätzt! < /p>