Source code for mtuq.process_data


import csv
import obspy
import numpy as np
import warnings

from os import listdir
from copy import deepcopy
from io import TextIOBase
from obspy import taup
from obspy.geodetics import gps2dist_azimuth
from os.path import basename, exists, isdir, join
from mtuq.util import AttribDict, warn
from mtuq.util.cap import WeightParser, taper
from mtuq.util.signal import cut, get_arrival, m_to_deg, _window_warnings


[docs]class ProcessData(object): """ An attempt at a one-size-fits-all data processing class .. rubric :: Usage Processing data is a two-step procedure. First, the user supplies parameters (see available choices below) to create a data processing function: .. code:: function = ProcessData(**parameters) Second, an ObsPy stream is given as input to the data processing function and a processed stream returned as output: .. code:: processed_stream = function(stream) Data processing can also be applied to an entire ``Dataset`` at once: .. code:: processed_dataset = dataset.map(function) See `mtuq/examples/` for further illustration. .. rubric :: Parameters ``filter_type`` (`str`) - ``'bandpass'`` Butterworth-Bandpass (uses `obspy.signal.filter.bandpass`) - ``'lowpass'`` Butterworth-Lowpass (uses `obspy.signal.filter.lowpass`) - ``'highpass'`` Butterworth-Highpass (uses `obspy.signal.filter.highpass`) - ``None`` no filter will be applied ``pick_type`` (`str`) - ``'taup'`` calculates P, S travel times from Tau-P model (uses `obspy.taup.TauPyModel.get_arrival_times`) - ``'FK_metadata'`` reads P, S travel times from FK metadata - ``'CPS_metadata'`` reads P, S travel times from CPS metadata - ``'SAC_metadata'`` reads P, S travel times from SAC metadata fields `t5`, `t6` - ``'user_supplied'`` reads P, S travel times from columns 8, 10 of `capuaf_file` - ``None`` no P,S travel times will be calculated ``window_type`` (`str`) - ``'body_wave'`` regional-distance body-wave window - ``'surface_wave'`` regional-distance surface-wave window - ``'group_velocity'`` surface-wave window centered on given group velocity - ``None`` no windows will be applied ``apply_statics`` (`bool`) Whether or not to apply static time shifts from columns 11-12 of `capuaf_file` ``apply_weights`` (`bool`) Whether or not to apply objective function weights from columns 3-8 of `capuaf_file` ``apply_scaling`` (`bool`) Whether or not to apply distance-dependent amplitude scaling ``apply_padding`` (`bool`) Whether or not to use longer Green's window relative to observed data window (allows for more accurate cross-correlations) .. rubric:: Other input arguments that may be required, depending on the above ``freq_min`` (`float`) Required for `filter_type=bandpass` ``freq_max`` (`float`) Required for `filter_type=bandpass` ``freq`` (`float`) Required for `filter_type=lowpass` or `filter_type=highpass` ``window_length`` (`float`) Window length in seconds ``group_velocity`` (`float`) Group velocity in m/s, required for `window_type=group_velocity` ``window_alignment`` (`float`) Optional window alignment for `window_type=group_velocity` (`float` between 0. and 1.) ``time_shift_min`` (`float`) Required for `apply_padding=True` ``time_shift_max`` (`float`) Required for `apply_padding=True` ``taup_model`` (`str`) Name of built-in ObsPy TauP model or path to custom ObsPy TauP model, required for `pick_type=taup` ``FK_database`` (`str`) Path to FK database, required for `pick_type=FK_metadata` ``CPS_database`` (`str`) Path to CPS database, required for `pick_type=CPS_metadata` ``capuaf_file`` (`str`) Path to `CAPUAF`-style text file, required for `pick_type=user_supplied` """ def __init__(self, filter_type=None, window_type=None, pick_type=None, # for filter_type='Bandpass' # (specify corners in terms of frequency or period, but not both) freq_min=None, freq_max=None, period_min=None, period_max=None, # for filter_type='Lowpass' or filter_type='Highpass' # (specify corner in terms of frequency or period, but not both) freq=None, period=None, # window parameters window_length=None, apply_padding=False, apply_statics=False, time_shift_min=None, time_shift_max=None, # data weighting parameters # (control contribution of particular traces or components to the # data misfit function) apply_weights=True, apply_scaling=True, scaling_power=None, scaling_coefficient=None, capuaf_file=None, # P and S pick parameters # (default body_wave and surface_wave windows are defined relative to # P and S picks, which can be calculated on the fly from tau-P or # read from FK or CPS database) taup_model=None, FK_database=None, FK_model=None, CPS_database=None, CPS_model=None, # any user-supplied keyword arguments not included above go into # this dictionary (can be helpful for user customization) **parameters): if not filter_type: warn("No filter will be applied") if not window_type: warn("No windows will be applied") if filter_type: filter_type = filter_type.lower() self.filter_type = filter_type if window_type: window_type = window_type.lower() self.window_type = window_type # note that we make pick_type case sensitive # (could be helpful because p,P s,S are meaningful differences?) self.pick_type = pick_type # # check filter parameters # if not self.filter_type: # nothing to check pass elif self.filter_type == 'bandpass': # filter corners can be specified in terms of either period [s] # or frequency [Hz], but not both if period_min is not None and\ period_max is not None: assert freq_min is None assert freq_max is None freq_min = period_max**-1 freq_max = period_min**-1 else: assert freq_min is not None assert freq_max is not None assert 0 < freq_min assert freq_min < freq_max assert freq_max < np.inf self.freq_min = freq_min self.freq_max = freq_max elif self.filter_type == 'lowpass' or\ self.filter_type == 'highpass': # filter corner can be specified in terms of either period [s] # or frequency [Hz], but not both if period is not None: assert freq is None freq = period*-1 else: assert freq is not None assert 0 < freq < np.inf self.freq = freq else: raise ValueError('Bad parameter: filter_type') # # check window parameters # if not self.window_type: # nothing to check now pass elif self.window_type == 'body_wave': # regional body-wave window in the manner of Zhu1996 assert pick_type is not None, "Must be defined: pick_type" assert window_length > 0. self.window_length = window_length elif self.window_type == 'surface_wave': # regional surface-wave window in the manner of Zhu1996 assert pick_type is not None, "Must be defined: pick_type" assert window_length > 0. self.window_length = window_length elif self.window_type == 'group_velocity': assert 'group_velocity' in parameters assert parameters['group_velocity'] >= 0. self.group_velocity = parameters['group_velocity'] self.window_alignment = getattr( parameters, 'window_alignment', 0.5) assert 0. <= self.window_alignment <= 1. assert window_length > 0. self.window_length = window_length elif self.window_type == 'min_max': assert 'v_min' in parameters assert 'v_max' in parameters assert 0. <= parameters['v_min'] assert parameters['v_min'] <= parameters['v_max'] assert parameters['v_max'] <= np.inf self.v_min = parameters['v_min'] self.v_max = parameters['v_max'] assert window_length >= 0. self.window_length = window_length _window_warnings(window_type, window_length) else: raise ValueError('Bad parameter: window_type') if apply_statics: assert self.window_type is not None if apply_padding: assert self.time_shift_min <= 0., \ ValueError("Bad parameter: time_shift_min") assert self.time_shift_max >= 0., \ ValueError("Bad parameter: time_shift_max") self.time_shift_min = time_shift_min self.time_shift_max = time_shift_max self.apply_padding = apply_padding self.apply_statics = apply_statics # # check phase pick parameters # if not self.pick_type: # nothing to check now pass elif self.pick_type == 'taup': assert taup_model is not None self.taup_model = taup_model self._taup = taup.TauPyModel(self.taup_model) elif self.pick_type == 'FK_metadata': assert FK_database is not None assert exists(FK_database) if FK_model is None: FK_model = basename(FK_database) self.FK_database = FK_database self.FK_model = FK_model elif self.pick_type == 'CPS_metadata': assert CPS_database is not None assert exists(CPS_database) print(CPS_database) if CPS_model is None: CPS_model = basename(CPS_database) # Remove model name if no model is provided # CPS_database = CPS_database[:-(len(CPS_model) + 1)] print(CPS_database) self.CPS_database = CPS_database self.CPS_model = CPS_model elif self.pick_type == 'SAC_metadata': pass elif self.pick_type == 'user_supplied': pass else: raise ValueError('Bad parameter: pick_type, %s' % self.pick_type) # # check weight parameters # self.apply_scaling = apply_scaling self.apply_weights = apply_weights if apply_scaling: if self.window_type == 'body_wave': if scaling_power is None: scaling_power = 1. if scaling_coefficient is None: scaling_coefficient = 1.e5 else: if scaling_power is None: scaling_power = 0.5 if scaling_coefficient is None: scaling_coefficient = 1.e5 self.scaling_power = scaling_power self.scaling_coefficient = scaling_coefficient # # parse text files # if self.apply_statics or\ self.apply_weights or\ self.pick_type == 'user_supplied': assert capuaf_file is not None parser = WeightParser(capuaf_file) if self.apply_statics: self.statics = parser.parse_statics() if self.apply_weights: self.weights = parser.parse_weights() if self.pick_type == 'user_supplied': self.picks = parser.parse_picks() def __call__(self, traces, station=None, origin=None, overwrite=False): ''' Carries out data processing operations on ObsPy traces or MTUQ GreensTensors ''' if station is None: station = getattr(traces, 'station', None) if origin is None: origin = getattr(traces, 'origin', None) # overwrite existing data? if overwrite: traces = traces else: traces = deepcopy(traces) if not hasattr(traces, 'id'): raise Exception('Missing station identifier') id = traces.id # collect location information distance_in_m, azimuth, _ = gps2dist_azimuth( origin.latitude, origin.longitude, station.latitude, station.longitude) # collect time sampling information nt, dt = traces[0].stats.npts, traces[0].stats.delta # Tags can be added through dataset.add_tag to keep track of custom # metadata or support other customized uses. Here we use tags to # distinguish data from Green's functions and displacement time series # from velcoity time series if not hasattr(traces, 'tags'): raise Exception('Missing tags attribute') tags = traces.tags if 'units:m' in tags: # nothing to do pass elif 'units:cm' in tags: # convert to meters for trace in traces: trace.data *= 1.e-2 index = tags.index('units:cm') tags[index] = 'units:m' else: warn('Units not specified.') for trace in traces: if not hasattr(trace, 'attrs'): trace.attrs = AttribDict() # # part 1: filter traces # if self.filter_type == 'bandpass': for trace in traces: trace.detrend('demean') trace.detrend('linear') trace.taper(0.05, type='hann') trace.filter('bandpass', zerophase=False, freqmin=self.freq_min, freqmax=self.freq_max) elif self.filter_type == 'lowpass': for trace in traces: trace.detrend('demean') trace.detrend('linear') trace.taper(0.05, type='hann') trace.filter('lowpass', zerophase=False, freq=self.freq) elif self.filter_type == 'highpass': for trace in traces: trace.detrend('demean') trace.detrend('linear') trace.taper(0.05, type='hann') trace.filter('highpass', zerophase=False, freq=self.freq) if 'type:velocity' in tags: # convert to displacement for trace in traces: trace.data = np.cumsum(trace.data)*dt index = tags.index('type:velocity') tags[index] = 'type:displacement' # # part 2a: apply distance scaling # if self.apply_scaling: for trace in traces: trace.data *=\ (distance_in_m/self.scaling_coefficient)**self.scaling_power # # part 2b: apply user-supplied data weights # if 'type:greens' in tags: pass elif self.apply_weights: for trace in traces: try: component = trace.stats.channel[-1].upper() if self.window_type == 'body_wave': key = 'body_wave_'+component else: key = 'surface_wave_'+component weight = self.weights[id][key] except: weight = None if weight: trace.attrs.weight = weight else: traces.remove(trace) # # part 3: determine phase picks # if not self.pick_type: pass elif self.pick_type == 'user_supplied': picks = self.picks[id] else: picks = dict() if self.pick_type == 'taup': with warnings.catch_warnings(): # suppress obspy warning that gets raised even when taup is # used correctly (someone should submit an ObsPy fix) warnings.filterwarnings('ignore') arrivals = self._taup.get_travel_times( origin.depth_in_m/1000., m_to_deg(distance_in_m), phase_list=['p', 's', 'P', 'S']) try: picks['P'] = get_arrival(arrivals, 'p') except: picks['P'] = get_arrival(arrivals, 'P') try: picks['S'] = get_arrival(arrivals, 's') except: picks['S'] = get_arrival(arrivals, 'S') elif self.pick_type == 'FK_metadata': sac_headers = obspy.read('%s/%s_%s/%s.grn.0' % (self.FK_database, self.FK_model, str(int(np.ceil(origin.depth_in_m/1000.))), str(int(np.ceil(distance_in_m/1000.)))), format='sac')[0].stats.sac picks['P'] = sac_headers.t1 picks['S'] = sac_headers.t2 elif self.pick_type == 'CPS_metadata': dep_desired = "{:06.1f}".format( np.ceil(origin.depth_in_m/1000.) * 10)[:-2] # Review all folders in CPS Green's Function directory. Folder # names correspond with depth of source. Find the folder # with a value closest to the one we are after. all_entries = listdir(self.CPS_database) # Filter out folder names that are numeric numeric_folder_names = [entry for entry in all_entries if entry.isdigit() and isdir(join(self.CPS_database, entry))] # Convert numeric folder names to integers numeric_folder_names_int = [int(folder) for folder in numeric_folder_names] # Find depth closest to our desired value dep_folder = numeric_folder_names[numeric_folder_names_int.index(min(numeric_folder_names_int, key=lambda x: abs(x - int(dep_desired))))] dst_desired = "{:07.1f}".format( np.ceil(distance_in_m/1000.) * 10)[:-2] directory_path = self.CPS_database + '/' + dep_folder all_files = listdir(directory_path) filenames_without_extensions_inline = [ filename.split('.')[0] for filename in all_files] filenames_without_letters = [filename for filename in filenames_without_extensions_inline if not any( char.isalpha() for char in filename)] filenames_unique = [entry[:5] for entry in list(set(filenames_without_letters))] filenames_unique_int = [int(filename) for filename in filenames_unique] dst_value = filenames_unique[filenames_unique_int.index( min(filenames_unique_int, key=lambda x: abs(x - int(dst_desired))))] sac_headers = obspy.read('%s/%s/%s/%s%s.ZDD' % (self.CPS_database, self.CPS_model, dep_folder, dst_value, dep_folder), format='sac')[0].stats.sac picks['P'] = sac_headers.a picks['S'] = sac_headers.t0 elif self.pick_type == 'SAC_metadata': sac_headers = traces[0].sac picks['P'] = sac_headers.t5 picks['S'] = sac_headers.t6 for trace in traces: # # part 4a: determine window start and end times # if self.window_type == 'body_wave': # regional body-wave window in the manner of Zhu1996 # (closely based on CAP code) starttime = picks['P'] - 0.4*self.window_length endtime = starttime + self.window_length starttime += float(origin.time) endtime += float(origin.time) elif self.window_type == 'surface_wave': # regional surface-wave window in the manner of Zhu1996 # (closely based on CAP code) starttime = picks['S'] - 0.3*self.window_length endtime = starttime + self.window_length starttime += float(origin.time) endtime += float(origin.time) elif self.window_type == 'group_velocity': # surface-wave window based on given group velocity [m/s] group_arrival = distance_in_m/self.group_velocity # alignment=0.0 - window starts at group arrival # alignment=0.5 - window centered on group arrival (default) # alignment=1.0 - window ends at group arrival # alignment = self.alignment starttime = group_arrival - self.window_length*self.window_alignment endtime = group_arrival + \ self.window_length*(1.-self.window_alignment) starttime += float(origin.time) endtime += float(origin.time) elif self.window_type == 'min_max': # experimental window type defined by minimum and maximum # group velocities [m/s] # WARNING - results in distance-dependent window lengths, # which may not work with other MTUQ functions starttime = distance_in_m/self.v_max endtime = distance_in_m/self.v_min # optionally, enforce minimum window length if endtime - starttime < self.window_length: average_time = (starttime + endtime)/2. starttime = average_time - self.window_length/2. endtime = average_time + self.window_length/2. starttime += float(origin.time) endtime += float(origin.time) else: starttime = trace.stats.starttime endtime = trace.stats.endtime # # part 4b: apply statics # # STATIC CONVENTION: A positive static time shift means synthetics # are arriving too early and need to be shifted forward in time # (positive shift) to match the observed data if self.apply_statics: try: # _component is a custom metadata attribute added by # mtuq.io.clients # Even though obspy.read doesn't return a stats.component # attribute, "component" is still reserved by ObsPy, # thus we use "_component" instead component = trace.stats._component except: # This way of getting the component from the channel is # actually what is hardwired into ObsPy, and is implemented # here as a fallback component = trace.stats.channel[-1].upper() try: if self.window_type == 'body_wave': key = 'body_wave_'+component else: key = 'surface_wave_'+component static = self.statics[id][key] trace.attrs.static_shift = static except: print('Error reading static time shift: %s' % id) static = 0. if self.window_type is not None and\ 'type:greens' in tags: trace.stats.starttime += static # # part 4c: apply padding # # using a longer window for Green's functions than for data allows for # more accurate time-shift corrections if self.apply_padding and\ 'type:greens' in tags: starttime += self.time_shift_min endtime += self.time_shift_max trace.stats.npts_padding_left = int( round(-self.time_shift_min/dt)) trace.stats.npts_padding_right = int( round(+self.time_shift_max/dt)) # # part 4d: cut and taper trace # # cuts trace and adjusts metadata if self.window_type is not None: cut(trace, starttime, endtime) elif self.apply_statics and 'type:greens' in tags: print('Not implemented warning') taper(trace.data) return traces