Given a stream of sensor data from various smart home devices within a property (e.g., 'device_id:temperature:timestamp', 'device_id:motion:timestamp'), write a Python function that identifies and alerts on anomalous readings (e.g., sudden temperature spikes, prolonged motion in an empty property) using a simple moving average or Z-score method. The function should take the data stream and anomaly detection parameters as input and return a list of detected anomalies.
technical screen · 15-20 minutes
How to structure your answer
Employ a MECE framework for anomaly detection. First, define data ingestion and parsing for sensor streams (device_id, metric, timestamp). Second, implement a sliding window for data aggregation (e.g., last 'N' readings per device). Third, apply anomaly detection logic: for temperature, use a Z-score against the moving average; for motion, detect prolonged activity during unoccupied periods or sudden spikes. Fourth, establish alert thresholds (e.g., Z-score > 3, motion duration > X minutes). Fifth, structure the output to include device_id, anomalous metric, timestamp, and anomaly type. Finally, ensure the function handles missing data gracefully and is scalable for multiple devices.
Sample answer
import collections
import numpy as np
def detect_anomalies(data_stream, window_size=10, z_score_threshold=3.0, motion_duration_threshold=300):
anomalies = []
device_data = collections.defaultdict(lambda: {'temperature': collections.deque(maxlen=window_size), 'last_motion_time': None})
for reading in data_stream:
parts = reading.split(':')
device_id = parts[0]
metric_type = parts[1]
timestamp = int(parts[2])
if metric_type == 'temperature':
temperature = float(parts[3])
device_data[device_id]['temperature'].append(temperature)
if len(device_data[device_id]['temperature']) == window_size:
mean = np.mean(device_data[device_id]['temperature'])
std_dev = np.std(device_data[device_id]['temperature'])
if std_dev > 0 and abs(temperature - mean) / std_dev > z_score_threshold:
anomalies.append(f"Anomaly: {device_id} - High Temperature Z-score ({temperature}) at {timestamp}")
elif metric_type == 'motion':
if device_data[device_id]['last_motion_time'] is not None and (timestamp - device_data[device_id]['last_motion_time']) > motion_duration_threshold:
anomalies.append(f"Anomaly: {device_id} - Prolonged Motion detected at {timestamp}")
device_data[device_id]['last_motion_time'] = timestamp
return anomalies
Key points to mention
- • Data parsing and validation for incoming sensor data.
- • Choice of anomaly detection method (SMA vs. Z-score) and justification for each metric type.
- • Handling of different sensor types (temperature vs. motion) with tailored logic.
- • State management for historical data per device.
- • Parameterization of `window_size` and `threshold` for flexibility.
- • Clear output format for detected anomalies.
- • Consideration of edge cases, such as insufficient data for the initial window.
Common mistakes to avoid
- ✗ Applying the same anomaly detection logic to all sensor types without differentiation.
- ✗ Not handling missing or malformed data points in the stream.
- ✗ Inefficient storage or retrieval of historical data, especially for a large number of devices.
- ✗ Hardcoding `window_size` or `threshold` values instead of making them configurable.
- ✗ Failing to consider the initial 'cold start' period where there isn't enough data for a full window.
- ✗ Incorrect calculation of Z-score or SMA, leading to false positives/negatives.