So konsumieren Sie Millionen von Nachrichten aus Azure Service Bus -Thema Trigger mit dauerhaften FunktionenPython

Python-Programme
Anonymous
 So konsumieren Sie Millionen von Nachrichten aus Azure Service Bus -Thema Trigger mit dauerhaften Funktionen

Post by Anonymous »

Ich verwende VSCODE, um Millionen von Nachrichten pro Tag vom Azure Service Bus -Thema zu verbrauchen und es mit Python auf ADLGen2 zu speichern. Die Funktion App ist ein Premium -App -Serviceplan. < /P>
Hier ist die Projektstruktur < /p>

Code: Select all

/project-azure-function-app
│
├── /custom_package
│   └── __init__.py
│   └── functions.py  # Contains my business logic (e.g., append_dataset, save_to_db, etc.)
│
├── function_app.py        # Main entry point: Service Bus trigger
│
├── host.json              # Function app configuration
├── requirements.txt       # List of Python dependencies
└── local.settings.json    # Local settings
In Host.json verwende ich die Standardparameter. Ich bin mir nicht sicher, ob es in meinem Fall bessere Alternativen gibt. Ich begrüße Ihre Eingabe und Ideen < /p>

Code: Select all

{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "orchestrator_function",
"maxConcurrentActivityFunctions": 10,
"maxConcurrentOrchestratorFunctions": 10
}
},
"functionTimeOut": "00:10:00"
}
< /code>
function_app.py
enthält im Stammverzeichnis den folgenden Code:

Code: Select all

import azure.functions as func
import logging
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DatalakeDirectoryClient, DataLakeClient
from custom_package.functions import save_to_db_activity, get_file_syst, arg_name, subscription_name, topic_name, connection, append_dataset, global_data

app = func.FunctionApp()

@app.service_bus_topic_trigger(arg_name, subscription_name, topic_name, connection)
@app.durable_client_input(client_name="client")
async def servicebus_topic_trigger(azservicebus: func.ServiceBusMessage, client: df.DurableOrchestrationContext) -> None:
get_msg = azservicebus.get_body().decode('utf-8')
_ = await client client.start_new('orchestrator_function', None, get_msg)

@app.durable_orchestration_trigger(context_name="context")
async def orchestrator_function(context: df.DurableOrchestrationContext):
data = context.get_input()

# Call the activity function to save data to Data Lake.
await context.call_activity('save_to_db_activity', data)
return True

@app.activity_trigger(input_name="save_to_db_activity")
def save_to_db_activity(data: str) -> None:
append_dataset(data, False)
char_nb = 1000
if len(global_data) > char_nb:
// Data lake stuff to save the data into a file.
.....

// Reset global_data
append_dataset("", True)
< /code>
/custom_package/functions.py

....

global_data = ""

def append_dataset(data: str, reset: bool)-> None:
global global_data
global_data += data + '\n'

if reset:
global_data = ""
< /code>
While debugging locally, I get the error: FunctionApp object has no attribute durable_client_object< /code>.
Leider wird die Funktion in der Cloud nicht ausgelöst.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post