Der RSA-Schlüssel kann nicht entschlüsselt und die Gesundheitsprüfung der WhatsApp-Flows nicht abgeschlossen werdenPython

Python-Programme
Guest
 Der RSA-Schlüssel kann nicht entschlüsselt und die Gesundheitsprüfung der WhatsApp-Flows nicht abgeschlossen werden

Post by Guest »

Das Problem
Ich versuche, einen Ablauf mit der WhatsApp Business API einzurichten, kann aber die Entschlüsselung des Schlüssels im Rahmen der Endpunkt-Gesundheitsprüfung nicht abschließen.
Ich erhalte diese Fehlermeldung: Verschlüsselung/Entschlüsselung fehlgeschlagen

Code: Select all

in decrypt_request
aes_key = private_key.decrypt(encrypted_aes_key, OAEP(mgf=MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
ValueError: Encryption/decryption failed.
So reproduzieren Sie das Problem
Im Folgenden sind die Schritte aufgeführt, die ich gemäß den Dokumenten hier und hier befolgt habe:
  • Richten Sie den Flow-Endpunkt ein.
  • Generieren Sie ein 2048-Bit-RSA-Schlüsselpaar.< /p>

Code: Select all

openssl genrsa -des3 -out private.pem 2048
Der Schlüssel sieht so aus:

Code: Select all

-----BEGIN ENCRYPTED PRIVATE KEY-----
AAA........
BBB....
....
-----END ENCRYPTED PRIVATE KEY-----
  • Exportieren Sie den öffentlichen RSA-Schlüssel in eine Datei:

Code: Select all

openssl rsa -in private.pem -outform PEM -pubout -out public.pem
  • Öffentlichen Geschäftsschlüssel festlegen.

Code: Select all

curl -X POST \
'https://graph.facebook.com/v21.0/PHONE_NUMBER_ID/whatsapp_business_encryption' \
-H 'Authorization: Bearer ACCESS_TOKEN' \
-H 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'business_public_key=-----BEGIN PUBLIC KEY-----
AAA
BBB
CCC
-----END PUBLIC KEY-----'
  • Verschlüsselungs-/Entschlüsselungscode erstellen:

Code: Select all

import json
import os
from base64 import b64decode, b64encode
from cryptography.hazmat.primitives.asymmetric.padding import OAEP, MGF1, hashes
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from fastapi import HTTPException, Request
import logging

# Load the private key string
# PRIVATE_KEY = os.environ.get('PRIVATE_KEY')
PRIVATE_KEY_PATH = "./private.pem"
with open(PRIVATE_KEY_PATH, "rb") as pem_file:

PRIVATE_KEY = pem_file.read().decode("utf-8")

async def data(request: Request):
try:
# Parse the request body
body = json.loads(await request.body())
logging.info(f"body = {body}")

# Read the request fields
encrypted_flow_data_b64 = body['encrypted_flow_data']
encrypted_aes_key_b64 = body['encrypted_aes_key']
initial_vector_b64 = body['initial_vector']

decrypted_data, aes_key, iv = await decrypt_request(
encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64)
print(decrypted_data)

# Return the next screen &  data to the client
response = {
"screen": "SCREEN_NAME",
"data": {
"some_key": "some_value"
}
}

# Return the response as plaintext
return await encrypt_response(response, aes_key, iv)
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="Internal Server Error")

async def decrypt_request(encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64):
flow_data = b64decode(encrypted_flow_data_b64)
iv = b64decode(initial_vector_b64)

# Decrypt the AES encryption key
encrypted_aes_key = b64decode(encrypted_aes_key_b64)
private_key = load_pem_private_key(
PRIVATE_KEY.encode("utf-8"), password=os.getenv("PASSPHRASE").encode('utf-8'))
aes_key = private_key.decrypt(encrypted_aes_key, OAEP(
mgf=MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))

# Decrypt the Flow data
encrypted_flow_data_body = flow_data[:-16]
encrypted_flow_data_tag = flow_data[-16:]
decryptor = Cipher(algorithms.AES(aes_key),
modes.GCM(iv, encrypted_flow_data_tag)).decryptor()
decrypted_data_bytes = decryptor.update(
encrypted_flow_data_body) + decryptor.finalize()
decrypted_data = json.loads(decrypted_data_bytes.decode("utf-8"))
return decrypted_data, aes_key, iv

async def encrypt_response(response, aes_key, iv):
# Flip the initialization vector
flipped_iv = bytearray()
for byte in iv:
flipped_iv.append(byte ^ 0xFF)

# Encrypt the response data
encryptor = Cipher(algorithms.AES(aes_key),
modes.GCM(flipped_iv)).encryptor()
return b64encode(
encryptor.update(json.dumps(response).encode("utf-8")) +
encryptor.finalize() +
encryptor.tag
).decode("utf-8")
Kann mir jemand sagen, was ich falsch mache?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post