Event-Triggered Safety Training

Assign training courses based on safety events

Overview

This script automates the process of assigning training courses to drivers based on their safety events. It interacts with Samsara APIs to fetch drivers based on tags, check existing assignments, and create new assignments for drivers based on their safety events.

Note: This guide assumes you are already familiar with the basics of the Samsara Training Product and the overall GET Training Assignments, POST Training Assignments and GET Training Courses APIs. If you are new to Training, see the Driver Training Guide first.

Example Use Cases

  • A Safety manager wants to assign the "Electronic Distractions" course to a driver when the driver has 2 or more Mobile Usage events in the past 30 days.
  • A Fleet manager wants to assign the "Speeding" course to a driver when the driver has 3 Severe Speeding events that are marked as "Needs Coaching" status in the past 45 days AND has a safety score below 80.

Trigger Conditions

  • Specify safety event count threshold (e.g., 2 events in 45 days)
  • Specify safety behavior (e.g., "Mobile Usage")
  • Specify safety event status (e.g., “Needs Coaching”)
  • Specify conditions under which assignments will skip drivers (e.g., skip assignment if the driver has completed or is enrolled in the course within the past 30 days)
  • Specify driver tags for assignment

Action

  • Assign training course with specific due dates

Execution Flow

  1. Fetch Safety Events: Convert SAFETY_EVENTS_LOOKBACK_START and SAFETY_EVENTS_LOOKBACK_END to RFC3339 format. Retrieve safety events for the given period and tag IDs.
  2. Filter Drivers Based on Event Behavior and Status: Count the number of times each driver has exhibited the specified safety behavior, considering only the events for which the status matches EVENT_STATUS. Filter only drivers who meet or exceed BEHAVIOUR_THRESHOLD.
  3. Check Driver Safety Scores: Convert SAFETY_EVENTS_LOOKBACK_START and SAFETY_EVENTS_LOOKBACK_END to milliseconds since the Samsara API for fetching safety score requires start time and end time in milliseconds. Retrieve safety scores for each driver. Filter drivers whose score is below SAFETY_SCORE_THRESHOLD.
  4. Identify Drivers Who Require Training: Loop through the drivers that meet the above thresholds. If a driver has already been assigned the course, omit those drivers from drivers_requiring_training.
  5. Generate Training Due Date: Call generate_rfc3339_timestamp() with TRAINING_DUE_DAYS to get the training due date in RFC3339 format.
  6. Assign Training: Call assign_training() to assign the course to drivers in drivers_requiring_training with the generated training due date.

Script

Note: To run certain automations, you will need to set up a development environment. Ensure your environment has access to Samsara API endpoints and is properly configured for integration. These code snippets can be copied but will need to be customized to match your organization's specific use case, including correct API tokens and other organizational details.

from datetime import datetime, timedelta
import requests

# API URLs and token
SAFETY_EVENTS_API = 'https://api.samsara.com/fleet/safety-events'
DRIVER_SAFETY_SCORE_API = 'https://api.samsara.com/v1/fleet/drivers/{}/safety/score'
ASSIGN_TRAINING_API = 'https://api.samsara.com/training-assignments'
STREAM_TRAINING_ASSIGNMENT_API = 'https://api.samsara.com/training-assignments/stream'
API_TOKEN = 'Insert your API token'
HEADERS = {'Accept': 'application/json', 'Authorization': f'Bearer {API_TOKEN}'}

# Constants
SAFETY_EVENTS_LOOKBACK_START = -45  
SAFETY_EVENTS_LOOKBACK_END = 0  
BEHAVIOR_TO_COACH = 'Insert behavior here'
EVENT_STATUS = 'Insert event status here'
TARGET_DRIVER_TAG_IDS = ['Insert the driver tag ID']  
SAFETY_VIOLATION_THRESHOLD = 2
SAFETY_SCORE_THRESHOLD = 80
COURSE_ID = 'Insert course ID' 
TRAINING_DUE_DAYS = 14
TRAINING_LOOKBACK_DAYS = -30 

def get_safety_events(start_time, end_time, tag_ids):
    '''
    Retrieve safety events from an API within a specified time range and for specific tag IDs.

    Parameters:
    start_time (str): The start time in RFC3339 format (e.g., '2025-02-01T00:00:00Z').
    end_time (str): The end time in RFC3339 format (e.g., '2025-02-01T23:59:59Z').
    tag_ids (list): A list of tag IDs to filter the safety events.

    Returns:
    list: A list of safety event data retrieved from the API.
    '''
    params = {'startTime': start_time, 'endTime': end_time, 'tagIds': ','.join(tag_ids)}

    safety_events = []
    after = ''
    has_next_page = True
    while has_next_page:
        params['after'] = after
        response = requests.get(SAFETY_EVENTS_API, headers=HEADERS, params=params)
        data = response.json()
        safety_events.extend(data['data'])
        if data.get('pagination', {}).get('hasNextPage'):
            has_next_page = True
            after = data['pagination']['endCursor']
        else:
            has_next_page = False

    return safety_events

def get_driver_safety_score(driver_id, start_ms, end_ms):
    '''
    Retrieve the safety score of a specific driver for a given time period.

    Parameters:
    driver_id (str): The ID of the driver whose safety score is being queried.
    start_ms (int): The start time in milliseconds (e.g., 1610000000000).
    end_ms (int): The end time in milliseconds (e.g., 1620000000000).

    Returns:
    dict: The safety score result for the driver.
    '''
    URL = f'{DRIVER_SAFETY_SCORE_API.format(driver_id)}'
    params = {'startMs': start_ms, 'endMs': end_ms}
    response = requests.get(URL, headers=HEADERS, params=params)
    result = response.json()
    return result

def assign_training(driver_ids, course_id, due_date):
    '''
    Assign a training course to a list of drivers.

    Parameters:
    driver_ids (list): A list of driver IDs who need to be assigned the training.
    course_id (str): The ID of the training course being assigned.
    due_date (str): The due date for the training in RFC3339 format (e.g., '2025-02-28T00:00:00Z').
    '''
    params = {'learnerIds': driver_ids, 'courseId': course_id, 'dueAtTime': due_date}

    requests.post(ASSIGN_TRAINING_API, headers=HEADERS, params=params)

def get_training_assignment_details(assignment_interval_timestamp, course_id):
    '''
    Retrieve filtered training assignment details for a specific course and assignment timestamp range.

    Parameters:
    assignment_interval_timestamp (str): The timestamp to filter the assignment stream.
    course_id (str): The ID of the course whose assignments are being queried.

    Returns:
    list: A list of training assignment details for the specified course.
    '''
    training_assignments = []
    after = ''
    params = {'startTime': assignment_interval_timestamp, 'courseIds': [course_id]}
    has_next_page = True
    while has_next_page:
        params['after'] = after
        response = requests.get(
            STREAM_TRAINING_ASSIGNMENT_API, headers=HEADERS, params=params
        )
        data = response.json()
        training_assignments.extend(data['data'])
        if data.get('pagination', {}).get('hasNextPage', ''):
            has_next_page = True
            after = data['pagination']['endCursor']
        else:
            has_next_page = False

    return training_assignments

def generate_rfc3339_timestamp(delta_days):
    '''
    Generate an RFC3339 timestamp by adding or subtracting days from the current date.

    Parameters:
    delta_days (int): The number of days to add or subtract from the current date.

    Returns:
    str: The generated RFC3339 timestamp (e.g., '2025-02-28T00:00:00Z').
    '''
    target_date = (datetime.now() + timedelta(days=delta_days)).replace(
        hour=0, minute=0, second=0, microsecond=0
    )
    return target_date.strftime('%Y-%m-%dT%H:%M:%SZ')

def generate_milliseconds_timestamp(delta_days):
    '''
    Generate a timestamp in milliseconds by adding or subtracting days from the current date.

    Parameters:
    delta_days (int): The number of days to add or subtract from the current date.

    Returns:
    int: The generated timestamp in milliseconds (e.g., 1610000000000).
    '''
    target_date = (datetime.now() + timedelta(days=delta_days)).replace(
        hour=0, minute=0, second=0, microsecond=0
    )
    return int(target_date.timestamp()) * 1000

def main():
    # Generate RFC3339 timestamps for the safety events lookback start and end times
    start_time_rfc3339 = generate_rfc3339_timestamp(SAFETY_EVENTS_LOOKBACK_START)
    end_time_rfc3339 = generate_rfc3339_timestamp(SAFETY_EVENTS_LOOKBACK_END)

    # Generate millisecond timestamps for the same period (for safety score fetching)
    start_time_ms = generate_milliseconds_timestamp(SAFETY_EVENTS_LOOKBACK_START)
    end_time_ms = generate_milliseconds_timestamp(SAFETY_EVENTS_LOOKBACK_END)

    # Fetch safety events for the given time range and target driver tags
    safety_events = get_safety_events(
        start_time_rfc3339, end_time_rfc3339, TARGET_DRIVER_TAG_IDS
    )

    driver_event_behavior_count = {}
    for event in safety_events:
        driver_id = event['driver']['id']
        behavior_label = event['behaviorLabels'][0]['name']
        coaching_state = event['coachingState']

        # Track events matching the behavior to coach and event status
        if behavior_label == BEHAVIOR_TO_COACH and coaching_state == EVENT_STATUS:
            if driver_id not in driver_event_behavior_count:
                driver_event_behavior_count[driver_id] = 1
            else:
                driver_event_behavior_count[driver_id] += 1

    # Flag drivers who exceed the behavior threshold
    flagged_drivers = {
        driver_id
        for driver_id, count in driver_event_behavior_count.items()
        if count >= SAFETY_VIOLATION_THRESHOLD
    }

    # For each flagged driver, fetch their safety score and filter by threshold
    drivers_with_violation_record = []
    for driver_id in flagged_drivers:
        safety_score_data = get_driver_safety_score(
            driver_id, start_time_ms, end_time_ms
        )

        # Filter drivers with safety score below the threshold
        if (
            safety_score_data
            and safety_score_data.get('safetyScore') < SAFETY_SCORE_THRESHOLD
        ):
            drivers_with_violation_record.append(driver_id)

    # Identify drivers who are flagged and haven't been assigned training
    training_lookback_days_rfc3339 = generate_rfc3339_timestamp(TRAINING_LOOKBACK_DAYS)
    assignment_details = get_training_assignment_details(
        training_lookback_days_rfc3339, COURSE_ID
    )

    drivers_with_existing_training = []
    for assignment in assignment_details:
        if assignment['learner']['type'] == 'driver':
            drivers_with_existing_training.append(assignment['learner']['id'])

    drivers_requiring_training = [
        driver
        for driver in flagged_drivers
        if driver not in drivers_with_existing_training
    ]

    drivers_requiring_training = ','.join(
        [f'driver-{id}' for id in drivers_requiring_training]
    )

    # Generate RFC3339 timestamp for the training due date (14 days from now)
    course_due_date_rfc3339 = generate_rfc3339_timestamp(TRAINING_DUE_DAYS)

    # Assign training to the filtered drivers
    assign_training(drivers_requiring_training, COURSE_ID, course_due_date_rfc3339)

if __name__ == '__main__':
    main()

Appendix

Constant Definitions

ConstantTypeDescription
SAFETY_EVENTS_LOOKBACK_STARTIntegerDefines how many days back to look for safety events.
SAFETY_EVENTS_LOOKBACK_ENDIntegerDefines the end date for looking up safety events.
BEHAVIOR_TO_COACHStringDefines the specific safety event behavior to track.
EVENT_STATUSStringThe status for a safety event.
TARGET_DRIVER_TAG_IDSList of StringsList of tag IDs used to filter specific drivers.
SAFETY_VIOLATION_THRESHOLDIntegerA threshold used to filter drivers based on the count of certain behaviors. Only drivers exhibiting the behavior at or above this threshold are considered for training.
SAFETY_SCORE_THRESHOLDIntegerThe safety score threshold. Drivers with a safety score below this threshold will be considered for training assignments.
COURSE_IDStringID of the training course that will be assigned.
TRAINING_DUE_DAYSIntegerNumber of days from today to set as the due date for training assignments.
TRAINING_LOOKBACK_DAYSIntegerNumber of days to look back for training assignments.

Function Definitions

FunctionDescription
get_safety_eventsFetches safety events from the Samsara API for a given time range and filters them by driver tag IDs. It retrieves all available pages of safety events data using pagination.
get_driver_safety_scoreRetrieves a driver's safety score from the Samsara API for a given time range.
assign_trainingThis function is responsible for creating training assignments for drivers. It accepts three parameters: course_id, which is the ID of the training course to be assigned, driver_ids, which is a list of driver IDs to receive the training, and due_date, which is the due date for completing the training.
get_training_assignment_detailsThis function retrieves a stream of training assignments filtered by a specified assignment_interval_timestamp and course_id. It handles pagination to fetch all relevant assignment data from the Samsara API. If successful, it returns a list of assignment records.
generate_rfc3339_timestampThis function generates a timestamp in milliseconds format by either adding or subtracting a specified number of days (given by delta_days) from the current date. To calculate a date in the past, pass a negative value for delta_days; for a future date, pass a positive value. This allows the function to determine whether the days should be added or subtracted. The timestamp generated in this example is midnight and in the UTC timezone. Please refer to Samsara Timestamp Documentation for more information on this.
generate_milliseconds_timestampThis function generates a timestamp in milliseconds format by either adding or subtracting a specified number of days (given by delta_days) from the current date. To calculate a date in the past, pass a negative value for delta_days; for a future date, pass a positive value. This allows the function to determine whether the days should be added or subtracted..