From aa56335bf6708de68aaefb326db81821c39fd1ac Mon Sep 17 00:00:00 2001 From: ahxbcn Date: Wed, 11 Feb 2026 15:53:01 +0800 Subject: [PATCH 1/2] refactor: Use new AbacusSTRU in charge density difference tool --- src/abacusagent/modules/cube.py | 1 + src/abacusagent/modules/submodules/cube.py | 78 +++------------------- 2 files changed, 12 insertions(+), 67 deletions(-) diff --git a/src/abacusagent/modules/cube.py b/src/abacusagent/modules/cube.py index 7850610..4a4bb3d 100644 --- a/src/abacusagent/modules/cube.py +++ b/src/abacusagent/modules/cube.py @@ -10,6 +10,7 @@ from abacusagent.modules.submodules.cube import abacus_cal_elf as _abacus_cal_elf from abacusagent.modules.submodules.cube import abacus_cal_charge_density_difference as _abacus_cal_charge_density_difference from abacusagent.modules.submodules.cube import abacus_cal_spin_density as _abacus_cal_spin_density + @mcp.tool() def abacus_cal_elf(abacus_inputs_dir: Path): """ diff --git a/src/abacusagent/modules/submodules/cube.py b/src/abacusagent/modules/submodules/cube.py index e4522b7..8e39ab0 100644 --- a/src/abacusagent/modules/submodules/cube.py +++ b/src/abacusagent/modules/submodules/cube.py @@ -5,15 +5,14 @@ """ import os from pathlib import Path -from typing import Literal, Optional, TypedDict, Dict, Any, List, Tuple, Union -from itertools import groupby +from typing import Optional, Dict, Any, List -from ase.data import chemical_symbols -from abacustest.lib_prepare.abacus import AbacusStru, ReadInput, WriteInput +from abacustest import AbacusSTRU +from abacustest.lib_prepare.abacus import ReadInput, WriteInput from abacustest.lib_model.comm import check_abacus_inputs -from abacusagent.modules.util.comm import run_abacus, generate_work_path, link_abacusjob, collect_metrics -from abacusagent.modules.util.cube_manipulator import read_gaussian_cube, axpy, write_gaussian_cube, profile1d +from abacusagent.modules.util.comm import run_abacus, generate_work_path, link_abacusjob +from abacusagent.modules.util.cube_manipulator import read_gaussian_cube, axpy, write_gaussian_cube def abacus_cal_elf(abacus_inputs_dir: Path): """ @@ -61,37 +60,6 @@ def abacus_cal_elf(abacus_inputs_dir: Path): except Exception as e: return {'message': f"Calculating electron localization function failed: {e}"} -def get_subsys_pp_orb(stru: AbacusStru, - subsys_atom_index: List[int] - ) -> Tuple[str, str]: - """ - Get the pseudopotential and orbital files for a subsystem. - - Args: - stru (AbacusStru): The structure of the full system. - subsys_atom_index (List[int]): Atom indices of the subsystem. - - Returns: - Tuple[str, str, str]: Paths to the pseudopotential and orbital files, and labels of different kinds for the subsystem. - """ - pp_list, orb_list = stru.get_pp(), stru.get_orb() - element_indices = [key for key, _ in groupby(stru.get_element())] - elements = [chemical_symbols[i] for i in element_indices] - pp_dict, orb_dict = dict(zip(elements, pp_list)), dict(zip(elements, orb_list)) - - subsys_elements = [chemical_symbols[stru.get_element()[i]] for i in subsys_atom_index] - subsys_pp, subsys_orb = [], [] - for element in subsys_elements: - if pp_dict[element] not in subsys_pp: - subsys_pp.append(pp_dict[element]) - if orb_dict[element] not in subsys_orb: - subsys_orb.append(orb_dict[element]) - - label_list = stru.get_label() - subsys_label = [label_list[idx] for idx in subsys_atom_index] - - return subsys_pp, subsys_orb, subsys_label - def get_total_charge_density(abacus_inputs_dir: Path): """ Get charge density for non spin-polarized case and total charge density for spin-polarized case. @@ -150,10 +118,10 @@ def abacus_cal_charge_density_difference( input_params = ReadInput(os.path.join(full_system_jobpath, "INPUT")) full_system_stru_file = os.path.join(full_system_jobpath, input_params.get('stru_file', 'STRU')) - full_system_stru = AbacusStru.ReadStru(full_system_stru_file) + full_system_stru = AbacusSTRU.read(full_system_stru_file) # Prepare labels, coordinates, pp and orbital settings needed to generate STRU file for every subsystems - subsys2_atom_index = [i for i in range(full_system_stru.get_natoms()) if i not in subsys1_atom_index] + subsys2_atom_index = [i for i in range(full_system_stru.natoms) if i not in subsys1_atom_index] if len(subsys1_atom_index) is None: raise ValueError("Subsystem 1 have no atoms! Aborting calculating charge density difference") if len(subsys2_atom_index) is None: @@ -161,35 +129,9 @@ def abacus_cal_charge_density_difference( subsys1_stru_file = os.path.join(work_path, f"subsys1/{input_params.get('stru_file', 'STRU')}") subsys2_stru_file = os.path.join(work_path, f"subsys2/{input_params.get('stru_file', 'STRU')}") - subsys1_pp, subsys1_orb, subsys1_label = get_subsys_pp_orb(full_system_stru, subsys1_atom_index) - subsys2_pp, subsys2_orb, subsys2_label = get_subsys_pp_orb(full_system_stru, subsys2_atom_index) - - subsys1_coord, subsys2_coord = [], [] - full_system_stru_coord = full_system_stru.get_coord() - for i in range(full_system_stru.get_natoms()): - if i in subsys1_atom_index: - subsys1_coord.append(full_system_stru_coord[i]) - elif i in subsys2_atom_index: - subsys2_coord.append(full_system_stru_coord[i]) - else: - raise ValueError(f"Atom {i} does not belong to neither subsystem1 nor subsystem2") - - subsys1_stru = AbacusStru(label=subsys1_label, - cell=full_system_stru.get_cell(), - coord=subsys1_coord, - lattice_constant=full_system_stru.get_stru()['lat'], - pp=subsys1_pp, - orb=subsys1_orb, - cartesian=True) + subsys1_stru = full_system_stru.create_subset(subsys1_atom_index) + subsys2_stru = full_system_stru.create_subset(subsys2_atom_index) subsys1_stru.write(subsys1_stru_file) - - subsys2_stru = AbacusStru(label=subsys2_label, - cell=full_system_stru.get_cell(), - coord=subsys2_coord, - lattice_constant=full_system_stru.get_stru()['lat'], - pp=subsys2_pp, - orb=subsys2_orb, - cartesian=True) subsys2_stru.write(subsys2_stru_file) # Modify INPUT file to output cube file needed for calculating charge density difference @@ -219,6 +161,8 @@ def abacus_cal_charge_density_difference( return {'charge_density_diff_work_path': Path(work_path).absolute(), 'charge_density_difference_cube_file': chg_dens_diff_cube_file} except Exception as e: + import traceback + traceback.print_exc() return {'message': f'Calculaing charge density difference failed: {e}'} def abacus_cal_spin_density( From 086d522705e06adef55822bdfdbf812027efcf2e Mon Sep 17 00:00:00 2001 From: ahxbcn Date: Wed, 11 Feb 2026 15:53:29 +0800 Subject: [PATCH 2/2] feature: integrate test for ELF and charge density difference tool --- tests/integrate_test/test_cube.py | 95 +++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/integrate_test/test_cube.py diff --git a/tests/integrate_test/test_cube.py b/tests/integrate_test/test_cube.py new file mode 100644 index 0000000..79b1f98 --- /dev/null +++ b/tests/integrate_test/test_cube.py @@ -0,0 +1,95 @@ +import os +import shutil +from pathlib import Path +import unittest +import tempfile +import pytest +import inspect +from abacusagent.modules.cube import ( + abacus_cal_elf, + abacus_cal_charge_density_difference, +) +from utils import initilize_test_env, load_test_ref_result + +initilize_test_env() + + +class TestCubeCalculation(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.TemporaryDirectory() + self.addCleanup(self.test_dir.cleanup) + self.test_path = Path(self.test_dir.name) + self.abacus_inputs_dir_si_prim = ( + Path(__file__).parent / "abacus_inputs_dirs/Si-prim/" + ) + self.abacus_inputs_dir_h2 = Path(__file__).parent / "abacus_inputs_dirs/H2/" + self.stru_scf = self.abacus_inputs_dir_si_prim / "STRU_scf" + + self.original_cwd = os.getcwd() + os.chdir(self.test_path) + + def tearDown(self): + os.chdir(self.original_cwd) + + @pytest.mark.smoke + def test_abacus_cal_elf_si_prim(self): + """ + Test the abacus_cal_elf function with Si primitive cell. + """ + test_func_name = inspect.currentframe().f_code.co_name + + test_work_dir = self.test_path / test_func_name + shutil.copytree(self.abacus_inputs_dir_si_prim, test_work_dir) + shutil.copy2(self.abacus_inputs_dir_si_prim / "STRU_scf", test_work_dir / "STRU") + + outputs = abacus_cal_elf(test_work_dir) + + elf_file = outputs["elf_file"] + self.assertIsInstance(elf_file, Path) + self.assertTrue(elf_file.exists()) + self.assertTrue(outputs["elf_work_path"].exists()) + + # Test that the ELF calculation ran successfully + self.assertTrue(outputs["elf_work_path"].joinpath("OUT.ABACUS", "ELF.cube").exists()) + + def test_abacus_cal_charge_density_difference_si_prim(self): + """ + Test the abacus_cal_charge_density_difference function with Si primitive cell. + """ + test_func_name = inspect.currentframe().f_code.co_name + + test_work_dir = self.test_path / test_func_name + shutil.copytree(self.abacus_inputs_dir_si_prim, test_work_dir) + shutil.copy2(self.abacus_inputs_dir_si_prim / "STRU_scf", test_work_dir / "STRU") + + outputs = abacus_cal_charge_density_difference(test_work_dir, subsys1_atom_index=[1]) + print(outputs) + + charge_density_diff_file = outputs["charge_density_difference_cube_file"] + self.assertIsInstance(charge_density_diff_file, Path) + self.assertTrue(charge_density_diff_file.exists()) + self.assertTrue(outputs["charge_density_diff_work_path"].exists()) + + # Test that the charge density difference calculation ran successfully + self.assertTrue(outputs["charge_density_diff_work_path"].joinpath("chg_density_diff.cube").exists()) + + def test_abacus_cal_charge_density_difference_h2(self): + """ + Test the abacus_cal_charge_density_difference function with H2 molecule. + """ + test_func_name = inspect.currentframe().f_code.co_name + + test_work_dir = self.test_path / test_func_name + shutil.copytree(self.abacus_inputs_dir_h2, test_work_dir) + shutil.copy2(self.abacus_inputs_dir_h2 / "STRU_relaxed", test_work_dir / "STRU") + + outputs = abacus_cal_charge_density_difference(test_work_dir, subsys1_atom_index=[1]) + print(outputs) + + charge_density_diff_file = outputs["charge_density_difference_cube_file"] + self.assertIsInstance(charge_density_diff_file, Path) + self.assertTrue(charge_density_diff_file.exists()) + self.assertTrue(outputs["charge_density_diff_work_path"].exists()) + + # Test that the charge density difference calculation ran successfully + self.assertTrue(outputs["charge_density_diff_work_path"].joinpath("chg_density_diff.cube").exists())