2018年5月26日 星期六

網路爬蟲開發 - 氣象局自動雨量站-雨量觀測 in Python



  • 前言

近期因為工作上需要氣象局的雨量資料,所以開始找相關的介接程式,從政府資料開放平台 (https://data.gov.tw/) 上面共有2個可以介接的資料來源

  1. 氣象局API: https://data.gov.tw/dataset/9177 
  2. 環保署Opendata: https://data.gov.tw/dataset/7879 

實際上來源都是氣象局,但環保署Opendata的資料集已經整理過是平面的資料表,直接用python pandas 可以直接 read_csv or read_json方式直接讀取,但也許是介接的人太多,效能很差,常常要等很久且抓不到資料,如果要作認真的應用(真的要拿來每小時分析或計算)會很困難,因此這一篇是以氣象局的API方式介接並寫入資料庫。


  • 介接方式
透過氣象局Open Data API取得資料,資料經過整理後,再寫入MS SQL Server

  • 前提
必須在氣象局 Open Data 網站 (http://opendata.cwb.gov.tw) 註冊帳號並取得授權碼,如果有很多資料集的應用,需要詳細研讀解氣象局呼叫方式。

    • Step 1 Import 與定義變數
# coding: utf-8

# 
# Purpose: 自中華民國氣象局 Open Data 取得自動雨量站 每10分鐘雨量資料 
# Remark: 解析json資料有參考 https://github.com/leafwind/cwb-cache 非常感謝
#
import logging
import pandas as pd
import requests
from datetime import datetime
import sqlalchemy

#氣象局API網址
CWB_URL = 'http://opendata.cwb.gov.tw/api/v1/rest/datastore/'
#自動雨量站 資料集代號
DATA_ID = 'O-A0002-001'
#氣象局 OPEN DATA 授權碼,需要自行修改
AUTH_KEY = 'CWB-OOOOOOOO-OOOO-OOOO-OOOO-OOOOOOOOOOOO'
#目的地 SQL Server 的連結字串,需要自行修改
SQL_CONNECTION_STRING = "mssql+pyodbc://[account]:[password]@[SqlServer]:1433/[Database]?driver=SQL+Server+Native+Client+11.0"
#目的地 SQL Server Table名稱,需要自行修改
DEST_TABLE = 'tb_Rainfall_API'
    • Step 2 取得API的資料集
這段用function方式模組化程式,感謝 leafwind的大作 https://github.com/leafwind/cwb-cache

def get_data_from_cwb(data_id, auth_key, params={}):
   '''limit, offset, format, locationName, elementName, sort'''
   logging.info('getting data from CWB...')

   dest_url = CWB_URL + '{}'.format(data_id)
   r = requests.get(dest_url, headers={'Authorization': auth_key})
   params_list = ['{}={}'.format(key, params[key]) for key in params]
   params_str = '?' + '&'.join(params_list)
   dest_url += params_str
   logging.debug('dest_url: {}'.format(dest_url))
   
   if r.status_code != 200:
       logging.error('r.status_code: {}'.format(r.status_code))
       return None

   data = r.json()
   
   if data.get('success') != 'true':
       return None
   return data
    • Step 3 解析API資料成為Dataframe
雖然概念上很簡單,但是因為解析那段有點麻煩,所以沒辦法很直覺的爬回來就準備寫入資料庫,所以需要轉換,可以直接呼叫這個網址當範例 (只查兩站,速度比較快),我們需要的資料是藍色框框,records\location 向下的資料,其中 weatherElement 是雨量的值。呼叫的function 如下。


def parse_json_to_dataframe(data):
   columns = ['stationId','locationName','lat','lon', 'obstime','ELEV','RAIN','MIN_10','HOUR_3','HOUR_6','HOUR_12','HOUR_24','NOW']
   df = pd.DataFrame(columns=columns)
   dataDic = {}
   locations = data['records']['location']
   row = -1
   for l in locations:
       row = row + 1
       dataDic['stationId'] = l['stationId']
       dataDic['locationName'] = l['locationName']
       dataDic['obstime'] = l['time']['obsTime']
       dataDic['lat'] = l['lat']
       dataDic['lon'] = l['lon']
       factors = l['weatherElement']
       for f in factors:
           factor_name = f['elementName']
           dataDic[factor_name] = f['elementValue']
       for key in dataDic.keys():
           df.loc[row,key] = dataDic[key]
       

   return df 


    • Step4 主程式:取得資料、解析成為資料表的格式、寫入資料庫
if __name__ == '__main__':
    json_data = get_data_from_cwb(DATA_ID, AUTH_KEY, {})
    df = parse_json_to_dataframe(json_data)
    df['InsertDatetime']=datetime.now()

# In[9]:

#寫入資料庫

engine = sqlalchemy.create_engine(SQL_CONNECTION_STRING)
conn = engine.connect()

df.to_sql(DEST_TABLE,engine,if_exists='append')

#確認資料庫結果
rs = conn.execute('SELECT TOP 10 * FROM ' + DEST_TABLE +' with (nolock) ORDER BY InsertDatetime desc ')
_result = pd.DataFrame(rs.fetchall())
_result.columns = rs.keys()
conn.close()

_result


  • Step5 進入資料庫的資料處理
對於這個資料集,會有一些額外的資料面要處理,如雨量的監測值-998轉成0等等,小弟認為這些是資料庫端處理會比較方便,譬如寫一個 stored procedure 或 trigger處理資料,這些就給各位依照自己的需求再調整了。



進入資料庫後,就容易進行後續的應用、計算與分析了!
如果朋友們有更簡單的方式也歡迎跟小弟交流切磋~~

#Python
#Crawler
#氣象局
#環保署
#雨量
#Sky is limit