import numpy as np
import os
import time
import dascore as dc
from dascore.units import s
# Define path for saving results
= '/path/to/desired/output/directory'
output_data_dir
# Get a spool to work on
= dc.get_example_spool().update()
sp
# Sort the spool
= sp.sort("time") sp
Real-Time Processing
This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in “near” real time for edge computing purposes.
Load libraries and get a spool
Set real-time processing parameters (if needed)
In this section, we define the window size and step size required for rolling mean processing. Additionally, we establish the desired wait time after each run by using the sleep_time_mult
parameter, which acts as a multiplier coefficient for the number of seconds in each patch.
# Define the target sampling interval (in sec.)
= 10
dt
# Determine window size
= dt*s
window
# Determine step size
= dt*s
step
# Set the desired wait time after each run
= 1.2 sleep_time_mult
Real-time processing
Eventually, we use a while loop to frequently call the spool and perform the processing. The while loop breaks if there are no new patches in the spool.
# Start the for loop for real-time processing
= 1
i while True:
print(f"\nRun number: {i}")
# Select an updated spool
= sp.update().sort("time")
sp = len(sp)
len_updated_sp
# Get number of seconds in the first patch
# (assuming data is getting in with the same time duration)
= sp[0].coords.step("time")
sampling_interval = (sp[0].coords.max("time") - sp[0].coords.min("time")
num_sec + sampling_interval) / np.timedelta64(1, 's')
= (i == 1)
initial_run if initial_run:
= changed_sleep_run_num = 0
len_last_sp = False
same_len print(f"Number of seconds in each patch = {num_sec}")
# Check for new patches
if not initial_run and len_last_sp == len_updated_sp:
if not same_len:
print("No new data was detected in the spool after the set sleep time."
" Consider manually increasing the sleep time multiplier"
" coefficient (which could depend on hardware) for better"
" real-time processing performance.")
# Adjust the sleep_time_mult to a greater value
= 3
sleep_time_mult print("So, sleep for longer to make sure no new patch exists.")
= True
same_len = i
changed_sleep_run_num else:
# Break the while loop if there are no new patches
# in the spool after extended sleep
if i != changed_sleep_run_num:
print("No new data was detected in spool even after "
f"{num_sec} * {str(sleep_time_mult)} = {sleep_time} sec. "
"Therefore, real-time data processing ended successfully.")
break
# Set sleep time after each run
= num_sec * sleep_time_mult
sleep_time
# Do processing on each patch in the spool
for j, patch in enumerate (sp[len_last_sp:]):
= len_last_sp + j
patch_num print(f"Working on patch number: {patch_num}")
# Do processing
= patch.rolling(
rolling_mean_patch =window, step=step, engine="numpy").mean()
time
# Save results
= sp.get_contents()["path"][patch_num]
file_name = os.path.join(output_data_dir, file_name)
output_path "dasdae")
rolling_mean_patch.io.write(output_path,
= len(sp)
len_last_sp +=1
i
# Wait for new data to get into the data_path before proceeding with a new run
print(f"Sleeping for {num_sec} * {str(sleep_time_mult)} = {sleep_time} sec.")
time.sleep(sleep_time)