Mavlink: Stream Live Drone Data Like a Pro (Python + Mavlink + Pixhawk)
Streaming Live Drone Data with Python Mavlink for your Pixhawk
Elliot Lee - Sep.9
dockercompose
Line #:
version: "3.8"
services:
ubicoders_yt_module3:
container_name: mavlink_test_cnt
# build: .
image: ubicoders/ros2:humble_px4
network_mode: host
privileged: true
stdin_open: true # docker run -i
tty: true # docker run -t
command: ["/bin/bash"]
environment:
- DISPLAY
- QT_X11_NO_MITSHM=1
volumes:
- "/tmp/.X11-unix/:/tmp/.X11-unix/:rw"
- "$HOME/.Xauthority:/root/.Xauthority:rw"
- "/dev/:/dev/"
2. Make sure websocket pip is on the system
bash
Line #:
pip install websockets
3. Create wakeup.py
wakeup.py
Line #:
#=================================
# Pixhawk Wakeup
# Author: Elliot Lee
# Date: 09/08/24
#=================================
from pymavlink import mavutil
def wakeup():
# Create mavserial
the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACMX for linux
# Sending a message creates PX4's streamer to serial.
the_connection.mav.heartbeat_send(
0, #type
0, #autopilot
0, #base_mode
0, #custom_mode
0, #system_status
0, #mavlink_version
)
# Check the heartbeat
the_connection.wait_heartbeat()
print(f"Heartbeat from target_system: {the_connection.target_system}, MAV_TYPE: {the_connection.mav_type}")
## mav_type: https://mavlink.io/en/messages/common.html#MAV_TYPE_QUADROTOR
if __name__ == "__main__":
wakeup()
4. parse_mavlink.py
parse_mavlink.py
Line #:
#=================================
# Basic script to connect to PX4 and read mavlink messages.
# Author: Elliot Lee
# Date: 09/08/24
#=================================
from pymavlink import mavutil
from wakeup import wakeup
wakeup()
# Create mavserial
the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACM0 for linux
# Keep reading the mavlink messages. i.e attitude and scaled imu
while True:
attitude = the_connection.recv_match(type='ATTITUDE') # 30
if attitude is not None:
print(attitude)
scaled_imu = the_connection.recv_match(type="SCALED_IMU") # 26
if scaled_imu is not None:
print(scaled_imu)
The mavlink view url
5. Create ws_srv.py
ws_srv.py
Line #:
#=================================
# Websocket server to send mavlink data to the client
# Author: Elliot Lee
# Date: 09/08/24
#=================================
import asyncio
import json
import websockets
import threading
from pymavlink import mavutil
from wakeup import wakeup
GLOBAL_FLIGHT_DATA = {
"time_boot_ms": 0,
"roll": 0,
"pitch": 0,
"heading": 0,
}
# Websocket server
async def run_ws_server(port=9090):
async def echo(websocket):
async for message in websocket:
# print(f"Received message: {message}")
await websocket.send(json.dumps(GLOBAL_FLIGHT_DATA))
# start server
async with websockets.serve(echo, "localhost", port):
print(f"\033[92m Mavlink Bridge is running @ port {port}\033[0m")
while True:
await asyncio.sleep(1)
def start_server():
asyncio.run(run_ws_server())
# Mavlink Parser
if __name__ == "__main__":
try:
thread = threading.Thread(target=start_server, )
thread.start()
wakeup()
# Create mavserial
the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACM0 for linux
# Keep reading the mavlink messages. i.e attitude and scaled imu
while True:
attitude = the_connection.recv_match(type='ATTITUDE') # 30
if attitude is not None:
# print(attitude)
GLOBAL_FLIGHT_DATA["time_boot_ms"] = attitude.time_boot_ms
GLOBAL_FLIGHT_DATA["roll"] = attitude.roll
GLOBAL_FLIGHT_DATA["pitch"] = attitude.pitch
GLOBAL_FLIGHT_DATA["heading"] = attitude.yaw
except KeyboardInterrupt:
print("Server stopped by user.")
Loading...