🚀 AI-Powered Mock Interviews Launching Soon - Join the Waitlist for Early Access

technicalmedium

Given a stream of sensor data from various smart home devices within a property (e.g., 'device_id:temperature:timestamp', 'device_id:motion:timestamp'), write a Python function that identifies and alerts on anomalous readings (e.g., sudden temperature spikes, prolonged motion in an empty property) using a simple moving average or Z-score method. The function should take the data stream and anomaly detection parameters as input and return a list of detected anomalies.

technical screen · 15-20 minutes

How to structure your answer

Employ a MECE framework for anomaly detection. First, define data ingestion and parsing for sensor streams (device_id, metric, timestamp). Second, implement a sliding window for data aggregation (e.g., last 'N' readings per device). Third, apply anomaly detection logic: for temperature, use a Z-score against the moving average; for motion, detect prolonged activity during unoccupied periods or sudden spikes. Fourth, establish alert thresholds (e.g., Z-score > 3, motion duration > X minutes). Fifth, structure the output to include device_id, anomalous metric, timestamp, and anomaly type. Finally, ensure the function handles missing data gracefully and is scalable for multiple devices.

Sample answer

import collections
import numpy as np

def detect_anomalies(data_stream, window_size=10, z_score_threshold=3.0, motion_duration_threshold=300):
    anomalies = []
    device_data = collections.defaultdict(lambda: {'temperature': collections.deque(maxlen=window_size), 'last_motion_time': None})

    for reading in data_stream:
        parts = reading.split(':')
        device_id = parts[0]
        metric_type = parts[1]
        timestamp = int(parts[2])

        if metric_type == 'temperature':
            temperature = float(parts[3])
            device_data[device_id]['temperature'].append(temperature)
            if len(device_data[device_id]['temperature']) == window_size:
                mean = np.mean(device_data[device_id]['temperature'])
                std_dev = np.std(device_data[device_id]['temperature'])
                if std_dev > 0 and abs(temperature - mean) / std_dev > z_score_threshold:
                    anomalies.append(f"Anomaly: {device_id} - High Temperature Z-score ({temperature}) at {timestamp}")
        elif metric_type == 'motion':
            if device_data[device_id]['last_motion_time'] is not None and (timestamp - device_data[device_id]['last_motion_time']) > motion_duration_threshold:
                anomalies.append(f"Anomaly: {device_id} - Prolonged Motion detected at {timestamp}")
            device_data[device_id]['last_motion_time'] = timestamp

    return anomalies

Key points to mention

  • • Data parsing and validation for incoming sensor data.
  • • Choice of anomaly detection method (SMA vs. Z-score) and justification for each metric type.
  • • Handling of different sensor types (temperature vs. motion) with tailored logic.
  • • State management for historical data per device.
  • • Parameterization of `window_size` and `threshold` for flexibility.
  • • Clear output format for detected anomalies.
  • • Consideration of edge cases, such as insufficient data for the initial window.

Common mistakes to avoid

  • ✗ Applying the same anomaly detection logic to all sensor types without differentiation.
  • ✗ Not handling missing or malformed data points in the stream.
  • ✗ Inefficient storage or retrieval of historical data, especially for a large number of devices.
  • ✗ Hardcoding `window_size` or `threshold` values instead of making them configurable.
  • ✗ Failing to consider the initial 'cold start' period where there isn't enough data for a full window.
  • ✗ Incorrect calculation of Z-score or SMA, leading to false positives/negatives.