Meldung: Ungültige Sitzungs-ID: Sitzung gelöscht, da der Browser die Verbindung Selenium in Docker geschlossen hatPython

Python-Programme
Guest
 Meldung: Ungültige Sitzungs-ID: Sitzung gelöscht, da der Browser die Verbindung Selenium in Docker geschlossen hat

Post by Guest »

Das Skript läuft nicht in Docker, funktioniert aber außerhalb. Der Fehler, der in Docker auftritt, ist:

Code: Select all

Message: invalid session id: session deleted as the browser has closed the connection.
Können Sie mir helfen zu verstehen, was falsch ist und wie ich den Code ändern kann, damit er in Docker funktioniert? Warum funktioniert es außerhalb von Docker einwandfrei?
Vorher habe ich Chrome nicht in Docker installiert, daher verstehe ich den Fehler nicht.
docker-compose. yml

Code: Select all

version: '3.9'

services:
web:
build: .
container_name: stock_parser
volumes:
- .:/app
environment:
- PYTHONUNBUFFERED=1
depends_on:
- mongo

mongo:
image: mongo:latest
container_name: stock_parser_db
restart: unless-stopped
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
Docker-Datei

Code: Select all

FROM --platform=linux/amd64 python:3.9-slim
WORKDIR /app

COPY requirements.txt .

# Install the required packages
RUN pip install --no-cache-dir -r requirements.txt

RUN apt-get update && apt-get install -y \
wget \
unzip \
libnss3 \
libgconf-2-4 \
libxss1 \
libxrandr2 \
libx11-xcb1 \
libxcomposite1 \
libxcursor1 \
libxi6 \
libxtst6 \
libglib2.0-0 \
libasound2 \
fonts-liberation \
libappindicator3-1 \
libgtk-3-0 \
&& rm -rf /var/lib/apt/lists/*

RUN apt-get update && apt-get install -y wget curl unzip gnupg --no-install-recommends

RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | \
gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/google.gpg --import; \
chmod 644 /etc/apt/trusted.gpg.d/google.gpg; \
echo "deb http://dl.google.com/linux/chrome/deb/ stable main" > /etc/apt/sources.list.d/google-chrome.list

RUN curl -LO https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb && \
apt-get install -y ./google-chrome-stable_current_amd64.deb && \
rm google-chrome-stable_current_amd64.deb

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

COPY .  .

# Expose any necessary ports, e.g., for MongoDB
EXPOSE 27017

CMD ["python", "stock_parser.py"]
< /code>
Stock_parser.py
from dotenv import load_dotenv
import os

import time
import json
import pickle
import logging
import requests
from db import send_database_data
from requests.exceptions import RequestException, ReadTimeout
from datetime import date, datetime

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager

load_dotenv()

logging.basicConfig(format='%(asctime)s %(levelname)s - %(message)s',
level=logging.INFO, filename='stockanalysis.log')

file_handler = logging.FileHandler("stockanalysis.log")
stream_handler = logging.StreamHandler()

class JsonWorker:

def save_file(self, data, file):
try:
with open(file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except IOError as e:
logging.error(f"Произошла ошибка ввода-вывода: {e}")
except TypeError as e:
logging.error(f"Ошибка типа данных: {e}")
except Exception as e:
logging.error(f"Произошла непредвиденная ошибка: {e}")

def open_json_file(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
return data
except FileNotFoundError:
logging.error(f"Файл {file_path} не найден.")
except json.JSONDecodeError:
logging.error(f"Ошибка декодирования JSON в файле {file_path}.")
except Exception as e:
logging.error(f"Произошла ошибка: {e}")

class SeleniumAuth(JsonWorker):

def __init__(self):
super().__init__()
self.driver = self.get_chromedriver()
self.link = 'https://stockanalysis.com'

def record_cookies(self):
pickle.dump(self.driver.get_cookies(), open(f"stock_cookies", "wb"))

def add_cookies(self):
print("Добавление куки..")
for cookie in pickle.load(open("stock_cookies", "rb")):
self.driver.add_cookie(cookie)

def get_chromedriver(self):
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument(
f"--user-agent={'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.135 YaBrowser/21.6.2.855 Yowser/2.5 Safari/537.36'}")
chrome_options.add_experimental_option("useAutomationExtension", False)
chrome_options.add_experimental_option(
"excludeSwitches", ["enable-automation"])
driver = webdriver.Chrome(options=chrome_options, service=Service(ChromeDriverManager().install()))
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
'source': '''
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;
'''
})
return driver

def restart_driver(self):
try:
self.driver.quit()  # Закрыть текущую сессию
except Exception as e:
logging.error(f"Ошибка при закрытии драйвера: {e}")
self.driver = self.get_chromedriver()  # Запустите новый драйвер

def transform_key(self, key):

try:
key_without_spaces = key.replace(" ", "")
date_object = datetime.strptime(key, "%b %d, %Y")
return "D"  + date_object.strftime("%Y%m%d")
except (ValueError, TypeError):
return key_without_spaces

def remove_spaces_from_keys(self, data):
if isinstance(data, dict):
new_data = {}
for key, value in data.items():
new_key = self.transform_key(key=key)
new_data[new_key] = self.remove_spaces_from_keys(value)
return new_data
elif isinstance(data, list):
# Рекурсивно обрабатываем списки
return [self.remove_spaces_from_keys(item) for item in data]
else:
return data

def parsing_financial_table(self, table):
'''
Для сложных финансовых таблиц
'''
thead = table.find_element(By.TAG_NAME, 'thead')
tbody = table.find_element(By.TAG_NAME, "tbody")
rows = tbody.find_elements(By.TAG_NAME, 'tr')
dict_table = {}
dict_table['Info'] = []
for headers in thead.find_elements(By.TAG_NAME, 'tr'):
key_name = headers.text.split('\n')[0]
if key_name not in 'Fiscal':
dict_table['Info'] = {key_name: []}
for th in headers.find_elements(By.TAG_NAME, 'th'):
if key_name != th.text:
dict_table['Info'][key_name].append({th.text: []})

for row in rows:
# print(row)
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]
key_name = cols[0]
cols = cols[1::]
for i in range(0, len(dict_table['Info']['Period Ending'])):
data_name = list(dict_table['Info']
['Period Ending'][i].keys())[0]
try:
dict_table['Info']['Period Ending'][i][data_name].append(
{key_name: cols[i]})
except IndexError:
dict_table['Info']['Period Ending'][i][data_name].append({
key_name: '-'})
# print(dict_table)
return dict_table

def parsing_table(self, table):
'''Для вкладки Overview'''
rows = table.find_elements(By.TAG_NAME, 'tr')
dict_table = {}
for row in rows:
# Заменить на "th", если нужно извлечь заголовки
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]
# print(cols)
key = cols[0]
dict_table[key] = cols[-1]
return dict_table

def parsing_table_statics(self, table):
'''Для вкладок Statics/Dividends'''
thead = table.find_element(By.TAG_NAME, "thead")
tbody = table.find_element(By.TAG_NAME, "tbody")
rows = tbody.find_elements(By.TAG_NAME, 'tr')
data_table = [{header.text: []}
for header in thead.find_elements(By.TAG_NAME, 'th')]
for row in rows:
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]

for i in range(0, len(data_table)):
try:
for key in data_table[i].keys():
data_table[i][key].append(cols[i])
except IndexError:
pass

return data_table

def get_stock_data(self, url):

try:
response = requests.get(url)
response.raise_for_status()  # Вызывает исключение для HTTP-ошибок (4xx, 5xx)
data = response.json()
return data
except (RequestException, ReadTimeout) as e:
logging.error(f"Ошибка запроса: {e}")
return None
except json.JSONDecodeError as e:
logging.error(f"Ошибка декодирования JSON: {e}")
return None

def rename_data_keys(self, data: dict) ->  dict:
key_mapping = {
"t": "time",
"o": "open",
"h": "high",
"l": "low",
"c": "close",
"v": "vol",
}
renamed_data = {}
for key, value in data.items():
# Используем get с default значением key, если ключ не найден в словаре
renamed_data[key_mapping.get(key, key)] = value
return renamed_data

def replace_key_text(self, key: str) -> str:
return key.replace(' ', '_')

# def login(self):
#     self.driver = self.get_chromedriver()
#     self.driver.get(self.link + '/login/')
#     time.sleep(10)
#     try:
#         self.driver.find_element(
#             By.XPATH, "//input[@type='email']").send_keys(os.environ.get("LOGIN"))
#         time.sleep(1)
#         self.driver.find_element(
#             By.XPATH, "//input[@type='password']").send_keys(os.environ.get("PASSWORD"))
#         self.driver.find_element(
#             By.XPATH, "//button[text()='Log In']").click()
#         self.record_cookies()
#         self.add_cookies()
#         self.driver.get(self.link + '/stocks/screener/')
#         time.sleep(10)
#         country_list = ['Russia', 'United States', 'US OTC', 'Saudi Arabia', 'United Arab Emirates',
#                         'Hong Kong', 'China', 'Brazil', 'United Kingdom', 'Kazakhstan', 'India', 'Japan']
#         for country in country_list:
#             self.get_country_company_list(country=country)
#     except NoSuchElementException as e:
#         logging.error(f"Ошибка авторизации: {e}")
#     finally:
#         self.driver.close()
#         self.driver.quit()

def login(self):
attempt = 0
while attempt < 3:  # Попробовать три раза
try:
self.driver = self.get_chromedriver()
self.driver.get(self.link + '/login/')
time.sleep(10)
self.driver.find_element(
By.XPATH, "//input[@type='email']").send_keys(os.environ.get("LOGIN"))
time.sleep(1)
self.driver.find_element(
By.XPATH, "//input[@type='password']").send_keys(os.environ.get("PASSWORD"))
self.driver.find_element(
By.XPATH, "//button[text()='Log In']").click()
self.record_cookies()
self.add_cookies()
self.driver.get(self.link + '/stocks/screener/')
time.sleep(10)
# Ваш код для получения списка компаний
country_list = ['Russia', 'United States', 'US OTC', 'Saudi Arabia', 'United Arab Emirates',
'Hong Kong', 'China', 'Brazil', 'United Kingdom', 'Kazakhstan', 'India', 'Japan']
for country in country_list:
self.get_country_company_list(country=country)
break  # Успешный выход из цикла
except NoSuchElementException as e:
logging.error(f"Ошибка авторизации: {e}")
# Переход к следующему разу
attempt += 1
if attempt < 3:
self.restart_driver()  # Перезапускаем драйвер
except Exception as e:
logging.error(f"Произошла ошибка: {e}")
self.restart_driver()  # Перезапускаем драйвер в случае других ошибок
finally:
if self.driver:
self.driver.quit()

def get_ticker_data(self, ticker_data: dict, ticker: str, func) -> dict:
try:
ticker_data = func(ticker_data=ticker_data)
except Exception as e:
logging.error(f"Ошибка: {e}")
return ticker_data

def get_overview_company(self, ticker_data: dict) ->  dict:
if ticker_data is None:
ticker_data = {}

overview_info = self.driver.find_element(
By.XPATH, "//table[@data-test='overview-info']")
overview_quote = self.driver.find_element(
By.XPATH, "//table[@data-test='overview-quote']")
overview_info_dict = self.parsing_table(overview_info)
overview_quote_dict = self.parsing_table(overview_quote)
profile = {**overview_info_dict, **overview_quote_dict}
ticker_data['Overview'] = profile
return ticker_data

def get_financials_company(self, ticker_data: dict) -> dict:
filters_fin = ['Income', 'Balance Sheet', 'Cash Flow', 'Ratios']
filters = ['Annual', 'Quarterly', 'TTM']
ticker_data['Financials'] = {}
for filter_fin in filters_fin:
print(filter_fin)
self.driver.find_element(
By.XPATH, f"//a[text()='{filter_fin}']").click()
self.driver.find_element(
By.XPATH, '//span[@class="truncate"]').click()
time.sleep(1)
try:
self.driver.find_element(
By.XPATH, '//button[contains(text(), "Billions")]').click()
except NoSuchElementException:
pass
time.sleep(5)
ticker_data['Financials'][filter_fin] = {}
for filter_ in filters:
print(filter_)
self.driver.find_element(
By.XPATH, f"//button[text()='{filter_}']").click()
time.sleep(5)
income = self.driver.find_element(
By.XPATH, "//table[@data-test='financials']")
income_dict = self.parsing_financial_table(income)
ticker_data['Financials'][filter_fin][filter_] = income_dict

return ticker_data

def get_statistics_company(self, ticker_data: dict) -> dict:
filters_stat = ['Statistics', 'Market Cap', 'Revenue']
for filter_stat in filters_stat:
self.driver.find_element(
By.XPATH, f"//a[text()='{filter_stat}']").click()
time.sleep(5)
if filter_stat == 'Statistics':
# Отдельная логика парсинга
h2_tags = self.driver.find_elements(By.TAG_NAME, 'h2')
statistics_table = self.driver.find_elements(
By.XPATH, "//table[@data-test='statistics-table']")
ticker_data['Statistics'] = {}
for index in range(0, len(h2_tags) - 1):
h2_tags_text = h2_tags[index].text
try:
ticker_data['Statistics'][h2_tags_text] = self.parsing_table(
statistics_table[index])
except IndexError:
ticker_data['Statistics'][h2_tags_text] = {}
else:
ticker_data['Statistics'][filter_stat] = {}
market_cap_infos = self.driver.find_element(
By.XPATH, "/html/body/div/div[1]/div[2]/main/div[2]/div[1]/div[2]").text
data_list = market_cap_infos.split('\n')
data_dict = {data_list[i]: data_list[i + 1]
for i in range(0, len(data_list), 2)}
ticker_data['Statistics'][filter_stat]['Info'] = data_dict
data_table = self.parsing_table_statics(
self.driver.find_element(By.TAG_NAME, 'table'))
for data in data_table:
for key, value in data.items():
ticker_data['Statistics'][filter_stat][key] = value
return ticker_data

def get_dividend_company(self, ticker_data: dict) ->  dict:
ticker_data['Dividends'] = {}
time.sleep(5)
data_info = self.driver.find_element(
By.XPATH, "//div[@data-test='dividend-infotable']").text
data_list = data_info.split('\n')
data_dict = {data_list[i]: data_list[i + 1]
for i in range(0, len(data_list), 2)}
ticker_data['Dividends']['Info'] = data_dict
data_table = self.parsing_table_statics(
self.driver.find_element(By.TAG_NAME, 'table'))
for data in data_table:
for key, value in data.items():
ticker_data['Dividends'][key] = value
return ticker_data

def get_chart_company(self, ticker_data: dict, ticker: str, link: str) ->  dict:
link = link.lower().split('/')
today = date.today().strftime("%Y-%m-%d")
url = f'https://api.stockanalysis.com/api/charts/a/{link[-2]}-{link[-1]}/stream/c?chartiq=true&start=2024-04-09&end={today}&interval=30min'
try:
chart_data = self.get_stock_data(url=url)
if chart_data is not None:
ticker_data['Charts'] = [self.rename_data_keys(
data=data) for data in chart_data['data']]
except Exception as e:
print(f'Error {e}')
return ticker_data

def get_country_company_list(self, country: str):
self.driver.find_element(
By.XPATH, "//img[contains(@alt, 'flag')]").click()
time.sleep(10)
input_country = self.driver.find_element(
By.XPATH, "//*[@id='main']/div[1]/div[2]/div[1]/div[2]/div/div[1]/input")
input_country.send_keys(country)
time.sleep(10)
self.driver.find_element(
By.XPATH, f"//img[@alt='{country} flag']").click()
time.sleep(5)
href_list = []
while True:
button_next = self.driver.find_element(
By.XPATH, "//span[text()='Next']")
href_list += [href.get_attribute('href') for href in self.driver.find_elements(
By.XPATH, "//a[contains(@href, 'quote')]")]
time.sleep(5)
try:
self.driver.find_element(
By.XPATH, "//button[@disabled][./span[contains(text(), 'Next')]]")
break
except NoSuchElementException:
button_next.click()
print(len(href_list))
for href in href_list:
href = href[0:-1]
print(href)
self.get_company(link=href, country=country)
print(len(href_list))

def get_company(self, link: str, country: str):
self.driver.get(link)
ticker = link.split('/')[-1]
print(ticker)
datas_func_dict = [{'href': None, 'func': self.get_overview_company},
{'href': link + '/financials/',
'func': self.get_financials_company},
{'href': link + '/statistics/',
'func': self.get_statistics_company},
{'href': link + '/dividend/',
'func': self.get_dividend_company},
{'href': link,
'func': self.get_chart_company,
'href_in_method': True
}
]

ticker_datas = self.open_json_file(file_path='stockanalysis.json')
if ticker_datas is None:
ticker_datas = []
ticker_data = {'ticker': ticker, 'country': country}
for data in datas_func_dict:
if data['href'] is not None:
self.driver.get(data['href'])
time.sleep(2)
if data.get('href_in_method', None):
ticker_data = data['func'](
ticker_data=ticker_data, link=link, ticker=ticker)
else:
ticker_data = self.get_ticker_data(
ticker_data=ticker_data, func=data['func'], ticker=ticker)
ticker_datas.append(self.remove_spaces_from_keys(ticker_data))
try:
send_database_data(ticker_datas)
except Exception as e:
logging.error(f"Ошибка записи в базу: {e}")
# self.save_file(data=ticker_datas, file='stockanalysis.json')
# print(profile)
time.sleep(10)

if __name__ == '__main__':
stock = SeleniumAuth()
stock.login()
db.py

Code: Select all

from pymongo import MongoClient

client = MongoClient('localhost', 27017)

def send_database_data(data_list:list):
# Подключение к локальному серверу MongoDB

# Создание базы данных
db = client.stockanalysis

# Создание коллекции
collection = db.stockanalysis_data

# Вставка нескольких документов
result = collection.insert_many(data_list)
print(result)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post