Ubicoders - Robotics and AI, Coding, and Software Development!

Mavlink: Stream Live Drone Data Like a Pro (Python + Mavlink + Pixhawk)

Streaming Live Drone Data with Python Mavlink for your Pixhawk

Picture of the author
Elliot Lee - Sep.9

1. Create docker compose file


Line #:

version: "3.8" services: ubicoders_yt_module3: container_name: mavlink_test_cnt # build: . image: ubicoders/ros2:humble_px4 network_mode: host privileged: true stdin_open: true # docker run -i tty: true # docker run -t command: ["/bin/bash"] environment: - DISPLAY - QT_X11_NO_MITSHM=1 volumes: - "/tmp/.X11-unix/:/tmp/.X11-unix/:rw" - "$HOME/.Xauthority:/root/.Xauthority:rw" - "/dev/:/dev/"

2. Make sure websocket pip is on the system


Line #:

pip install websockets

3. Create wakeup.py


Line #:

#================================= # Pixhawk Wakeup # Author: Elliot Lee # Date: 09/08/24 #================================= from pymavlink import mavutil def wakeup(): # Create mavserial the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACMX for linux # Sending a message creates PX4's streamer to serial. the_connection.mav.heartbeat_send( 0, #type 0, #autopilot 0, #base_mode 0, #custom_mode 0, #system_status 0, #mavlink_version ) # Check the heartbeat the_connection.wait_heartbeat() print(f"Heartbeat from target_system: {the_connection.target_system}, MAV_TYPE: {the_connection.mav_type}") ## mav_type: https://mavlink.io/en/messages/common.html#MAV_TYPE_QUADROTOR if __name__ == "__main__": wakeup()

4. parse_mavlink.py


Line #:

#================================= # Basic script to connect to PX4 and read mavlink messages. # Author: Elliot Lee # Date: 09/08/24 #================================= from pymavlink import mavutil from wakeup import wakeup wakeup() # Create mavserial the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACM0 for linux # Keep reading the mavlink messages. i.e attitude and scaled imu while True: attitude = the_connection.recv_match(type='ATTITUDE') # 30 if attitude is not None: print(attitude) scaled_imu = the_connection.recv_match(type="SCALED_IMU") # 26 if scaled_imu is not None: print(scaled_imu)


Line #:

#================================= # Websocket server to send mavlink data to the client # Author: Elliot Lee # Date: 09/08/24 #================================= import asyncio import json import websockets import threading from pymavlink import mavutil from wakeup import wakeup GLOBAL_FLIGHT_DATA = { "time_boot_ms": 0, "roll": 0, "pitch": 0, "heading": 0, } # Websocket server async def run_ws_server(port=9090): async def echo(websocket): async for message in websocket: # print(f"Received message: {message}") await websocket.send(json.dumps(GLOBAL_FLIGHT_DATA)) # start server async with websockets.serve(echo, "localhost", port): print(f"\033[92m Mavlink Bridge is running @ port {port}\033[0m") while True: await asyncio.sleep(1) def start_server(): asyncio.run(run_ws_server()) # Mavlink Parser if __name__ == "__main__": try: thread = threading.Thread(target=start_server, ) thread.start() wakeup() # Create mavserial the_connection = mavutil.mavlink_connection('/dev/ttyACM0')# /dev/ttyACM0 for linux # Keep reading the mavlink messages. i.e attitude and scaled imu while True: attitude = the_connection.recv_match(type='ATTITUDE') # 30 if attitude is not None: # print(attitude) GLOBAL_FLIGHT_DATA["time_boot_ms"] = attitude.time_boot_ms GLOBAL_FLIGHT_DATA["roll"] = attitude.roll GLOBAL_FLIGHT_DATA["pitch"] = attitude.pitch GLOBAL_FLIGHT_DATA["heading"] = attitude.yaw except KeyboardInterrupt: print("Server stopped by user.")