AEL code Front Arena -Risk
Submitted by admin on Wed, 06/10/2009 - 23:00
import ael
from os import environ
from os.path import exists
from string import split,lower
import tempfile
from os import remove
import Numeric
import COMexport
####################################################################
#
####################################################################
class VaR:
def __init__(self,value):
self.ael_entity = None
if str(type(value)) == "<type 'ael_entity'>":
if 'Trade' in split(str(value),' '):
self.ael_entity = value
elif 'TradeFilter' in split(str(value),' '):
self.ael_entity = value
elif 'Instrument' in split(str(value),' '):
self.ael_entity = value
else:
print "Wrong type, expected a Trade, TradeFilter or Instrument"
else:
print "Unknown type, expected a ael_entity (Trade, TradeFilter or Instrument)"
def value_at_risk(self):
'''
Value At Risk (Normal at not log normal assumption)
uses value_at_risk from intas,ael
An example how you could include all functions which are already
implemented in AEL.
'''
if self.ael_entity == None:
return 0.0
return self.ael_entity.value_at_risk()
#-------------------------------------------------------------------
#-------------------------------------------------------------------
def monte_carlo(self,scenario_name,NBR,percentile,target_name=None):
'''
Monte Carlo simulation, NBR of scenarios from scenario_name.
'''
try:
percentile = float(percentile)
NBR = int(NBR)
if NBR < 1:
print "Expected a positive integer in NBR"
return 0.0
if not (0 < percentile <= 1):
print "Expected a percentile s.a 0 < percentile <= 1"
return 0.0
except ValueError:
print "Expected a integer in NBR and float in percentile"
return 0.0
if self.ael_entity == None:
return 0.0
if environ.has_key('FCS_DIR_RISK'):
if not exists(environ['FCS_DIR_RISK']+"/"+scenario_name):
print "Could not find file in FCS_DIR_RISK"
return 0.0
else:
if not exists(scenario_name):
print "Could not find file in current directory"
return 0.0
if target_name == None:
monte_carlo_list = range(0,NBR,1)
for i in monte_carlo_list:
monte_carlo_list[i] = self.ael_entity.rf_scenario(scenario_name,i+1)
monte_carlo_list.sort()
if NBR*(1-percentile) < 1:
return -monte_carlo_list[0]
else:
return -monte_carlo_list[int(NBR*(1-percentile)-1)]
else:
try:
tar = open(target_name,'w')
except:
print "Could not open target file named",target_name
return 0.0
monte_carlo_list = range(0,NBR,1)
for i in monte_carlo_list:
monte_carlo_list[i] = self.ael_entity.rf_scenario(scenario_name,i+1)
tar.write(str(i+1)+"\t"+str(monte_carlo_list[i])+"\n")
print 'Processing scenario: ', i
tar.close()
monte_carlo_list.sort()
if NBR*(1-percentile) < 1:
return -monte_carlo_list[0]
else:
return -monte_carlo_list[int(NBR*(1-percentile)-1)]
def reduce(self,source_file_name, target_name, rfspecid):
"""Produces a new reduced scenario name.
All rows in the source file where rf_delta for that risk factor
with portfolio fltid are equal to zero are reduced from the
target file"""
if self.ael_entity == None:
return 0.0
if not exists(source_file_name):
print 'Could not find scenario file'
return 0
if ael.RiskFactorSpecHeader[rfspecid] == None:
print "Could not find RiskFactorSpecHeader named",rfspecid
rf_dict = {}
for i in ael.RiskFactorSpec:
if i.rfspechdrnbr == ael.RiskFactorSpecHeader[rfspecid]:
rf_dict[i.extern_id] = i
source = open(source_file_name, 'r')
target = open(target_name, 'w')
row = source.readline()
while not row == '':
extern_id = split(row, '\t')[0]
if rf_dict.has_key(extern_id):
if not self.ael_entity.rf_delta(None, None, rf_dict[extern_id]) == 0.0:
target.write(row)
row = source.readline()
source.close()
target.close()
return 1
#--------------------------------------------------------------------------
#--------------------------------------------------------------------------
def delta_gamma(self,delta_or_gamma,scenario_name,percentile, rfspecid, target_name, NBR):
#Error handling
if self.ael_entity == None:
return 0.0
if not (lower(delta_or_gamma) in ['delta','deltagamma', 'deltagamma_d']):
print "Wrong input, expected the string 'deltagamma', 'deltagamma_d' or 'delta',found",delta_or_gamma
return 0.0
if not exists(scenario_name):
print 'Could not find scenario file',scenario_name
return 0.0
try:
percentile = float(percentile)
if not (0 < percentile <= 1):
print "Expected a float or a string which could be converted into a float between 0 and 1",percentile
return 0.0
except ValueError:
print "Expected a float or a string which could be converted into a float between 0 and 1",percentile
return 0.0
if ael.RiskFactorSpecHeader[rfspecid] == None:
print "No RiskFactorSpecHeader named",rfspecid
return 0.0
#-------------------------------------------------------------------
#-------------------------------------------------------------------
def ael_rf(rfspecid):
"""
Returns a tuple of dictionarys containing rf_value and riskfactorspec.
Keys = extern_id's
"""
rf_spec_dict={}
rf_value_dict = {}
for rfs in ael.RiskFactorSpec:
if rfs.rfspechdrnbr == ael.RiskFactorSpecHeader[rfspecid]:
rf_spec_dict[rfs.extern_id] = rfs
rf_value_dict[rfs.extern_id] = rfs.rf_value()
return rf_spec_dict, rf_value_dict
def scenario2matrix(scenario_name,rf_value_dict, NBR):
"""
Takes a scenario file from FCS_DIR_RISK or current directory.
returns a tuple containing a list of riskfactors id's which are
present in file and in the rf_value_dict and a matrix corresponding
to (x-x_0) in a taylor expansion.(a row == a scenario)
"""
NBR=int(NBR)
src_file = open(scenario_name,'r')
row = src_file.readline()
mtr = []
real_rf_list = []
while row != '':
row = split(row,'\t')
id = row[0]
if rf_value_dict.has_key(id):
if not (id in real_rf_list):
real_rf_list.append(id)
del row[0]
#mtr_row = range(0,(len(row) / 2))
#for i in range(0,len(row)/2):
# mtr_row[i] = (float(row[2*i]) - 1.0)*rf_value_dict[id]
#mtr.append(mtr_row)
mtr_row = range(0,NBR)
for i in range(0,NBR):
mtr_row[i] = (float(row[2*i]) - 1.0)*rf_value_dict[id]
mtr.append(mtr_row)
else:
print "Following extern_id atleast twice",id
#else:
#print "Following extern_id not found",id
row = src_file.readline()
print 'Number of read scenarios', len(mtr)
mtr = Numeric.array(mtr)
mtr = Numeric.transpose(mtr)
src_file.close()
return real_rf_list,mtr
def first_derivative(ael_entity,rf_list,rf_spec_dict,dx,rf_value_dict):
"""
Returns a Numeric.array containing (F(x+dx)-F(x))/dx
"""
print rf_value_dict
print "Calculating Delta Vector..."
f_d = []
for i in range(0,len(rf_list)):
#f_d.append(ael_entity.rf_scenario(None,None,None,rf_spec_dict[rf_list[i]],1.0,dx)/dx)
if (rf_value_dict[rf_list[i]]) != 0.0:
f_d.append(ael_entity.rf_delta(None,None,rf_spec_dict[rf_list[i]])/(rf_value_dict[rf_list[i]])*100.0)
print 'Calculating Delta for riskfactor: ',rf_list[i],ael_entity.rf_delta(None,None,rf_spec_dict[rf_list[i]])/(rf_value_dict[rf_list[i]])*100.0
else:
f_d.append(0.0)
return Numeric.array(f_d)
def second_derivative(ael_entity,f_d,rf_list,rf_spec_dict,dx, rf_value_dict, delta_or_gamma):
"""
Returns a Numeric.array containing +dx second derivative
"""
#tar = open("second.txt",'w')
print "Calculating Gamma matrix..."
total=(len(rf_list)*len(rf_list)/2)
count =1
hessian = []
for i in range(0,len(rf_list)):
hessian.append(range(0,len(rf_list)))
for i in range(0,len(rf_list)):
for j in range(0,i+1):
if i != j and delta_or_gamma=='deltagamma':
if (rf_value_dict[rf_list[i]]) != 0.0 and (rf_value_dict[rf_list[j]]) !=0.0:
tmp=(ael_entity.rf_gamma(
rf_spec_dict[rf_list[i]],
rf_spec_dict[rf_list[j]],0)/(rf_value_dict[rf_list[i]])/(rf_value_dict[rf_list[j]])*10000.0)
hessian[i][j] = tmp
hessian[j][i] = tmp
else:
hessian[i][j] = 0.0
hessian[j][i] = 0.0
print 'Calculating cross-Gamma for riskfactors: ',rf_list[i],rf_list[j], (' [ '+str(int(100.0*float(count)/float(total)))+'% ]')
count=count+1
elif i != j and delta_or_gamma=='deltagamma_d':
hessian[i][j] = 0.0
hessian[j][i] = 0.0
else:
print 'Calculating diagonal gamma-element: ',rf_list[i],rf_list[j]
if (rf_value_dict[rf_list[i]]) != 0.0:
hessian[i][i]=(ael_entity.rf_gamma(
rf_spec_dict[rf_list[i]],rf_spec_dict[rf_list[i]],0)/(rf_value_dict[rf_list[i]])/(rf_value_dict[rf_list[i]])*10000.0)
else:
hessian[i][i]=0.0
#tar.close()
return Numeric.array(hessian)
ael_entity = self.ael_entity
Nmult = Numeric.matrixmultiply
if lower(delta_or_gamma) == "delta":
if target_name != None:
try:
tar = open(target_name,'w')
except:
print "Could not open target file named",target_name
return 0.0
tar = open(target_name,'w')
dx = 0.00001
rf_spec_dict,rf_value_dict = ael_rf(rfspecid)
rf_list,mtr = scenario2matrix(scenario_name,rf_value_dict, NBR)
#del rf_value_dict
f_d_plus = first_derivative(ael_entity,rf_list,rf_spec_dict,dx, rf_value_dict)
f_d_minu = first_derivative(ael_entity,rf_list,rf_spec_dict,-dx,rf_value_dict)
f_d = (f_d_plus + f_d_minu)/2.0
del f_d_minu
del f_d_plus
mc = Nmult(mtr,f_d)
mc = mc.tolist()
for i in range(0,len(mc)):
tar.write(str(i+1)+"\t"+str(mc[i])+"\n")
tar.close()
else:
dx = 0.00001
rf_spec_dict,rf_value_dict = ael_rf(rfspecid)
rf_list,mtr = scenario2matrix(scenario_name,rf_value_dict, NBR)
del rf_value_dict
f_d_plus = first_derivative(ael_entity,rf_list,rf_spec_dict,dx,rf_value_dict)
f_d_minu = first_derivative(ael_entity,rf_list,rf_spec_dict,-dx,rf_value_dict)
f_d = (f_d_plus + f_d_minu)/2.0
del f_d_minu
del f_d_plus
mc = Nmult(mtr,f_d)
mc = mc.tolist()
mc.sort()
if len(mc)*(1-percentile) < 1:
return -mc[0]
else:
return -mc[int(len(mc)*(1-percentile)-1)]
elif lower(delta_or_gamma) == "deltagamma" or lower(delta_or_gamma) == "deltagamma_d":
print 'Running Delta/Gamma revaluation...'
if target_name != None:
try:
tar = open(target_name,'w')
except:
print "Could not open target file named",target_name
return 0.0
dx = 0.00001
rf_spec_dict,rf_value_dict = ael_rf(rfspecid)
rf_list,mtr = scenario2matrix(scenario_name,rf_value_dict, NBR)
#del rf_value_dict
f_d_plus = first_derivative(ael_entity,rf_list,rf_spec_dict,dx, rf_value_dict)
hessian = second_derivative(ael_entity,f_d_plus,rf_list,rf_spec_dict,dx,rf_value_dict, delta_or_gamma)
mc = range(0,len(mtr))
for i in range(0,len(mtr)):
mc[i] = Nmult(mtr[i],f_d_plus) + Nmult(mtr[i],Nmult(hessian,mtr[i]))/2.0
tar.write(str(i+1)+"\t"+str(mc[i])+"\n")
tar.close()
else:
dx = 0.00001
rf_spec_dict,rf_value_dict = ael_rf(rfspecid)
rf_list,mtr = scenario2matrix(scenario_name,rf_value_dict, NBR)
del rf_value_dict
f_d_plus = first_derivative(ael_entity,rf_list,rf_spec_dict,dx,rf_value_dict)
hessian = second_derivative(ael_entity,f_d_plus,rf_list,rf_spec_dict,dx,rf_value_dict, delta_or_gamma)
mc = range(0,len(mtr))
for i in range(0,len(mtr)):
mc[i] = Nmult(mtr[i],f_d_plus) + Nmult(mtr[i],Nmult(hessian,mtr[i]))/2.0
mc.sort()
if len(mc)*(1-percentile) < 1:
return -mc[0]
else:
return -mc[int(len(mc)*(1-percentile)-1)]
else:
print "Expected a 'delta', 'deltagamma' or 'deltagamma_d' in variable delta_or_gamma, found",delta_or_gamma
return 0.0
#END OF CLASS
####################################################################
#-------------------------------------------------------------------
#-------------------------------------------------------------------
def monte_carlo(ael_entity, scenario_name, nr_of_scenarios, percentile, rfspecid,target_name=None,*rest):
FCS_DIR_RISK = ''
if environ.has_key('FCS_DIR_RISK'):
FCS_DIR_RISK = environ['FCS_DIR_RISK'] + '/'
tempfile.tempdir = FCS_DIR_RISK
tempfile.template = 'reduced'
reduced_file_name = tempfile.mktemp()
if target_name == '':
target_name = None
port = VaR(ael_entity)
if port.reduce(FCS_DIR_RISK+scenario_name, reduced_file_name, rfspecid):
result = port.monte_carlo(split(split(reduced_file_name,'/')[-1], '\\')[-1], nr_of_scenarios, percentile,target_name)
remove(reduced_file_name)
return result
else:
return 0.0
def delta_gamma(ael_entity,delta_or_gamma,scenario_name,percentile,rfspecid,target_name, NBR, *rest):
del rest
FCS_DIR_RISK = ''
if environ.has_key('FCS_DIR_RISK'):
FCS_DIR_RISK = environ['FCS_DIR_RISK'] + '/'
if target_name == '':
target_name = None
tempfile.tempdir = FCS_DIR_RISK
tempfile.template = 'reduced'
reduced_file_name = tempfile.mktemp()
port = VaR(ael_entity)
if port.reduce(FCS_DIR_RISK+scenario_name, reduced_file_name, rfspecid):
result = port.delta_gamma(delta_or_gamma, reduced_file_name, percentile, rfspecid,target_name, NBR)
remove(reduced_file_name)
return result
else:
return 0.0
def file_name_construct(tf, type, *rest):
if type=='scen':
return tf.fltid+'.scn'
elif type=='resdlt':
return tf.fltid+'.dlt' # Delta
elif type=='resdgc':
return tf.fltid+'.dgc' # Delta/Gamma, including cross-gammas
elif type=='resdgd':
return tf.fltid+'.dgd' # Delta/Gamma, only diagonal elements in gamma matrix
elif type=='resfll':
return tf.fltid+'.fll' # Full revaluation
elif type=='rescmb':
return tf.fltid+'.cmb' # Combined revaluation (DeltaGamma-cross + Full)
else:
print 'Specify type as \'scen\', \'resdg\' or \'resfl\''
return tf.fltid
def percentile(o, percentile, NBR, target, *rest):
NBR=int(NBR)
percentile=float(percentile)
if target == '':
print 'Could not find target file'
return 0.0
FCS_DIR_RISK = ''
if environ.has_key('FCS_DIR_RISK'):
FCS_DIR_RISK = environ['FCS_DIR_RISK'] + '/'
NBR=0
monte_carlo_list=[]
mc_result_file=open(target)
for r in mc_result_file.readlines():
monte_carlo_list.append(float(split(r, '\t')[1]))
NBR=NBR+1
mc_result_file.close()
monte_carlo_list.sort()
if NBR*(1-percentile) < 1:
return -(monte_carlo_list[0])
else:
return -(monte_carlo_list[int(NBR*(1-percentile)-1)])
def export_histogram(o, bins, NBR, target, additional_text='', *rest):
NBR=int(NBR)
bins=int(bins)
if target == '':
print 'Could not find target file'
return 0.0
FCS_DIR_RISK = ''
if environ.has_key('FCS_DIR_RISK'):
FCS_DIR_RISK = environ['FCS_DIR_RISK'] + '/'
monte_carlo_list=[]
mc_result_file=open(target)
for r in mc_result_file.readlines():
monte_carlo_list.append(float(split(r, '\t')[1]))
mc_result_file.close()
monte_carlo_list.sort()
min=monte_carlo_list[0]
max=monte_carlo_list[-1]
bin_range=(monte_carlo_list[-1]-monte_carlo_list[0])/bins
bin_list=[]
freq_list=[]
idx=0
for b in range(bins):
count=0
while float(monte_carlo_list[idx])<=min+b*bin_range:
count=count+1
idx=idx+1
freq_list.append(count)
bin_list.append(min+b*bin_range)
data=[bin_list, freq_list]
title='Simulation Histogram for ' + str(o.fltid) + additional_text
COMexport.Export_To_Excel(o, 2, data, title,'X','Y',None,None)
return ''
def export_convergence(o, percentile, target_dlt, target_dgd, target_dgc, target_fll, scenarios, *rest):
percentile=float(percentile)
monte_carlo_list=[]
data_series=[]
scenario_nbr_list=[]
legend_list=[]
NBR=0
scenarios=int(scenarios)
if exists(target_dlt):
mc_result_file=open(target_dlt)
data_series.append(append_data(mc_result_file, percentile))
legend_list.append('Delta')
if exists(target_dgd):
mc_result_file=open(target_dgd)
data_series.append(append_data(mc_result_file, percentile))
legend_list.append('DeltaGamma (Diagonal)')
if exists(target_dgc):
mc_result_file=open(target_dgc)
data_series.append(append_data(mc_result_file, percentile))
legend_list.append('DeltaGamma (Cross)')
if exists(target_fll):
mc_result_file=open(target_fll)
data_series.append(append_data(mc_result_file, percentile))
legend_list.append('Full Monte Carlo')
data=[]
for d in range(scenarios):
scenario_nbr_list.append(d+1)
data.append(scenario_nbr_list)
for l in data_series:
data.append(l)
#data=[scenario_list, conv_data_list]
title="Simulation Convergence for " + str(o.fltid) + ' ['+ str((1-percentile)*100)+'th percentile]'
COMexport.Export_To_Excel(o, 3, data, title,'X','Y',legend_list, None)
print data
return ''
def append_data(mc_result_file, percentile):
monte_carlo_list=[]
for r in mc_result_file.readlines():
monte_carlo_list.append(float(split(r, '\t')[1]))
mc_result_file.close()
tmp=[]
NBR=0
conv_data_list=[]
reduced_conv_list=[]
rng=int(len(monte_carlo_list)/1000)
for r in monte_carlo_list:
tmp.append(r)
NBR=NBR+1
tmp.sort()
if NBR*(1-percentile) < 1:
conv_data_list.append(-tmp[0])
else:
conv_data_list.append(-(tmp[int(NBR*(1-percentile)-1)]))
print NBR
if None: #rng>=1:
for x in range(1000):
print x, rng
reduced_conv_list.append(conv_data_list[x*rng])
else:
reduced_conv_list=conv_data_list
return reduced_conv_list
Bookmark/Search this post with:
»
- admin's blog
- Login or register to post comments
Delicious
Digg
Facebook
Technorati