爬取历史天气网站数据
从天气网站爬取指定城市、指定时间范围内的天气数据,并将数据保存为CSV文件。具体而言,它使用了Selenium库来模拟浏览器行为,以便获取动态加载的页面内容。
主要步骤如下:
- 读取城市信息和代理IP信息,并初始化Chrome浏览器及其设置。
- 提示用户输入爬取的起始年份、结束年份、起始月份和结束月份。
- 启动Chrome浏览器调试服务,并创建Chrome驱动。
- 定义了一个名为crawl_aqi_data()的函数,用于爬取指定城市、指定年份和月份的天气数据,并将数据保存到CSV文件中。在函数内部,它首先构建了目标URL,然后打开URL,等待页面加载完成。接着,创建了保存数据的目录,并打开CSV文件进行写入。在读取数据时,它通过Selenium定位到页面上相应的元素,提取数据,并写入CSV文件。
- 循环遍历所有城市、年份和月份,调用crawl_aqi_data()函数执行爬取任务,并在每次爬取后休眠1秒。
- 最后,关闭浏览器,并输出"数据爬取完成。"
- 需要注意的是,为了避免被网站封禁IP,程序在每次爬取后都会休眠1秒,并且随机选择一个代理IP来访问网站
import os
import time
import subprocess
from selenium import webdriver
from selenium.webdriver.common.by import By
import csv
import pinyin
from fake_useragent import UserAgent
import random# 读取城市信息
with open("city.txt", "r", encoding="utf-8") as file:cities = [line.strip() for line in file if line.strip()]ips =[]
with open('ip.txt', 'r') as f:for line in f:ip = line.strip()ips.append(ip.strip())# 输入爬取的起始年份、结束年份、起始月份和结束月份
start_year = int(input("请输入起始年份: "))
end_year = int(input("请输入结束年份: "))
start_month = int(input("请输入起始月份: "))
end_month = int(input("请输入结束月份: "))# 启动Chrome浏览器调试服务
subprocess.Popen('cmd', shell=True)
subprocess.Popen('"chrome-win64\chrome.exe" --remote-debugging-port=9222', shell=True)# 创建Chrome驱动
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option("debuggerAddress", "localhost:9222")
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable‐gpu')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument('--proxy-server=http://' + random.choice(ips))
chrome_options.add_argument(f"user-agent={UserAgent().random}")
driver = webdriver.Chrome(options=chrome_options)def crawl_aqi_data(city, year,month):# 构建URLcity1 = pinyin.get(city, format="strip")url = f"https://m.tianqi.com/lishi/{city1.lower()}/{year}{month:02}.html"# 打开URL https://m.tianqi.com/lishi/shanghai/202401.htmldriver.get(url)time.sleep(3) # 等待页面加载完成# 创建目录folder_path = os.getcwd()+f"\data\天气/{city}/{year}年"if not os.path.exists(folder_path):os.makedirs(folder_path)# 打开CSV文件with open(f"{folder_path}/{year}年{month:02}月.csv", "w", newline='', encoding="utf-8") as csvfile:writer = csv.writer(csvfile)# 写入表头#writer.writerow(["日期", "AQI指数", "质量等级", "PM2.5", "PM10", "SO2", "CO", "NO2", "O3-8h"])# 读取数据并保存到CSV文件rows = driver.find_elements(By.CSS_SELECTOR, "body > div.history_weatherct1 > div.count_temp > table > tbody > tr")for i, row in enumerate(rows):if i == 0: # 只写入第一行数据data = [cell.text for cell in row.find_elements(By.TAG_NAME, "td")]if data:print(data)writer.writerow(["平均高温℃","平均低温℃"])writer.writerow([data[0].split("\n")[-1].replace("℃",""),data[1].split("\n")[-1].replace("℃","")])# 循环爬取指定城市、指定时间段的数据
for city in cities:for year in range(start_year, end_year + 1):for month in range(start_month, end_month + 1):crawl_aqi_data(city, year, month)time.sleep(1)# 关闭浏览器
driver.quit()print("数据爬取完成。")
封禁ip
温度可视化
import os
import pandas as pd
import matplotlib.pyplot as plt# 读取CSV文件并加载数据
def load_data(file_path):if os.path.exists(file_path):if os.stat(file_path).st_size == 0:print(f"File '{file_path}' is empty.")return Nonedf = pd.read_csv(file_path)return dfelse:print(f"File '{file_path}' does not exist.")return None# 可视化数据
def visualize_temperatures(df):# 可视化显示plt.figure(figsize=(10, 6))plt.plot(df['城市'], df['平均高温℃'], marker='o', color='red', linestyle='-', label='平均高温')plt.plot(df['城市'], df['平均低温℃'], marker='o', color='blue', linestyle='-')plt.xlabel('城市')plt.ylabel('温度')plt.title('各城市温度对比')plt.xticks(rotation=90) # 旋转x轴标签,以便更好地显示城市名plt.grid(True) # 显示网格线plt.tight_layout() # 调整布局,防止标签重叠plt.show()# 主函数
def main():# 设置中文显示plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 指定天气目录路径weather_dir = os.path.join(os.getcwd(), 'data', '天气')all_df = []# 遍历天气目录下的城市数据for city_name in os.listdir(weather_dir):city_dir = os.path.join(weather_dir, city_name)if os.path.isdir(city_dir):# 遍历城市文件夹下的所有 CSV 文件for year in os.listdir(city_dir):year_dir = os.path.join(city_dir, year)for csv_file in os.listdir(year_dir):if csv_file.endswith(".csv"):# 构建文件路径file_path = os.path.join(year_dir, csv_file)# 加载数据df = load_data(file_path)# 添加城市列df['城市'] = city_nameall_df.append(df)rged_df = pd.concat(all_df, ignore_index=True)print(rged_df)# 可视化数据visualize_temperatures(rged_df)if __name__ == "__main__":main()