Source code for aim2dat.aiida_workflows.cp2k.combined_work_chains

"""
Aiida workchains for cp2k to run advanced tasks combining different basic workchains.
"""

# Third party library imports
import aiida.orm as aiida_orm
from aiida.engine import WorkChain, ToContext, if_, while_
from aiida.plugins import CalculationFactory, WorkflowFactory, DataFactory

# Internal library imports
from aim2dat.aiida_workflows.cp2k.work_chain_specs import (
    structural_p_specs,
    numerical_p_specs,
)
from aim2dat.aiida_workflows.cp2k.auxiliary_functions import (
    return_work_chain_info,
)
from aim2dat.aiida_workflows.cp2k.surface_opt_utils import (
    surfopt_setup,
    surfopt_should_run_add_calc,
    surfopt_should_run_slab_conv,
)
from aim2dat.aiida_workflows.cp2k.el_properties_utils import elprop_setup

SurfaceData = DataFactory("aim2dat.surface")
StructureData = DataFactory("core.structure")
Cp2kCalculation = CalculationFactory("aim2dat.cp2k")
FindSCFParametersWC = WorkflowFactory("aim2dat.cp2k.find_scf_p")
CellOptWC = WorkflowFactory("aim2dat.cp2k.cell_opt")
GeoOptWC = WorkflowFactory("aim2dat.cp2k.geo_opt")
EigenvaluesWC = WorkflowFactory("aim2dat.cp2k.eigenvalues")
BandStructureWC = WorkflowFactory("aim2dat.cp2k.band_structure")
PDOSWC = WorkflowFactory("aim2dat.cp2k.pdos")
PartialChargesWC = WorkflowFactory("aim2dat.cp2k.partial_charges")


def _validate_bulk_reference(ref, _):
    """Validate bulk reference values."""
    return None


[docs]class SurfaceOptWorkChain(WorkChain): """ Work chain to converge the slab size and optimize the atomic positions of the converged slab. """ @classmethod def define(cls, spec): """Specify inputs and outputs.""" super().define(spec) spec = numerical_p_specs(spec, required=True) spec.input( "structural_p.surface", valid_type=SurfaceData, required=True, help="Vacuum at the bottom and top of the slab.", ) spec.input( "structural_p.periodic", valid_type=aiida_orm.Bool, required=True, help="Whether the cell direction normal to the surface plan is periodic or not.", ) spec.input( "structural_p.vacuum", valid_type=aiida_orm.Float, required=True, help="Vacuum at the bottom and top of the slab.", ) spec.input( "structural_p.vacuum_factor", valid_type=aiida_orm.Float, default=lambda: aiida_orm.Float(0.0), required=True, help="Vacuum at the bottom and top of the slab.", ) spec.input( "structural_p.minimum_slab_size", valid_type=aiida_orm.Float, default=lambda: aiida_orm.Float(15.0), required=True, help="Vacuum at the bottom and top of the slab.", ) spec.input( "structural_p.maximum_slab_size", valid_type=aiida_orm.Float, default=lambda: aiida_orm.Float(50.0), required=True, help="Vacuum at the bottom and top of the slab.", ) spec.expose_inputs(GeoOptWC, exclude="structural_p") spec.input_namespace("preopt_numerical_p", required=True, dynamic=False, help=".") spec.input( "preopt_numerical_p.basis_sets", valid_type=(aiida_orm.Str, aiida_orm.Dict), required=False, help="Basis sets used for the different species for the pre-optimization steps.", ) spec.input( "preopt_numerical_p.cutoff_values", valid_type=aiida_orm.Dict, required=False, help="Cut-off values for the grids for the pre-optimization steps.", ) spec.input( "preopt_numerical_p.basis_file", valid_type=DataFactory("core.singlefile"), help="File containing the used basis sets for the pre-optimization steps.", required=False, ) spec.input_namespace("preopt_optimization_p", required=True, dynamic=False, help=".") spec.input( "preopt_optimization_p.max_force", valid_type=aiida_orm.Float, required=False, help="Convergence criterion for the maximum force component.", ) spec.input( "preopt_optimization_p.rms_force", valid_type=aiida_orm.Float, required=False, help="Convergence criterion for the root mean square (RMS) force.", ) spec.input( "preopt_optimization_p.max_dr", valid_type=aiida_orm.Float, required=False, help="Convergence criterion for the maximum geometry change.", ) spec.input( "preopt_optimization_p.rms_dr", valid_type=aiida_orm.Float, required=False, help="Convergence criterion for the root mean square (RMS) geometry change.", ) spec.input_namespace("slab_conv", required=True, dynamic=False, help=".") spec.input( "slab_conv.criteria", valid_type=aiida_orm.Str, default=lambda: aiida_orm.Str("surface_energy"), required=True, help="Convergence criteria to choose the slab size.", ) spec.input( "slab_conv.threshold", valid_type=aiida_orm.Float, default=lambda: aiida_orm.Float(0.1), required=True, help="Convergence threshold for the surface or formation energy.", ) spec.input_namespace( "bulk_reference", validator=_validate_bulk_reference, required=True, dynamic=True, help="Reference total energies of bulk systems.", ) spec.output( "scf_parameters", valid_type=aiida_orm.Dict, required=True, help="Information on the SCF-Parameters that converge the Kohn-Sham equations.", ) spec.output( "primitive_slab", valid_type=StructureData, required=False, help="Primitive unrelaxed surface slab.", ) spec.output( "path_parameters", valid_type=aiida_orm.Dict, required=False, help="K-path parameters for the slab.", ) spec.output_namespace( "cp2k", required=True, dynamic=True, help="Output data of the last CP2K calculation.", ) spec.exit_code( 600, "ERROR_INPUT_WRONG_VALUE", message='Input parameter "{parameter}" contains an unsupported value.', ) spec.exit_code( 700, "ERROR_SCF_PARAMETERS", message="SCF-parameters could not be retrieved.", ) spec.exit_code( 701, "ERROR_GEO_OPT", message="Surface slab could not be converged.", ) spec.outline( cls.setup, while_(cls.should_run_slab_conv)( cls.find_scf_p, cls.inspect_find_scf_p_results, cls.geo_preopt, cls.inspect_geo_opt_results, cls.geo_opt, cls.inspect_geo_opt_results, ), if_(cls.should_run_add_calc)( cls.find_scf_p, cls.inspect_find_scf_p_results, cls.geo_preopt, cls.inspect_geo_opt_results, cls.geo_opt, cls.inspect_geo_opt_results, ), cls.post_processing, ) def setup(self): """Define initial parameters.""" exit_code = surfopt_setup(self.ctx, self.inputs) if exit_code is not None: return self.exit_codes.ERROR_INPUT_WRONG_VALUE.format(parameter="bulk_reference") def should_run_slab_conv(self): """ Check whether the convergence criteria is fulfilled and the slab size is not exceeding the maximum slab size. """ cond, reports = surfopt_should_run_slab_conv(self.ctx, self.inputs) for report in reports: self.report(report) return cond def should_run_add_calc(self): """ Check whether additional calculations are run after the slab size is converged. """ return surfopt_should_run_add_calc(self.ctx, self.inputs) def find_scf_p(self): """ Run the FindSCFParameters work chain. """ builder = FindSCFParametersWC.get_builder() for p_label in self.ctx.base_input_p: if p_label in self.inputs: builder[p_label] = self.inputs[p_label] builder.numerical_p = self.inputs.numerical_p builder.structural_p.structure = self.ctx.surface_slab if self.ctx.scf_p is not None: builder.structural_p.scf_parameters = self.ctx.scf_p running = self.submit(builder) self.report(f"Launching FindSCFParametersWorkChain <{running.pk}>.") return ToContext(find_scf_p=running) def geo_preopt(self): """ Run the GeoOpt work chain. """ builder = GeoOptWC.get_builder() for p_label in self.ctx.base_input_p: if p_label in self.inputs: builder[p_label] = self.inputs[p_label] builder.adjust_scf_parameters = self.inputs.adjust_scf_parameters builder.numerical_p = self.inputs.preopt_numerical_p builder.numerical_p.xc_functional = self.inputs.numerical_p.xc_functional builder.numerical_p.kpoints_ref_dist = self.inputs.numerical_p.kpoints_ref_dist if "pseudo_file" in self.inputs.numerical_p: builder.numerical_p.pseudo_file = self.inputs.numerical_p.pseudo_file builder.optimization_p = self.inputs.preopt_optimization_p builder.structural_p.structure = self.ctx.surface_slab if self.ctx.scf_p is not None: builder.structural_p.scf_parameters = self.ctx.scf_p if self.ctx.fix_atoms: p_dict = builder["cp2k"]["parameters"].get_dict() p_dict.update( { "MOTION": { "CONSTRAINT": {"FIXED_ATOMS": {"LIST": " ".join(self.ctx.fixed_atoms)}} } } ) builder["cp2k"]["parameters"] = aiida_orm.Dict(dict=p_dict) running = self.submit(builder) self.report(f"Launching GeoOptWorkChain <{running.pk}>.") return ToContext(geo_opt=running) def geo_opt(self): """ Run the GeoOpt work chain. """ builder = GeoOptWC.get_builder() for p_label in self.ctx.base_input_p: if p_label in self.inputs: builder[p_label] = self.inputs[p_label] builder.initial_opt_parameters = self.ctx.initial_opt_parameters builder.adjust_scf_parameters = self.inputs.adjust_scf_parameters builder.numerical_p = self.inputs.numerical_p builder.optimization_p = self.inputs.optimization_p builder.structural_p.structure = self.ctx.surface_slab if self.ctx.scf_p is not None: builder.structural_p.scf_parameters = self.ctx.scf_p if self.ctx.fix_atoms: p_dict = builder["cp2k"]["parameters"].get_dict() p_dict.update( { "MOTION": { "CONSTRAINT": {"FIXED_ATOMS": {"LIST": " ".join(self.ctx.fixed_atoms)}} } } ) builder["cp2k"]["parameters"] = aiida_orm.Dict(dict=p_dict) running = self.submit(builder) self.report(f"Launching GeoOptWorkChain <{running.pk}>.") return ToContext(geo_opt=running) def inspect_find_scf_p_results(self): """ Check if the previous work chain finished successful. """ if not self.ctx.find_scf_p.is_finished_ok: return self.exit_codes.ERROR_SCF_PARAMETERS self.ctx.scf_p = self.ctx.find_scf_p.outputs.scf_parameters self.ctx.parent_calc_folder = self.ctx.find_scf_p.outputs["cp2k"]["remote_folder"] def inspect_geo_opt_results(self): """ Check if the previous work chain finished successful. """ if "geo_opt" in self.ctx and not self.ctx.geo_opt.is_finished_ok: return self.exit_codes.ERROR_GEO_OPT self.ctx.parent_calc_folder = None self.ctx.surface_slab = self.ctx.geo_opt.outputs["cp2k"]["output_structure"] def post_processing(self): """ Define outputs. """ self.out("scf_parameters", self.ctx.geo_opt.outputs["scf_parameters"]) if "k_path_parameters" in self.ctx: self.out("path_parameters", self.ctx.k_path_parameters) if "prim_slab" in self.ctx: self.out("primitive_slab", self.ctx.prim_slab) for label, val in self.ctx.geo_opt.outputs["cp2k"].items(): self.out("cp2k." + label, val)
[docs]class ElectronicPropertiesWorkChain(WorkChain): """ Work chain to optimize the unit cell and calculate different electronic properties. """ @classmethod def define(cls, spec): """Specify inputs and outputs.""" super().define(spec) spec = structural_p_specs(spec) spec = numerical_p_specs(spec, required=True) spec.input_namespace("cp2k", required=True, help="Details about the used CP2K code.") spec.input( "cp2k.code", valid_type=DataFactory("core.code"), help="CP2K code to be used for the calculations.", ) spec.input( "cp2k.metadata", valid_type=aiida_orm.Dict, help="Forwards extra information to the calculation job (e.g. for the scheduler).", required=False, ) spec.input_namespace( "critic2", required=False, help="Details about the used critic2 code.", populate_defaults=False, ) spec.input( "critic2.code", valid_type=DataFactory("core.code"), help="Critic2-code to be used for the calculations.", required=False, ) spec.input( "critic2.metadata", valid_type=aiida_orm.Dict, help="Forwards extra information to the calculation job (e.g. for the scheduler).", required=False, ) spec.input_namespace( "chargemol", required=False, help="Details about the used chargemol code.", populate_defaults=False, ) spec.input( "chargemol.code", valid_type=DataFactory("core.code"), help="Chargemol-code to be used for the calculations.", required=False, ) spec.input( "chargemol.metadata", valid_type=aiida_orm.Dict, help="Forwards extra information to the calculation job (e.g. for the scheduler).", required=False, ) spec.input( "chargemol.path_atomic_densities", valid_type=aiida_orm.Str, required=False, help="Absolte path to the atomic densities needed for the chargemol calculation.", ) spec.input( "structural_p.system_character", valid_type=aiida_orm.Str, required=False, help="Electronic character of the system, possible options are 'metallic' or " "'insulator'. In case this parameter is set to 'metallic' ('insulator') electronic " "smearing is always (never) applied.", ) spec.input_namespace( "workflow", required=True, help="Parameters that define the workflow." ) spec.input( "workflow.run_cell_optimization", valid_type=aiida_orm.Bool, default=lambda: aiida_orm.Bool(True), help="Whether a unit cell optimization should be performed.", ) spec.input( "workflow.calc_band_structure", valid_type=aiida_orm.Bool, default=lambda: aiida_orm.Bool(False), help="Whether the band structure should be calculated.", ) spec.input( "workflow.calc_eigenvalues", valid_type=aiida_orm.Bool, default=lambda: aiida_orm.Bool(False), help="Whether the eigenvalues should be calculated.", ) spec.input( "workflow.calc_pdos", valid_type=aiida_orm.Bool, default=lambda: aiida_orm.Bool(False), help="Whether the projected density of states (pDOS) should be calculated.", ) spec.input( "workflow.calc_partial_charges", valid_type=aiida_orm.Bool, default=lambda: aiida_orm.Bool(False), help="Whether the partial charges should be calculated.", ) spec.input( "workflow.protocol", valid_type=aiida_orm.Str, default=lambda: aiida_orm.Str("cp2k-crystal-standard"), help="Protocol type to be used for the calculations. The default value is 'standard'.", ) spec.input( "workflow.custom_protocol", valid_type=aiida_orm.Dict, help="Use a custom protocol to set the parameters of the work chain.", required=False, ) spec.input( "clean_workdir", valid_type=aiida_orm.Bool, help="Clean woring directory after calculation.", required=False, ) spec.outline( cls.setup, cls.find_scf_parameters, if_(cls.should_run_cell_opt)(cls.dft_cell_opt), cls.electronic_structure, cls.post_processing, ) spec.output( "general_info", valid_type=aiida_orm.Dict, help="General properties.", ) spec.output( "optimized_structure", valid_type=DataFactory("core.structure"), help="Optimized unit cell of the crystal.", required=False, ) spec.output( "general_info", valid_type=aiida_orm.Dict, help="Information on the constrained space group.", required=False, ) spec.output( "eigenvalue_info", valid_type=aiida_orm.Dict, help="Contains all calculated eigenvalues for all k-points.", required=False, ) spec.output( "band_structure", valid_type=DataFactory("core.array.bands"), help="The electronic band structure of the crysal.", required=False, ) spec.output( "pdos", valid_type=DataFactory("core.array.xy"), help="The projected density of states of the crystal.", required=False, ) spec.output( "mulliken_populations", valid_type=aiida_orm.List, help="The Mulliken populations of the system.", required=False, ) spec.output( "hirshfeld_populations", valid_type=aiida_orm.List, help="The Hirshfeld populations of the system.", required=False, ) spec.output( "bader_populations", valid_type=aiida_orm.List, help="The Bader populations of the system.", required=False, ) spec.output( "ddec6_populations", valid_type=aiida_orm.List, help="The DDEC6 populations of the system.", required=False, ) spec.exit_code( 700, "ERROR_SCF_PARAMETERS", message="SCF-parameters could not be retrieved.", ) spec.exit_code( 701, "ERROR_CELL_OPT", message="Crystal structure could not be converged.", ) spec.exit_code( 702, "ERROR_BAND_STRUCTURE", message="Band structure could not be calculated.", ) spec.exit_code( 703, "ERROR_EIGENVALUES", message="Eigenvalues could not be calculated.", ) spec.exit_code( 704, "ERROR_PDOS", message="PDOS could not be calculated.", ) spec.exit_code( 705, "ERROR_PARTIAL_CHARGES", message="Partial charges could not be calculated.", ) def setup(self): """Set up calculation parameters.""" elprop_setup(self.ctx, self.inputs) def find_scf_parameters(self): """ Find mixing parameters that converge the Kohn-Sham equations. """ builder = FindSCFParametersWC.get_builder() for p_label, p_value in self.ctx.find_scf_parameters.items(): self.set_input_parameter(builder, p_label, p_value) builder.structural_p.structure = self.inputs.structural_p.structure if "system_character" in self.inputs.structural_p: builder.structural_p.system_character = self.inputs.structural_p.system_character running = self.submit(builder) self.report(f"Launching FindSCFMixingParametersWorkChain <{running.pk}>.") return ToContext(find_scf_p=running) def should_run_cell_opt(self): """Whether to run a cell optimization.""" return self.inputs.workflow.run_cell_optimization.value def dft_cell_opt(self): """ Perform the cell relaxation. """ if not self.ctx.find_scf_p.is_finished_ok: return self.exit_codes.ERROR_SCF_PARAMETERS builder = CellOptWC.get_builder() for p_label, p_value in self.ctx.unit_cell_opt.items(): self.set_input_parameter(builder, p_label, p_value) builder.structural_p.structure = self.ctx.find_scf_p.outputs.seekpath.primitive_structure builder.structural_p.scf_parameters = self.ctx.find_scf_p.outputs.scf_parameters running = self.submit(builder) self.report(f"Launching CellOptWorkChain <{running.pk}>.") return ToContext(optimized=running) def electronic_structure(self): """ Calculate the electronic properties of the crystal. """ if self.inputs.workflow.run_cell_optimization.value: if not self.ctx.optimized.is_finished_ok: return self.exit_codes.ERROR_CELL_OPT structure = self.ctx.optimized.outputs.cp2k["output_structure"] remote_folder = self.ctx.optimized.outputs.cp2k["remote_folder"] scf_parameters = self.ctx.optimized.outputs.scf_parameters else: if not self.ctx.find_scf_p.is_finished_ok: return self.exit_codes.ERROR_SCF_PARAMETERS structure = self.ctx.find_scf_p.outputs.seekpath.primitive_structure remote_folder = self.ctx.find_scf_p.outputs.cp2k["remote_folder"] scf_parameters = self.ctx.find_scf_p.outputs.scf_parameters if self.inputs.workflow.calc_band_structure.value: running_bs = self.run_el_prop_wc( "band_structure", BandStructureWC, structure, remote_folder, scf_parameters, [("path_parameters", self.ctx.find_scf_p.outputs.seekpath.path_parameters)], ) self.report(f"Launching BandStructureWorkChain <{running_bs.pk}>.") self.to_context(band_structure=running_bs) if self.inputs.workflow.calc_eigenvalues.value: running_ev = self.run_el_prop_wc( "eigenvalues", EigenvaluesWC, structure, remote_folder, scf_parameters, [] ) self.report(f"Launching EigenValuesWorkChain <{running_ev.pk}>.") self.to_context(eigenvalues=running_ev) if self.inputs.workflow.calc_pdos.value: running_pdos = self.run_el_prop_wc("pdos", PDOSWC, structure, None, scf_parameters, []) self.report(f"Launching PDOSWorkChain <{running_pdos.pk}>.") self.to_context(pdos=running_pdos) if self.inputs.workflow.calc_partial_charges.value: running_pc = self.run_el_prop_wc( "partial_charges", PartialChargesWC, structure, remote_folder, scf_parameters, [] ) self.report(f"Launching PartialChargesWorkChain <{running_pc.pk}>.") self.to_context(partial_charges=running_pc) def run_el_prop_wc( self, task_label, work_chain, structure, remote_folder, scf_parameters, extra_input ): """Run electronic properties calculation.""" builder = work_chain.get_builder() input_parameters = getattr(self.ctx, task_label) for p_label, p_value in input_parameters.items(): self.set_input_parameter(builder, p_label, p_value) builder.structural_p.structure = structure builder.structural_p.scf_parameters = scf_parameters if remote_folder is not None: builder.cp2k.parent_calc_folder = remote_folder for inp in extra_input: setattr(builder, inp[0], inp[1]) return self.submit(builder) def post_processing(self): """ Post-processing routine. """ if self.inputs.workflow.calc_band_structure and not self.ctx.band_structure.is_finished_ok: return self.exit_codes.ERROR_BAND_STRUCTURE if self.inputs.workflow.calc_eigenvalues and not self.ctx.eigenvalues.is_finished_ok: return self.exit_codes.ERROR_EIGENVALUES if self.inputs.workflow.calc_pdos and not self.ctx.pdos.is_finished_ok: return self.exit_codes.ERROR_PDOS if ( self.inputs.workflow.calc_partial_charges and not self.ctx.partial_charges.is_finished_ok ): return self.exit_codes.ERROR_PARTIAL_CHARGES # Create output # TO-DO include space group self.out( "general_info", return_work_chain_info( self.ctx.find_scf_p.outputs.cp2k["output_parameters"], self.ctx.find_scf_p.outputs.seekpath.primitive_structure, ), ) if self.inputs.workflow.run_cell_optimization.value: self.out("optimized_structure", self.ctx.optimized.outputs.cp2k["output_structure"]) if self.inputs.workflow.calc_band_structure.value: self.out("band_structure", self.ctx.band_structure.outputs.cp2k["output_bands"]) if self.inputs.workflow.calc_pdos.value: self.out("pdos", self.ctx.pdos.outputs.cp2k["output_pdos"]) if self.inputs.workflow.calc_eigenvalues.value: self.out("eigenvalue_info", self.ctx.eigenvalues.outputs.cp2k["output_eigenvalues"]) if self.inputs.workflow.calc_partial_charges.value: self.out( "mulliken_populations", self.ctx.partial_charges.outputs.cp2k["output_mulliken_populations"], ) self.out( "hirshfeld_populations", self.ctx.partial_charges.outputs.cp2k["output_hirshfeld_populations"], ) if "critic2" in self.inputs: self.out( "bader_populations", self.ctx.partial_charges.outputs.critic2["output_bader_populations"], ) if "chargemol" in self.inputs: self.out( "ddec6_populations", self.ctx.partial_charges.outputs.chargemol["output_ddec6_populations"], ) @staticmethod def set_input_parameter(work_chain_builder, input_key, value): """Set input parameter for a child work chain.""" input_setter = work_chain_builder input_path = input_key.split(".") for keyword in input_path[:-1]: input_setter = input_setter[keyword] input_setter[input_path[-1]] = value