Select Git revision
udp_gs_tester.py
-
Davide Mor authoredDavide Mor authored
udp_gs_tester.py 3.93 KiB
#!/usr/bin/env python
# Copyright (c) 2021 Skyward Experimental Rocketry
# Author: Davide Mor
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Very simple script to test skywardhub UDP mavlink port
#
import sys, os
# Setup import paths for mavlink
sys.path.append(os.path.dirname(__file__) + "/../libs/mavlink-skyward-lib")
sys.path.append(os.path.dirname(__file__) + "/../libs/mavlink-skyward-lib/mavlink")
from mavlink_lib import *
import threading, socket, datetime, time, queue
RECV_PORT = 42070
SEND_PORT = 42069
start_time = datetime.datetime.now()
def get_timestamp():
now_time = datetime.datetime.now()
return int((now_time - start_time) / datetime.timedelta(microseconds=1))
def fake_data(i):
return MAVLink_rocket_flight_tm_message(
get_timestamp(), # timestamp
0, # pressure_ada
0, # pressure_digi
0, # pressure_static
0, # pressure_dpl
0, # airspeed_pitot
0, # altitude_agl
0, # ada_vert_speed
0, # mea_mass
0, # mea_apogee
0, # acc_x
0, # acc_y
0, # acc_z
0, # gyro_x
0, # gyro_y
0, # gyro_z
0, # mag_x
0, # mag_y
0, # mag_z
0, # vn100_qx
0, # vn100_qy
0, # vn100_qz
0, # vn100_qw
0, # gps_fix
0, # gps_lat
0, # gps_lon
0, # gps_alt
0, # fmm_state
0, # abk_angle
0, # nas_n
0, # nas_e
0, # nas_d
0, # nas_vn
0, # nas_ve
0, # nas_vd
0, # nas_qx
0, # nas_qy
0, # nas_qz
0, # nas_qw
0, # nas_bias_x
0, # nas_bias_y
0, # nas_bias_z
0, # battery_voltage
0, # cam_battery_voltage
0, # temperature
)
def build_ack(msg):
return MAVLink_ack_tm_message(msg.get_msgId(), msg.get_seq())
def recv_loop(sock, mavlink, acks):
while True:
(data, address) = sock.recvfrom(1024)
msg = mavlink.parse_char(data)
if msg is not None:
if msg.get_msgId() != MAVLINK_MSG_ID_ACK_TM:
acks.append(build_ack(msg))
print(msg.get_srcSystem(), msg.get_srcComponent(), msg)
def send_loop(sock, mavlink, acks):
i = 0
while True:
while len(acks) > 0:
ack = acks.pop(0)
sock.sendto(ack.pack(mavlink), ("255.255.255.255", SEND_PORT))
data = fake_data(i)
sock.sendto(data.pack(mavlink), ("255.255.255.255", SEND_PORT))
time.sleep(1)
i += 1
def main():
acks = []
mavlink = MAVLink(None, srcSystem=1, srcComponent=0)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("", RECV_PORT))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
t1 = threading.Thread(target=lambda: send_loop(sock, mavlink, acks))
t2 = threading.Thread(target=lambda: recv_loop(sock, mavlink, acks))
t1.start()
t2.start()
if __name__ == "__main__":
main()