- 前言
近期因為工作上需要氣象局的雨量資料,所以開始找相關的介接程式,從政府資料開放平台 (https://data.gov.tw/) 上面共有2個可以介接的資料來源
- 氣象局API: https://data.gov.tw/dataset/9177
- 環保署Opendata: https://data.gov.tw/dataset/7879
實際上來源都是氣象局,但環保署Opendata的資料集已經整理過是平面的資料表,直接用python pandas 可以直接 read_csv or read_json方式直接讀取,但也許是介接的人太多,效能很差,常常要等很久且抓不到資料,如果要作認真的應用(真的要拿來每小時分析或計算)會很困難,因此這一篇是以氣象局的API方式介接並寫入資料庫。
- 介接方式
透過氣象局Open Data API取得資料,資料經過整理後,再寫入MS SQL Server
- 前提
必須在氣象局 Open Data 網站 (http://opendata.cwb.gov.tw) 註冊帳號並取得授權碼,如果有很多資料集的應用,需要詳細研讀解氣象局呼叫方式。
- 運作方式: 以下以步驟的方式說明程式運作的方式,如果想直接看完整的程式,請直接到: https://github.com/stacepsho/TW_CWB_Rainfall_Crawler
- Step 1 Import 與定義變數
# coding: utf-8 # # Purpose: 自中華民國氣象局 Open Data 取得自動雨量站 每10分鐘雨量資料 # Remark: 解析json資料有參考 https://github.com/leafwind/cwb-cache 非常感謝 # import logging import pandas as pd import requests from datetime import datetime import sqlalchemy #氣象局API網址 CWB_URL = 'http://opendata.cwb.gov.tw/api/v1/rest/datastore/' #自動雨量站 資料集代號 DATA_ID = 'O-A0002-001' #氣象局 OPEN DATA 授權碼,需要自行修改 AUTH_KEY = 'CWB-OOOOOOOO-OOOO-OOOO-OOOO-OOOOOOOOOOOO' #目的地 SQL Server 的連結字串,需要自行修改 SQL_CONNECTION_STRING = "mssql+pyodbc://[account]:[password]@[SqlServer]:1433/[Database]?driver=SQL+Server+Native+Client+11.0" #目的地 SQL Server Table名稱,需要自行修改 DEST_TABLE = 'tb_Rainfall_API'
- Step 2 取得API的資料集
這段用function方式模組化程式,感謝 leafwind的大作 https://github.com/leafwind/cwb-cache
def get_data_from_cwb(data_id, auth_key, params={}): '''limit, offset, format, locationName, elementName, sort''' logging.info('getting data from CWB...') dest_url = CWB_URL + '{}'.format(data_id) r = requests.get(dest_url, headers={'Authorization': auth_key}) params_list = ['{}={}'.format(key, params[key]) for key in params] params_str = '?' + '&'.join(params_list) dest_url += params_str logging.debug('dest_url: {}'.format(dest_url)) if r.status_code != 200: logging.error('r.status_code: {}'.format(r.status_code)) return None data = r.json() if data.get('success') != 'true': return None return data
- Step 3 解析API資料成為Dataframe
雖然概念上很簡單,但是因為解析那段有點麻煩,所以沒辦法很直覺的爬回來就準備寫入資料庫,所以需要轉換,可以直接呼叫這個網址當範例 (只查兩站,速度比較快),我們需要的資料是藍色框框,records\location 向下的資料,其中 weatherElement 是雨量的值。呼叫的function 如下。
def parse_json_to_dataframe(data): columns = ['stationId','locationName','lat','lon', 'obstime','ELEV','RAIN','MIN_10','HOUR_3','HOUR_6','HOUR_12','HOUR_24','NOW'] df = pd.DataFrame(columns=columns) dataDic = {} locations = data['records']['location'] row = -1 for l in locations: row = row + 1 dataDic['stationId'] = l['stationId'] dataDic['locationName'] = l['locationName'] dataDic['obstime'] = l['time']['obsTime'] dataDic['lat'] = l['lat'] dataDic['lon'] = l['lon'] factors = l['weatherElement'] for f in factors: factor_name = f['elementName'] dataDic[factor_name] = f['elementValue'] for key in dataDic.keys(): df.loc[row,key] = dataDic[key] return df
- Step4 主程式:取得資料、解析成為資料表的格式、寫入資料庫
if __name__ == '__main__': json_data = get_data_from_cwb(DATA_ID, AUTH_KEY, {}) df = parse_json_to_dataframe(json_data) df['InsertDatetime']=datetime.now() # In[9]: #寫入資料庫 engine = sqlalchemy.create_engine(SQL_CONNECTION_STRING) conn = engine.connect() df.to_sql(DEST_TABLE,engine,if_exists='append') #確認資料庫結果 rs = conn.execute('SELECT TOP 10 * FROM ' + DEST_TABLE +' with (nolock) ORDER BY InsertDatetime desc ') _result = pd.DataFrame(rs.fetchall()) _result.columns = rs.keys() conn.close() _result
- Step5 進入資料庫的資料處理
進入資料庫後,就容易進行後續的應用、計算與分析了!
如果朋友們有更簡單的方式也歡迎跟小弟交流切磋~~
#Python
#Crawler
#氣象局
#環保署
#雨量
#Sky is limit