Live Environment#

DUGSeis can be used in a live environment. The assumption here is that some other acquisition process will write waveform data files continuously. The processing script will monitor the folders and process them as they come in.

The graphical interface can update with changing data - this either happens manually or by monitoring the database and waveform directories. This can be selected in the graphical interface.

Similar to the normal processing this is handled with a Python file. This can tuned to each use case to be maximally useful. Please have a look at this example:

"""
Example live processing script. Very similar to normal processing, just written
to monitor certain directories and process newly incoming data.

Parts of this could be wrapped in other functions - that really depends on how
it will be used in the end.
"""
import copy
import logging
import pathlib
import re
import time

import obspy
import tqdm
import yaml

# Import from the DUGSeis library.
from dug_seis.project.project import DUGSeisProject
from dug_seis.waveform_handler.waveform_handler import FILENAME_REGEX
from dug_seis import util

from dug_seis.event_processing.detection.dug_trigger import dug_trigger
from dug_seis.event_processing.picking.dug_picker import dug_picker
from dug_seis.event_processing.location.locate_homogeneous import (
    locate_in_homogeneous_background_medium,
)

# The logging is optional, but useful.
util.setup_logging_to_file(
    # folder=".",
    # If folder is not specified it will not log to a file but only to stdout.
    folder=None,
    log_level="info",
)
logger = logging.getLogger(__name__)


def launch_processing(project):
    # Helper function to compute intervals over the project.
    intervals = util.compute_intervals(
        project=project, interval_length_in_seconds=5, interval_overlap_in_seconds=0.1
    )

    total_event_count = 0

    for interval_start, interval_end in tqdm.tqdm(intervals):
        # Run the trigger only on a few waveforms.
        st_triggering = project.waveforms.get_waveforms(
            channel_ids=[
                "GRM.001.001.001",
                "GRM.016.001.001",
                "GRM.017.001.001",
                "GRM.018.001.001",
                "GRM.019.001.001",
                "GRM.020.001.001",
            ],
            start_time=interval_start,
            end_time=interval_end,
        )

        # Standard DUGSeis trigger.
        detected_events = dug_trigger(
            st=st_triggering,
            # Helps with classification.
            active_triggering_channel="GRM.001.001.001",
            minimum_time_between_events_in_seconds=0.0006,
            max_spread_electronic_interference_in_seconds=2e-5,
            # Passed on the coincidence trigger.
            conincidence_trigger_opts={
                "trigger_type": "recstalta",
                "thr_on": 8.0,
                "thr_off": 2.0,
                "thr_coincidence_sum": 2,
                # The time windows are given in seconds.
                "sta": 1.0 / 200000.0 * 50,
                "lta": 1.0 / 200000.0 * 700,
                "trigger_off_extension": 0.01,
                "details": True,
            },
        )

        logger.info(
            f"Found {len(detected_events)} event candidates in interval "
            f"{interval_start}-{interval_end}."
        )

        if not detected_events:
            continue

        # Now loop over the detected events.
        added_event_count = 0
        all_channels = sorted(project.channels.keys())

        for event_candidate in detected_events:
            # Get the waveforms for the event processing. Note that this could
            # use the same channels as for the initial trigger or different ones.
            st_event = project.waveforms.get_waveforms(
                # All but the first because that is the active triggering channel
                # here.
                channel_ids=all_channels,
                start_time=event_candidate["time"] - 5e-3,
                end_time=event_candidate["time"] + 25e-3,
            )

            # Optionally remove the instrument response if necessary.
            # Requires StationXML files where this is possible.
            # st_event.remove_response(inventory=project.inventory, output="VEL")

            picks = dug_picker(
                st=st_event,
                pick_algorithm="sta_lta",
                picker_opts={
                    # Here given as samples.
                    "st_window": 70,
                    "lt_window": 700,
                    "threshold_on": 5.5,
                    "threshold_off": 2.0,
                },
            )

            # We want at least three picks, otherwise we don't designate it an event.
            if len(picks) < 3:
                # Optionally save the picks to the database as unassociated picks.
                # if picks:
                #    project.db.add_object(picks)
                continue

            event = locate_in_homogeneous_background_medium(
                picks=picks,
                coordinates=project.cartesian_coordinates,
                velocity=4866.0,
                damping=0.01,
                local_to_global_coordinates=project.local_to_global_coordinates,
            )

            # If there is a magnitude determination algorithm this could happen
            # here. Same with a moment tensor inversion. Anything really.

            # Write the classification as a comment.
            event.comments = [
                obspy.core.event.Comment(
                    text=f"Classification: {event_candidate['classification']}"
                )
            ]

            # Could optionally do a QA step here.
            if event.origins[0].time_errors.uncertainty > 5e-4:
                logger.info(
                    "Rejected event. Time error too large: "
                    f"{event.origins[0].time_errors.uncertainty}"
                )
                continue

            # Add the event to the project.
            added_event_count += 1
            project.db.add_object(event)
        logger.info(
            f"Successfully located {added_event_count} of "
            f"{len(detected_events)} event(s)."
        )
        total_event_count += added_event_count

    logger.info("DONE.")
    logger.info(f"Found {total_event_count} events.")


with open("./live_processing_example.yaml", "r") as fh:
    yaml_template = yaml.load(fh, Loader=yaml.SafeLoader)

all_folders = [
    pathlib.Path(i).absolute() for i in yaml_template["paths"]["asdf_folders"]
]

ping_interval_in_seconds = 2.5

while True:
    try:
        project = DUGSeisProject(config=copy.deepcopy(yaml_template))
        project.waveforms
    except ValueError as err:
        if err.args[0] == "Could not find any waveform data files.":
            logger.info(
                "No data yet - trying again in "
                f"{ping_interval_in_seconds:.2f} seconds."
            )
            time.sleep(ping_interval_in_seconds)
            continue
        raise err
    break


launch_processing(project=project)

# These files have already been processed.
already_processed_files = set([p.absolute() for p in project.waveforms._files.keys()])

# Monitor the folders and launch the processing again.
while True:
    all_available_files = set()
    for p in all_folders:
        all_available_files = all_available_files.union(p.glob("*.h5"))

    new_files = all_available_files.difference(already_processed_files)
    if not new_files:
        logger.info(
            "No new files yet - trying again in "
            f"{ping_interval_in_seconds:.2f} seconds."
        )
        time.sleep(ping_interval_in_seconds)
        continue

    starttimes = []
    endtimes = []
    for f in new_files:
        m = re.match(FILENAME_REGEX, f.stem)
        if not m:
            continue
        g = m.groups()
        s = f.stat()

        starttimes.append(obspy.UTCDateTime(*[int(i) for i in g[:7]]))
        endtimes.append(obspy.UTCDateTime(*[int(i) for i in g[7:]]))

    config = copy.deepcopy(yaml_template)
    config["temporal_range"]["start_time"] = min(starttimes)
    config["temporal_range"]["end_time"] = max(endtimes)
    # Small grace period for everything to finish copying.
    time.sleep(0.25)
    project = DUGSeisProject(config=config)
    launch_processing(project=project)

    already_processed_files = already_processed_files.union(
        set([p.absolute() for p in project.waveforms._files.keys()])
    )

A corresponding configuration file could look like this:

version: 13

meta:
  project_name: Grimsel_dummy_live
  project_location: Grimsel
  project_description: Grimsel_dummy

local_coordinate_system:
  epsg_code: 21781
  translation_vector: [579300.0, 247500.0, 500.0]

paths:
  asdf_folders:
    - 'fake_live_data/asdf'
  stationxml_folders:
    - 'C:\Users\lionk\Downloads\DUGSeis\DUGSeis\01_dummy_Grimsel\StationXML'
  database: 'sqlite://fake_live_data/db.sqlite'
  cache_folder: 'fake_live_data/cache'

# Must cover the whole range of the experiment, e.g. more than the expected
# start + end time for the live test case.
temporal_range:
  start_time: 2017-02-09T13:10:00.000Z
  end_time: 2017-02-09T14:00:00.000Z

graphical_interface:
  classifications:
    - passive
    - active
    - electronic
    - unknown
    - random
  pick_types:
    - P
    - S
  uncertainties_in_ms:
    - 0.00001
    - 0.000025
    - 0.00005
  3d_view:
    # Helpful to distinguish newer from older events.
    color_events: plasma
    size_events_in_pixel: 7
    size_channels_in_pixel: 3
    # Greenish color for channels.
    color_channels: [0.1, 0.9, 0.0, 0.4]

filters:
  - filter_id: smi:local/bandpass_causal_1000_5000
    filter_settings:
      filter_type: butterworth_bandpass
      highpass_frequency_in_hz: 1000.0
      lowpass_frequency_in_hz: 5000.0
      filter_corners: 4
      zerophase: false

Last but not least it is oftentimes useful to simulate a live environment. This helper script here can aid with that:

# DUGSeis
# Copyright (C) 2021 DUGSeis Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
Script that creates a fake live acquisition environment for development
purposes.
"""
import pathlib
import random
import re
import shutil
import time

import obspy

from dug_seis.waveform_handler.waveform_handler import FILENAME_REGEX

###########################################################################
# Settings
DATA_FOLDER = pathlib.Path(
    r"C:\Users\lionk\Downloads\DUGSeis\DUGSeis\01_dummy_Grimsel\01_ASDF_data"
)
FAKE_ACQUISITION_FOLDER = pathlib.Path(r"fake_live_data\asdf")
INITIAL_DELAY_IN_SECONDS = 10.0
###########################################################################


assert not FAKE_ACQUISITION_FOLDER.exists(), str(FAKE_ACQUISITION_FOLDER)
FAKE_ACQUISITION_FOLDER.mkdir(parents=True)

data_files = [i for i in DATA_FOLDER.glob("*.h5") if i.is_file()]

# Parse all filenames.
available_files = []
for f in data_files:
    m = re.match(FILENAME_REGEX, f.stem)
    if not m:
        continue
    g = m.groups()
    s = f.stat()

    # Filter the times.
    starttime = obspy.UTCDateTime(*[int(i) for i in g[:7]])
    endtime = obspy.UTCDateTime(*[int(i) for i in g[7:]])

    available_files.append(
        {
            "filename": f,
            "starttime": starttime,
            "endtime": endtime,
        }
    )

# Sort by start time.
available_files = sorted(available_files, key=lambda x: x["starttime"])

script_start_time = time.time()
reference_time = available_files[0]["starttime"]

time.sleep(INITIAL_DELAY_IN_SECONDS)

while available_files:
    current_delay = time.time() - script_start_time
    file_delay = available_files[0]["endtime"] - reference_time
    print(current_delay, file_delay)
    if current_delay > file_delay:
        f = available_files.pop(0)
        target = FAKE_ACQUISITION_FOLDER / f["filename"].name
        print(f"Copying file to {target}")
        print(f"  Remaining files: {len(available_files)}")
        shutil.copyfile(f["filename"], target)

    # A bit scatter to introduce some artificial irregularities.
    time.sleep(random.random() * 1.0)