Real-Time Processing

This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in “near” real time for edge computing purposes.

Load libraries and get a spool

import numpy as np
import os
import time

import dascore as dc
from dascore.units import s 


# Define path for saving results
output_data_dir = '/path/to/desired/output/directory'

# Get a spool to work on
sp = dc.get_example_spool().update()

# Sort the spool
sp = sp.sort("time")

Set real-time processing parameters (if needed)

In this section, we define the window size and step size required for rolling mean processing. Additionally, we establish the desired wait time after each run by using the sleep_time_mult parameter, which acts as a multiplier coefficient for the number of seconds in each patch.

# Define the target sampling interval (in sec.)
dt = 10 

# Determine window size 
window = dt*s

# Determine step size 
step = dt*s

# Set the desired wait time after each run
sleep_time_mult = 1.2

Real-time processing

Eventually, we use a while loop to frequently call the spool and perform the processing. The while loop breaks if there are no new patches in the spool.

# Start the for loop for real-time processing
i = 1
while True:
    print(f"\nRun number: {i}")

    # Select an updated spool 
    sp = sp.update().sort("time")
    len_updated_sp = len(sp)

    # Get number of seconds in the first patch 
    # (assuming data is getting in with the same time duration)
    sampling_interval = sp[0].coords.step("time")
    num_sec = (sp[0].coords.max("time") - sp[0].coords.min("time") 
    + sampling_interval) / np.timedelta64(1, 's')

    initial_run = (i == 1)
    if initial_run:
        len_last_sp = changed_sleep_run_num = 0
        same_len = False
        print(f"Number of seconds in each patch = {num_sec}") 
    
    # Check for new patches
    if not initial_run and len_last_sp == len_updated_sp: 
        if not same_len: 
            print("No new data was detected in the spool after the set sleep time."
                " Consider manually increasing the sleep time multiplier"
                " coefficient (which could depend on hardware) for better"
                " real-time processing performance.")
            
            # Adjust the sleep_time_mult to a greater value
            sleep_time_mult = 3
            print("So, sleep for longer to make sure no new patch exists.")

            same_len = True
            changed_sleep_run_num = i
        else:
            # Break the while loop if there are no new patches 
            # in the spool after extended sleep
            if i != changed_sleep_run_num:
                print("No new data was detected in spool even after "
                    f"{num_sec} * {str(sleep_time_mult)} = {sleep_time} sec. "
                    "Therefore, real-time data processing ended successfully.")
                break

    # Set sleep time after each run 
    sleep_time = num_sec * sleep_time_mult 

    # Do processing on each patch in the spool
    for j, patch in enumerate (sp[len_last_sp:]): 
        patch_num = len_last_sp + j
        print(f"Working on patch number: {patch_num}")

        # Do processing
        rolling_mean_patch = patch.rolling(
            time=window, step=step, engine="numpy").mean()

        # Save results 
        file_name = sp.get_contents()["path"][patch_num]
        output_path = os.path.join(output_data_dir, file_name)
        rolling_mean_patch.io.write(output_path, "dasdae")

    len_last_sp = len(sp)
    i+=1

    # Wait for new data to get into the data_path before proceeding with a new run
    print(f"Sleeping for {num_sec} * {str(sleep_time_mult)} = {sleep_time} sec.")
    time.sleep(sleep_time)