Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ API Reference
fetchers/portugal
fetchers/slovenia
fetchers/southafrica
fetchers/southkorea
fetchers/spain
fetchers/uk_ea
fetchers/uk_nrfa
Expand Down
5 changes: 5 additions & 0 deletions docs/fetchers/southkorea.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
South Korea Fetcher
===================

.. automodule:: rivretrieve.southkorea
:members:
30 changes: 30 additions & 0 deletions examples/test_southkorea_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging

import matplotlib.pyplot as plt

from rivretrieve import SouthKoreaFetcher, constants

logging.basicConfig(level=logging.INFO)

gauge_id = "1001602"
variable = constants.DISCHARGE_DAILY_MEAN
start_date = "2023-01-01"
end_date = "2023-01-07"

fetcher = SouthKoreaFetcher()
data = fetcher.get_data(gauge_id=gauge_id, variable=variable, start_date=start_date, end_date=end_date)

if data.empty:
print(f"No data found for {gauge_id}")
else:
print(data.head())
plt.figure(figsize=(12, 6))
plt.plot(data.index, data[variable], label=gauge_id)
plt.xlabel(constants.TIME_INDEX)
plt.ylabel(f"{variable} (m3/s)")
plt.title(f"South Korea River Discharge ({gauge_id})")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("southkorea_discharge_plot.png")
print("Plot saved to southkorea_discharge_plot.png")
1 change: 1 addition & 0 deletions rivretrieve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .portugal import PortugalFetcher
from .slovenia import SloveniaFetcher
from .southafrica import SouthAfricaFetcher
from .southkorea import SouthKoreaFetcher
from .spain import SpainFetcher
from .uk_ea import UKEAFetcher
from .uk_nrfa import UKNRFAFetcher
Expand Down
1,311 changes: 1,311 additions & 0 deletions rivretrieve/cached_site_data/korea_sites.csv

Large diffs are not rendered by default.

270 changes: 270 additions & 0 deletions rivretrieve/southkorea.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
"""Fetcher for South Korea (WAMIS) river gauge data."""

import logging
from typing import Optional

import numpy as np
import pandas as pd
from tqdm import tqdm

from . import base, constants, utils

logger = logging.getLogger(__name__)


class SouthKoreaFetcher(base.RiverDataFetcher):
"""Fetches river gauge data from South Korea's WAMIS Open API.

Data source:
https://www.wamis.go.kr/

Supported variables:
- 'discharge_daily_mean' (m³/s)
- 'stage_daily_mean' (m)
- 'stage_hourly_mean' (m)

Data description and API:
- see http://wamis.go.kr:8080/wamisweb/flw/w15.do

Terms of use:
- see https://www.hrfco.go.kr/web/openapi/policy.do
"""

# --- API endpoints ---
URL_DISCHARGE = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/flw_dtdata"
URL_STAGE_DAILY = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_dtdata"
URL_STAGE_HOURLY = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_hrdata"
URL_STATION_LIST = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_dubwlobs"
URL_STATION_INFO = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_obsinfo"

@staticmethod
def _empty_result(variable: str) -> pd.DataFrame:
"""Returns a standardized empty time series result."""
return pd.DataFrame(columns=[variable], index=pd.DatetimeIndex([], name=constants.TIME_INDEX, tz="UTC"))

@staticmethod
def _empty_metadata_frame() -> pd.DataFrame:
"""Returns a standardized empty metadata result."""
return pd.DataFrame(columns=[constants.GAUGE_ID]).set_index(constants.GAUGE_ID)

# --- Public API methods ---
@staticmethod
def get_cached_metadata() -> pd.DataFrame:
"""Retrieves cached metadata for South Korea (if available)."""
return utils.load_cached_metadata_csv("korea")

def get_metadata(self) -> pd.DataFrame:
"""Downloads and parses site metadata from WAMIS.

Keeps provider-specific columns where available, while mapping standard
RivRetrieve metadata fields and returning a DataFrame indexed by
``constants.GAUGE_ID``.
"""
session = utils.requests_retry_session()
try:
resp = session.get(self.URL_STATION_LIST, params={"output": "json"}, timeout=30)
resp.raise_for_status()
payload = resp.json()
stations = payload.get("list", []) if isinstance(payload, dict) else []
if not stations:
logger.warning("No stations found in WAMIS wl_dubwlobs.")
return self._empty_metadata_frame()
station_ids = [s["obscd"] for s in stations if "obscd" in s]
except Exception as e:
logger.error(f"Failed to fetch WAMIS station list: {e}")
return self._empty_metadata_frame()

df_all = pd.DataFrame()
for sid in tqdm(station_ids, desc="Fetching WAMIS metadata"):
try:
r = session.get(self.URL_STATION_INFO, params={"obscd": sid, "output": "json"}, timeout=10)
r.raise_for_status()
data = r.json()
if data.get("result", {}).get("code") == "success" and "list" in data:
df = pd.json_normalize(data["list"])
df_all = pd.concat([df_all, df], ignore_index=True)
except Exception:
continue

if df_all.empty:
logger.warning("No metadata records retrieved from WAMIS.")
return self._empty_metadata_frame()

# --- Rename + convert ---
df_all = df_all.rename(
columns={
"wlobscd": constants.GAUGE_ID,
"obsnmeng": constants.STATION_NAME,
"rivnm": constants.RIVER,
"gdt": constants.ALTITUDE,
"bsnara": constants.AREA,
"lon": "longitude_dms",
"lat": "latitude_dms",
}
)

def dms_to_decimal(dms: str) -> float:
if not isinstance(dms, str) or not dms.strip():
return np.nan
try:
d, m, s = map(float, dms.split("-"))
return d + m / 60 + s / 3600
except Exception:
return np.nan

df_all[constants.LONGITUDE] = df_all["longitude_dms"].apply(dms_to_decimal)
df_all[constants.LATITUDE] = df_all["latitude_dms"].apply(dms_to_decimal)
df_all[constants.ALTITUDE] = pd.to_numeric(df_all[constants.ALTITUDE], errors="coerce")
df_all[constants.AREA] = pd.to_numeric(df_all[constants.AREA], errors="coerce")
df_all[constants.COUNTRY] = "South Korea"
df_all[constants.SOURCE] = "WAMIS Open API"
for column in (
constants.STATION_NAME,
constants.RIVER,
constants.LATITUDE,
constants.LONGITUDE,
constants.ALTITUDE,
constants.AREA,
constants.COUNTRY,
constants.SOURCE,
):
if column not in df_all.columns:
df_all[column] = np.nan

df_all[constants.GAUGE_ID] = df_all[constants.GAUGE_ID].astype(str).str.strip()
df_final = df_all.dropna(subset=[constants.GAUGE_ID]).drop_duplicates(subset=[constants.GAUGE_ID])
return df_final.set_index(constants.GAUGE_ID).sort_index()

# --- Variable support ---
@staticmethod
def get_available_variables() -> tuple[str, ...]:
return (
constants.DISCHARGE_DAILY_MEAN,
constants.STAGE_DAILY_MEAN,
constants.STAGE_HOURLY_MEAN,
)

# --- Internal helpers ---
def _get_endpoint(self, variable: str) -> tuple[str, str, str]:
"""Map variable to endpoint and JSON field names."""
if variable == constants.DISCHARGE_DAILY_MEAN:
return self.URL_DISCHARGE, "ymd", "fw"
elif variable == constants.STAGE_DAILY_MEAN:
return self.URL_STAGE_DAILY, "ymd", "wl"
elif variable == constants.STAGE_HOURLY_MEAN:
return self.URL_STAGE_HOURLY, "ymdh", "wl"
else:
raise ValueError(f"Unsupported variable: {variable}")

def _download_data(self, gauge_id: str, variable: str, start_date: str, end_date: str) -> pd.DataFrame:
"""Downloads raw WAMIS data year by year (handles partial years correctly)."""
s = utils.requests_retry_session()
all_data = []

start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
years = range(start_dt.year, end_dt.year + 1)

url, date_field, value_field = self._get_endpoint(variable)

for year in years:
start_chunk = max(start_dt, pd.Timestamp(year=year, month=1, day=1))
end_chunk = min(end_dt, pd.Timestamp(year=year, month=12, day=31))

params = {
"obscd": gauge_id,
"startdt": start_chunk.strftime("%Y%m%d"),
"enddt": end_chunk.strftime("%Y%m%d"),
"output": "json",
}

try:
r = s.get(url, params=params, timeout=30)
r.raise_for_status()
js = r.json()

if not isinstance(js, dict) or "list" not in js:
continue

df = pd.DataFrame(js["list"])
if df.empty or date_field not in df.columns or value_field not in df.columns:
continue

df = df.rename(columns={date_field: "time", value_field: variable})
df["time"] = pd.to_datetime(
df["time"],
format="%Y%m%d%H" if variable == constants.STAGE_HOURLY_MEAN else "%Y%m%d",
errors="coerce",
)
df[variable] = pd.to_numeric(df[variable], errors="coerce")
df.loc[df[variable] <= -777, variable] = np.nan
all_data.append(df)

except Exception as e:
logger.warning(f"Failed {variable} fetch for {gauge_id} ({year}): {e}")
continue

if not all_data:
return pd.DataFrame(columns=["time", variable])

df_all = pd.concat(all_data, ignore_index=True)
df_all = df_all.dropna(subset=["time", variable])
if constants.HOURLY in variable or constants.INSTANTANEOUS in variable:
end_dt = end_dt + pd.Timedelta(days=1)
df_all = df_all[(df_all["time"] >= start_dt) & (df_all["time"] < end_dt)]
else:
df_all = df_all[(df_all["time"] >= start_dt) & (df_all["time"] <= end_dt)]
df_all = df_all.drop_duplicates(subset="time", keep="first")
df_all = df_all.sort_values("time").reset_index(drop=True)
return df_all

def _parse_data(self, gauge_id: str, raw_data: pd.DataFrame, variable: str) -> pd.DataFrame:
"""Ensures consistent time index and format."""
if raw_data.empty:
return self._empty_result(variable)

df = raw_data.copy()
df = df.dropna(subset=["time", variable])
df = df.sort_values("time")
if constants.DISCHARGE in variable:
df[variable] = df[variable] / 1000.0
elif constants.STAGE in variable:
df[variable] = df[variable] / 100.0
df = df.rename(columns={"time": constants.TIME_INDEX}).set_index(constants.TIME_INDEX)
df.index = df.index.tz_localize("UTC", nonexistent="NaT", ambiguous="NaT")
return df[[variable]]

def get_data(
self,
gauge_id: str,
variable: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
start_date = utils.format_start_date(start_date)
end_date = utils.format_end_date(end_date)

if variable not in self.get_available_variables():
raise ValueError(f"Unsupported variable: {variable}")

try:
raw_data = self._download_data(gauge_id, variable, start_date, end_date)
df = self._parse_data(gauge_id, raw_data, variable)

if df.empty:
logger.debug(f"No {variable} data returned for gauge {gauge_id}.")
return self._empty_result(variable)

# Final date filter
start_dt = pd.to_datetime(start_date).tz_localize("UTC")
end_dt = pd.to_datetime(end_date).tz_localize("UTC")
if constants.HOURLY in variable or constants.INSTANTANEOUS in variable:
end_dt = end_dt + pd.Timedelta(days=1)
return df[(df.index >= start_dt) & (df.index < end_dt)]

df = df[(df.index >= start_dt) & (df.index <= end_dt)]
return df

except Exception as e:
logger.error(f"Failed to get data for site {gauge_id}, variable {variable}: {e}")
return self._empty_result(variable)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"result": {
"code": "success"
},
"list": [
{
"ymd": "20230101",
"fw": "1500"
},
{
"ymd": "20230102",
"fw": "1600"
},
{
"ymd": "20230103",
"fw": "1700"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"result": {
"code": "success"
},
"list": [
{
"ymd": "20230101",
"wl": "250"
},
{
"ymd": "20230102",
"wl": "255"
},
{
"ymd": "20230103",
"wl": "260"
}
]
}
19 changes: 19 additions & 0 deletions tests/test_data/southkorea_stage_hourly_1001602_20230101.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"result": {
"code": "success"
},
"list": [
{
"ymdh": "2023010100",
"wl": "250"
},
{
"ymdh": "2023010101",
"wl": "255"
},
{
"ymdh": "2023010123",
"wl": "300"
}
]
}
Loading
Loading