Ich stelle meine Funktion App über VS -Code ein und ich zurücklieg oft zur traditionellen Azure -Funktionsstruktur, da meine Funktionen, wenn ich das dekoratorbasierte Modell verwende.
de-NewsletterAI-dev-01-fa/
├── function_app.py
├── __init__.py
├── RSSNewsletter.py
├── news_scrapper.py
├── host.json
└── requirements.txt
< /code>
rssnewsletter.py:
# Azure Function entry point
def main_timer_trigger(req: func.HttpRequest) -> func.HttpResponse:
"""HTTP trigger function to run the newsletter generation"""
try:
main()
return func.HttpResponse(
"Successfully generated reports and sent emails.",
status_code=200
)
except Exception as e:
print(f"Error in main function: {e}")
logging.error(f"Error in main function: {e}")
return func.HttpResponse(
f"An error occurred: {str(e)}",
status_code=500
)
def get_company_news(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Processing request for company news')
# Get parameters from query string
company_name = req.params.get('company')
days_back = int(req.params.get('days', 7))
if not company_name:
return func.HttpResponse(
"Please provide a company name in the query string",
status_code=400
)
try:
# Get news using the RSS-first approach
news_items = news_scraper.get_news_with_fallback(company_name, days_back)
# Return the news items as JSON
return func.HttpResponse(
json.dumps({"news": news_items, "count": len(news_items)}),
mimetype="application/json",
status_code=200
)
except Exception as e:
logging.error(f"Error retrieving news: {str(e)}")
return func.HttpResponse(
f"An error occurred: {str(e)}",
status_code=500
)
def scheduled_news_collector(newsTimer: func.TimerRequest) -> None:
"""Runs every 4 hours to collect news for configured companies"""
if newsTimer.past_due:
logging.info('The news timer is past due!')
logging.info('Starting scheduled news collection')
# Companies to monitor - could be moved to configuration
companies = ["Abbott Diabetes Care", "Dexcom", "Medtronic Diabetes"]
all_results = {}
# Create a blob storage manager using the existing class
blob_storage = BlobStorageManager()
for company in companies:
try:
news_items = news_scraper.get_news_with_fallback(company)
all_results[company] = news_items
logging.info(f"Collected {len(news_items)} news items for {company}")
# Store individual company results
if news_items:
# Create a clean company name for the filename
clean_company_name = company.replace(" ", "_").lower()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
blob_name = f"news_{clean_company_name}_{timestamp}.json"
# Store as JSON in the output container
blob_storage.upload_blob(
container_name="output",
blob_name=blob_name,
data=news_items,
content_type="application/json"
)
logging.info(f"Stored {len(news_items)} news items for {company} in blob: {blob_name}")
except Exception as e:
logging.error(f"Error collecting news for {company}: {e}")
# Store the combined results with all companies
if all_results:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
combined_blob_name = f"news_all_companies_{timestamp}.json"
# Add metadata about the collection
collection_data = {
"collection_time": datetime.now().isoformat(),
"companies": companies,
"news_counts": {company: len(items) for company, items in all_results.items()},
"total_items": sum(len(items) for items in all_results.values()),
"data": all_results
}
# Store combined results
blob_storage.upload_blob(
container_name="output",
blob_name=combined_blob_name,
data=collection_data,
content_type="application/json"
)
logging.info(f"Stored combined results for all companies in blob: {combined_blob_name}")
logging.info('Completed scheduled news collection')
< /code>
Function_App.py:
import logging
logging.info("function app starting")
import azure.functions as func
from . import RSSNewsletter
app = func.FunctionApp()
@app.route(route="get_company_news", methods=["GET"])
def get_company_news(req: func.HttpRequest) -> func.HttpResponse:
return RSSNewsletter.get_company_news(req)
@app.schedule(schedule="0 0 7 1 * *", arg_name="newsTimer", run_on_startup=False)
def scheduled_news_collector(newsTimer: func.TimerRequest) -> None:
return RSSNewsletter.scheduled_news_collector(newsTimer)
# Add this new function to trigger the main newsletter generation
@app.route(route="generate_newsletter", methods=["GET", "POST"])
def generate_newsletter(req: func.HttpRequest) -> func.HttpResponse:
return RSSNewsletter.main_timer_trigger(req)
Ich stelle meine Funktion App über VS -Code ein und ich zurücklieg oft zur traditionellen Azure -Funktionsstruktur, da meine Funktionen, wenn ich das dekoratorbasierte Modell verwende.[code]de-NewsletterAI-dev-01-fa/ ├── function_app.py ├── __init__.py ├── RSSNewsletter.py ├── news_scrapper.py ├── host.json └── requirements.txt < /code> rssnewsletter.py:
# Azure Function entry point def main_timer_trigger(req: func.HttpRequest) -> func.HttpResponse: """HTTP trigger function to run the newsletter generation""" try: main() return func.HttpResponse( "Successfully generated reports and sent emails.", status_code=200 ) except Exception as e: print(f"Error in main function: {e}") logging.error(f"Error in main function: {e}") return func.HttpResponse( f"An error occurred: {str(e)}", status_code=500 )
def get_company_news(req: func.HttpRequest) -> func.HttpResponse: logging.info('Processing request for company news')
# Get parameters from query string company_name = req.params.get('company') days_back = int(req.params.get('days', 7))
if not company_name: return func.HttpResponse( "Please provide a company name in the query string", status_code=400 )
try: # Get news using the RSS-first approach news_items = news_scraper.get_news_with_fallback(company_name, days_back)
def scheduled_news_collector(newsTimer: func.TimerRequest) -> None: """Runs every 4 hours to collect news for configured companies""" if newsTimer.past_due: logging.info('The news timer is past due!')
# Companies to monitor - could be moved to configuration companies = ["Abbott Diabetes Care", "Dexcom", "Medtronic Diabetes"]
all_results = {}
# Create a blob storage manager using the existing class blob_storage = BlobStorageManager()
for company in companies: try: news_items = news_scraper.get_news_with_fallback(company) all_results[company] = news_items logging.info(f"Collected {len(news_items)} news items for {company}")
# Store individual company results if news_items: # Create a clean company name for the filename clean_company_name = company.replace(" ", "_").lower() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") blob_name = f"news_{clean_company_name}_{timestamp}.json"
# Store as JSON in the output container blob_storage.upload_blob( container_name="output", blob_name=blob_name, data=news_items, content_type="application/json" )
logging.info(f"Stored {len(news_items)} news items for {company} in blob: {blob_name}")
except Exception as e: logging.error(f"Error collecting news for {company}: {e}")
# Store the combined results with all companies if all_results: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") combined_blob_name = f"news_all_companies_{timestamp}.json"
# Add metadata about the collection collection_data = { "collection_time": datetime.now().isoformat(), "companies": companies, "news_counts": {company: len(items) for company, items in all_results.items()}, "total_items": sum(len(items) for items in all_results.values()), "data": all_results }
# Store combined results blob_storage.upload_blob( container_name="output", blob_name=combined_blob_name, data=collection_data, content_type="application/json" )
logging.info(f"Stored combined results for all companies in blob: {combined_blob_name}")
# Add this new function to trigger the main newsletter generation @app.route(route="generate_newsletter", methods=["GET", "POST"]) def generate_newsletter(req: func.HttpRequest) -> func.HttpResponse: return RSSNewsletter.main_timer_trigger(req) [/code]
Ich bin neu in Avalonia/ WPF-, XAML- und Desktop -Entwicklung im Allgemeinen, also bitte verzeihen und klären Sie alle verwandten Missverständnisse, die ich zeige. Ich werde weiterhin die verfügbaren...
Ich baue eine Python -Funktion. Wenn ich es lokal ausführe, funktioniert alles wie erwartet. Alt = Aktion auf Github -Aktionen src =
Wenn ich jedoch die Azure -Funktionsprotokolle anschaue, sehe...
Ich baue eine Python -Funktion. Wenn ich es lokal ausführe, funktioniert alles wie erwartet. Alt = Aktion auf Github -Aktionen src =
Wenn ich jedoch die Azure -Funktionsprotokolle anschaue, sehe...
Ich habe einen vorhandenen Python -Dekorateur, der sicherstellt, dass eine Methode eine Psycopg Asyncconnection -Instanz erhält. Ich versuche, die Eingabe zu aktualisieren, um ParamSpec und verkettet...