Vehicle Gateway

Table of Contents

The Bridge Between Vehicle Protocols and NATS

The Vehicle Gateway is a service running on each vehicle’s edge computer that translates between vehicle-native protocols (MAVLink, CAN bus, ROS 2) and the NATS messaging system used for fleet coordination.


Why a Gateway?

Vehicle protocols and fleet messaging serve different purposes:

AspectVehicle ProtocolNATS
ScopeSingle vehicleFleet-wide
ProtocolBinary, compactJSON, human-readable
Rate100+ Hz telemetryDownsampled for WAN
PersistenceNoneJetStream streams
PatternRequest-responsePub/sub with persistence

The Gateway bridges these worlds, handling protocol translation, rate limiting, and state management.


Supported Protocols

ProtocolVehicle TypeIntegration
MAVLinkDrones (PX4, ArduPilot)UDP/Serial, message parsing
CAN busCars, trucksSocketCAN, DBC decoding
J1939Heavy vehiclesPGN/SPN mapping
ROS 2Autonomous platformsTopic subscription

Each protocol requires a protocol-specific adapter, but the gateway’s core logic (downsampling, events, commands, shadow) is shared.


Core Responsibilities

1. Protocol Ingest

Receive and parse vehicle-native protocol messages. Example for MAVLink (drones):

// Receive MAVLink frames from mavlink-router
conn, _ := net.ListenUDP("udp", &net.UDPAddr{Port: 14550})

for {
    buf := make([]byte, 1024)
    n, _, _ := conn.ReadFromUDP(buf)

    frame, _ := mavlink.Parse(buf[:n])

    switch msg := frame.Message().(type) {
    case *mavlink.Heartbeat:
        handleHeartbeat(msg)
    case *mavlink.GlobalPositionInt:
        handlePosition(msg)
    case *mavlink.BatteryStatus:
        handleBattery(msg)
    // ... handle other message types
    }
}

Key message types (MAVLink example):

MAVLink MessageContent
HEARTBEATMode, armed state, system status
GLOBAL_POSITION_INTLat, lon, alt, velocity
ATTITUDERoll, pitch, yaw
BATTERY_STATUSVoltage, current, remaining
GPS_RAW_INTGPS fix, satellites, HDOP
SYS_STATUSCPU load, errors, health

CAN bus equivalent (ground vehicles):

CAN SignalContent
Engine RPMCurrent engine speed
Vehicle SpeedGround speed from wheel sensors
GPS PositionLat/lon from telematics module
Fuel LevelTank fill percentage
DTC CodesDiagnostic trouble codes

2. State Downsampling

Reduce telemetry rate for WAN transmission:

// Position: downsample from 10Hz to 1Hz
positionSampler := NewDownsampler(100*time.Millisecond, func(msg *Position) {
    // Average positions over window
    return averagePosition(msg)
})

// Battery: only publish on change
batterySampler := NewChangeFilter(func(old, new *Battery) bool {
    return math.Abs(old.Remaining - new.Remaining) > 0.01
})

Downsampling strategies:

StrategyUse CaseExample
Time-basedPeriodic statePosition at 1Hz
Change-basedDiscrete valuesMode changes
ThresholdGradual changesBattery when Δ > 1%
AggregationHigh-frequency dataMin/max/avg over window

Full-rate telemetry stays on-vehicle for perception systems. Only downsampled data crosses the WAN.

3. Event Extraction

Generate events from state transitions:

type EventDetector struct {
    previousState *VehicleState
}

func (e *EventDetector) Process(current *VehicleState) []Event {
    var events []Event

    // Detect arm state change
    if current.Armed && !e.previousState.Armed {
        events = append(events, Event{
            Type:      "armed",
            Timestamp: time.Now(),
            Data:      map[string]interface{}{"reason": "manual"},
        })
    }

    // Detect mode change
    if current.Mode != e.previousState.Mode {
        events = append(events, Event{
            Type:      "mode_change",
            Timestamp: time.Now(),
            Data: map[string]interface{}{
                "from": e.previousState.Mode,
                "to":   current.Mode,
            },
        })
    }

    // Detect failsafe
    if current.FailsafeActive && !e.previousState.FailsafeActive {
        events = append(events, Event{
            Type:      "failsafe",
            Timestamp: time.Now(),
            Data:      map[string]interface{}{"type": current.FailsafeType},
        })
    }

    e.previousState = current
    return events
}

Events detected:

EventTrigger
armedArmed state false → true
disarmedArmed state true → false
mode_changeFlight mode transition
takeoffIn-air state false → true
landedIn-air state true → false
failsafeFailsafe activated
geofenceGeofence breach detected
battery.lowBattery below threshold
battery.criticalBattery critical level

4. Command Execution with Policy

Receive commands from NATS and execute via MAVLink:

func (g *Gateway) handleCommand(cmd *Command) *CommandAck {
    // Validate command is allowed
    if !g.policy.Allows(cmd) {
        return &CommandAck{
            Status: "rejected",
            Error:  "command not allowed by policy",
        }
    }

    // Convert to MAVLink command
    mavCmd := cmd.ToMAVLink()

    // Send to flight controller
    if err := g.mavlink.Send(mavCmd); err != nil {
        return &CommandAck{
            Status: "failed",
            Error:  err.Error(),
        }
    }

    // Wait for MAVLink ACK
    mavAck, err := g.mavlink.WaitAck(mavCmd.CommandID, 5*time.Second)
    if err != nil {
        return &CommandAck{
            Status: "timeout",
            Error:  "no response from flight controller",
        }
    }

    return &CommandAck{
        Status: mavAck.Result.String(),
    }
}

Policy enforcement:

PolicyDescription
GeofenceReject goto commands outside boundary
Altitude limitCap maximum altitude commands
Mode restrictionsDisallow certain mode transitions
Rate limitingPrevent command flooding
AuthenticationVerify command source

5. Shadow Reconciliation

Sync desired state with actual vehicle state:

func (g *Gateway) reconcileLoop() {
    ticker := time.NewTicker(1 * time.Second)

    for range ticker.C {
        // Read desired state from KV
        desired, _ := g.kv.Get(g.desiredKey())

        // Compare with actual state
        actual := g.getCurrentState()

        // Generate commands to reconcile
        commands := g.reconcile(desired, actual)

        for _, cmd := range commands {
            g.executeCommand(cmd)
        }

        // Update reported state
        reported := g.buildReportedState(actual)
        g.kv.Put(g.reportedKey(), reported)
    }
}

func (g *Gateway) reconcile(desired, actual *State) []*Command {
    var commands []*Command

    // Mode reconciliation
    if desired.Mode != actual.Mode {
        commands = append(commands, &Command{
            Type: "set_mode",
            Data: map[string]interface{}{"mode": desired.Mode},
        })
    }

    // Geofence reconciliation
    if desired.GeofenceEnabled != actual.GeofenceEnabled {
        commands = append(commands, &Command{
            Type: "set_geofence",
            Data: map[string]interface{}{"enabled": desired.GeofenceEnabled},
        })
    }

    return commands
}

Shadow state enables declarative management:

  • Fleet operator sets desired state
  • Gateway detects differences
  • Gateway issues commands to converge
  • Gateway reports actual state
  • Repeat continuously

Architecture

┌────────────────────────────────────────────────────────────────────┐
│                         Vehicle Gateway                             │
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐     │
│  │   Protocol   │    │    State     │    │    NATS Client   │     │
│  │   Adapter    │───▶│   Machine    │───▶│    Publisher     │     │
│  └──────────────┘    └──────────────┘    └──────────────────┘     │
│         │                   │                      │               │
│         │            ┌──────▼──────┐               │               │
│         │            │   Event     │               │               │
│         │            │  Detector   │───────────────┤               │
│         │            └─────────────┘               │               │
│         │                                          │               │
│         │            ┌─────────────┐               │               │
│         │            │   Shadow    │◀──────────────┤               │
│         │            │ Reconciler  │               │               │
│         │            └──────┬──────┘               │               │
│         │                   │                      │               │
│         │            ┌──────▼──────┐    ┌─────────▼────────┐      │
│         │            │  Command    │    │   NATS Client    │      │
│         │◀───────────│  Executor   │◀───│   Subscriber     │      │
│         │            └─────────────┘    └──────────────────┘      │
│  ┌──────▼──────┐                                                   │
│  │   Protocol  │                                                   │
│  │   Sender    │                                                   │
│  └─────────────┘                                                   │
└────────────────────────────────────────────────────────────────────┘
         │                                           │
         ▼                                           ▼
┌─────────────────┐                       ┌─────────────────┐
│  Vehicle Control│                       │   NATS Leaf     │
│ (Pixhawk/ECU/   │                       │   (localhost)   │
│  ROS 2 node)    │                       │                 │
└─────────────────┘                       └─────────────────┘

Protocol adapters:

  • MAVLink Adapter — For drones (PX4, ArduPilot)
  • CAN Adapter — For cars/trucks (SocketCAN + DBC)
  • ROS 2 Adapter — For ROS-based platforms

Implementation Details

Dependencies

import (
    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
    "github.com/bluenviern/go-mavlink/v2"
)

Core libraries:

LibraryPurpose
nats.goNATS client, JetStream, KV
go-mavlinkMAVLink protocol implementation
slogStructured logging

Configuration

# gateway.yaml
vehicle_id: VID-001
environment: prod

mavlink:
  local_addr: ":14550"
  system_id: 1
  component_id: 1

nats:
  url: "nats://localhost:4222"
  credentials: "/etc/gateway/vehicle.creds"

sampling:
  position_hz: 1
  attitude_hz: 1
  battery_change_threshold: 0.01

policy:
  max_altitude: 120
  geofence_file: "/etc/gateway/geofence.json"

Deployment

# systemd service
[Unit]
Description=Vehicle Gateway
After=network.target nats.service

[Service]
Type=simple
ExecStart=/usr/local/bin/vehicle-gateway --config /etc/gateway/config.yaml
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target

Error Handling

ErrorResponse
Connection lostReconnect with backoff
Parse errorLog and skip frame
TimeoutRetry or fail command

NATS Errors

ErrorResponse
Connection lostLocal NATS continues, reconnect to hub
Publish failedBuffer locally, retry
Stream errorLog, alert, continue

Command Errors

ErrorResponse
Policy violationReject immediately
MAVLink rejectionReturn failure ACK
TimeoutReturn timeout ACK

Metrics

The Gateway exposes Prometheus metrics:

# MAVLink
gateway_mavlink_messages_received_total{type="heartbeat"}
gateway_mavlink_messages_sent_total{type="command_long"}
gateway_mavlink_parse_errors_total

# NATS
gateway_nats_messages_published_total{subject="state.position"}
gateway_nats_messages_received_total{subject="cmd.takeoff"}
gateway_nats_publish_errors_total

# Commands
gateway_commands_received_total{type="takeoff"}
gateway_commands_executed_total{type="takeoff",result="success"}
gateway_command_latency_seconds{type="takeoff"}

# Shadow
gateway_shadow_reconciliations_total
gateway_shadow_commands_issued_total

Summary

ResponsibilityInputOutput
Protocol IngestVehicle messagesParsed telemetry
State Downsampling100Hz telemetry1Hz state
Event ExtractionState transitionsDiscrete events
Command ExecutionNATS commandsVehicle commands
Shadow ReconciliationDesired stateConvergence commands

The Vehicle Gateway is the critical component that makes fleet-scale operations possible while preserving the safety guarantees of the underlying vehicle control system.



Next

Safety Model →