"""Functions to read the standard output file of CP2K."""
# Internal library imports
from aim2dat.io.base_parser import transform_str_value, parse_block_function, _BaseDataBlock
[docs]
class CP2KBlock(_BaseDataBlock):
"""CP2K| data block."""
start_str = "CP2K|"
use_once = True
def _parse_line(self, line):
if "version string:" in line:
self.current_data["cp2k_version"] = " ".join(line.split()[5:])
[docs]
class GlobalBlock(_BaseDataBlock):
"""GLOBAL| data block."""
start_str = "GLOBAL|"
use_once = True
def _parse_line(self, line):
if "Run type" in line:
self.current_data["run_type"] = " ".join(line.split()[3:])
[docs]
class BrillouinBlock(_BaseDataBlock):
"""BRILLOUIN| data block."""
start_str = "BRILLOUIN|"
end_str = "***"
use_once = True
def __init__(self):
"""Initialize class."""
_BaseDataBlock.__init__(self)
self.in_kpoint_block = False
def _parse_line(self, line):
if "K-point scheme" in line:
self.current_data["kpoint_scheme"] = line.split()[-1]
elif "K-Point grid" in line:
line_sp = line.split()
self.current_data["kpoint_grid"] = [
int(line_sp[-3]),
int(line_sp[-2]),
int(line_sp[-1]),
]
elif "Number" in line and "Weight" in line:
self.in_kpoint_block = True
self.current_data["kpoints"] = []
elif self.in_kpoint_block:
if self.end_str in line:
self.in_kpoint_block = False
return None
line_sp = line.split()
self.current_data["kpoints"].append(
{
"idx": int(line_sp[1]),
"weight": float(line_sp[2]),
"kpoint": [float(line_sp[3]), float(line_sp[4]), float(line_sp[5])],
}
)
[docs]
class DFTBlock(_BaseDataBlock):
"""DFT| data block."""
start_str = "DFT|"
use_once = True
def _parse_line(self, line):
if "Kohn-Sham" in line:
self.current_data["dft_type"] = line.split()[-1]
elif "Multiplicity" in line:
self.current_data["multiplicity"] = int(line.split()[-1])
elif "Charge" in line:
self.current_data["charge"] = float(line.split()[-1])
[docs]
class FunctionalBlock(_BaseDataBlock):
"""FUNCTIONAL| data block."""
start_str = "FUNCTIONAL|"
use_once = True
def _parse_line(self, line):
xc_info = self.current_data.setdefault("xc", {})
if line.endswith(":\n"):
new_value = line[13:-2]
fct = xc_info.pop("functional", None)
if fct is None:
fct = new_value
elif isinstance(fct, str):
fct = [fct, new_value]
else:
fct.append(new_value)
xc_info["functional"] = fct
[docs]
class VdWBlock(_BaseDataBlock):
"""vdW POTENTIAL| data block."""
start_str = "vdW POTENTIAL|"
use_once = True
def _parse_line(self, line):
# Get first and second line:
xc_info = self.current_data.setdefault("xc", {})
if len(xc_info) == 0:
xc_info["vdw_type"] = " ".join(line.split()[2:])
elif len(xc_info) == 1:
xc_info["vdw_method"] = line.split()[2]
[docs]
class MDParBlock(_BaseDataBlock):
"""MD_PAR| data block."""
start_str = "MD_PAR|"
use_once = True
def _parse_line(self, line):
if "Ensemble type" in line:
self.current_data["md_ensemble"] = line.split()[-1]
[docs]
class MDIniBlock(_BaseDataBlock):
"""MD_INI| data block."""
_pattern_mapping = [
("Potential energy", "potential_energy"),
("Kinetic energy", "kinetic_energy"),
("Temperature", "temperature"),
]
start_str = "MD_INI|"
use_once = True
def _parse_line(self, line):
md_ini = self.current_data.setdefault(
"md_ini", {"step_nr": 0, "time_fs": 0.0, "index": self.line_indices[0]}
)
for pattern, key in self._pattern_mapping:
if pattern in line:
md_ini[key] = float(line.split()[-1])
[docs]
class MDBlock(_BaseDataBlock):
"""MD| data block."""
_pattern_mapping = [
("Energy drift per atom", "energy_drift_p_atom"),
("Potential energy", "potential_energy"),
("Kinetic energy", "kinetic_energy"),
("Temperature", "temperature"),
]
start_str = "MD|"
def _parse_line(self, line):
if "Step number" in line:
self.current_data["step_nr"] = int(line.split()[-1])
elif "Time [fs]" in line:
self.current_data["time_fs"] = float(line.split()[-1])
for pattern, key in self._pattern_mapping:
if pattern in line:
self.current_data[key] = float(line.split()[-2])
def _process_output(self):
if len(self.line_indices) > 0:
return {"md_steps": self.all_data, "motion_indices": self.line_indices}
[docs]
class NumbersBlock(_BaseDataBlock):
"""Numbers data block."""
start_str = "TOTAL NUMBERS AND MAXIMUM NUMBERS"
end_str = "SCF PARAMETERS"
use_once = True
def _parse_line(self, line):
if "Atoms:" in line:
self.current_data["natoms"] = int(line.split()[-1])
[docs]
class KindInfoBlock(_BaseDataBlock):
"""Kind info data block."""
start_str = "ATOMIC COORDINATES IN"
end_str = "SCF PARAMETERS"
use_once = True
def _parse_line(self, line):
line = line.strip()
if line == "" or line.startswith(("Atom", "SCF", "MODULE")):
return None
ki_list = self.current_data.setdefault("kind_info", [])
line_sp = line.split()
ki_list.append(
{
"kind": int(line_sp[1]),
"element": line_sp[2],
"atomic_nr": int(line_sp[3]),
"core_electrons": int(int(line_sp[3]) - float(line_sp[7])),
"valence_electrons": int(float(line_sp[7])),
"mass": float(line_sp[8]),
}
)
[docs]
class SCFParametersBlock(_BaseDataBlock):
"""SCF parameters data block."""
start_str = "SCF PARAMETERS"
def _parse_line(self, line):
if "added MOs" in line:
line_sp = line.split()
if line_sp[-1] != "0":
self.current_data["nr_unocc_orbitals"] = [int(line_sp[-2]), int(line_sp[-1])]
else:
self.current_data["nr_unocc_orbitals"] = int(line_sp[-2])
[docs]
class SPGRBlock(_BaseDataBlock):
"""SPGR| data block."""
_pattern_mapping = [
("SPACE GROUP NUMBER:", "sg_number"),
("INTERNATIONAL SYMBOL:", "int_symbol"),
("POINT GROUP SYMBOL:", "point_group_symbol"),
("SCHOENFLIES SYMBOL:", "schoenflies_symbol"),
]
start_str = "SPGR|"
def _parse_line(self, line):
sg_info = self.current_data.setdefault("spgr_info", {})
for pattern, key in self._pattern_mapping:
if pattern in line:
sg_info[key] = line.split()[-1]
if "sg_number" in sg_info:
sg_info["sg_number"] = int(sg_info["sg_number"])
[docs]
class SCFBlock(_BaseDataBlock):
"""SCF cycles data block."""
_pattern_mapping = [
("Number of electrons:", "nelectrons"),
("Number of occupied orbitals:", "nr_occ_orbitals"),
]
_all_keys = [
"nelectrons",
"nr_occ_orbitals",
"scf_converged",
"energy_scf",
"energy",
"energy_units",
] # , "nr_scf_steps"
start_str = "Number of electrons:"
end_str = ["ENERGY| Total FORCE_EVAL", "BSSE"]
def _parse_line(self, line):
for pattern, key in self._pattern_mapping:
if pattern in line:
self.current_data.setdefault(key, []).append(int(line.split()[-1]))
return None
if "SCF run converged in" in line:
self.current_data["nr_scf_steps"] = int(line.split()[-3])
self.current_data["scf_converged"] = True
elif "Leaving inner SCF loop after" in line:
self.current_data["nr_scf_steps"] = int(line.split()[-2])
self.current_data["scf_converged"] = False
elif "outer SCF loop converged in" in line:
self.current_data["nr_scf_steps"] = int(line.split()[-2])
self.current_data["scf_converged"] = True
elif "outer SCF loop FAILED to converge after" in line:
self.current_data["nr_scf_steps"] = int(line.split()[-2])
self.current_data["scf_converged"] = False
elif "Overlap energy of the core charge distribution:" in line:
self.current_data["energy_overlap_ccd"] = float(line.split()[-1])
elif "Self energy of the core charge distribution:" in line:
self.current_data["energy_self_ccd"] = float(line.split()[-1])
elif "Core Hamiltonian energy:" in line:
self.current_data["energy_core_hamiltonian"] = float(line.split()[-1])
elif "Hartree energy:" in line:
self.current_data["energy_hartree"] = float(line.split()[-1])
elif "Exchange-correlation energy:" in line:
self.current_data["energy_xc"] = float(line.split()[-1])
elif "Dispersion energy:" in line:
self.current_data["energy_dispersion"] = float(line.split()[-1])
elif "Total energy:" in line:
self.current_data["energy_scf"] = float(line.split()[-1])
elif "ENERGY| Total FORCE_EVAL" in line:
self.current_data["energy"] = float(line.split()[-1])
line = line.replace("]", "[")
self.current_data["energy_units"] = line.split("[")[1]
def _process_output(self):
for scf_step in self.all_data:
for _, key in self._pattern_mapping:
val = scf_step.get(key, [])
if len(val) == 1:
scf_step[key] = val[0]
output = {"scf_steps": self.all_data, "scf_indices": self.line_indices}
# Adds the last complete SCF-step here:
c0 = len(self.all_data) - 1
while c0 > -1:
if all(key in self.all_data[c0] for key in self._all_keys):
for key in self._all_keys:
output[key] = self.all_data[c0][key]
break
c0 -= 1
return output
[docs]
class MullikenBlock(_BaseDataBlock):
"""Mulliken charges data block."""
start_str = "Mulliken Population Analysis"
end_str = "# Total charge"
current_data_type = list
def _parse_line(self, line):
line = line.strip()
if line == "" or line.startswith("#") or line.startswith("Mulliken"):
return None
line_sp = line.split()
pc_dict = {
"kind": int(line_sp[2]),
"element": line_sp[1],
}
if len(line_sp) == 7:
pc_dict["population"] = [float(line_sp[3]), float(line_sp[4])]
pc_dict["charge"] = float(line_sp[5])
else:
pc_dict["population"] = float(line_sp[3])
pc_dict["charge"] = float(line_sp[4])
self.current_data.append(pc_dict)
def _process_output(self):
return {"mulliken": self.all_data}
[docs]
class HirshfeldBlock(_BaseDataBlock):
"""Hirshfeld charges data block."""
start_str = "Hirshfeld Charges"
end_str = "Total Charge"
current_data_type = list
def _parse_line(self, line):
line = line.strip()
if (
line == ""
or line.startswith("#")
or line.startswith("Hirshfeld")
or line.startswith("Total")
):
return None
line_sp = line.split()
pc_dict = {
"kind": int(line_sp[2]),
"element": line_sp[1],
}
if len(line_sp) == 8:
pc_dict["population"] = [float(line_sp[4]), float(line_sp[5])]
pc_dict["charge"] = float(line_sp[7])
else:
pc_dict["population"] = float(line_sp[4])
pc_dict["charge"] = float(line_sp[5])
self.current_data.append(pc_dict)
def _process_output(self):
return {"hirshfeld": self.all_data}
[docs]
class OptStepBlock(_BaseDataBlock):
"""Optimization step data block."""
start_str = "OPTIMIZATION STEP:"
end_str = "--------------------------"
current_data_type = list
def _parse_line(self, line):
pass
def _process_output(self):
if len(self.line_indices) > 0:
return {"motion_indices": self.line_indices}
[docs]
class OptSuccessBlock(_BaseDataBlock):
"""Opt success data block."""
start_str = "GEOMETRY OPTIMIZATION COMPLETED"
end_str = "***"
use_once = True
def _parse_line(self, line):
if self.start_str in line:
self.current_data["geo_converged"] = self.line_indices[-1]
[docs]
class MaxOptStepsBlock(_BaseDataBlock):
"""Opt max steps reched data block."""
start_str = "MAXIMUM NUMBER OF OPTIMIZATION STEPS REACHED"
end_str = "EXITING GEOMETRY OPTIMIZATION"
use_once = True
def _parse_line(self, line):
if self.start_str in line:
self.current_data["geo_not_converged"] = True
[docs]
class WalltimeBlock(_BaseDataBlock):
"""Walltime exceeded data block."""
start_str = "GEO run terminated - exceeded requested execution time:"
use_once = True
def _parse_line(self, line):
if self.start_str in line:
self.current_data["exceeded_walltime"] = True
[docs]
class BandsBlock(_BaseDataBlock):
"""Band structure data block."""
start_str = "Band Structure Calculation"
end_str = "ENERGY|"
def _parse_line(self, line):
line_sp = line.split()
if "KPOINTS|" in line:
if "Special point" in line:
if "not specifi" in line:
self.current_data.setdefault("labels", []).append(None)
else:
self.current_data.setdefault("labels", []).append(line_sp[4])
elif "Number of k-points in set" in line:
self.current_data.setdefault("label_pos", []).append(int(line_sp[-1]))
elif line.startswith("#"):
if line_sp[2] == "Energy":
self.current_data["bands_unit"] = line_sp[3][1:-1]
elif line_sp[4] == "1:":
self.current_data.setdefault("kpoints", []).append(
[float(line_sp[5]), float(line_sp[6]), float(line_sp[7])]
)
for idx in range(2):
self.current_data.setdefault("bands", [[], []])[idx].append([])
self.current_data.setdefault("occupations", [[], []])[idx].append([])
self.current_data["spin"] = 0
elif line_sp[4] == "2:":
self.current_data["spin"] = 1
elif len(line_sp) < 3:
return None
elif line_sp[0].isdigit():
self.current_data["bands"][self.current_data["spin"]][-1].append(float(line_sp[1]))
self.current_data["occupations"][self.current_data["spin"]][-1].append(
float(line_sp[2])
)
def _process_output(self):
if len(self.all_data) > 0 and len(self.all_data[0]) > 0:
kpt_data = self.all_data[0]
if kpt_data.pop("spin") == 0:
kpt_data["bands"] = kpt_data["bands"][0]
kpt_data["occupations"] = kpt_data["occupations"][0]
label_pos = kpt_data.pop("label_pos")
labels = kpt_data.pop("labels")
last_pos = -1
labels_w_pos = []
for idx, pos in enumerate(label_pos):
labels_w_pos.append([last_pos + 1, labels[idx * 2]])
last_pos += pos
labels_w_pos.append([last_pos, labels[idx * 2 + 1]])
kpt_data["labels"] = labels_w_pos
return {"kpoint_data": kpt_data}
[docs]
class EigenvaluesBlock(_BaseDataBlock):
"""Eigenvalues data block."""
start_str = "EIGENVALUES AND"
end_str = ["Fermi energy:", "E(Fermi):"]
def _parse_line(self, line):
line_sp = line.split()
if len(line_sp) < 3:
return None
if "EIGENVALUES" in line:
self.current_data["idx"] = int(line_sp[-1]) if "POINT" in line else 0
self.current_data["alpha"] = "ALPHA" in line
self.current_data["beta"] = "BETA" in line
elif line_sp[1].isdigit() or line_sp[0].isdigit():
energies = self.current_data.setdefault("energies", [])
occupations = self.current_data.setdefault("occupations", [])
if "MO|" in line:
energy = float(line_sp[-3])
else:
energy = float(line_sp[-2])
occ = float(line_sp[-1])
if occ >= 0.5:
self.current_data["vbm"] = energy
elif "cbm" not in self.current_data:
self.current_data["cbm"] = energy
energies.append(energy)
occupations.append(occ)
elif "E(Fermi):" in line or "Fermi energy:" in line:
self.current_data["fermi_energy"] = float(line_sp[2])
def _process_output(self):
ev_output = {"eigenvalues": []}
gap_keys = ["vbm", "cbm", "gap"]
gap_info = {key: [[], []] for key in gap_keys}
fermi_energy = None
for ev_data in self.all_data:
ev_data["gap"] = ev_data["cbm"] - ev_data["vbm"]
gap_info0 = {key: ev_data[key] for key in gap_keys}
del ev_data["vbm"]
del ev_data["cbm"]
idx = 0
if ev_data.pop("beta", False):
# Double-check that k-points are in right order:
if ev_data["idx"] != ev_output["eigenvalues"][-1]["idx"]:
raise ValueError("Wrong order of k-points.")
idx = 1
for key in ["energies", "occupations", "gap"]:
ev_output["eigenvalues"][-1][key].append(ev_data[key])
else:
if ev_data.pop("alpha", False):
for key in ["energies", "occupations", "gap"]:
ev_data[key] = [ev_data[key]]
ev_output["eigenvalues"].append(ev_data)
for key, val in gap_info0.items():
gap_info[key][idx].append(val)
fermi_energy = ev_data.pop("fermi_energy")
if len(gap_info["cbm"][0]) > 0:
if len(gap_info["vbm"][1]) > 0:
ev_output["gap"] = min(
[
max(min(gap_info["cbm"][idx]) - max(gap_info["vbm"][idx]), 0.0)
for idx in range(2)
]
)
ev_output["direct_gap"] = min(
[max(min(gap_info["gap"][idx]), 0.0) for idx in range(2)]
)
else:
ev_output["gap"] = max(min(gap_info["cbm"][0]) - max(gap_info["vbm"][0]), 0.0)
ev_output["direct_gap"] = max(min(gap_info["gap"][0]), 0.0)
if len(ev_output["eigenvalues"]) > 0:
return {"fermi_energy": fermi_energy, "eigenvalues_info": ev_output}
[docs]
class BSSECalculationBlock(_BaseDataBlock):
"""BSSE calculation block."""
start_str = "BSSE CALCULATION"
end_str = "-------------------------------------------------------------------------------"
def _parse_line(self, line):
line_sp = line.split()
if len(line_sp) < 3:
return None
elif self.start_str in line:
self.current_data["fragment_conf"] = line_sp[5]
self.current_data["fragment_subconf"] = line_sp[8]
elif "MULTIPLICITY" in line:
self.current_data["charge"] = int(line_sp[3])
self.current_data["multiplicity"] = int(line_sp[-2])
elif all(pattern not in line for pattern in ["----------", "ATOM INDEX"]):
self.current_data.setdefault("site_indices", []).append(int(line_sp[1]))
self.current_data.setdefault("site_labels", []).append(line_sp[2])
def _process_output(self):
if len(self.line_indices) > 0:
return {"bsse_indices": self.line_indices, "bsse_steps": self.all_data}
[docs]
class BSSEResultsBlock(_BaseDataBlock):
"""BSSE results block."""
start_str = "BSSE RESULTS"
end_str = "BSSE-free interaction energy:"
use_once = True
def _parse_line(self, line):
line_sp = line.split()
if len(line_sp) < 3 or self.start_str in line:
return None
if "CP-corrected Total energy:" in line:
self.current_data["energy"] = float(line_sp[-2])
elif "BSSE-free interaction energy:" in line:
self.current_data["interaction_energy"] = float(line_sp[-2])
else:
key = "_".join([v.lower() for v in line_sp[1:-2]])
self.current_data.setdefault(key, []).append(float(line_sp[-2]))
def _process_output(self):
if len(self.all_data) > 0:
self.all_data[-1]["bsse_last_index"] = self.line_indices[-1]
return self.all_data[-1]
[docs]
class RuntimeBlock(_BaseDataBlock):
"""Runtime data block."""
start_str = "T I M I N G"
end_str = "The number of warnings for this run is"
use_once = True
def _parse_line(self, line):
if "CP2K" in line:
self.current_data["runtime"] = float(line.split()[-1])
[docs]
class WarningBlock(_BaseDataBlock):
"""Warning data block."""
start_str = "*** WARNING"
def _parse_line(self, line):
line_sp = line.split()
if self.start_str in line:
file_name, line_number = line_sp[3].split(":")
self.current_data["file_name"] = file_name
self.current_data["line_number"] = line_number
self.current_data["message"] = " ".join(line_sp[5:-1])
elif len(line_sp) > 1:
self.current_data["message"] += " " + " ".join(line_sp[1:-1])
def _process_output(self):
output = {"warnings": self.all_data}
if len(self.all_data) > 0:
output["nwarnings"] = len(self.all_data)
return output
[docs]
class ErrorBlock(_BaseDataBlock):
"""Error data block."""
start_str = "[ABORT]"
end_str = "* / \\"
use_once = True
def _parse_line(self, line):
if self.start_str in line:
self.current_data["message"] = ""
elif self.end_str in line:
file_name, line_number = line.split()[3].split(":")
self.current_data["file_name"] = file_name
self.current_data["line_number"] = line_number
else:
message = " ".join(line[12:78].split())
if self.current_data["message"] != "" and len(message) > 0:
self.current_data["message"] += " "
self.current_data["message"] += message
def _process_output(self):
if len(self.all_data) > 0:
return {"errors": self.all_data, "aborted": True}
_BLOCKS = {
"standard": [
CP2KBlock,
GlobalBlock,
BrillouinBlock,
DFTBlock,
FunctionalBlock,
VdWBlock,
MDParBlock,
NumbersBlock,
SCFParametersBlock,
SPGRBlock,
RuntimeBlock,
SCFBlock,
StepInformationBlock,
MaxOptStepsBlock,
BandsBlock,
EigenvaluesBlock,
BSSECalculationBlock,
BSSEResultsBlock,
WalltimeBlock,
WarningBlock,
ErrorBlock,
],
"trajectory": [
CP2KBlock,
GlobalBlock,
BrillouinBlock,
DFTBlock,
FunctionalBlock,
VdWBlock,
MDParBlock,
NumbersBlock,
KindInfoBlock,
SCFParametersBlock,
SPGRBlock,
RuntimeBlock,
SCFBlock,
MullikenBlock,
HirshfeldBlock,
OptStepBlock,
StepInformationBlock,
OptSuccessBlock,
MaxOptStepsBlock,
MDIniBlock,
MDBlock,
WalltimeBlock,
WarningBlock,
ErrorBlock,
],
"partial_charges": [
CP2KBlock,
GlobalBlock,
BrillouinBlock,
DFTBlock,
FunctionalBlock,
VdWBlock,
MDParBlock,
NumbersBlock,
KindInfoBlock,
SCFParametersBlock,
SPGRBlock,
RuntimeBlock,
SCFBlock,
MullikenBlock,
HirshfeldBlock,
StepInformationBlock,
MaxOptStepsBlock,
WalltimeBlock,
WarningBlock,
ErrorBlock,
],
}
_WARNING_MAPPING = [
("Using a non-square number of", "Using a non-square number of MPI ranks."),
("SCF run NOT converged", "One or more SCF run did not converge."),
("Specific L-BFGS convergence criteria", "LBFGS converged with specific criteria."),
("Add more MOs for proper smearing", "Add more MOs for proper smearing."),
]
_ERROR_MAPPING = [
("exceeded requested execution time", "exceeded_walltime"),
("Use the LSD option for an odd number of electrons", "odd_nr_electrons"),
("Extra MOs (ADDED_MOS) are required for smearing", "need_added_mos"),
("exceeded requested execution time", "exceeded_walltime"),
("Cholesky decompose failed", "cholesky_decompose_failed"),
("Bad condition number R_COND", "bad_condition_number"),
]
_MOTION_STEP_MAPPING = {
"max._step_size": "max_step",
"maximum_step_size": "max_step",
"rms_step_size": "rms_step",
"max._gradient": "max_grad",
"maximum_gradient": "max_grad",
"rms_gradient": "rms_grad",
"internal_pressure_[bar]": "pressure",
"potential_energy": "potential_energy",
"kinetic_energy": "kinetic_energy",
"temperature": "temperature",
"energy_drift_p_atom": "energy_drift_p_atom",
"step_nr": "step_nr",
"time_fs": "time_fs",
}
_MOTION_SCF_KEYS = ["nr_scf_steps", "scf_converged", "energy"]
_BSSE_STEP_MAPPING = {
"fragment_conf": "fragment_conf",
"fragment_subconf": "fragment_subconf",
"charge": "charge",
"multiplicity": "multiplicity",
"site_indices": "site_indices",
"site_labels": "site_labels",
}
_BSSE_SCF_KEYS = [
"nr_scf_steps",
"scf_converged",
"energy_overlap_ccd",
"energy_self_ccd",
"energy_core_hamiltonian",
"energy_hartree",
"energy_xc",
"energy_dispersion",
"energy_scf",
]
def _create_step_info_dict(
steps: list,
step_indices: list,
step_mapping: list,
scf_steps: list,
scf_indices: list,
scf_keys: list,
pc: dict,
):
outp_list = []
c_scf_steps = 0
for step_idx, line_idx in enumerate(step_indices):
m_step = {"scf_steps": []}
if step_idx < len(steps):
for key, val in steps[step_idx].items():
if key in step_mapping:
m_step[step_mapping[key]] = val
while c_scf_steps < len(scf_steps):
if scf_indices[c_scf_steps] > line_idx:
break
scf_step = {}
for key in scf_keys:
if key in scf_steps[c_scf_steps]:
scf_step[key] = scf_steps[c_scf_steps][key]
for pc_type, values in pc.items():
if c_scf_steps < len(values):
scf_step[pc_type] = values[c_scf_steps]
m_step["scf_steps"].append(scf_step)
c_scf_steps += 1
if len(m_step["scf_steps"]) == 1:
m_step.update(m_step.pop("scf_steps")[0])
elif len(m_step["scf_steps"]) == 0:
del m_step["scf_steps"]
outp_list.append(m_step)
return outp_list
[docs]
def read_stdout(file_name: str, parser_type: str = "standard") -> dict:
"""
Read standard output file of CP2K.
Parameters
----------
file_name : str
Path to the output file.
parser_type : str
Defines the quantities that are being parsed. Supported options are ``'standard'``,
``'partial_charges'`` and ``'trajectory'``.
Returns
-------
dict
Dictionary containing the parsed values.
"""
output, n_lines = parse_block_function(file_name, _BLOCKS[parser_type])
output["cp2k_version"] = float(output["cp2k_version"])
if "exceeded_walltime" not in output:
output["exceeded_walltime"] = False
if "md_ensemble" in output:
output["run_type"] += "-" + output.pop("md_ensemble")
if "runtime" not in output:
output.pop("nwarnings", None)
output["energy_units"] = "a.u." if output["cp2k_version"] < 2025.0 else "hartree"
output["interrupted"] = True
warnings = []
for warn0 in output.get("warnings", []):
for warn1 in _WARNING_MAPPING:
if warn1[0] in warn0["message"]:
warnings.append(warn1[1])
output["warnings"] = warnings
errors = output.pop("errors", [])
for err0 in errors:
if "SCF run NOT converged." in err0["message"]:
output["scf_converged"] = False
for err1 in _ERROR_MAPPING:
if err1[0] in err0["message"]:
output[err1[1]] = True
kpoints = output.pop("kpoints", [])
if "eigenvalues_info" in output:
for ev in output["eigenvalues_info"]["eigenvalues"]:
if len(kpoints) > 0:
for idx, kpt in enumerate(kpoints):
if kpt["idx"] == ev["idx"]:
break
ev["weight"] = kpt["weight"]
ev["kpoint"] = kpt["kpoint"]
del kpoints[idx]
del ev["idx"]
motion_steps = output.pop("motion_steps", [])
motion_indices = output.pop("motion_indices", [])
if "geo_converged" in output:
motion_indices.append(output.pop("geo_converged"))
motion_indices.append(n_lines)
if "md_ini" in output:
md_ini = output.pop("md_ini")
motion_indices = [md_ini.pop("index")] + motion_indices[:-1]
motion_steps = [md_ini] + output.pop("md_steps", [])
scf_steps = output.pop("scf_steps", [])
scf_indices = output.pop("scf_indices", [])
pc = {}
for pc_type in ["mulliken", "hirshfeld"]:
if pc_type in output and len(output[pc_type]) > 0:
pc[pc_type] = output.pop(pc_type)
if parser_type == "partial_charges":
output[pc_type] = pc[pc_type][-1]
if output["run_type"] == "BSSE":
bsse_indices = output.pop("bsse_indices", [])[1:]
if "bsse_last_index" in output:
bsse_indices.append(output.pop("bsse_last_index"))
bsse_steps = output.pop("bsse_steps", [])
output["bsse_step_info"] = _create_step_info_dict(
bsse_steps,
bsse_indices,
_BSSE_STEP_MAPPING,
scf_steps,
scf_indices,
_BSSE_SCF_KEYS,
pc,
)
if parser_type == "trajectory":
output["motion_step_info"] = _create_step_info_dict(
motion_steps,
motion_indices,
_MOTION_STEP_MAPPING,
scf_steps,
scf_indices,
_MOTION_SCF_KEYS,
pc,
)
else:
output.pop("motion_step_info", None)
return output