最近需要写爬虫,在解决docker-standalone-chrome 发现只能有一个chrome被执行。所以写了这个多线程并发控制类来管理。当模板记录下。
#! /usr/bin/env python3
import threading
import tracebackfrom loguru import logger
from selenium import webdriver
from selenium.common import WebDriverException
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.common.by import By
import json
from utils.configs import ScreenshotDir, ChromeUserDataDir, REMOTE_DRIVERdef options():"""默认选项"""opts = webdriver.ChromeOptions()opts.add_argument(f"user-data-dir={ChromeUserDataDir}")opts.page_load_strategy = 'eager'opts.add_experimental_option('excludeSwitches', ['enable-automation']) # 开发者模式opts.add_argument("--disable-popup-blocking") # 关闭操作baropts.add_argument("--blink-settings=imagesEnabled=false") # 禁用图片opts.add_argument("--window-size=1920,1080") # 窗口大小opts.add_argument("--no-sandbox") # 无沙箱opts.add_argument("--disable-gpu")# opts.add_argument('--headless') # 无头模式# opts.add_experimental_option('debuggerAddress', '127.0.0.1:9222') # 指定serverreturn optsclass Browser:"""浏览器,线程不安全"""def __init__(self, opts=None):if not opts:opts = options()self.driver = webdriver.Remote(command_executor=REMOTE_DRIVER, options=opts)# self.driver = webdriver.Chrome(options=opts)def quit(self):try:if hasattr(self, 'driver') and hasattr(self.driver, 'quit'):self.driver.quit()self.driver = Noneexcept Exception as e:logger.error("browser quit err:{e}\ntraceback:{traceback}".format(e=e, traceback=traceback.format_exc()))def send(self, cmd, params):"""发送命令"""resource = "/session/%s/chromium/send_command_and_get_result" % self.driver.session_idurl = self.driver.command_executor._url + resourcebody = json.dumps({'cmd': cmd, 'params': params})response = self.driver.command_executor._request('POST', url, body)return response.get('value')def get(self, url) -> None:logger.info(f'browser starting open url={url}')self.driver.get(url)logger.info(f'browser open suc url={url}')def wait_by_xpath(self, path, timeout=10) -> None:try:WebDriverWait(self.driver, timeout, 1, ignored_exceptions=(WebDriverException,)).until(EC.element_to_be_clickable((By.XPATH, path)))except Exception:logger.error(f'browser wait xpath not find')def wait_by_id(self, tid, timeout=10) -> None:try:WebDriverWait(self.driver, timeout, 1, ignored_exceptions=(WebDriverException,)).until(EC.presence_of_element_located((By.ID, tid)))except Exception:logger.error(f'browser wait id not find')def save(self, path):with open(path, 'w', encoding='utf-8') as fw:fw.write(self.get_page_source())def get_page_source(self) -> str:return self.driver.page_sourcedef click(self, path):try:button = self.driver.find_element(By.XPATH, path)if button:button.click()time.sleep(2)# 跳转到新标签windows = self.driver.window_handlesself.driver.switch_to.window(windows[-1])return self.driver.current_urlelse:return ""except Exception as e:logger.error(f'browser click err={e}')def screenshot(self, name=None):full_name = 'full_page_screenshot.png'if name:full_name = namescreenshot_path = f'{ScreenshotDir}/{full_name}'self.driver.save_screenshot(screenshot_path)class BrowserManager:"""浏览器管理类,线程安全usages:with BrowserManager() as browser:browser.get(url)"""_browser = None_is_used = False_condition = threading.Condition()def __init__(self, opts=None):self.opts = optsdef __enter__(self):with BrowserManager._condition:while BrowserManager._is_used:if not BrowserManager._condition.wait(timeout=10):continueBrowserManager._is_used = Truetry:BrowserManager._browser = Browser(self.opts)except Exception as e:BrowserManager._is_used = Falseraise ereturn BrowserManager._browserdef __exit__(self, exc_type, exc_value, traceback):with BrowserManager._condition:BrowserManager._browser.quit()BrowserManager._is_used = FalseBrowserManager._condition.notify()
用法就很简单了,例如
with BrowserManager() as browser:try:browser.get(url)return browser.get_page_source()except Exception as e:logger.error(e)