Source code for utils.baselines

from __future__ import annotations
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
import numpy as np
import pandas as pd

from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier, MLPRegressor
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    mean_squared_error,
)
from scipy.stats import pearsonr

try:
    import lightgbm as lgb
    _HAVE_LGBM = True
except Exception:
    _HAVE_LGBM = False


[docs] @dataclass class BaselineParams: seed: int = 0 fp_radius: int = 2 fp_nbits: int = 2048 # MLP defaults mlp_hidden: tuple = (256, 128) mlp_max_iter: int = 300 # RF defaults rf_estimators: int = 500 # LGBM defaults lgbm_estimators: int = 800 lgbm_lr: float = 0.05 lgbm_leaves: int = 31 lgbm_subsample: float = 0.8 lgbm_colsample: float = 0.8
def _cfg_param(cfg: Dict[str, Any], *path, default=None): d = cfg for p in path: if not isinstance(d, dict) or p not in d: return default d = d[p] return d def _task_kind(cfg: Dict[str, Any], y: np.ndarray) -> str: kind = cfg.get("task") if kind in ("classification", "regression"): return kind # Fallback heuristic y_clean = y[~pd.isna(y)] uniq = np.unique(y_clean) if len(uniq) <= 10 and set(uniq.tolist()).issubset({0, 1}): return "classification" return "regression" def _get_ycol(df: pd.DataFrame, cfg: Dict[str, Any]) -> np.ndarray: info = cfg.get("info", {}) for c in (info.get("label_col"), "label", "label_raw"): if c and c in df.columns: return df[c].to_numpy() raise KeyError("No label column found in dataframe") def _morgan_fps(smiles: List[Optional[str]], radius: int, nbits: int): fps = [] for s in smiles: m = Chem.MolFromSmiles(s) if pd.notna(s) else None if m is None: fps.append(None) else: fps.append(AllChem.GetMorganFingerprintAsBitVect(m, radius, nBits=nbits)) return fps def _fps_to_numpy(fps): if not fps: return np.zeros((0, 0), dtype=np.float32) n_bits = fps[0].GetNumBits() if fps[0] is not None else 2048 arr = np.zeros((len(fps), n_bits), dtype=np.float32) for i, fp in enumerate(fps): if fp is None: continue DataStructs.ConvertToNumpyArray(fp, arr[i]) return arr def _X_from_df(df: pd.DataFrame, cfg: Dict[str, Any], params: BaselineParams) -> np.ndarray: info = cfg.get("info", {}) smi_col = info.get("smiles_col") if smi_col is None: # prefer cleaned column if present smi_col = "smiles_clean" if "smiles_clean" in df.columns else "smiles" if smi_col in df.columns: fps = _morgan_fps( df[smi_col].astype(str).tolist(), radius=int(_cfg_param(cfg, "info", "fp_radius", default=params.fp_radius)), nbits=int(_cfg_param(cfg, "info", "fp_nbits", default=params.fp_nbits)), ) return _fps_to_numpy(fps) # Tabular fallback: all numeric columns except id/labels/smiles drop = {info.get("id_col", "id"), info.get("label_col", "label"), "label_raw", "smiles", "smiles_clean"} cols = [c for c in df.columns if c not in drop and np.issubdtype(df[c].dtype, np.number)] return df[cols].to_numpy(dtype=np.float32) def _class_models(p: BaselineParams): models = { "mlp": make_pipeline(StandardScaler(with_mean=False), MLPClassifier(hidden_layer_sizes=p.mlp_hidden, max_iter=p.mlp_max_iter, random_state=p.seed)), "rf": RandomForestClassifier(n_estimators=p.rf_estimators, random_state=p.seed, n_jobs=-1), } if _HAVE_LGBM: models["lgbm"] = lgb.LGBMClassifier( n_estimators=p.lgbm_estimators, learning_rate=p.lgbm_lr, num_leaves=p.lgbm_leaves, subsample=p.lgbm_subsample, colsample_bytree=p.lgbm_colsample, random_state=p.seed, ) return models def _reg_models(p: BaselineParams): models = { "mlp": make_pipeline(StandardScaler(with_mean=False), MLPRegressor(hidden_layer_sizes=p.mlp_hidden, max_iter=p.mlp_max_iter, random_state=p.seed)), "rf": RandomForestRegressor(n_estimators=p.rf_estimators, random_state=p.seed, n_jobs=-1), } if _HAVE_LGBM: models["lgbm"] = lgb.LGBMRegressor( n_estimators=p.lgbm_estimators, learning_rate=p.lgbm_lr, num_leaves=p.lgbm_leaves, subsample=p.lgbm_subsample, colsample_bytree=p.lgbm_colsample, random_state=p.seed, ) return models
[docs] def eval_baselines_generic(cfg: Dict[str, Any], splits: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """Train on train only, evaluate on test using standard metrics.""" p = BaselineParams( seed=int(cfg.get("seed", 0)), fp_radius=int(_cfg_param(cfg, "info", "fp_radius", default=2)), fp_nbits=int(_cfg_param(cfg, "info", "fp_nbits", default=2048)), ) train = splits["train"] test = splits["test"] X_tr = _X_from_df(train, cfg, p) y_tr = _get_ycol(train, cfg) if len(y_tr) > 0: sample = y_tr[0] if isinstance(sample, (list, tuple, np.ndarray)): raise NotImplementedError("Multitask baselines are not supported yet.") X_te = _X_from_df(test, cfg, p) # test labels may not exist (e.g., Polaris). If present we compute local metrics. y_te = None try: y_te = _get_ycol(test, cfg) except Exception: pass kind = _task_kind(cfg, y_tr) results: Dict[str, Any] = {"task": kind, "models": {}} if kind == "classification": models = _class_models(p) for name, model in models.items(): model.fit(X_tr, y_tr) # Use probabilities for metrics if hasattr(model, "predict_proba"): prob = model.predict_proba(X_te) y_pred = prob[:, 1] if prob.ndim == 2 and prob.shape[1] >= 2 else prob.ravel() else: # fall back to scaled decision function dec = model.decision_function(X_te) y_pred = (dec - dec.min()) / (dec.max() - dec.min() + 1e-12) metrics = {} if y_te is not None: try: metrics["roc_auc"] = float(roc_auc_score(y_te, y_pred)) except Exception: metrics["roc_auc"] = None try: metrics["average_precision"] = float(average_precision_score(y_te, y_pred)) except Exception: metrics["average_precision"] = None results["models"][name] = {"metrics": metrics, "predictions": y_pred.tolist()} else: models = _reg_models(p) for name, model in models.items(): model.fit(X_tr, y_tr) y_pred = model.predict(X_te) metrics = {} if y_te is not None: try: metrics["mse"] = float(mean_squared_error(y_te, y_pred)) except Exception: metrics["mse"] = None try: pr = pearsonr(y_te, y_pred) metrics["pearsonr"] = float(getattr(pr, "statistic", pr[0])) except Exception: metrics["pearsonr"] = None results["models"][name] = {"metrics": metrics, "predictions": y_pred.astype(float).tolist()} return results
[docs] def eval_baselines_polaris(cfg: Dict[str, Any]) -> Dict[str, Any]: """Train on train only, get predictions for test, and evaluate via Polaris' API.""" import polaris as po p = BaselineParams(seed=int(cfg.get("seed", 0))) bench_name = cfg.get("name") if not bench_name: raise ValueError("Polaris config must include 'name' (e.g., 'tdcommons/pgp-broccatelli').") benchmark = po.load_benchmark(bench_name) train, test = benchmark.get_train_test_split() # Polaris loaders expose .inputs/.targets; fall back to iteration if needed X_tr = np.asarray(getattr(train, "inputs", [x for x, _ in train])) y_tr = np.asarray(getattr(train, "targets", [y for _, y in train])) X_te = np.asarray(getattr(test, "inputs", [x for x in test])) kind = _task_kind(cfg, y_tr) out: Dict[str, Any] = {"task": kind, "models": {}} if kind == "classification": models = _class_models(p) for name, model in models.items(): model.fit(X_tr, y_tr) if hasattr(model, "predict_proba"): prob = model.predict_proba(X_te) y_pred = prob[:, 1] if prob.ndim == 2 and prob.shape[1] >= 2 else prob.ravel() else: dec = model.decision_function(X_te) y_pred = (dec - dec.min()) / (dec.max() - dec.min() + 1e-12) results = benchmark.evaluate(list(map(float, y_pred))) try: metrics = results.to_dict() if hasattr(results, "to_dict") else dict(results) except Exception: metrics = {"polaris_evaluate": True} out["models"][name] = {"metrics": metrics, "predictions": list(map(float, y_pred))} else: models = _reg_models(p) for name, model in models.items(): model.fit(X_tr, y_tr) y_pred = model.predict(X_te) results = benchmark.evaluate(list(map(float, y_pred))) try: metrics = results.to_dict() if hasattr(results, "to_dict") else dict(results) except Exception: metrics = {"polaris_evaluate": True} out["models"][name] = {"metrics": metrics, "predictions": list(map(float, y_pred))} return out
[docs] def run_baselines(cfg: Dict[str, Any], splits: Optional[Dict[str, pd.DataFrame]] = None) -> Dict[str, Any]: """Public entry point. Uses Polaris path when cfg['type']=='polaris', else generic.""" if cfg.get("type") == "polaris": return eval_baselines_polaris(cfg) if splits is None: raise ValueError("Non-Polaris baselines require preloaded data splits") return eval_baselines_generic(cfg, splits)