Benutzerdefinierte Einbettung und benutzerdefinierte Wissensquelle in Crewai - KeyError erhalten: 'openai_api_key' 'Python

Python-Programme
Anonymous
 Benutzerdefinierte Einbettung und benutzerdefinierte Wissensquelle in Crewai - KeyError erhalten: 'openai_api_key' '

Post by Anonymous »

Ich habe einen benutzerdefinierten Einbettder für Wissen und Tools eingerichtet, aber den Fehler erhalten - KeyError: 'openai_api_key' < /code>
Das benutzerdefinierte Einbettder, das ich verwende, ist ein privat gehostetes Azure OpenAI -Modell. Auf die mit bestimmten APIs zusammen mit einem abgerufenen Auth -Token zugegriffen werden kann.

Code: Select all

class Embedding(EmbeddingFunction):
"""Custom EmbeddingFunction implementation for Azure OpenAI Embeddings with OAuth authentication.

This class handles text embedding requests to Azure OpenAI's embedding models.
It supports authentication via OAuth with Microsoft Identity Platform,
handles token management, and processes embedding responses in various formats.
"""

def __init__(self):
"""Initialize the embedding function."""
self.token_manager = TokenManager()  # Initialize token manager

def _encoding(self) -> tiktoken.Encoding:
"""Return the encoding function for token counting.

This method uses tiktoken to get the encoding for the specified model.
If tiktoken is not available, it falls back to a simple approximation.
"""
try:
return tiktoken.get_encoding("cl100k_base")
except Exception as e:
logger.warning(
f"Failed to load tiktoken encoding for embeddings: {e}.  Falling back to approximate token counting."
)
raise e

def __call__(
self,
messages: Documents,
) -> Embeddings:
"""Call the Azure OpenAI Embedding API to generate embeddings.

This method follows the CrewAI BaseLLM interface and handles both single text
and batch embedding scenarios.

Args:
messages: String or list of strings to embed

Returns:
Embeddings: A List[List[float]] containing the embedding vectors
"""

try:
logger.info(f"Embedding request received for {len(messages)} items")
api_url = str(base_url)
payload: Dict[str, Any] = {
"input": messages,
}

# Get cached token from token manager
access_token = self.token_manager.get_token()
headers = {
"Subscription-Key": subscription_key,
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}

# Check if we're dealing with a batch request
is_batch = isinstance(messages, list)

# Handle batch processing if needed
if is_batch and len(messages) > 0:
# Check if we need to split the batch due to token limits
# The embedding model typically has a 8191 token limit per request
MAX_BATCH_TOKENS = min(8000, context_window - 100)  # Keep some buffer

# If we have a large batch, split and process in chunks
if (
len(messages) > 20
):  # Arbitrary threshold for when to check token counts
logger.info(
f"Large batch detected ({len(messages)} items), checking token counts"
)

total_tokens = 0
if self._encoding():  # Only attempt if we have tiktoken available
try:
# Sample a few items to get average token count
sample_size = min(5, len(messages))
sample_tokens = sum(
self.count_tokens(msg) for msg in messages[:sample_size]
)
avg_tokens = sample_tokens / sample_size

estimated_total = avg_tokens * len(messages)
logger.info(
f"Estimated token count for batch: {estimated_total:.0f} tokens"
)

if estimated_total > MAX_BATCH_TOKENS:
# We should split the batch into smaller chunks
logger.warning(
f"Batch size exceeds max tokens ({estimated_total:.0f} >  {MAX_BATCH_TOKENS})"
)
logger.warning(
"Consider splitting large batches manually for better control"
)
except Exception as e:
logger.warning(f"Error estimating token count: {e}")

# Make the API request with retry decorator and proper error handling
logger.info(f"Sending embedding request to Azure OpenAI API: {api_url}")
start_time = time.time()

# The retry mechanism will handle 401/403 errors and automatically refresh
# tokens as needed in the _make_api_request method
response = self._make_api_request(api_url, headers, payload)

api_time = time.time() - start_time
logger.info(f"Embedding API call completed in {api_time:.2f}s")
response_data = response

# Log usage information if available
if response_data.get("usage"):
prompt_tokens = response_data["usage"].get("prompt_tokens", 0)
total_tokens = response_data["usage"].get("total_tokens", 0)
logger.info(
f"Embedding usage: {prompt_tokens} prompt tokens, {total_tokens} total tokens"
)

# Always return List[List[float]] as per the Embeddings type
if not response_data.get("data"):
logger.warning("No embeddings returned from API")
return []

# Extract embedding vectors, regardless of batch or single input
embeddings = [item.get("embedding", []) for item in response_data["data"]]
return embeddings

except (
AuthenticationError,
RateLimitError,
InvalidRequestError,
APITimeoutError,
TokenValidationError,
ResponseParsingError,
) as e:
# Log specific error
logger.error(f"API Error: {str(e)}")
raise

except Exception as e:
# Log unexpected errors
logger.exception(f"Unexpected error in embedding API call: {str(e)}")
raise LLMError(
f"An unexpected error occurred in embedding API call: {str(e)}"
)

def count_tokens(self, text: str) -> int:
"""Count the number of tokens in a text string.

Args:
text: The text to count tokens for

Returns:
Approximate token count
"""
if self._encoding():
return len(self._encoding().encode(text))
else:
# Fallback to a simple approximation if encoding is not available
return len(text) // 4  # Rough approximation: ~4 chars per token

@staticmethod
def _should_retry(exception) -> bool:
"""Determine if the exception warrants a retry."""
if isinstance(exception, requests.HTTPError):
if hasattr(exception, "response") and hasattr(
exception.response, "status_code"
):
status_code = exception.response.status_code
return status_code in [401, 403]
return False

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception(_should_retry),
)
def _make_api_request(
self, url: str, headers: Dict[str, str], payload: Dict[str, Any]
) -> Dict[str, Any]:
"""Make an API request with retries and proper error handling."""
try:
response = requests.post(url, headers=headers, json=payload)

# Check for auth errors first so we can refresh token before retry
if response.status_code in [401, 403]:
# Refresh token and update headers before retrying
logger.warning(
f"Authentication error {response.status_code}.  Refreshing token..."
)
access_token = self.token_manager.get_token(force_refresh=True)
headers["Authorization"] = f"Bearer {access_token}"
# Raise HTTPError which will trigger retry with updated headers
response.raise_for_status()

# For other status codes, just raise for status as normal
response.raise_for_status()

try:
return response.json()
except ValueError:
raise ResponseParsingError("Failed to parse JSON response from API")

except requests.HTTPError as e:
# Handle non-auth errors (since auth errors are handled above)
status_code = e.response.status_code
if status_code in [401, 403]:
# Let the retry mechanism handle it with refreshed token
raise e
elif status_code == 429:
raise RateLimitError("Rate limit exceeded. Please try again later.")
elif status_code >= 500:
raise LLMError(f"Server error: {status_code}")
else:
raise LLMError(f"HTTP error: {status_code}")
except requests.ConnectionError as e:
raise LLMError(f"Connection error: {str(e)}")
except requests.RequestException as e:
raise LLMError(f"Request failed: {str(e)}")
< /code>
Es wird mit Agent verwendet: < /p>
self.embedding = Embedding()

self.column_identifier_agent = Agent(
role=self.agents_config["column_identifier_agent"]["role"],
goal=self.agents_config["column_identifier_agent"]["goal"],
backstory=self.agents_config["column_identifier_agent"]["backstory"],
verbose=self.verbose,
llm=self.llm,
tools=[dir_tool, file_read_tool],
embedder={
"provider": "custom",
"config": {
"embedding_model": self.embedding,
},
},
)
In ähnlicher Weise erstellte bei Verwendung des Crewai -Wissens einen benutzerdefinierten Wissenspeicher durch Erweiterung des Knowledledrigage und die Verwendung der Postgres als Vektor DB:
zu verwendenclass PgVectorStorage(KnowledgeStorage):
"""PgVector implementation for knowledge storage using SQLAlchemy."""

def __init__(
self,
table_name: str = "knowledge_embeddings",
vector_dim: int = 1536,
index_type: str = "hnsw",
):
"""Initialize PgVector storage.

Args:
table_name (str, optional): Name of the table to store embeddings. Defaults to "knowledge_embeddings".
vector_dim (int, optional): Dimension of vectors. Defaults to 1536 (OpenAI's text-embedding-ada-002).
index_type (str, optional): Type of index to create ('hnsw' or 'ivfflat'). Defaults to "hnsw".
"""
self.table_name = table_name
self.vector_dim = vector_dim
self.index_type = index_type
self._initialize_storage()

def _initialize_storage(self) -> None:
"""Initialize the database table and create necessary indexes."""
with engine.connect() as conn:
# Enable vector extension if not already enabled
conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))

# Create table if it doesn't exist
conn.execute(
text(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id bigserial PRIMARY KEY,
content text NOT NULL,
embedding vector({self.vector_dim}) NOT NULL,
metadata jsonb
);
"""
)
)

# Create vector similarity search index
if self.index_type == "hnsw":
conn.execute(
text(
f"""
CREATE INDEX IF NOT EXISTS {self.table_name}_embedding_idx
ON {self.table_name}
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
"""
)
)
elif self.index_type == "ivfflat":
# For IVFFlat, we'll use sqrt(n) lists where n is the current number of rows
result = conn.execute(text(f"SELECT COUNT(*) FROM {self.table_name};"))
row_count = max(100, int(result.scalar() ** 0.5))
conn.execute(
text(
f"""
CREATE INDEX IF NOT EXISTS {self.table_name}_embedding_idx
ON {self.table_name}
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = {row_count});
"""
)
)

conn.commit()

def search(
self,
query: List[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""Search for documents in the knowledge base.

Args:
query (List[str]): Query vectors to search for
limit (int, optional): Maximum number of results to return. Defaults to 3.
filter (Optional[dict], optional): Metadata filter criteria. Defaults to None.
score_threshold (float, optional): Minimum similarity score. Defaults to 0.35.

Returns:
List[Dict[str, Any]]: List of matching documents with their metadata and scores
"""
# Convert query list to a PG vector string
query_vector = f"'[{','.join(query)}]'"

logger.info(f"searching to PgVector storage... {query_vector}")

with engine.connect() as conn:
# Build the base query
base_query = f"""
SELECT
content,
metadata,
1 - (embedding {query_vector}::vector) as similarity
FROM {self.table_name}
WHERE 1 - (embedding {query_vector}::vector) > :score_threshold
"""

# Add metadata filters if provided
params = {"score_threshold": score_threshold}
if filter:
conditions = []
for idx, (key, value) in enumerate(filter.items()):
param_name = f"filter_{idx}"
conditions.append(f"metadata->>{key} = :{param_name}")
params[param_name] = value
if conditions:
base_query += " AND " + " AND ".join(conditions)

# Add ordering and limit
base_query += f" ORDER BY similarity DESC LIMIT {limit}"

# Execute query
result = conn.execute(text(base_query), params)

# Fetch results
results = []
for row in result:
results.append(
{
"content": row.content,
"metadata": row.metadata or {},
"score": float(row.similarity),
}
)

return results

def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
) -> None:
"""Save documents to the knowledge base.

Args:
documents (List[str]): List of document vectors to save
metadata (Dict[str, Any] | List[Dict[str, Any]]): Metadata for the documents
"""

logger.info("Saving to PgVector storage...")
if isinstance(metadata, dict):
metadata = [metadata] * len(documents)

with engine.connect() as conn:
# Insert each document
for doc, meta in zip(documents, metadata):
vector_str = f"[{','.join(doc)}]"

# Using parameterized query for safe insertion
query = text(
f"""
INSERT INTO {self.table_name} (content, embedding, metadata)
VALUES (:content, :embedding::vector, :metadata::jsonb)
"""
)

conn.execute(
query, {"content": doc, "embedding": vector_str, "metadata": meta}
)

conn.commit()

def reset(self) -> None:
"""Reset the knowledge base by dropping and recreating the table."""

logger.info("Reset PgVector storage...")
with engine.connect() as conn:
conn.execute(text(f"DROP TABLE IF EXISTS {self.table_name};"))
conn.commit()

# Reinitialize the storage
self._initialize_storage()
< /code>
Verwendung: < /p>
business_terms = JSONKnowledgeSource(
file_paths=["business_terms_metadata.json"],
storage=self.storage,
collection_name="business_terms",
metadata={
"description": "Business terms and their definitions",
"source": "business_terms_metadata.md"
}
)
business_terms.add()
< /code>
Dies fällt auch auf den Standard zurück und versucht, die OpenAI_API_KEY zu finden.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post