Code: Select all
Message: invalid session id: session deleted as the browser has closed the connection.
Vorher habe ich Chrome nicht in Docker installiert, daher verstehe ich den Fehler nicht.
docker-compose. yml
Code: Select all
version: '3.9'
services:
web:
build: .
container_name: stock_parser
volumes:
- .:/app
environment:
- PYTHONUNBUFFERED=1
depends_on:
- mongo
mongo:
image: mongo:latest
container_name: stock_parser_db
restart: unless-stopped
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
Code: Select all
FROM --platform=linux/amd64 python:3.9-slim
WORKDIR /app
COPY requirements.txt .
# Install the required packages
RUN pip install --no-cache-dir -r requirements.txt
RUN apt-get update && apt-get install -y \
wget \
unzip \
libnss3 \
libgconf-2-4 \
libxss1 \
libxrandr2 \
libx11-xcb1 \
libxcomposite1 \
libxcursor1 \
libxi6 \
libxtst6 \
libglib2.0-0 \
libasound2 \
fonts-liberation \
libappindicator3-1 \
libgtk-3-0 \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y wget curl unzip gnupg --no-install-recommends
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | \
gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/google.gpg --import; \
chmod 644 /etc/apt/trusted.gpg.d/google.gpg; \
echo "deb http://dl.google.com/linux/chrome/deb/ stable main" > /etc/apt/sources.list.d/google-chrome.list
RUN curl -LO https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb && \
apt-get install -y ./google-chrome-stable_current_amd64.deb && \
rm google-chrome-stable_current_amd64.deb
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY . .
# Expose any necessary ports, e.g., for MongoDB
EXPOSE 27017
CMD ["python", "stock_parser.py"]
< /code>
Stock_parser.py
from dotenv import load_dotenv
import os
import time
import json
import pickle
import logging
import requests
from db import send_database_data
from requests.exceptions import RequestException, ReadTimeout
from datetime import date, datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
load_dotenv()
logging.basicConfig(format='%(asctime)s %(levelname)s - %(message)s',
level=logging.INFO, filename='stockanalysis.log')
file_handler = logging.FileHandler("stockanalysis.log")
stream_handler = logging.StreamHandler()
class JsonWorker:
def save_file(self, data, file):
try:
with open(file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except IOError as e:
logging.error(f"Произошла ошибка ввода-вывода: {e}")
except TypeError as e:
logging.error(f"Ошибка типа данных: {e}")
except Exception as e:
logging.error(f"Произошла непредвиденная ошибка: {e}")
def open_json_file(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
return data
except FileNotFoundError:
logging.error(f"Файл {file_path} не найден.")
except json.JSONDecodeError:
logging.error(f"Ошибка декодирования JSON в файле {file_path}.")
except Exception as e:
logging.error(f"Произошла ошибка: {e}")
class SeleniumAuth(JsonWorker):
def __init__(self):
super().__init__()
self.driver = self.get_chromedriver()
self.link = 'https://stockanalysis.com'
def record_cookies(self):
pickle.dump(self.driver.get_cookies(), open(f"stock_cookies", "wb"))
def add_cookies(self):
print("Добавление куки..")
for cookie in pickle.load(open("stock_cookies", "rb")):
self.driver.add_cookie(cookie)
def get_chromedriver(self):
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument(
f"--user-agent={'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.135 YaBrowser/21.6.2.855 Yowser/2.5 Safari/537.36'}")
chrome_options.add_experimental_option("useAutomationExtension", False)
chrome_options.add_experimental_option(
"excludeSwitches", ["enable-automation"])
driver = webdriver.Chrome(options=chrome_options, service=Service(ChromeDriverManager().install()))
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
'source': '''
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;
'''
})
return driver
def restart_driver(self):
try:
self.driver.quit() # Закрыть текущую сессию
except Exception as e:
logging.error(f"Ошибка при закрытии драйвера: {e}")
self.driver = self.get_chromedriver() # Запустите новый драйвер
def transform_key(self, key):
try:
key_without_spaces = key.replace(" ", "")
date_object = datetime.strptime(key, "%b %d, %Y")
return "D" + date_object.strftime("%Y%m%d")
except (ValueError, TypeError):
return key_without_spaces
def remove_spaces_from_keys(self, data):
if isinstance(data, dict):
new_data = {}
for key, value in data.items():
new_key = self.transform_key(key=key)
new_data[new_key] = self.remove_spaces_from_keys(value)
return new_data
elif isinstance(data, list):
# Рекурсивно обрабатываем списки
return [self.remove_spaces_from_keys(item) for item in data]
else:
return data
def parsing_financial_table(self, table):
'''
Для сложных финансовых таблиц
'''
thead = table.find_element(By.TAG_NAME, 'thead')
tbody = table.find_element(By.TAG_NAME, "tbody")
rows = tbody.find_elements(By.TAG_NAME, 'tr')
dict_table = {}
dict_table['Info'] = []
for headers in thead.find_elements(By.TAG_NAME, 'tr'):
key_name = headers.text.split('\n')[0]
if key_name not in 'Fiscal':
dict_table['Info'] = {key_name: []}
for th in headers.find_elements(By.TAG_NAME, 'th'):
if key_name != th.text:
dict_table['Info'][key_name].append({th.text: []})
for row in rows:
# print(row)
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]
key_name = cols[0]
cols = cols[1::]
for i in range(0, len(dict_table['Info']['Period Ending'])):
data_name = list(dict_table['Info']
['Period Ending'][i].keys())[0]
try:
dict_table['Info']['Period Ending'][i][data_name].append(
{key_name: cols[i]})
except IndexError:
dict_table['Info']['Period Ending'][i][data_name].append({
key_name: '-'})
# print(dict_table)
return dict_table
def parsing_table(self, table):
'''Для вкладки Overview'''
rows = table.find_elements(By.TAG_NAME, 'tr')
dict_table = {}
for row in rows:
# Заменить на "th", если нужно извлечь заголовки
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]
# print(cols)
key = cols[0]
dict_table[key] = cols[-1]
return dict_table
def parsing_table_statics(self, table):
'''Для вкладок Statics/Dividends'''
thead = table.find_element(By.TAG_NAME, "thead")
tbody = table.find_element(By.TAG_NAME, "tbody")
rows = tbody.find_elements(By.TAG_NAME, 'tr')
data_table = [{header.text: []}
for header in thead.find_elements(By.TAG_NAME, 'th')]
for row in rows:
cols = row.find_elements(By.TAG_NAME, "td")
cols = [ele.text for ele in cols]
for i in range(0, len(data_table)):
try:
for key in data_table[i].keys():
data_table[i][key].append(cols[i])
except IndexError:
pass
return data_table
def get_stock_data(self, url):
try:
response = requests.get(url)
response.raise_for_status() # Вызывает исключение для HTTP-ошибок (4xx, 5xx)
data = response.json()
return data
except (RequestException, ReadTimeout) as e:
logging.error(f"Ошибка запроса: {e}")
return None
except json.JSONDecodeError as e:
logging.error(f"Ошибка декодирования JSON: {e}")
return None
def rename_data_keys(self, data: dict) -> dict:
key_mapping = {
"t": "time",
"o": "open",
"h": "high",
"l": "low",
"c": "close",
"v": "vol",
}
renamed_data = {}
for key, value in data.items():
# Используем get с default значением key, если ключ не найден в словаре
renamed_data[key_mapping.get(key, key)] = value
return renamed_data
def replace_key_text(self, key: str) -> str:
return key.replace(' ', '_')
# def login(self):
# self.driver = self.get_chromedriver()
# self.driver.get(self.link + '/login/')
# time.sleep(10)
# try:
# self.driver.find_element(
# By.XPATH, "//input[@type='email']").send_keys(os.environ.get("LOGIN"))
# time.sleep(1)
# self.driver.find_element(
# By.XPATH, "//input[@type='password']").send_keys(os.environ.get("PASSWORD"))
# self.driver.find_element(
# By.XPATH, "//button[text()='Log In']").click()
# self.record_cookies()
# self.add_cookies()
# self.driver.get(self.link + '/stocks/screener/')
# time.sleep(10)
# country_list = ['Russia', 'United States', 'US OTC', 'Saudi Arabia', 'United Arab Emirates',
# 'Hong Kong', 'China', 'Brazil', 'United Kingdom', 'Kazakhstan', 'India', 'Japan']
# for country in country_list:
# self.get_country_company_list(country=country)
# except NoSuchElementException as e:
# logging.error(f"Ошибка авторизации: {e}")
# finally:
# self.driver.close()
# self.driver.quit()
def login(self):
attempt = 0
while attempt < 3: # Попробовать три раза
try:
self.driver = self.get_chromedriver()
self.driver.get(self.link + '/login/')
time.sleep(10)
self.driver.find_element(
By.XPATH, "//input[@type='email']").send_keys(os.environ.get("LOGIN"))
time.sleep(1)
self.driver.find_element(
By.XPATH, "//input[@type='password']").send_keys(os.environ.get("PASSWORD"))
self.driver.find_element(
By.XPATH, "//button[text()='Log In']").click()
self.record_cookies()
self.add_cookies()
self.driver.get(self.link + '/stocks/screener/')
time.sleep(10)
# Ваш код для получения списка компаний
country_list = ['Russia', 'United States', 'US OTC', 'Saudi Arabia', 'United Arab Emirates',
'Hong Kong', 'China', 'Brazil', 'United Kingdom', 'Kazakhstan', 'India', 'Japan']
for country in country_list:
self.get_country_company_list(country=country)
break # Успешный выход из цикла
except NoSuchElementException as e:
logging.error(f"Ошибка авторизации: {e}")
# Переход к следующему разу
attempt += 1
if attempt < 3:
self.restart_driver() # Перезапускаем драйвер
except Exception as e:
logging.error(f"Произошла ошибка: {e}")
self.restart_driver() # Перезапускаем драйвер в случае других ошибок
finally:
if self.driver:
self.driver.quit()
def get_ticker_data(self, ticker_data: dict, ticker: str, func) -> dict:
try:
ticker_data = func(ticker_data=ticker_data)
except Exception as e:
logging.error(f"Ошибка: {e}")
return ticker_data
def get_overview_company(self, ticker_data: dict) -> dict:
if ticker_data is None:
ticker_data = {}
overview_info = self.driver.find_element(
By.XPATH, "//table[@data-test='overview-info']")
overview_quote = self.driver.find_element(
By.XPATH, "//table[@data-test='overview-quote']")
overview_info_dict = self.parsing_table(overview_info)
overview_quote_dict = self.parsing_table(overview_quote)
profile = {**overview_info_dict, **overview_quote_dict}
ticker_data['Overview'] = profile
return ticker_data
def get_financials_company(self, ticker_data: dict) -> dict:
filters_fin = ['Income', 'Balance Sheet', 'Cash Flow', 'Ratios']
filters = ['Annual', 'Quarterly', 'TTM']
ticker_data['Financials'] = {}
for filter_fin in filters_fin:
print(filter_fin)
self.driver.find_element(
By.XPATH, f"//a[text()='{filter_fin}']").click()
self.driver.find_element(
By.XPATH, '//span[@class="truncate"]').click()
time.sleep(1)
try:
self.driver.find_element(
By.XPATH, '//button[contains(text(), "Billions")]').click()
except NoSuchElementException:
pass
time.sleep(5)
ticker_data['Financials'][filter_fin] = {}
for filter_ in filters:
print(filter_)
self.driver.find_element(
By.XPATH, f"//button[text()='{filter_}']").click()
time.sleep(5)
income = self.driver.find_element(
By.XPATH, "//table[@data-test='financials']")
income_dict = self.parsing_financial_table(income)
ticker_data['Financials'][filter_fin][filter_] = income_dict
return ticker_data
def get_statistics_company(self, ticker_data: dict) -> dict:
filters_stat = ['Statistics', 'Market Cap', 'Revenue']
for filter_stat in filters_stat:
self.driver.find_element(
By.XPATH, f"//a[text()='{filter_stat}']").click()
time.sleep(5)
if filter_stat == 'Statistics':
# Отдельная логика парсинга
h2_tags = self.driver.find_elements(By.TAG_NAME, 'h2')
statistics_table = self.driver.find_elements(
By.XPATH, "//table[@data-test='statistics-table']")
ticker_data['Statistics'] = {}
for index in range(0, len(h2_tags) - 1):
h2_tags_text = h2_tags[index].text
try:
ticker_data['Statistics'][h2_tags_text] = self.parsing_table(
statistics_table[index])
except IndexError:
ticker_data['Statistics'][h2_tags_text] = {}
else:
ticker_data['Statistics'][filter_stat] = {}
market_cap_infos = self.driver.find_element(
By.XPATH, "/html/body/div/div[1]/div[2]/main/div[2]/div[1]/div[2]").text
data_list = market_cap_infos.split('\n')
data_dict = {data_list[i]: data_list[i + 1]
for i in range(0, len(data_list), 2)}
ticker_data['Statistics'][filter_stat]['Info'] = data_dict
data_table = self.parsing_table_statics(
self.driver.find_element(By.TAG_NAME, 'table'))
for data in data_table:
for key, value in data.items():
ticker_data['Statistics'][filter_stat][key] = value
return ticker_data
def get_dividend_company(self, ticker_data: dict) -> dict:
ticker_data['Dividends'] = {}
time.sleep(5)
data_info = self.driver.find_element(
By.XPATH, "//div[@data-test='dividend-infotable']").text
data_list = data_info.split('\n')
data_dict = {data_list[i]: data_list[i + 1]
for i in range(0, len(data_list), 2)}
ticker_data['Dividends']['Info'] = data_dict
data_table = self.parsing_table_statics(
self.driver.find_element(By.TAG_NAME, 'table'))
for data in data_table:
for key, value in data.items():
ticker_data['Dividends'][key] = value
return ticker_data
def get_chart_company(self, ticker_data: dict, ticker: str, link: str) -> dict:
link = link.lower().split('/')
today = date.today().strftime("%Y-%m-%d")
url = f'https://api.stockanalysis.com/api/charts/a/{link[-2]}-{link[-1]}/stream/c?chartiq=true&start=2024-04-09&end={today}&interval=30min'
try:
chart_data = self.get_stock_data(url=url)
if chart_data is not None:
ticker_data['Charts'] = [self.rename_data_keys(
data=data) for data in chart_data['data']]
except Exception as e:
print(f'Error {e}')
return ticker_data
def get_country_company_list(self, country: str):
self.driver.find_element(
By.XPATH, "//img[contains(@alt, 'flag')]").click()
time.sleep(10)
input_country = self.driver.find_element(
By.XPATH, "//*[@id='main']/div[1]/div[2]/div[1]/div[2]/div/div[1]/input")
input_country.send_keys(country)
time.sleep(10)
self.driver.find_element(
By.XPATH, f"//img[@alt='{country} flag']").click()
time.sleep(5)
href_list = []
while True:
button_next = self.driver.find_element(
By.XPATH, "//span[text()='Next']")
href_list += [href.get_attribute('href') for href in self.driver.find_elements(
By.XPATH, "//a[contains(@href, 'quote')]")]
time.sleep(5)
try:
self.driver.find_element(
By.XPATH, "//button[@disabled][./span[contains(text(), 'Next')]]")
break
except NoSuchElementException:
button_next.click()
print(len(href_list))
for href in href_list:
href = href[0:-1]
print(href)
self.get_company(link=href, country=country)
print(len(href_list))
def get_company(self, link: str, country: str):
self.driver.get(link)
ticker = link.split('/')[-1]
print(ticker)
datas_func_dict = [{'href': None, 'func': self.get_overview_company},
{'href': link + '/financials/',
'func': self.get_financials_company},
{'href': link + '/statistics/',
'func': self.get_statistics_company},
{'href': link + '/dividend/',
'func': self.get_dividend_company},
{'href': link,
'func': self.get_chart_company,
'href_in_method': True
}
]
ticker_datas = self.open_json_file(file_path='stockanalysis.json')
if ticker_datas is None:
ticker_datas = []
ticker_data = {'ticker': ticker, 'country': country}
for data in datas_func_dict:
if data['href'] is not None:
self.driver.get(data['href'])
time.sleep(2)
if data.get('href_in_method', None):
ticker_data = data['func'](
ticker_data=ticker_data, link=link, ticker=ticker)
else:
ticker_data = self.get_ticker_data(
ticker_data=ticker_data, func=data['func'], ticker=ticker)
ticker_datas.append(self.remove_spaces_from_keys(ticker_data))
try:
send_database_data(ticker_datas)
except Exception as e:
logging.error(f"Ошибка записи в базу: {e}")
# self.save_file(data=ticker_datas, file='stockanalysis.json')
# print(profile)
time.sleep(10)
if __name__ == '__main__':
stock = SeleniumAuth()
stock.login()
Code: Select all
from pymongo import MongoClient
client = MongoClient('localhost', 27017)
def send_database_data(data_list:list):
# Подключение к локальному серверу MongoDB
# Создание базы данных
db = client.stockanalysis
# Создание коллекции
collection = db.stockanalysis_data
# Вставка нескольких документов
result = collection.insert_many(data_list)
print(result)