Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
## 2024-03-25 - Pre-compiling Regex in performance-critical loops
**Learning:** Initializing `re` matches inside loops without pre-compiling adds significant overhead. Profiling regex performance specifically in `parse_charge_mult` showed that dynamic matching creates a ~1.5x-2x performance bottleneck over 100k invocations compared to `re.compile()` at the module level.
**Action:** Always extract regex expressions into pre-compiled module-level constants (e.g., `RE_CHARGE`, `RE_XYZ`) instead of defining them inline, especially in frequently called parsing loops.

## 2024-05-18 - Replacing `json` with `orjson` for large datasets
**Learning:** In pipelines handling large datasets via dictionaries containing metadata (e.g. millions of prefixes), `json.dump` and `json.load` can become significant bottlenecks, adding seconds or even minutes to startup and checkpointing phases. `orjson` provides a near drop-in replacement that is 4-10x faster for such operations.
**Action:** When working with large JSON files, especially in a framework requiring frequent disk checkpoints, replace Python's built-in `json` module with `orjson` wrapping `loads`/`dumps` to preserve API compatibility while gaining massive performance boosts.
## 2024-03-29 - ASE Custom JSON encoding vs standard JSON
**Learning:** ASE's custom JSON encoder (`ase.io.jsonio.encode`) will generate dicts with special keys like `__ndarray__` or `__complex__` (e.g. `{"__ndarray__": [[5], "int64", ...]}`). When optimizing JSON deserialization using faster alternatives like `orjson`, it's critical to realize that a normal `json.loads` or `orjson.loads` will deserialize this into a Python dictionary, while ASE's custom `decode` will properly reconstruct the underlying numpy array. Bypassing ASE's decoder without checking for these keys leads to downstream type errors (e.g. `KeyError: '__ndarray__'`).
**Action:** When replacing or wrapping ASE's jsonio with `orjson`, always fall back to ASE's `decode` if the payload string contains `__ndarray__` or `__complex__` markers, to ensure custom objects are correctly reconstructed.
## 2025-03-02 - [Safe Fast-path Optimization for Parsing]
**Learning:** When using substring pre-checks (`in`) to short-circuit expensive regex compilation/searching in parsing loops, the substring condition must conservatively encompass all possible valid targets. For example, `RE_QUAD` matches lines containing `"Buckingham"` *or* `"a.u."` alongside `"TOT"`. Filtering only for `"Buckingham"` creates a subtle functional regression by prematurely skipping valid rows.
**Action:** When creating fast-path checks for complex text extraction algorithms, always base the early-return substring directly off the shared static prefixes or non-optional components of the regular expression (e.g., `"TOT"`) rather than optional capturing branches.
126 changes: 88 additions & 38 deletions src/lavello_mlips/process_omol25.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import ase.parallel
from ase.parallel import DummyMPI

ase.parallel.world = DummyMPI()

from .s3_processor import S3DataProcessor
Expand Down Expand Up @@ -61,9 +62,16 @@
)


def parse_quadrupole(txt: str) -> Optional[Tuple[float, float, float, float, float, float, float]]:
def parse_quadrupole(
txt: str,
) -> Optional[Tuple[float, float, float, float, float, float, float]]:
if not txt:
return None
# ⚡ Bolt Optimization: Fast-path string literal check
# Avoids running the expensive RE_QUAD regex if the text definitively lacks the target pattern.
if "TOT" not in txt:
return None

match = RE_QUAD.search(txt)

if match:
Expand All @@ -78,6 +86,11 @@ def parse_quadrupole(txt: str) -> Optional[Tuple[float, float, float, float, flo
def parse_dipole(txt: str) -> Optional[Tuple[float, float, float, float]]:
if not txt:
return None
# ⚡ Bolt Optimization: Fast-path string literal check
# Skips executing the complex RE_DIP expression on large string blocks when 'ipole' or 'IPOLE' is absent.
if "ipole" not in txt and "IPOLE" not in txt:
return None

best = None
for m in RE_DIP.finditer(txt):
unit = (m.group(1) or "").lower()
Expand All @@ -97,39 +110,60 @@ def parse_dipole(txt: str) -> Optional[Tuple[float, float, float, float]]:
# ---------- charge/multiplicity ----------
RE_CHARGE_MULT = re.compile(
r"(?:Total\s+Charge|Overall\s+charge\s+of\s+the\s+system)\s*[:=]\s*(-?\d+)|"
r"Multiplicity\s*[:=]\s*(\d+)", re.I)
r"Multiplicity\s*[:=]\s*(\d+)",
re.I,
)
RE_XYZ = re.compile(r"^\s*\*\s*xyz(?:file)?\s+(-?\d+)\s+(\d+)\b.*$", flags=re.I | re.M)


def parse_charge_mult(txt: str) -> Tuple[Optional[int], Optional[int]]:
Q = None
M = None
for m in RE_CHARGE_MULT.finditer(txt):
q_match = m.group(1)
if q_match is not None:
try:
Q = int(q_match)
except ValueError:
# Ignore unparsable charge value; leave Q as-is (None or previous match).
logger.debug("Failed to parse charge value from match %r in text; ignoring.", q_match)
else:
m_match = m.group(2)
if m_match is not None:

if not txt:
return Q, M

# ⚡ Bolt Optimization: Fast-path string literal check
# Validates substrings first to prevent regex invocation if text lacks expected target keywords.
has_charge = "harge" in txt or "HARGE" in txt
has_mult = "ultiplicity" in txt or "ULTIPLICITY" in txt

if has_charge or has_mult:
for m in RE_CHARGE_MULT.finditer(txt):
q_match = m.group(1)
if q_match is not None:
try:
M = int(m_match)
Q = int(q_match)
except ValueError:
# Ignore unparsable multiplicity value; leave M as-is (None or previous match).
logger.debug("Failed to parse multiplicity value from match %r in text; ignoring.", m_match)
# Ignore unparsable charge value; leave Q as-is (None or previous match).
logger.debug(
"Failed to parse charge value from match %r in text; ignoring.",
q_match,
)
else:
m_match = m.group(2)
if m_match is not None:
try:
M = int(m_match)
except ValueError:
# Ignore unparsable multiplicity value; leave M as-is (None or previous match).
logger.debug(
"Failed to parse multiplicity value from match %r in text; ignoring.",
m_match,
)

m = RE_XYZ.search(txt)
if m:
try:
Q = int(m.group(1))
M = int(m.group(2))
except ValueError:
# Ignore unparsable XYZ header values; leave Q/M as determined above.
logger.debug(
"Failed to parse charge/multiplicity from XYZ header match %r; ignoring.", m.groups()
)
if "xyz" in txt or "XYZ" in txt:
m = RE_XYZ.search(txt)
if m:
try:
Q = int(m.group(1))
M = int(m.group(2))
except ValueError:
# Ignore unparsable XYZ header values; leave Q/M as determined above.
logger.debug(
"Failed to parse charge/multiplicity from XYZ header match %r; ignoring.",
m.groups(),
)
return Q, M


Expand All @@ -145,12 +179,14 @@ def cnc(Z, coords):


def geom_sha1(elems, coords, ndp: int = 6) -> Optional[str]:
h = hashlib.sha1()
for e, (x, y, z) in zip(elems, coords):
h.update(
f"{e}:{round(x, ndp):.6f}:{round(y, ndp):.6f}:{round(z, ndp):.6f};".encode()
)
return h.hexdigest()
# ⚡ Bolt Optimization: Hashing performance
# Replaces looping `.update()` calls with a single generator expression `.join()` and a unified `.encode()`,
# reducing overhead during repetitive molecular fingerprinting operations.
s = "".join(
f"{e}:{round(x, ndp):.6f}:{round(y, ndp):.6f}:{round(z, ndp):.6f};"
for e, (x, y, z) in zip(elems, coords)
)
return hashlib.sha1(s.encode()).hexdigest()


# ---------- eigenvalues ----------
Expand All @@ -169,13 +205,18 @@ def homo_lumo(evals, occs, thr=1e-3):
return None, (evals[virt_idx[0]] if virt_idx else None)
h = max(occ_idx)
virt_above = [i for i in virt_idx if i > h] or virt_idx
l = min(virt_above) if virt_above else None
return evals[h], (evals[l] if l is not None else None)
l_idx = min(virt_above) if virt_above else None
return evals[h], (evals[l_idx] if l_idx is not None else None)


def parse_eigens(txt: str) -> Optional[Dict[str, Any]]:
if not txt:
return None
# ⚡ Bolt Optimization: Fast-path string literal check
# Avoids expensive splitlines and block scanning when no eigenvalue markers are present.
if "OCC" not in txt and "occ" not in txt and "Occ" not in txt:
return None

lines = txt.splitlines()
blocks = []
i = 0
Expand Down Expand Up @@ -265,7 +306,9 @@ class OmolDataProcessor(S3DataProcessor):
Derived processor for Omol data, handling MPI orchestration and Orca parsing.
"""

def __init__(self, args: argparse.Namespace, rank: int, size: int, comm: Any) -> None:
def __init__(
self, args: argparse.Namespace, rank: int, size: int, comm: Any
) -> None:
super().__init__(args.login_file, args.bucket, args.local_dir)
self.args = args
self.rank = rank
Expand Down Expand Up @@ -404,7 +447,9 @@ def handle_signal(signum, frame):
if self.rank == 0:
logger.info(f"Using flush batch size of {self.batch_size}")

def flush_recs(self, recs: List[Dict[str, Any]], all_atoms: Optional[List[Any]] = None) -> None:
def flush_recs(
self, recs: List[Dict[str, Any]], all_atoms: Optional[List[Any]] = None
) -> None:
"""Flush a batch of records to Parquet, and optionally atoms to ExtXYZ."""
if not recs:
return
Expand All @@ -422,7 +467,9 @@ def flush_recs(self, recs: List[Dict[str, Any]], all_atoms: Optional[List[Any]]
write(str(xyz_path), all_atoms, format="extxyz")
self.chunk_idx += 1

def _process_buffer(self, buffer: BytesIO, x: str) -> Optional[Tuple[Dict[str, Any], Any]]:
def _process_buffer(
self, buffer: BytesIO, x: str
) -> Optional[Tuple[Dict[str, Any], Any]]:
"""Parse a .tar.zst buffer; returns (rec dict, ASE Atoms) or None."""
rec: Dict[str, Any] = {}
try:
Expand Down Expand Up @@ -532,7 +579,9 @@ def _process_buffer(self, buffer: BytesIO, x: str) -> Optional[Tuple[Dict[str, A
logger.error(f"Error parsing buffer for {x}: {e}")
return None

def process_single(self, idx: int, s3_client: Any = None) -> Optional[Tuple[Dict[str, Any], Any, str]]:
def process_single(
self, idx: int, s3_client: Any = None
) -> Optional[Tuple[Dict[str, Any], Any, str]]:
"""Processes a single task synchronously. Returns (rec, atoms, x) or None."""
start_time = time.time()
x = self.prefixes[idx]
Expand Down Expand Up @@ -788,4 +837,5 @@ def run_serial(self) -> None:
finally:
pass


logger = logging.getLogger(__name__)
Loading