"""
Scraper for NZ MEDSAFE (Suspected Medicine Adverse Reaction Search (SMARS)) using SeleniumBase.
"""
import os
import time
import warnings
from collections.abc import Callable
from typing import Any
import pandas as pd
from bs4 import BeautifulSoup
from seleniumbase import SB
warnings.filterwarnings("ignore")
[docs]
def scrape_medsafe_sb(
searching_for: str,
drug_vaccine: str,
output_dir: str = "data/nzmedsafe",
callback: Callable[[dict], None] | None = None,
headless: bool = True,
num_retries: int = 5,
) -> pd.DataFrame:
"""
Scrapes NZ Medsafe database for a given medicine or vaccine, for System Organ Class (SOC), PTs and associated count.
Parameters
-----------
searching_for : str
Either "medicine" or "vaccine".
drug_vaccine : str
The drug or vaccine name to search for.
output_dir : str
Directory to save CSV (default "data/nzmedsafe").
callback : callable
Callable to receive UI/status events, called with a dict.
This is essential to show progress to user.
headless : bool
Run the browser headless (default True).
num_retries: int
Number of retries for data scraping after which error is thrown (default 5).
Returns
--------
A dataframe with columns ["SOC", "PT", "Count"].
"""
def _emit(event_type: str, **kw: Any) -> None: # pragma: no cover
if callback:
try:
callback({"type": event_type, **kw})
except Exception: # pragma: no cover
raise # pragma: no cover
def parse_table(html: str):
soup = BeautifulSoup(html, "html.parser")
headers = [th.get_text(strip=True) for th in soup.find_all("th")]
data, current_soc = [], None
for row in soup.find_all("tr")[1:]: # skip header
cols = row.find_all("td")
if len(cols) == 3:
current_soc = cols[0].get_text(strip=True)
reaction = cols[1].get_text(strip=True)
reports = cols[2].get_text(strip=True)
elif len(cols) == 2:
reaction = cols[0].get_text(strip=True)
reports = cols[1].get_text(strip=True)
else:
continue
data.append(
{headers[0]: current_soc, headers[1]: reaction, headers[2]: reports}
)
return data, headers
os.makedirs(output_dir, exist_ok=True)
exceptions = []
for attempt in range(num_retries):
try:
if attempt > 0:
_emit("log", message=f"Retrying... ({attempt + 1}/{num_retries})\n")
url = "https://www.medsafe.govt.nz/Projects/B1/ADRSearch.asp"
with SB(uc=True, headless=headless) as sb:
_emit(
"log", message=f"Parsing medsafe.govt.nz (Attempt {attempt + 1})\n"
)
sb.activate_cdp_mode(url)
sb.wait_for_ready_state_complete()
try:
if sb.cdp.is_element_present('//*[@id="Accept"]'):
sb.cdp.click('//*[@id="Accept"]')
sb.sleep(2)
sb.wait_for_ready_state_complete()
except Exception as e: # pragma: no cover
_emit("log", message=f"Cookie/terms click skipped or failed: {e}")
# Select medicine type
try:
search_type = (searching_for or "medicine").strip().lower()
if search_type == "vaccine":
sb.cdp.select_if_unselected(
'//*[@id="MainContent_MedicineType_0"]'
)
sb.wait_for_ready_state_complete()
else:
sb.cdp.select_if_unselected(
'//*[@id="MainContent_MedicineType_1"]'
)
sb.wait_for_ready_state_complete()
sb.sleep(5)
except Exception as e: # pragma: no cover
_emit("log", message=f"Failed setting medicine type: {e}")
raise # pragma: no cover
# Search text
try:
if sb.cdp.is_element_visible('//*[@id="MainContent_TextToFind"]'):
sb.cdp.type(
'//*[@id="MainContent_TextToFind"]', str(drug_vaccine)
)
sb.sleep(1.5)
sb.cdp.click('//*[@id="MainContent_ButtonFind"]')
sb.sleep(2.0)
sb.wait_for_ready_state_complete()
except Exception as e: # pragma: no cover
_emit("error", message=f"Failed typing/searching for term: {e}")
raise # pragma: no cover
# Check for site error message in case of no ingredient match
try:
if sb.cdp.is_element_present('//*[@id="MainContent_LabelErrors"]'):
msg = sb.cdp.get_text('//*[@id="MainContent_LabelErrors"]')
if (msg or "").strip():
_emit("error", message=msg)
raise RuntimeError(msg) # pragma: no cover
except Exception: # pragma: no cover
raise # pragma: no cover
# Choose summary report type
try:
sb.cdp.select_if_unselected('//*[@id="MainContent_ReportType_1"]')
sb.cdp.click('//*[@id="MainContent_ButtonSearch"]')
sb.sleep(1)
sb.wait_for_ready_state_complete()
except Exception as e: # pragma: no cover
_emit("error", message=f"Failed to initiate results search: {e}")
raise # pragma: no cover
# Find number of pages
num_pages = 1
try:
pager_table_xpath = (
'//*[@id="MainContent_GridSummary"]/tbody/tr[last()]/td/table'
)
if sb.cdp.is_element_present(pager_table_xpath):
table_el = sb.cdp.find_element(pager_table_xpath)
row_el = table_el.query_selector(
"tbody > tr"
) or table_el.query_selector("tr")
num_pages = max(1, len(row_el.query_selector_all("td")))
except Exception: # pragma: no cover
num_pages = 1
_emit("log", message=f"Pages detected: {num_pages}")
data_rows = []
def scrape_current_page():
table = sb.cdp.find_element(
'//*[@id="MainContent_GridSummary"]/tbody'
)
r = table.get_attribute("outerHTML")
page_rows, _headers = parse_table(r)
return page_rows
# Progress per page
delta = 100.0 / float(max(1, num_pages))
for page in range(1, num_pages + 1):
try:
if page > 1:
sb.cdp.click(
f'//*[@id="MainContent_GridSummary"]/tbody/tr[last()]/td/table/tbody/tr/td[{page}]/a'
)
sb.sleep(0.8)
sb.wait_for_ready_state_complete()
data_rows.extend(scrape_current_page())
_emit("progress", delta=delta)
except Exception as e: # pragma: no cover
_emit(
"log", message=f"Page {page}: failed to collect rows: {e}"
)
df = pd.DataFrame(data_rows)
if not df.empty:
cols = list(df.columns)
rename_map = {}
if len(cols) >= 1:
rename_map[cols[0]] = "SOC"
if len(cols) >= 2:
rename_map[cols[1]] = "PT"
if len(cols) >= 3:
rename_map[cols[2]] = "Count"
df = df.rename(columns=rename_map)
# Coerce Count to int if possible
if "Count" in df.columns:
try:
df["Count"] = (
df["Count"]
.astype(str)
.str.replace(",", "", regex=False)
.astype(int)
)
except Exception: # pragma: no cover
pass
out_path = os.path.join(output_dir, f"{drug_vaccine}_nzsmars_adrs.csv")
try:
df.to_csv(out_path, index=False)
_emit("log", message=f"Data saved to: {os.path.abspath(out_path)}")
except Exception as e: # pragma: no cover
_emit("error", message=f"Failed to save CSV: {e}")
_emit("done")
return df
except Exception as e: # pragma: no cover
exceptions.append(e)
_emit("log", message=f"Attempt {attempt + 1} failed.\n")
time.sleep(20)
continue
_emit(
"error",
message=(
f"All {num_retries} attempt(s) to scrape data for {drug_vaccine} failed. "
"Please check the following:\n"
"1. Ensure you have a stable internet connection.\n"
"2. Verify that 'https://www.medsafe.govt.nz/Projects/B1/ADRSearch.asp' opens correctly in your Chrome browser.\n"
"3. If these steps do not resolve the issue, please wait a while and retry. \n"
"If problems persist, contact the developer at https://github.com/rmj3197/SurVigilance/issues "
"for assistance.\n\n"
),
)
raise RuntimeError(
f"All {num_retries} attempt(s) to scrape data for {drug_vaccine} failed. "
"Please check the following:\n"
"1. Ensure you have a stable internet connection.\n"
"2. Verify that 'https://www.medsafe.govt.nz/Projects/B1/ADRSearch.asp' opens correctly in your Chrome browser.\n"
"3. If these steps do not resolve the issue, please wait a while and retry. \n"
"If problems persist, contact the developer at https://github.com/rmj3197/SurVigilance/issues "
"for assistance.\n\n"
)