Skip to content
Snippets Groups Projects
Select Git revision
  • lyra-ipv4ll-backport
  • main default protected
  • dev-smart-pointer-refactor-2
  • topics
  • outgoingmessagesviewer
  • qt6
  • dev-smart-pointer-refactor
  • gse-interface-dev
  • crash-report-dev
  • riccardo-dev
  • roccaraso2024
  • euroc2024
  • euroc2023
  • roccaraso2023
  • euroc-2021
15 results

udp_gs_tester.py

Blame
  • udp_gs_tester.py 3.93 KiB
    #!/usr/bin/env python
    
    # Copyright (c) 2021 Skyward Experimental Rocketry
    # Author: Davide Mor
    #
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the 'Software'), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    #
    # The above copyright notice and this permission notice shall be included in
    # all copies or substantial portions of the Software.
    #
    # THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    # THE SOFTWARE.
    
    # 
    # Very simple script to test skywardhub UDP mavlink port
    #
    
    import sys, os
    
    # Setup import paths for mavlink
    sys.path.append(os.path.dirname(__file__) + "/../libs/mavlink-skyward-lib")
    sys.path.append(os.path.dirname(__file__) + "/../libs/mavlink-skyward-lib/mavlink")
    
    from mavlink_lib import *
    
    import threading, socket, datetime, time, queue
    
    RECV_PORT = 42070
    SEND_PORT = 42069
    
    start_time = datetime.datetime.now()
    def get_timestamp():
        now_time = datetime.datetime.now()
        return int((now_time - start_time) / datetime.timedelta(microseconds=1))
    
    def fake_data(i):
        return MAVLink_rocket_flight_tm_message(
            get_timestamp(), # timestamp
            0, # pressure_ada
            0, # pressure_digi
            0, # pressure_static
            0, # pressure_dpl
            0, # airspeed_pitot
            0, # altitude_agl
            0, # ada_vert_speed
            0, # mea_mass
            0, # mea_apogee
            0, # acc_x
            0, # acc_y
            0, # acc_z
            0, # gyro_x
            0, # gyro_y
            0, # gyro_z
            0, # mag_x
            0, # mag_y
            0, # mag_z
            0, # vn100_qx
            0, # vn100_qy
            0, # vn100_qz
            0, # vn100_qw
            0, # gps_fix
            0, # gps_lat
            0, # gps_lon
            0, # gps_alt
            0, # fmm_state
            0, # abk_angle
            0, # nas_n
            0, # nas_e
            0, # nas_d
            0, # nas_vn
            0, # nas_ve
            0, # nas_vd
            0, # nas_qx
            0, # nas_qy
            0, # nas_qz
            0, # nas_qw
            0, # nas_bias_x
            0, # nas_bias_y
            0, # nas_bias_z
            0, # battery_voltage
            0, # cam_battery_voltage
            0, # temperature
        )
    
    def build_ack(msg):
        return MAVLink_ack_tm_message(msg.get_msgId(), msg.get_seq())
    
    def recv_loop(sock, mavlink, acks):
        while True:
            (data, address) = sock.recvfrom(1024)
    
            msg = mavlink.parse_char(data)
            if msg is not None:
                if msg.get_msgId() != MAVLINK_MSG_ID_ACK_TM:
                    acks.append(build_ack(msg))
    
                print(msg.get_srcSystem(), msg.get_srcComponent(), msg)
    
    def send_loop(sock, mavlink, acks):
        i = 0
        while True:
            while len(acks) > 0:
                ack = acks.pop(0)
                sock.sendto(ack.pack(mavlink), ("255.255.255.255", SEND_PORT))
    
            data = fake_data(i)
            sock.sendto(data.pack(mavlink), ("255.255.255.255", SEND_PORT))
    
            time.sleep(1)
            i += 1
    
    
    def main():
        acks = []
        mavlink = MAVLink(None, srcSystem=1, srcComponent=0)
    
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind(("", RECV_PORT))
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    
        t1 = threading.Thread(target=lambda: send_loop(sock, mavlink, acks))
        t2 = threading.Thread(target=lambda: recv_loop(sock, mavlink, acks))
    
        t1.start()
        t2.start()
    
    if __name__ == "__main__":
        main()