forked from mfacorcoran/ogip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ogip_check.py
232 lines (181 loc) · 9.07 KB
/
ogip_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from __future__ import print_function
import astropy.io.fits as pyfits
from ogip_dictionary import ogip_dictionary
from ogip_generic_lib import *
import os.path
import inspect
def ogip_check(input,otype,logfile,verbosity=2,dtype=None,vonly=False,meta_key=None):
"""
Checks for the existence of OGIP required keywords and columns
for FITS files based on the Standards doccumented here:
https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/ofwg_recomm.html
Failure to open the file as a FITS file will result in non-zero retstat.status.
Failure in the FITS verify step will result in non-zero retstat.status.
Failure to actually compare the file to any dictionary will
result in a non-zero return value in the retstat.status attribute.
But errors in required keys or columns will simply be counted in
the retstat.ERRORS attribute.
"""
filename=input
status=retstat()
try:
logf=init_log(logfile,status)
except:
return status
frame = inspect.currentframe()
args, _, _, values = inspect.getargvalues(frame)
print("Running %s with" % inspect.getframeinfo(frame)[2],file=logf)
for i in args:
if type(values[i]) is file: print(" %s = %s" % (i, values[i].name),file=logf)
else: print(" %s = %s" % (i, values[i]),file=logf)
# Even just trying to open it can lead to pyfits barfing errors
# and warnings all over (which we catch in the specific log, and
# just don't want to stderr at this point). Leave stdout though,
# since robust_open() logs what happens to stdout and we'll want
# to see that.
stop=False
if logf is not sys.stdout:
with stdouterr_redirector(logf,redir_out=False):
try:
hdulist = robust_open(filename,logf,status)
except IOError:
# Cases should be trapped in robust_open and already
# reflected in status:
stop=True
except:
# For the unexpected, e.g., the system call to gunzip fails
# for some reason:
print("ERROR: Non-IOError error raised. Status is %s." % status.status)
sys.stdout.flush()
stop=True
else:
# Just in case:
if status.status != 0:
print("ERROR: No error raised, but status is %s." % status.status)
sys.stdout.flush()
stop=True
else:
try:
hdulist = robust_open(filename,logf,status)
except IOError:
# Cases should be trapped in robust_open and already
# reflected in status:
stop=True
except:
# For the unexpected, e.g., the system call to gunzip fails
# for some reason:
print("ERROR: Non-IOError error raised. Error is %s, status is %s." % (sys.exc_info()[0],status.status) )
sys.stdout.flush()
stop=True
else:
# Just in case:
if status.status != 0:
print("ERROR: No error raised, but status is %s." % status.status)
sys.stdout.flush()
stop=True
# At this point, if stop=True, then python could not open the
# file. Try still to run ftverify, and then quit? On the other
# hand, this is not a good idea. Will ftverify lots of junk that
# isn't FITS and swamp the problems in the FITS files. These
# files that cannot be opened are summarized and we can then
# ftverify them on a case-by-case basis.
if stop: return status
# Basic FITS verification:
if stop==False:
fits_errs=ogip_fits_verify(filename,logf,status,hdulist=hdulist)
else:
fits_errs=ogip_fits_verify(filename,logf,status,hdulist=None)
# Note that in ogip_check, this error ends up in line below the
# ftverify warning. But in ogip_check_dir, it ends up not in the
# log for the file but in the master log only.
if fits_errs == 1:
status.update(report="ERROR: file %s does not pass FITS verification but able to continue." % filename,fver=1,verbosity=verbosity)
if logf is not sys.stdout: status.update(report="ERROR: file %s does not pass FITS verification but able to continue." % filename,verbosity=verbosity,log=logf)
if fits_errs == 2:
status.update(report="ERROR: file %s does not pass FITS verification; giving up." % filename,fver=2,status=1,verbosity=verbosity)
if logf is not sys.stdout: status.update(report="ERROR: file %s does not pass FITS verification; giving up." % filename,log=logf,verbosity=verbosity)
return status
if stop==True:
status.update(report="ERROR: problems opening the file, but fits_err!=2. How did I get here?",status=1,verbosity=2)
return status
if vonly:
status.update(vonly=True,report='Skipping OGIP standards check.')
return status
# Determine the file type from extensions, keywords, or the
# default type.
if otype is None:
otype=ogip_determine_ref_type(filename, hdulist,status,logf,dtype,verbosity)
if otype is None:
return status
status.otype=otype
else:
print("\nChecking file as type %s" % otype,file=logf)
ogip_dict=ogip_dictionary(otype,meta_key)
if ogip_dict==0:
status.update(report="WARNING: do not recognize OGIP type %s" % otype, status=1,verbosity=verbosity)
return status
fname = filename
if "/" in filename:
fname=filename[filename.rfind("/")+1:]
# List of required or optional extensions.
ereq=ogip_dict['EXTENSIONS']['REQUIRED']
eopt=ogip_dict['EXTENSIONS']['OPTIONAL']
check=True if otype == 'CALDB' else False # No required extension name(?)
# If there are multiple entries in the required set, only one must
# be in the file to be tested for it to pass. This code will have
# to change if there is a requirement for multiple extensions.
extnames= [x.name for x in hdulist]
for ref in ereq:
for actual in extnames:
if ref in actual: check=True
if not check and not otype=='IMAGE':
status.update(report="ERROR: %s does not have any of the required extension names" % fname, log=logf,level=1,extn='none',verbosity=verbosity)
# We have the type, now simply loop over the extensions found in
# the file and check whatever's there against what's expected for
# the type, except for calibration files, which could have any
# extension name.
#
# Note that the name in the dictionary may be a
# substring of the name in the file (e.g., for RMF files, the
# required extension is 'MATRIX' while the file may have
# 'SPECRESP MATRIX'.) So check for substrings.
extns_checked=0
if otype=='IMAGE':
# Does all extensions, wants the whole FITS file. Run now
# but print output below in loop over extensions.
wcs_out=ogip_wcs_validate(hdulist,filename,logf,status)
for this_extn in extnames:
extno=extnames.index(this_extn)
if this_extn=='PRIMARY' and otype=='IMAGE':
ref_extn='IMAGE'
elif this_extn=='PRIMARY' and otype!='IMAGE':
continue
if otype == 'CALDB':
ref_extn='CALFILE'
else:
ref_extn, t = ogip_determine_ref_extn( hdulist[extno], otype )
if ref_extn:
print("\n=============== Checking '%s' extension against '%s' standard ===============\n" % (this_extn,ref_extn),file=logf)
if ref_extn=='IMAGE' and wcs_out is not None:
[status.update(report="WARNING3: WCS.validate[key='%s']: %s" % (k._key,line.replace('\n','')),log=logf,level=3,extn=this_extn,verbosity=verbosity) for k in wcs_out[extno] for line in k if "No issues" not in line ]
cmp_keys_cols(hdulist,filename,this_extn,ref_extn,ogip_dict,logf,status,verbosity=verbosity)
extns_checked+=1
else:
status.update(report="\nExtension '%s' is not an OGIP defined extension for this type; ignoring.\n" % this_extn,log=logf,unrec_extn=this_extn,verbosity=verbosity)
if extns_checked > 0:
print("\n=============== %i Errors; %i (level==2) warnings; %i (level==3) warnings found in %s extensions checked ===============\n" %(status.tot_errors(), status.tot_warnings(2),status.tot_warnings(3),extns_checked),file=logf)
else:
status.update(report="\nERROR: No extensions could be checked\n",log=logf,status=1,verbosity=verbosity)
if status.tot_errors() > 0:
ogip_fail(filename,ogip_dict,logf)
else:
print("\n===========================================================",file=logf)
if meta_key is None or meta_key == "default":
print("\nFile %s conforms to the OGIP Standards" % filename,file=logf)
else:
print("\nFile %s conforms to the OGIP Standards with exceptions defined for meta_key=%s" % (filename,meta_key),file=logf)
return status
if __name__== "__main__":
filename = "test_files/CRAB_daily_lc.fits"
status = ogip_check(filename)
exit(status.status)