Source code for dpgen.tools.auto_gen_param

#%%
import os
import argparse
import json
from collections import defaultdict
from itertools import tee

[docs]class System(object): current_num_of_system = 0 current_num_of_sub_systems = 0 @property def index_system(self): return self._index_system @index_system.setter def index_system(self,value): self._index_system = value
[docs] @classmethod def register_system(cls): cls.current_num_of_system+=1 return cls.current_num_of_system-1
[docs] @classmethod def register_sub_system(cls): cls.current_num_of_sub_systems+=1 return cls.current_num_of_sub_systems-1
def __init__(self, system_prefix=""): # print(files_list) # if sum(map_relations)>len(files_list): # raise RuntimeError( # "files_list not enough;sum(map_relations):%s>len(files_list):%s, %s" # % (sum(map_relations),len(files_list),files_list,)) self.index_system = self.register_system() self.sub_system_list = [] self.system_prefix = system_prefix self.current_idx2 = 0
[docs] def add_sub_system(self,idx2, files_list): idx1 = self.register_sub_system() idx2 = self.current_idx2 self.sub_system_list.append((idx1, self.index_system, idx2, files_list)) self.current_idx2 += 1
[docs] def get_sub_system(self): return self.sub_system_list
[docs]class Iteration(object): current_num_of_itearation = 0 current_num_of_sub_itearation = 0 @property def index_iteration(self): return self._index_iteration # pylint: disable=no-member @index_iteration.setter def index_iteration(self, value): self._index_sub_iteration = value
[docs] @classmethod def register_iteration(cls): cls.current_num_of_itearation+=1 return cls.current_num_of_itearation-1
[docs] @classmethod def register_sub_iteartion(cls): cls.current_num_of_sub_itearation +=1 return cls.current_num_of_sub_itearation-1
def __init__(self, temps, nsteps_list=[500, 500, 1000, 1000, 3000, 3000, 6000, 6000], sub_iteration_num=8, ensemble='npt', press=[1.0, 10.0, 100.0, 1000.0, 5000.0, 10000.0, 20000.0, 50000.0], trj_freq=10): if len(nsteps_list) != sub_iteration_num: raise RuntimeError(f'{nsteps_list}, {sub_iteration_num}; length does not match') self.temps = temps self.index_iteration = self.register_iteration() self.nsteps_list=nsteps_list self.sub_iteration_num=sub_iteration_num self.ensemble=ensemble self.press = press self.trj_freq = trj_freq
[docs] def gen_sub_iter(self, system_list): sub_iter_list = [] for idx2 in range(self.sub_iteration_num): iter_dict = {} iter_dict['_idx'] = self.register_sub_iteartion() iter_dict['ensemble'] = self.ensemble iter_dict['nsteps'] = self.nsteps_list[idx2] iter_dict['press'] = self.press iter_dict['sys_idx'] = [ii[0] for ii in system_list if ii[2]==idx2] iter_dict['temps'] = self.temps iter_dict['trj_freq'] = self.trj_freq sub_iter_list.append(iter_dict) return sub_iter_list
[docs]def default_map_generator(map_list=[1,1,2,2,2,4,4,4], data_list=None): num = 0 # if len(data_list) < sum(map_list): # raise RuntimeError(f'{data_list} < {map_list};not enough structure to expore, data_list_too_short!') if (data_list is None) and ( all(el%10==0 for el in map_list) ): for ii in map_list: yield [f"{jj:0<5}?" for jj in range(num, num+ii//10)] num+=(ii//10) elif data_list: for ii in map_list: yield [data_list[jj] for jj in range(num, num+ii)] num += ii raise RuntimeError(f"{map_list} length is not enough")
# while True: # yield [data_list[jj] for jj in range(num, num+ii)] # num += ii
[docs]def get_system_list(system_dict, map_list=[1,1,2,2,2,4,4,4], meta_iter_num=4, sub_iteration_num=8, map_iterator=None, file_name="POSCAR"): """ :type map_iterator: Iterable use to generate sys_configs :Exmaple [['000000', '000001',], ['00000[2-9]',], ['00001?', '000020',],] """ if sub_iteration_num != len(map_list): raise RuntimeError(f"{sub_iteration_num},{map_list};sub_iteration_num does not match the length of map_list") system_list = [] for system_prefix,data_list in system_dict.items(): if map_iterator is None: print('12', data_list) new_map_iterator = default_map_generator(map_list=map_list, data_list=data_list) else: origin_one, new_map_iterator = tee(map_iterator) # pylint: disable=unused-variable # tee means copy;new_map_generator will become a copy of map_iterator system = System(system_prefix) for idx2 in range(sub_iteration_num): files_list = [os.path.join(system_prefix, jj) for jj in next(new_map_iterator)] system.add_sub_system(idx2=idx2, files_list=files_list) system_list.extend(system.get_sub_system()) return system_list
[docs]def scan_files(scan_dir="./" ,file_name="POSCAR", min_allow_files_num=20): # will return # files_list=[] system_dict = defaultdict(list) for ii in os.walk(scan_dir): if file_name in ii[2]: system_prefix = os.path.dirname(ii[0]) system_suffix = os.path.basename(ii[0]) system_dict[system_prefix].append(os.path.join(system_suffix, file_name)) for k,v in list(system_dict.items()): if len(v) < min_allow_files_num: del system_dict[k] return system_dict
# def gen_
[docs]def default_temps_generator(melt_point, temps_intervel=0.1, num_temps=5): temps = [50, ] last_temp = 0 for ii in range(num_temps-1): # pylint: disable=unused-variable last_temp = last_temp + temps_intervel*melt_point temps.append(last_temp) yield temps while True: temps = [] for ii in range(num_temps): last_temp = last_temp + temps_intervel*melt_point temps.append(last_temp) yield temps
[docs]def get_model_devi_jobs(melt_point, system_list, nsteps_list=[500, 500, 1000, 1000, 3000, 3000, 6000, 6000], press=[1.0, 10.0, 100.0, 1000.0, 5000.0, 10000.0, 20000.0, 50000.0], meta_iter_num=4, sub_iteration_num=8, temps_iterator=None, ensemble="npt", trj_freq=10, temps_intervel=0.1, num_temps=5): if temps_iterator is None: temps_iterator = default_temps_generator(melt_point=melt_point, temps_intervel=temps_intervel, num_temps=num_temps) if len(nsteps_list) != sub_iteration_num: raise RuntimeError(f"{nsteps_list}, {sub_iteration_num};length do not match!") model_devi_jobs =[] for ii in range(meta_iter_num): # pylint: disable=unused-variable temps = next(temps_iterator) meta_iter = Iteration(temps=temps, nsteps_list=nsteps_list, sub_iteration_num=sub_iteration_num, ensemble=ensemble, press=press, trj_freq=trj_freq) model_devi_jobs.extend(meta_iter.gen_sub_iter(system_list)) return model_devi_jobs
[docs]def get_sys_configs(system_list): sys_configs=[[] for ii in system_list] for t in system_list: sys_configs[t[0]]=t[3] return sys_configs
[docs]def get_init_data_sys(scan_dir='./', init_file_name='type.raw'): init_data_sys = [] for t in os.walk(scan_dir): if init_file_name in t[2]: init_data_sys.append(t[0]) else: pass return init_data_sys
[docs]def get_basic_param_json(melt_point, out_param_filename='param_basic.json', scan_dir="./", file_name='POSCAR', init_file_name='type.raw', min_allow_files_num=16, map_list=[1,1,2,2,2,4,4,4], meta_iter_num=4, sub_iteration_num=8, map_iterator=None, nsteps_list=[500, 500, 1000, 1000, 3000, 3000, 6000, 6000], press=[1.0, 10.0, 100.0, 1000.0, 5000.0, 10000.0, 20000.0, 50000.0], temps_iterator=None, ensemble="npt", trj_freq=10, temps_intervel=0.1, num_temps=5,): init_data_sys = get_init_data_sys(scan_dir=scan_dir, init_file_name=init_file_name) print(f"length of init_data_sys: {len(init_data_sys)} {init_data_sys}") system_dict = scan_files(scan_dir, file_name, min_allow_files_num) print(f"num of different systems: {len(system_dict)}") system_list =get_system_list(system_dict, map_list=map_list, meta_iter_num=meta_iter_num, sub_iteration_num=sub_iteration_num, map_iterator=map_iterator, file_name=file_name) sys_configs = get_sys_configs(system_list) print(f"length of sys_configs: {len(sys_configs)}") model_devi_jobs = get_model_devi_jobs(melt_point=melt_point, system_list=system_list, nsteps_list=nsteps_list, press=press, meta_iter_num=meta_iter_num, sub_iteration_num=sub_iteration_num, temps_iterator=temps_iterator, ensemble=ensemble, trj_freq=trj_freq, temps_intervel=temps_intervel, num_temps=num_temps) param_dict={ 'init_data_sys': init_data_sys, 'sys_configs':sys_configs, 'model_devi_jobs':model_devi_jobs } with open(out_param_filename, 'w') as p: json.dump(param_dict, p, indent=4) return param_dict
def _main(): parser = argparse.ArgumentParser(description='Collect data from inputs and generate basic param.json') parser.add_argument("melt_point", type=float, help="melt_point") # parser.addparser.add_argument("JOB_DIR", type=str, help="the directory of the DP-GEN job") args = parser.parse_args() get_basic_param_json(melt_point=args.melt_point) if __name__=='__main__': _main()
[docs]def auto_gen_param(args): if args.PARAM: with open(args.PARAM) as p: j = json.load(p) melt_point = j['melt_point'] print('param_basic.json', get_basic_param_json(melt_point=melt_point)) else: raise RuntimeError('must provide melt point or PARAM')
#%%