使用 Python 定期执行抓取脚本

数据挖掘 Python 数据集
2021-09-15 22:43:44

这是我的想法和我早期的工作。

我的目标

  • 连续从中国政府获取 1 小时分辨率的空气污染数据。
  • 从全国各地的监测网站收集的网站数据每小时更新一次。

我的代码

现在,我可以在一个小时内获取有用的信息。这是我的代码:

  1. 输入不同污染的网站链接(co、no2、pm10 等)

    html_co = urllib.urlopen("http://www.pm25.in/api/querys/co.json?city=beijing&token=5j1znBVAsnSf5xQyNQyq").read().decode('utf-8')
    html_no2 = urllib.urlopen("http://www.pm25.in/api/querys/no2.json?city=beijing&token=5j1znBVAsnSf5xQyNQyq").read().decode('utf-8')
    html_pm10 = urllib.urlopen("http://www.pm25.in/api/querys/pm10.json?city=beijing&token=5j1znBVAsnSf5xQyNQyq").read().decode('utf-8')
    
  2. 获取 html 文档的内容。

    soup_co = BeautifulSoup(html_co)
    soup_no2 = BeautifulSoup(html_no2)
    soup_pm10 = BeautifulSoup(html_pm10)
    
  3. 从整个内容中提取有用的信息。

    l = soup_co.p.get_text() 
    co= json.loads(l) 
    l = soup_no2.p.get_text() 
    no2= json.loads(l) 
    l = soup_pm10.p.get_text() 
    pm10= json.loads(l)       
    
  4. 将原始数据压缩成整洁的 Pandas.Dataframe。

    data = {"time":[],"station":[],"code":[],"co":[],"no2":[],"pm10":[]}
    for i in range(0,len(pm10)-1,1):
        ## 'station' is the monitor station's name in Chinese
        data["station"].append(co[i]["position_name"])
        data["time"].append(co[i]["time_point"])
        data["co"].append(co[i]["co"])
        ## 'code' is the monitor station's index
        data["code"].append(co[i]["station_code"])
        data["no2"].append(no2[i]["no2"])
        data["pm10"].append(pm10[i]["pm10"])
    

我的结果

一些预先说明

  • 忽略表中的汉字。
  • 我只抓取一个城市(这里是北京)的数据,从0-11的索引通知北京有12个监控站点。
  • “co”/“NO2”/“PM10”列代表这些空气污染物的浓度。

    在此处输入图像描述

我的问题

现在,我可以根据上面的代码手动获取网络数据。但是,我想自动实现每小时以下的工作流程

小时我

  • 执行代码

  • (1) 从网站上获取 Hour i 的空气污染物数据;

  • (2)根据真实日期将数据保存为.csv(如20160101.csv)

一个小时以后。

  • 执行代码

  • (1)从网站上获取Hour i+1的空气污染物数据;

  • (2) 根据真实日期将数据保存为.csv。
    如果它是同一天,比作小时 i --> 相同
    的 .csv(如 2016-01-01.csv)如果现在已经过去 -> 创建一个新的 .csv(如 2016-01-02.csv)

我以前没有做过这种事情。有人可以给我一些建议吗?
所以,我可以在后台运行一个有用的数据抓取工具,我不必担心它。

3个回答

如果您使用的是基于 Unix 的操作系统,请使用像 cron这样的调度程序来定期运行您的脚本。cron 是最常见的一种,你可以找到很多关于它的教程。我想这是你不熟悉的部分。

当它执行你的脚本时,让它像这样检查时间:) fname=datetime.datetime.now().strftime('%Y-%m-%d-%H.csv'以附加模式打开相应的文件 ( with open(fname, 'a') as csv_file) ; 如果它不存在,它将被创建。这就是它的全部!

正如 Emre 所说的,你可以使用 cron 来完成这项工作。

话虽如此,我建议您使用以下任何一种:

  • 路易吉
  • 空气流动

这些是 Python 的依赖管理、调度程序库,它们非常易于使用并且比 vanilla cron 好得多。

当我每天在生产中触发数百个 Airflow 脚本时,我也可以保证它的易用性和实用性。

所以,这是我电脑上的截图,运行了一个非常基本的 Airflow 脚本:

在此处输入图像描述

所以,这里的任务是相互依赖的。例如,第二个任务依赖于第一个任务。因此,如果第一个任务失败,第二个任务会自动失败,这样您就不需要每次都与代码/数据库交火。

因此,如果您需要分析任务取决于脚本任务,它将确保两者都按预期发生。

此外,您可以这样做:

  1. 将 Airflow 脚本设置为每小时自动为您运行一次
  2. 每当发生错误时触发电子邮件(这在您不监视它们时运行任务时非常有用)
  3. 失败、正在运行和已完成任务的整洁可视化。

有用的阅读:

我对带有 GUI 的基于 DAG 的脚本执行调度程序/计划程序的回答;类似于 airbnb 的气流或 BODS 或 Informatica

我会将您当前拥有的内容包装在一个函数中。例如:

def write_excel():
     #your existing code

然后您可以导入 BlockingScheduler,这是一个可以连续运行脚本的库。如果您将扩展名从 .py 更改为 .pyw 并运行 .pyw,它将在后台运行,直到您使用任务管理器将其关闭。

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()
scheduler.add_job(write_excel, 'interval', hours=1)
scheduler.start()