import numpy as np
from .chemistry_constants import tevk, tiny, mh
import types
import os
import sympy
import h5py
import docutils.utils.roman as roman
from .periodic_table import \
periodic_table_by_name, \
periodic_table_by_number
from .mixin import ComparableMixin
import re
try:
import ChiantiPy.core as ch
chu = 1 # temporary to avoid the check
#import ChiantiPy.util as chu
except (ImportError, KeyError):
ch = None
#chu = None
reaction_registry = {}
cooling_registry = {}
species_registry = {}
[docs]def registry_setup(func):
"""a decorator function register unseen species,
cooling and reactions into the registry
"""
def _wfunc(*args, **kwargs):
old_names = [set(d.keys()) for d in (species_registry,
cooling_registry,
reaction_registry)]
func(*args, **kwargs)
nn = []
for on, r in zip(old_names, (species_registry,
cooling_registry,
reaction_registry)):
nn.append(set(r.keys()).difference(on))
return nn
return _wfunc
def ensure_reaction(r):
if isinstance(r, Reaction): return r
return reaction_registry[r]
def ensure_cooling(c):
if isinstance(c, CoolingAction): return c
return cooling_registry[c]
def ensure_species(s):
if isinstance(s, Species): return s
return species_registry[s]
count_m = sympy.Symbol('m', integer=True)
index_i = sympy.Idx('i', count_m)
[docs]class ReactionCoefficient(sympy.Symbol):
#TODO: the functional derivatives
# sympy.diff() of the sum/ mul of the combination of `ReacionCoefficient` class
# gives zeros....
energy = sympy.simplify("ge")
def _eval_derivative(self, s):
if s == self.energy:
return sympy.Symbol("r%s" % self)
else:
return super(ReactionCoefficient, self)._eval_derivative(s)
def _eval_derivative_n_times(self, s, n):
if n == 1:
return self._eval_derivative(s)
else:
return 0
def diff(self, s):
if s == self.energy:
return sympy.Symbol("r%s" %self)
else:
return super(ReactionCoefficient, self).diff(s)
@property
def free_symbols(self):
return super().free_symbols.union(set([self.energy]))
class Reaction(ComparableMixin):
def __init__(self, name, coeff_fn, left_side, right_side):
self.name = name
self.coeff_fn = coeff_fn
#self.coeff_sym = sympy.IndexedBase(name, (count_m,))
self.coeff_sym = ReactionCoefficient("%s[i]" % name)
self.left_side = left_side
self.right_side = right_side
self.considered = set( (s.name for n, s in left_side + right_side) )
reaction_registry[name] = self # Register myself
def __contains__(self, c):
c = ensure_species(c)
return c in self.considered
@property
def down_species(self):
return [s for n, s in self.left_side]
@property
def up_species(self):
return [s for n, s in self.right_side]
@property
def species(self):
return set(self.down_species + self.up_species)
def net_change(self, sname):
up = sum( n for n, s in self.right_side if s.name == sname)
down = sum( n for n, s in self.left_side if s.name == sname)
if up == down:
return up - down
else:
return up - down
@property
def nbody_reaction(self):
# this is handy function for calculating
# number of species involved in the reaction
# this is used primarily in counting the
# number of scale factors a^3 needed to
# `calibrate`the reaction rates
return sum(n for n, s in self.left_side)
def __call__(self, quantities, up_derivatives, down_derivatives):
# We just calculate our net derivatives and stick them in the right
# place
r = self.rate(quantities)
for n, s in self.left_side:
r *= s.number_density(quantities)**n
for n, s in self.left_side:
down_derivatives[s.name] += r * n * s.weight
for n, s in self.right_side:
up_derivatives[s.name] += r * n * s.weight
return r
def _cmpkey(self):
return repr(self)
def __repr__(self):
a = "%s : " % self.name \
+ " + ".join( ["%s*%s" % (i, s.name) for i, s in self.left_side] ) \
+ " => " \
+ " + ".join( ["%s*%s" % (i, s.name) for i, s in self.right_side] )
return a
@classmethod
def create_reaction(cls, name, left_side, right_side):
"""Initialize `Reaction`,
the chemical reactions between species
Parameters
----------
name: str
name of the reaction
left_side:
left hand side of the chemical reaction
right_side:
right hand side of the chemical reaction
f:
a function that takes state,
and returns the reaction coefficient
"""
def _w(f):
rxn = cls(name, f, left_side, right_side)
return _w
def species_equation(self, species):
if isinstance(species, str):
species = species_registry[species]
elif isinstance(species, (sympy.IndexedBase, sympy.Symbol)):
species = species_registry[str(species)]
if species not in self.species: return 0
nr = self.net_change(species.name)
return nr * self.lhs_equation
@property
def lhs_equation(self):
#eq = self.coeff_sym[index_i]
eq = self.coeff_sym
for i, s in self.left_side:
for ii in range(i):
#eq *= s.symbol[index_i]
eq *= s.symbol
# a little hacky here to unfold
# the algebraic power
# i.e. H_1**3 -> H_1*H_1*H_1
#eq = eq.replace(
# lambda x: x.is_Pow and x.exp > 0,
# lambda x: sympy.Symbol('*'.join([x.base.name]*x.exp)) )
return eq
reaction = Reaction.create_reaction
def chianti_rate(atom_name, sm1, s, sp1):
if ch is None: raise ImportError
ion_name = s.name.lower()
if "_" not in ion_name:
print ("Name must be in ChiantiPy format.")
raise RuntimeError
de = species_registry['de']
new_rates = []
def ion_rate(network):
ion = ch.ion(ion_name, temperature = network.T)
try:
ion.ionizRate()
except AttributeError:
print(f"{ion_name} is not defined in ChiantiPy master list")
print(f"manually adding temperature to {ion_name}_ion")
ion.Temperature = network.T
ion.NTempDens = len(network.T)
ion.ionizRate()
vals = ion.IonizRate['rate']
return vals
if sp1 is not None:
Reaction("%s_i" % s.name, ion_rate,
[(1, s), (1, de)], # left side
[(1, sp1), (2, de)]) # right side
new_rates.append("%s_i" % s.name)
def rec_rate(network):
ion = ch.ion(ion_name, temperature = network.T)
# for some reason, the latest chiantipy
# failed to update the tempeature for fully ionized
ion.Temperature = network.T
ion.recombRate()
vals = ion.RecombRate['rate']
return vals
if sm1 is not None:
Reaction("%s_r" % s.name, rec_rate,
[(1, s), (1, de)], # left side
[(1, sm1), ]) # right side
new_rates.append("%s_r" % s.name)
return new_rates
def ion_photoionization_rate(species, photo_background='HM12'):
if chu is None: raise ImportError
ion_name = species.name.lower()
#ion_name = chu.zion2name(np.int(species.number),
# np.int(species.free_electrons + 1))
if "_" not in ion_name:
print ("Name must be in 'Ion Species' format.")
raise RuntimeError
element_name = ion_name.split("_")[0]
ion_state = int(ion_name.split("_")[1])
species_pi = "%s_%s" % (element_name.capitalize(), ion_state + 1)
de = species_registry['de']
new_rates = []
def photoionization_rate(network):
# Read in photoheating rates generated from Ben Oppenheimer's data
# (from: http://noneq.strw.leidenuniv.nl/)
# and do linear interpolation and then recompute
# the ends with either an extrapolation or falloff
# NOTE: these rates do the interpolation as a function fo redshift
fn = os.path.join(os.path.dirname(__file__),
'..', 'input', 'photoionization',
'%s_ion_by_ion_photoionization_%s.h5' %(element_name, photo_background))
f = h5py.File(fn,'r')
#f = h5py.File('../input/photoionization/%s_ion_by_ion_photoionization_%s.h5'
# %(element_name, photo_background))
#print('../input/photoionization/%s_ion_by_ion_photoionization_%s.h5'
# %(element_name, photo_background))
### Intepolate values within table values ###
vals = np.interp(network.z, f['z'], f['%s' %(ion_name)])
end_method = 0 # 0 = extrapolation, 1 = gaussian falloff
if end_method == 0:
### Extrapolation in logspace ###
# convert to log space
vals = np.log10(vals)
logz = np.log10(network.z)
logdataz = np.log10(f['z'])
logdataS = np.log10(f['%s' %(ion_name)])
# extrapolate
extrapdown = logdataS[0] + \
(logz - logdataz[0]) * (logdataS[0] - logdataS[1]) \
/ (logdataz[0] - logdataz[1])
vals[logz < logdataz[0]] = extrapdown[logz < logdataz[0]]
extrapup = logdataS[-1] + \
(logz - logdataz[-1]) * (logdataS[-1] - logdataS[-2]) \
/ (logdataz[-1] - logdataz[-2])
vals[logz > logdataz[-1]] = extrapup[logz > logdataz[-1]]
# convert back to linear
vals = 10.0**vals
if end_method == 1:
### Gaussian falloff when values extend beyond table values ###
# rename some variables to symplify code
z = network.z
dataz = f['z']
dataS = f['%s' %(ion_name)]
# compute gaussian tails
gaussdown = dataS[0] * (tiny/dataS[0])**(((z - dataz[0])/(z[0] - dataz[0])))**2
vals[z < dataz[0]] = gaussdown[z < dataz[0]]
gaussup = dataS[-1] * (tiny/dataS[-1])**(((z - dataz[-1])/(z[-1] - dataz[-1])))**2
vals[z > dataz[-1]] = gaussup[z > dataz[-1]]
f.close()
return vals
if species_pi in species_registry:
species_pi = species_registry[species_pi]
Reaction("%s_pi" % species.name, photoionization_rate,
[(1, species), ], # left side
[(1, species_pi), (1, de)]) # right side
new_rates.append("%s_pi" % species.name)
return new_rates
class Species(ComparableMixin):
def __init__(self, name, weight, pretty_name = None):
self.pretty_name = pretty_name or name
self.weight = weight
self.name = name
self.symbol = sympy.Symbol("%s" % name)
species_registry[name] = self
def _cmpkey(self):
return repr(self)
def __repr__(self):
return "Species: %s" % (self.name)
[docs]class ChemicalSpecies(Species):
"""initialize chemical species object
Parameters
----------
name: str
name of the chemical species
weight: float
the weight of the species in terms of atomic mass unit
free_electrons: int
the number of free electrons of the species
"""
def __init__(self, name, weight, free_electrons = 0.0,
pretty_name = None):
self.weight = weight
self.free_electrons = free_electrons
#self.elements = {}
super(ChemicalSpecies, self).__init__(name, weight, pretty_name)
def number_density(self, quantities):
if self.weight == 0:
return quantities[self.name]
return quantities[self.name]/self.weight
def add_to_dict(self, e, v):
if e in self._edict:
self._edict[e] += v
else:
self._edict[e] = v
[docs] def find_constituent(self):
"""find elements based on its name
return: total weight
"""
self._edict = {}
name = self.original_name
split = re.findall("\d+|\D+[a-z]|\D", name)
for s in split:
if s.isalpha():
_ele = s
if s.isnumeric():
self.add_to_dict(_ele, int(s)-1)
else:
self.add_to_dict(_ele, 1)
self.elements = self._edict
def get_weight(self):
w = 0
for s, n in self.elements.items():
w += periodic_table_by_name[s][1]*n
self._weight = w
[docs]class AtomicSpecies(ChemicalSpecies):
"""Initialize the atomic species
Parameters
----------
atom_name: str
name of the atom
free_electrons: int
number of free electron
Note
----
since it is an atom, the weight is taken directly from the periodic table.
"""
def __init__(self, atom_name, free_electrons):
num, weight, pn = periodic_table_by_name[atom_name]
if free_electrons < 0:
name = "%s_m%i" % (atom_name, np.abs(free_electrons + 1))
else:
name = "%s_%01i" % (atom_name, free_electrons + 1)
pretty_name = "%s with %s free electrons" % (
pn, free_electrons)
# update the self.elements based on the input name
self.original_name = atom_name
self.find_constituent ()
super(AtomicSpecies, self).__init__(name, weight,
free_electrons, pretty_name)
[docs]class MolecularSpecies(ChemicalSpecies):
"""Initialize molecular species
Parameters
----------
molecule_name: str
the name of the molecules
weight: float
weight of the molecule in amu
free_electrons: int
free electron in molecules
"""
def __init__(self, molecule_name, weight, free_electrons,
original_name = None):
name = "%s_%i" % (molecule_name, free_electrons + 1)
pretty_name = "%s with %s free electrons" % (
name, free_electrons)
self.original_name = original_name or molecule_name
# update the self.elements based on the input name
self.find_constituent()
super(MolecularSpecies, self).__init__(name, weight,
free_electrons, pretty_name)
class CoolingAction(object):
_eq = None
def __init__(self, name, equation):
self.name = name
self._equation = equation
self.tables = {}
self.temporaries = {}
self.table_symbols = {}
self.temp_symbols = {}
cooling_registry[name] = self # Register myself
# so that we can sort the coolingaction
def __gt__(self, other):
return self.name > other.name
@property
def equation(self):
if self._eq is not None: return self._eq
symbols = dict((n, s.symbol) for n, s in species_registry.items())
#ta_sym = dict((n, sympy.IndexedBase(n, (count_m,))) for n in self.tables))
# instead of using sympy symbols, we declare
ta_sym = dict((n, ReactionCoefficient("%s_%s[i]" % (self.name, n))) for n in self.tables)
self.table_symbols.update(ta_sym)
#tp_sym = dict((n, sympy.IndexedBase(n, (count_m,))) for n in self.temporaries))
# temporaries are in fact function of of the tables...
tp_sym = dict((n, sympy.Symbol("%s" % (n))) for n in self.temporaries)
self.temp_symbols.update(tp_sym)
for n, s in species_registry.items():
try:
name, Ilevel = n.split('_')
sp_name = name + eval(Ilevel)*'I'
temp_dict = {sp_name: s.symbol}
symbols.update(temp_dict)
except:
pass
symbols.update(self.table_symbols)
symbols.update(self.temp_symbols)
self._eq = eval(self._equation, symbols)
for n, e in self.temporaries.items():
e = eval(e, symbols)
e = sympy.sympify(e)
for n2, e2 in ta_sym.items():
e = e.subs(n2, e2)
self._eq = self._eq.subs(n, e)
return self._eq
@property
def species(self):
# self.equation
bad = set(self.temp_symbols.values()).update( set(self.table_symbols.values()) )
species = set([])
#for s in self.equation.atoms(sympy.IndexedBase):
for s in self.equation.atoms(sympy.Symbol):
bad = set(self.temp_symbols.values()).update( set(self.table_symbols.values()) )
if bad != None:
if s not in bad:
species.add(species_registry[str(s).replace("[i]","")])
return species
def table(self, func):
self.tables[func.__name__] = func
def temporary(self, name, eq):
self.temporaries[name] = eq
@classmethod
def create_cooling_action(cls, name, equation):
obj = cls(name, equation)
def _W(f):
f(obj)
return _W
cooling_action = CoolingAction.create_cooling_action
def ion_cooling_rate(species, atom_name):
species_c = species.name
ion_name = species.name.lower()
de = species_registry['de']
new_rates = []
def cooling_rate(network):
# Read in cooling rates from Gnat & Ferland 2012
# and do linear interpolation and then recompute
# the ends with either an extrapolation or falloff
fn = os.path.join(os.path.dirname(__file__),
'..', 'input', 'cooling',
'%s_ion_by_ion_cooling.h5' % atom_name.lower())
data = h5py.File(fn, 'r')
### Intepolate values within table values ###
vals = np.interp(network.T, data['T'], data['%s' %(ion_name)])
end_method = 1 # 0 = extrapolation, 1 = gaussian falloff
if end_method == 0:
### Extrapolation in logspace ###
# convert to log space
vals = np.log10(vals)
logT = np.log10(network.T)
logdataT = np.log10(data['T'])
logdataS = np.log10(data['%s' %(ion_name)])
# extrapolate
extrapdown = logdataS[0] + \
(logT - logdataT[0]) * (logdataS[0] - logdataS[1]) \
/ (logdataT[0] - logdataT[1])
vals[logT < logdataT[0]] = extrapdown[logT < logdataT[0]]
extrapup = logdataS[-1] + \
(logT - logdataT[-1]) * (logdataS[-1] - logdataS[-2]) \
/ (logdataT[-1] - logdataT[-2])
vals[logT > logdataT[-1]] = extrapup[logT > logdataT[-1]]
# convert back to linear
vals = 10.0**vals
if end_method == 1:
### Gaussian falloff when values extend beyond table values ###
# rename some variables to symplify code
T = network.T
dataT = data['T']
dataS = data['%s' %(ion_name)]
# compute gaussian tails
gaussdown = dataS[0] * (tiny/dataS[0])**(((T - dataT[0])/(T[0] - dataT[0])))**2
vals[T < dataT[0]] = gaussdown[T < dataT[0]]
gaussup = dataS[-1] * (tiny/dataS[-1])**(((T - dataT[-1])/(T[-1] - dataT[-1])))**2
vals[T > dataT[-1]] = gaussup[T > dataT[-1]]
data.close()
return vals
ion_cooling_action = CoolingAction("%s_c" % species.name, #name
"-%s_c * %s * de" %(species.name, species.name)) #equation
ion_cooling_action.tables["%s_c" % species.name] = cooling_rate
new_rates.append("%s_c" % species.name)
return new_rates
def ion_photoheating_rate(species, photo_background='HM12'):
if chu is None: raise ImportError
#ion_name = chu.zion2name(np.int(species.number),
# np.int(species.free_electrons + 1))
ion_name = species.name.lower()
if "_" not in ion_name:
print ("Name must be in 'Ion Species' format.")
raise RuntimeError
element_name = ion_name.split("_")[0]
ion_state = int(ion_name.split("_")[1])
species_ph = species.name
de = species_registry['de']
new_rates = []
def photoheating_rate(network):
# Read in photoheating rates generated from Ben Oppenheimer's data
# (from: http://noneq.strw.leidenuniv.nl/)
# and do linear interpolation and then recompute
# the ends with either an extrapolation or falloff
# NOTE: these rates do the interpolation as a function fo redshift
fn = os.path.join(os.path.dirname(__file__),
'..', 'input', 'photoheating',
'%s_ion_by_ion_photoheating_%s.h5' %(element_name, photo_background))
f = h5py.File(fn,'r')
#f = h5py.File('../input/photoheating/%s_ion_by_ion_photoheating_%s.h5' %(element_name,
# photo_background))
### Intepolate values within table values ###
vals = np.interp(network.z, f['z'], f['%s' %(ion_name)])
end_method = 0 # 0 = extrapolation, 1 = gaussian falloff
if end_method == 0:
### Extrapolation in logspace ###
# convert to log space
vals = np.log10(vals)
logz = np.log10(network.z)
logdataz = np.log10(f['z'])
logdataS = np.log10(f['%s' %(ion_name)])
# extrapolate
extrapdown = logdataS[0] + \
(logz - logdataz[0]) * (logdataS[0] - logdataS[1]) \
/ (logdataz[0] - logdataz[1])
vals[logz < logdataz[0]] = extrapdown[logz < logdataz[0]]
extrapup = logdataS[-1] + \
(logz - logdataz[-1]) * (logdataS[-1] - logdataS[-2]) \
/ (logdataz[-1] - logdataz[-2])
vals[logz > logdataz[-1]] = extrapup[logz > logdataz[-1]]
# convert back to linear
vals = 10.0**vals
if end_method == 1:
### Gaussian falloff when values extend beyond table values ###
# rename some variables to symplify code
z = network.z
dataz = f['z']
dataS = f['%s' %(ion_name)]
# compute gaussian tails
gaussdown = dataS[0] * (tiny/dataS[0])**(((z - dataz[0])/(z[0] - dataz[0])))**2
vals[z < dataz[0]] = gaussdown[z < dataz[0]]
gaussup = dataS[-1] * (tiny/dataS[-1])**(((z - dataz[-1])/(z[-1] - dataz[-1])))**2
vals[z > dataz[-1]] = gaussup[z > dataz[-1]]
f.close()
return vals
if species_ph in species_registry:
species_ph = species_registry[species_ph]
ion_cooling_action = CoolingAction("%s_ph" % species.name, #name
"%s_ph * %s" %(species.name, species.name)) #equation
ion_cooling_action.tables["%s_ph" % species.name] = photoheating_rate
new_rates.append("%s_ph" % species.name)
return new_rates