diff --git a/.gitignore b/.gitignore index 33bb3551..240970cc 100644 --- a/.gitignore +++ b/.gitignore @@ -128,6 +128,9 @@ dmypy.json # Pyre type checker .pyre/ +.idea +pylintrc + my_notebooks .vscode examples/Sentinel_2_example_notebook/Etna_00.tif diff --git a/README.md b/README.md index 53bf786f..e4d38169 100644 --- a/README.md +++ b/README.md @@ -328,6 +328,10 @@ print(sat_actor.get_altitude(t0)) #### How to add a communication device +PASEOS supports two types of communication modelling. +1. A simplified model where a constant bitrate is set between two actors. See the first example below on how to add such a communication device. +2. A link budget based model where the bitrate is adjusted based on distance between actors and the transmitter, receiver and link characteristics. This other examples in this section will further explore this model. + The following code snippet shows how to add a communication device to a [SpacecraftActors] (#spacecraftactor). A communication device is needed to model the communication between [SpacecraftActors] (#spacecraftactor) or a [SpacecraftActor](#spacecraftactor) and [GroundstationActor](#ground-stationactor). Currently, given the maximum transmission data rate of a communication device, PASEOS calculates the maximum data that can be transmitted by multiplying the transmission data rate by the length of the communication window. The latter is calculated by taking the period for which two actors are in line-of-sight into account. ```py @@ -345,6 +349,95 @@ ActorBuilder.add_comm_device(actor=sat_actor, bandwidth_in_kbps=100000) ``` +Link budget communication modelling is possible between satellites or between a satellite and ground stations. The example below explores an optical connection between sat_actor and comms_sat and a radio connection between comms_sat and maspalomas_groundstation. + +The first step is to add an optical transmitter to a spacecraft: + +```py + import pykep as pk + from paseos import SpacecraftActor, ActorBuilder + from paseos.communication.device_type import DeviceType + sat_actor = ActorBuilder.get_actor_scaffold(name="mySat", + actor_type=SpacecraftActor, + epoch=pk.epoch(0)) + + ActorBuilder.set_orbit( + actor=sat_actor, + position=[10000000, 0, 0], + velocity=[0, 8000.0, 0], + epoch=pk.epoch(0), + central_body=pk.planet.jpl_lp("earth"), # use Earth from pykep + ) + optical_transmitter_name = "sat_optical_transmitter_1" + ActorBuilder.add_comm_device(actor=sat_actor, + device_name=optical_transmitter_name, input_power=1, + power_efficiency=1, antenna_efficiency=1, line_losses=1, + point_losses=3, fwhm=1E-3, device_type=DeviceType.OPTICAL_TRANSMITTER) +``` +The full width at half maximum (fwhm) is used to determine the transmitter gain. This can alternatively be determined by setting antenna_gain, or by setting antenna_diameter. + +The second step is to add an optical receiver and a radio transmitter to the comms satellite. + +```py +import pykep as pk +from paseos import SpacecraftActor, ActorBuilder +from paseos.communication.device_type import DeviceType +comms_sat:SpacecraftActor = ActorBuilder.get_actor_scaffold(name="comms_1",actor_type=SpacecraftActor, epoch=pk.epoch(0)) +ActorBuilder.set_orbit( + actor=comms_sat, + position=[10000000, 0, 0], + velocity=[0, 8000.0, 0], + epoch=pk.epoch(0), + central_body=pk.planet.jpl_lp("earth"), # use Earth from pykep + ) + +optical_receiver_name = "optical_receiver_1" +ActorBuilder.add_comm_device(actor=comms_sat,device_name=optical_receiver_name, line_losses=4.1, antenna_gain=114.2, device_type=DeviceType.OPTICAL_RECEIVER) + +radio_name = "sat_radio_transmitter_1" +ActorBuilder.add_comm_device(actor=comms_sat,device_name=radio_name, input_power=2, power_efficiency=0.5, + antenna_efficiency=0.5, line_losses=1, point_losses=5, antenna_diameter=0.3, device_type=DeviceType.RADIO_TRANSMITTER) +``` +Next, add a radio receiver the ground station. + +```py +from paseos import SpacecraftActor, ActorBuilder, GroundstationActor +maspalomas_groundstation = ActorBuilder.get_actor_scaffold( + name="maspalomas_groundstation", actor_type=GroundstationActor, epoch=pk.epoch(0) + ) +ActorBuilder.set_ground_station_location(maspalomas_groundstation,latitude=27.7629, longitude=-15.6338, elevation=205.1, minimum_altitude_angle=5) +receiver_name = "maspalomas_radio_receiver_1" +ActorBuilder.add_comm_device(actor=maspalomas_groundstation, device_name=receiver_name, noise_temperature=135, + line_losses=1, polarization_losses=3, antenna_gain=62.6, device_type=DeviceType.RADIO_RECEIVER) +``` + +The final step is to add links to the transmitting actors. + +```py +from paseos import SpacecraftActor, ActorBuilder +optical_link_name = "optical_link_1" +ActorBuilder.add_comm_link(sat_actor, optical_transmitter_name, comms_sat, optical_receiver_name, optical_link_name) + +frequency = 8475E6 #Hz +radio_link_name = "radio_link_1" +ActorBuilder.add_comm_link(comms_sat, radio_link_name, maspalomas_groundstation, receiver_name, radio_link_name, frequency=frequency) +``` + +The link and bitrate can then be evaluated during the simulation to determine the bitrate: + +```py +from paseos.communication.link_budget_calc import calc_dist_and_alt_angle + +while t <= simulation_time: + for instance in paseos_instances: + local_t = instance.local_actor.local_time + for link in instance.local_actor.communication_links: + distance, elevation_angle = calc_dist_and_alt_angle(instance.local_actor, + link.receiver_actor, local_t) + if instance.local_actor.is_in_line_of_sight(link.receiver_actor, epoch=local_t): + bitrate = link.get_bitrate(distance, elevation_angle) +``` + #### How to add a power device The following code snippet shows how to add a power device to a [SpacecraftActor](#spacecraftactor). diff --git a/examples/Communication_example/communication_example_utils.py b/examples/Communication_example/communication_example_utils.py new file mode 100644 index 00000000..0e2ba19c --- /dev/null +++ b/examples/Communication_example/communication_example_utils.py @@ -0,0 +1,123 @@ +import numpy as np +import pandas as pd + + +def get_known_actor_comms_status(values): + """Helper function to track comms status + Args: + values (list): a list with known actors histories. + Returns: + A list with status values. + """ + conv_values = [] + for val in values: + status = ["No signal", "Ground only", "CommSat only", "Ground + Sat"] + idx = ("comms_1" in val) * 2 + 1 * ("gs_1" in val) + conv_values.append(status[idx]) + return conv_values + + +def get_bitrate_status(link): + """Helper function to get bitrate status history + Args: + link (LinkModel): the link that contains the bitrate history. + Returns: + The bitrate history, a list with bitrates in bps. + """ + return link.bitrate_history + + +def get_line_of_sight_status(link): + """Helper function to get line of sight status history + Args: + link (LinkModel): the link that contains the line of sight history. + Returns: + The line of sight history, a list with booleans. + """ + return link.line_of_sight_history + + +def get_distance_status(link): + """Helper function to distance status history + Args: + link (LinkModel): the link that contains the distance history. + Returns: + The distance history, a list with distances in metres. + """ + return link.distance_history + + +def get_elevation_angle_status(link): + """Helper function to get elevation angle status history + Args: + link (LinkModel): the link that contains the elevation angle history. + Returns: + The elevation angle history, a list with angles in degrees. + """ + return link.elevation_angle_history + + +def get_closest_entry(df, t, id): + """Helper function to the closest entry in the dataframe of a particular time. + Args: + df (DataFrame): the dataframe. + t (df.Time): the timestamp that is being searched for. + id (int): the id of the satellite in the dataframe. + Returns: + The elevation angle history, a list with angles in degrees. + """ + df_id = df[df.ID == id] + return df_id.iloc[(df_id["Time"] - t).abs().argsort().iloc[:1]] + + +def get_analysis_df(df, timestep=60, orbital_period=1): + """Helper function to get elevation angle status history + Args: + df (Dataframe): the dataframe to analyze + timestep (int): the timestep to construct the analysis + orbital_period (int): the orbital period + Returns: + The dataframe constructed with the analysis outputs. + """ + t = np.round(np.linspace(0, df.Time.max(), int(df.Time.max() // timestep))) + sats = df.ID.unique() + df["known_actors"] = pd.Categorical(df.known_actors) + df["comm_cat"] = df.known_actors.cat.codes + standby = [] + processing = [] + is_in_eclipse = [] + comm_stat = [[], [], [], []] + + for idx, t_cur in enumerate(t): + n_c = 0 + n_ec = 0 + for c in comm_stat: + c.append(0) + for sat in sats: + vals = get_closest_entry(df, t_cur, sat) + n_c += vals.current_activity.values[0] == "Standby" + n_ec += vals.is_in_eclipse.values[0] + c_idx = vals.comm_cat.values[0] + comm_stat[c_idx][idx] += 1 + standby.append(n_c) + processing.append(len(sats) - n_c) + is_in_eclipse.append(n_ec) + + ana_df = pd.DataFrame( + { + "Time[s]": t, + "# of Standby": standby, + "# of Processing": processing, + "# in Eclipse": is_in_eclipse, + "# of " + df.known_actors.cat.categories[0]: comm_stat[0], + "# of " + df.known_actors.cat.categories[1]: comm_stat[1], + "# of " + df.known_actors.cat.categories[2]: comm_stat[2], + # "# of " + df.known_actors.cat.categories[3]: comm_stat[3], + } + ) + ana_df["Completed orbits"] = ana_df["Time[s]"] / orbital_period + ana_df = ana_df.round({"Completed orbits": 2}) + ana_df["Share Processing"] = ana_df["# of Processing"] / len(sats) + ana_df["Share in Eclipse"] = ana_df["# in Eclipse"] / len(sats) + + return ana_df diff --git a/examples/Communication_example/communication_simple_ground.ipynb b/examples/Communication_example/communication_simple_ground.ipynb new file mode 100644 index 00000000..7486f7fb --- /dev/null +++ b/examples/Communication_example/communication_simple_ground.ipynb @@ -0,0 +1,458 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Event-based Communication Example\n", + "\n", + "This notebook illustrates how to simulate satellite to ground communication in PASEOS with a link budget model. \n", + "\n", + "First, let's import all necessary modules\n", + "\n", + "In addition to PASEOS's requirements this notebook uses seaborn and pandas. `conda install seaborn` or `pip install seaborn` will get it. (analogously for pandas)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns\n", + "import pandas as pd\n", + "import sys\n", + "import asyncio\n", + " \n", + "sys.path.append(\"..\")\n", + "sys.path.append(\"../..\")\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")\n", + "\n", + "import pykep as pk\n", + "import numpy as np\n", + "import paseos\n", + "from paseos.actors.spacecraft_actor import SpacecraftActor\n", + "from paseos.actors.ground_station_actor import GroundstationActor\n", + "from paseos.actors.actor_builder import ActorBuilder\n", + "from paseos.communication.link_budget_calc import calc_dist_and_alt_angle\n", + "from paseos.communication.device_type import DeviceType\n", + "# paseos.set_log_level(\"WARNING\") # use info / debug if you want more insight\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Constellation setup\n", + "\n", + "In addition to the code in this notebook, we use a simple function that automatically computes the orbits of a [Walker Constellation](https://en.wikipedia.org/wiki/Satellite_constellation#Walker_Constellation) for us. It allows us to only specify a few parameters to get started.\n", + "\n", + "For illustrative purposes our constellation will consist of 1 satellite in LEO and one ground station at Maspalamos." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from utils.get_constellation import get_constellation\n", + "altitude = 550 * 1000 # constellation altitude above the Earth's ground [m]\n", + "inclination = 10.0 # inclination of each orbital plane\n", + "nPlanes = 1 # the number of orbital planes (see linked wiki article)\n", + "nSats = 1 # the number of satellites per orbital plane\n", + "t0 = pk.epoch_from_string(\"2023-Jan-05 02:00:00\") # the starting date of our simulation\n", + "\n", + "maspalomas_groundstation = ActorBuilder.get_actor_scaffold(\n", + " name=\"maspalomas_groundstation\", actor_type=GroundstationActor, epoch=t0\n", + " )\n", + "ActorBuilder.set_ground_station_location(maspalomas_groundstation,latitude=27.7629, longitude=-15.6338, elevation=205.1, minimum_altitude_angle=5)\n", + "\n", + "# Compute orbits of LEO satellites\n", + "planet_list,sats_pos_and_v,orbital_period = get_constellation(altitude,inclination,nSats,nPlanes,t0)\n", + "\n", + "# This line to ensure the satellite is in line of sight with the ground station during simulation.\n", + "sats_pos_and_v[0] = [[ 373533.80545186, -6910598.7742078 , 65864.08810787], [7462.47263817, 415.90492902, 1315.83526891]]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The ground station needs a receiver to receive the radio signal." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "receiver_name = \"maspalomas_radio_receiver_1\"\n", + "ActorBuilder.add_comm_device(actor=maspalomas_groundstation, device_name=receiver_name, noise_temperature=135, \n", + " line_losses=1, polarization_losses=3, antenna_gain=62.6, device_type=DeviceType.RADIO_RECEIVER)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's briefly plot our satellite orbits for illustrative purposes. We use [pykep's plotting features](https://esa.github.io/pykep/documentation/orbitplots.html) for that.\n", + "\n", + "Note that angles aren't to scale in pykep plots." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig = plt.figure(figsize=(6,6),dpi=100)\n", + "ax = plt.axes(projection='3d')\n", + "colors = sns.color_palette(\"Paired\")\n", + "for i in range (nPlanes*nSats):\n", + " color_idx = i // nPlanes\n", + " pk.orbit_plots.plot_planet(planet_list[i],axes=ax,s=64,color=colors[color_idx])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### PASEOS Setup\n", + "\n", + "Now we will create a PASEOS instance for the LEO satellite. Further, we add a communication device, power device and a thermal model.\n", + "\n", + "Pleae have a look at the [readme sections on physical models](https://github.com/aidotse/PASEOS#physical-models) for additional details." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "paseos_instances = [] # this will store paseos instances\n", + "earth = pk.planet.jpl_lp(\"earth\") # define our central body\n", + "\n", + "for idx,sat_pos_v in enumerate(sats_pos_and_v):\n", + " pos,v = sat_pos_v\n", + " sat_actor:SpacecraftActor = ActorBuilder.get_actor_scaffold(name=\"Sat_\"+str(idx),\n", + " actor_type=SpacecraftActor,\n", + " epoch=t0)\n", + " ActorBuilder.set_orbit(actor=sat_actor,position=pos,velocity=v,epoch=t0,central_body=earth)\n", + " radio_name = \"sat_radio_transmitter_1\"\n", + " ActorBuilder.add_comm_device(actor=sat_actor,device_name=radio_name, input_power=2, power_efficiency=0.5,\n", + " antenna_efficiency=0.5, line_losses=1, point_losses=5, antenna_diameter=0.3, device_type=DeviceType.RADIO_TRANSMITTER)\n", + "\n", + " ActorBuilder.set_power_devices(actor=sat_actor,battery_level_in_Ws=10000+np.random.rand()*90000,\n", + " max_battery_level_in_Ws=100000,charging_rate_in_W=50)\n", + " ActorBuilder.set_thermal_model(\n", + " actor=sat_actor,\n", + " actor_mass=50.0,\n", + " actor_initial_temperature_in_K=273.15,\n", + " actor_sun_absorptance=1.0,\n", + " actor_infrared_absorptance=1.0,\n", + " actor_sun_facing_area=2.0,\n", + " actor_central_body_facing_area=2.0,\n", + " actor_emissive_area=4.0,\n", + " actor_thermal_capacity=1000\n", + " )\n", + "\n", + " frequency = 8475E6 #Hz\n", + " radio_link_name = \"radio_link_1\"\n", + " ActorBuilder.add_comm_link(sat_actor, radio_name, maspalomas_groundstation, receiver_name, radio_link_name, frequency=frequency)\n", + " \n", + " instance = paseos.init_sim(local_actor=sat_actor)\n", + " paseos_instances.append(instance)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can use PASEOS internal plotting to have a look at our setup now." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "paseos_instances[0].empty_known_actors()\n", + "for instance in paseos_instances[1:]:\n", + " paseos_instances[0].add_known_actor(instance.local_actor)\n", + "plotter = paseos.plot(paseos_instances[0], paseos.PlotType.SpacePlot)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we define some operational constraints for our constellation. Let's say our LEO satellites have two tasks:\n", + "\n", + "* Processing - Consumes 100W and can only be performed at < 56.85° C and if our battery is at least 20% charged.\n", + "* Standby - What we do if the above constraint is violated (costs 2W)\n", + "\n", + "To this end, we define a function that returns `True` if the constraint is met." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_consumption_and_activity(actor):\n", + " \"\"\"Determine power consumption and activity for an actor.\"\"\"\n", + " if operational_constraint(actor):\n", + " value = 100\n", + " action = \"Processing\"\n", + " transmitters = actor.get_all_transmitters()\n", + " for transmitterName in transmitters:\n", + " transmitter = actor.get_transmitter(transmitterName)\n", + " if transmitter.active:\n", + " value += transmitter.input_power\n", + " action += \"Communicating\"\n", + "\n", + " # Power consumption for processing is 100W\n", + " return value,action\n", + " else: \n", + " # Power consumption in standby is 2W\n", + " return 2.0, \"Standby\"\n", + "\n", + "def operational_constraint(actor):\n", + " \"\"\"Determine if constraint is met for an actor\"\"\"\n", + " if (actor.state_of_charge > 0.2 \n", + " and actor.temperature_in_K < 330):\n", + " return True\n", + " else:\n", + " return False" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Running the simulation\n", + "\n", + "Now, we are ready to run the main simulation. We will simulate the operations of this constellation for 8 hours. Every 600 seconds (or once the constraint is no longer valid) we will redecide whether a satellite is going to \"charge\" or \"process\".\n", + "\n", + "Whenever a satellite starts violating the operational constraint, it will stop the activity and continue to standby. This is marked by an `INFO` output from PASEOS.\n", + "\n", + "If the activity is interrupted due to overheating or lack of battery charge, we continue charging in standby for the remaining time of the 600 second window." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "simulation_time = 24.0 * 3600 # eight hours in seconds\n", + "t = 0 # starting time in seconds\n", + "timestep = 1200 # how often we decide what each sat should do [s]\n", + "\n", + "# Run until end of simulation\n", + "while t <= simulation_time: \n", + " N_standby = 0 # track in standby\n", + " N_processing = 0 # track processors\n", + " N_interruped = 0 # track which switched due to constraint\n", + " # For each satellite, we perform the following steps\n", + " for instance in paseos_instances:\n", + " local_t = instance.local_actor.local_time\n", + " # Update known actors, i.e. for each sat if they can\n", + " # see the ground station and the comms satellite\n", + " for link in instance.local_actor.communication_links:\n", + " distance, elevation_angle = calc_dist_and_alt_angle(instance.local_actor, link.receiver_actor, local_t)\n", + " if instance.local_actor.is_in_line_of_sight(link.receiver_actor, epoch=local_t):\n", + " link.set_line_of_sight(True)\n", + " bitrate = link.get_bitrate(distance, elevation_angle)\n", + " link.set_bitrate(bitrate)\n", + " else:\n", + " link.set_bitrate(0)\n", + " link.set_line_of_sight(False)\n", + " link.set_distance(distance)\n", + " link.set_elevation_angle(elevation_angle)\n", + " \n", + " instance.empty_known_actors()\n", + "\n", + " # Determine whether satellite is processing or in standby\n", + " power_consumption, activity = get_consumption_and_activity(instance.local_actor)\n", + " if \"Processing\" in activity:\n", + " N_processing += 1\n", + " eval_constraint = lambda: operational_constraint(instance.local_actor)\n", + " else:\n", + " N_standby += 1\n", + " eval_constraint = None\n", + " \n", + " # This call is only necessary to display running activity \n", + " # in PASEOS internal monitoring (which we use below)\n", + " instance.local_actor._current_activity = activity\n", + "\n", + " # Advance the simulation state of this satellite\n", + " # Note how we pass the \"eval_constraint\" to tell paseos to evaluate if the constraint\n", + " # for running the \"Processing\" is still satisfied\n", + " time_remaining_to_advance = instance.advance_time(\n", + " time_to_advance=timestep,\n", + " current_power_consumption_in_W=power_consumption,\n", + " constraint_function=eval_constraint\n", + " )\n", + "\n", + " # If activity was interrupted by constraint violation, proceed in standby for remainder\n", + " if time_remaining_to_advance > 0:\n", + " N_interruped += 1\n", + " instance.advance_time(\n", + " time_to_advance=time_remaining_to_advance,\n", + " current_power_consumption_in_W=2.0,\n", + " constraint_function=None\n", + " )\n", + " \n", + " print(f\"Time: {t}s - # of Processing = {N_processing} ({N_interruped} interrupted), # of Standby = {N_standby}\")\n", + " t += timestep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Plotting and Analysis\n", + "\n", + "Alright! Now, let's have a look at what actually happened during our simulation.\n", + "\n", + "For this, we will make use of [PASEOS' monitor](https://github.com/aidotse/PASEOS#monitor).\n", + "\n", + "The below code uses it to extract a few quantities for all our sats and plots them over the course of the simulation for each satellite (each color is one).\n", + "\n", + "This will give us a good impression of whether each satellite's operations are sensible.\n", + "\n", + "We will also accumulate all the data into a [pandas dataframe](https://pandas.pydata.org/docs/user_guide/10min.html) for convenience." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from communication_example_utils import get_known_actor_comms_status, get_bitrate_status, get_line_of_sight_status, get_distance_status, get_elevation_angle_status\n", + "\n", + "# Quantities we want to track\n", + "quantities = [\"temperature\",\"state_of_charge\",\"current_activity\",\"known_actors\",\"is_in_eclipse\", \"bitrate\", \"line_of_sight\", \"distance\", \"elevation_angle\"]\n", + "\n", + "# Setup data frame to collect all data\n", + "df = pd.DataFrame(columns=(\"Time\",\"ID\"))\n", + "for idx,item in enumerate(quantities):\n", + " names = []\n", + "\n", + " # Dataframe for this quantity\n", + " small_df = pd.DataFrame(columns=(\"Time\",\"ID\"))\n", + " \n", + " plt.figure(figsize=(8, 2),dpi=150)\n", + "\n", + " # Get data from all satellites and plot it\n", + " for instance in paseos_instances:\n", + "\n", + " # Get time of each data point\n", + " timesteps = instance.monitor[\"timesteps\"]\n", + "\n", + " # Get data\n", + " communication_histories = []\n", + "\n", + " if item == \"known_actors\":\n", + " values = get_known_actor_comms_status(instance.monitor[item])\n", + " elif item == \"bitrate\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_bitrate_status(link))\n", + " elif item == \"line_of_sight\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_line_of_sight_status(link))\n", + " elif item == \"distance\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_distance_status(link))\n", + " elif item == \"elevation_angle\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_elevation_angle_status(link))\n", + " else:\n", + " values = instance.monitor[item]\n", + " names.append(instance.local_actor.name)\n", + "\n", + " if item == \"bitrate\" or item == \"line_of_sight\" or item == \"distance\" or item == \"elevation_angle\":\n", + " # Collect data from this sat into big dataframe\n", + " for history in communication_histories:\n", + " plt.figure(figsize=(8, 2),dpi=150)\n", + " smaller_df = pd.DataFrame({\"Time\": timesteps,\"ID\": len(timesteps)*[instance.local_actor.name],item: history})\n", + " small_df = pd.concat([small_df,smaller_df])\n", + " if item == \"line_of_sight\":\n", + " smaller_df[item] = smaller_df[item].astype(\"boolean\")\n", + " plt.plot(timesteps,history)\n", + " plt.xlabel(\"Time [s]\")\n", + " plt.ylabel(item)\n", + " else:\n", + " smaller_df = pd.DataFrame({\"Time\": timesteps,\"ID\": len(timesteps)*[instance.local_actor.name],item: values})\n", + " if item == \"is_in_eclipse\": #pandas things...\n", + " smaller_df[\"is_in_eclipse\"] = smaller_df[\"is_in_eclipse\"].astype(\"boolean\")\n", + " small_df = pd.concat([small_df,smaller_df])\n", + " \n", + " # Plot it :)\n", + " plt.plot(timesteps,values)\n", + " plt.xlabel(\"Time [s]\")\n", + " plt.ylabel(item.replace(\"_\", \" \"))\n", + "\n", + " # Add a legend showing which satellite is which\n", + " # plt.legend(\n", + " # names,\n", + " # fontsize = 8,\n", + " # bbox_to_anchor=(0.5, 1.4),\n", + " # ncol=10,\n", + " # loc=\"upper center\",\n", + " # )\n", + " \n", + " df = df.merge(small_df,how=\"right\")" + ] + }, + { + "cell_type": "code", + "outputs": [], + "source": [], + "metadata": { + "collapsed": false + } + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + }, + "vscode": { + "interpreter": { + "hash": "457d73575bcd07fce3b582d06bfae9446395c25f1ea618852d5f58e28a465e48" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/Communication_example/communication_simple_satellite.ipynb b/examples/Communication_example/communication_simple_satellite.ipynb new file mode 100644 index 00000000..175cc8c6 --- /dev/null +++ b/examples/Communication_example/communication_simple_satellite.ipynb @@ -0,0 +1,456 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Event-based Constellation Example\n", + "\n", + "This notebook illustrates how to run many PASEOS instances in parellel to model a constellation in low-Earth orbit (LEO). \n", + "\n", + "First, let's import all necessary modules\n", + "\n", + "In addition to PASEOS's requirements this notebook uses seaborn and pandas. `conda install seaborn` or `pip install seaborn` will get it. (analogously for pandas)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns\n", + "import pandas as pd\n", + "import sys \n", + "sys.path.append(\"..\")\n", + "sys.path.append(\"../..\")\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")\n", + "\n", + "import pykep as pk\n", + "import numpy as np\n", + "import paseos\n", + "from paseos.actors.spacecraft_actor import SpacecraftActor\n", + "from paseos.actors.actor_builder import ActorBuilder\n", + "from paseos.communication.link_budget_calc import calc_dist_and_alt_angle\n", + "from paseos.communication.device_type import DeviceType\n", + "\n", + "# paseos.set_log_level(\"WARNING\") # use info / debug if you want more insight\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Constellation setup\n", + "\n", + "In addition to the code in this notebook, we use a simple function that automatically computes the orbits of a [Walker Constellation](https://en.wikipedia.org/wiki/Satellite_constellation#Walker_Constellation) for us. It allows us to only specify a few parameters to get started.\n", + "\n", + "For illustrative purposes our constellation will consist of 16 satellites in LEO and one communication satellite in GEO. Feel free to change the numbers to try out different setups." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from utils.get_constellation import get_constellation\n", + "altitude = 550 * 1000 # constellation altitude above the Earth's ground [m]\n", + "inclination = 10.0 # inclination of each orbital plane\n", + "nPlanes = 1 # the number of orbital planes (see linked wiki article)\n", + "nSats = 1 # the number of satellites per orbital plane\n", + "t0 = pk.epoch_from_string(\"2023-Jan-04 20:00:00\") # the starting date of our simulation\n", + "\n", + "# Compute orbits of LEO satellites\n", + "planet_list,sats_pos_and_v,orbital_period = get_constellation(altitude,inclination,nSats,nPlanes,t0)\n", + "\n", + "# Compute orbit of GEO communications satellite\n", + "altitude_geo = 35786*1000\n", + "inclination_geo,n_Planes_geo,nSats_geo = 0,1,1\n", + "comms_sat,comm_sat_pos_and_v,_ = get_constellation(altitude_geo,inclination_geo,n_Planes_geo,nSats_geo,t0)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's briefly plot our satellite orbits for illustrative purposes. We use [pykep's plotting features](https://esa.github.io/pykep/documentation/orbitplots.html) for that.\n", + "\n", + "Note that angles aren't to scale in pykep plots." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig = plt.figure(figsize=(6,6),dpi=100)\n", + "ax = plt.axes(projection='3d')\n", + "colors = sns.color_palette(\"Paired\")\n", + "for i in range (nPlanes*nSats):\n", + " color_idx = i // nPlanes\n", + " pk.orbit_plots.plot_planet(planet_list[i],axes=ax,s=64,color=colors[color_idx])\n", + "# pk.orbit_plots.plot_planet(comms_sat[0],axes=ax,s=64)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### PASEOS Setup\n", + "\n", + "Now we will create a PASEOS instance for each LEO satellite. Further, we add a communication device, power device and a thermal model.\n", + "\n", + "Pleae have a look at the [readme sections on physical models](https://github.com/aidotse/PASEOS#physical-models) for additional details." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "paseos_instances = [] # this will store paseos instances\n", + "earth = pk.planet.jpl_lp(\"earth\") # define our central body\n", + "\n", + "comms_instances = []\n", + "\n", + "pos,v = comm_sat_pos_and_v[0]\n", + "comms_sat:SpacecraftActor = ActorBuilder.get_actor_scaffold(name=\"comms_1\",actor_type=SpacecraftActor, epoch=t0)\n", + "ActorBuilder.set_orbit(actor=comms_sat,position=pos,velocity=v,epoch=t0,central_body=earth)\n", + "\n", + "# Optical receiver input based on \n", + "# https://www.eoportal.org/satellite-missions/edrs#lct-laser-communication-terminal\n", + "optical_receiver_name = \"optical_receiver_1\"\n", + "ActorBuilder.add_comm_device(actor=comms_sat,device_name=optical_receiver_name, line_losses=4.1, antenna_diameter=135E-3, device_type=DeviceType.OPTICAL_RECEIVER) \n", + "\n", + "instance = paseos.init_sim(local_actor=comms_sat)\n", + "comms_instances.append(instance)\n", + "\n", + "for idx,sat_pos_v in enumerate(sats_pos_and_v):\n", + " pos,v = sat_pos_v\n", + " sat_actor:SpacecraftActor = ActorBuilder.get_actor_scaffold(name=\"Sat_\"+str(idx),\n", + " actor_type=SpacecraftActor,\n", + " epoch=t0)\n", + " ActorBuilder.set_orbit(actor=sat_actor,position=pos,velocity=v,epoch=t0,central_body=earth)\n", + " optical_transmitter_name = \"sat_optical_transmitter_1\"\n", + "\n", + " # Optical transmitter input based on Giggenbach, D., Knopp, M. T., & Fuchs, C. (2023).\n", + " # Link budget calculation in optical LEO satellite downlinks with on/off‐keying and\n", + " # large signal divergence: A simplified methodology. International Journal of Satellite\n", + " # Communications and Networking.\n", + " ActorBuilder.add_comm_device(actor=sat_actor,device_name=optical_transmitter_name, input_power=1, power_efficiency=1, antenna_efficiency=1, line_losses=1, point_losses=3, fwhm=1E-3, device_type=DeviceType.OPTICAL_TRANSMITTER)\n", + " \n", + " ActorBuilder.set_power_devices(actor=sat_actor,battery_level_in_Ws=10000+np.random.rand()*90000,\n", + " max_battery_level_in_Ws=100000,charging_rate_in_W=50)\n", + " ActorBuilder.set_thermal_model(\n", + " actor=sat_actor,\n", + " actor_mass=50.0,\n", + " actor_initial_temperature_in_K=273.15,\n", + " actor_sun_absorptance=1.0,\n", + " actor_infrared_absorptance=1.0,\n", + " actor_sun_facing_area=2.0,\n", + " actor_central_body_facing_area=2.0,\n", + " actor_emissive_area=4.0,\n", + " actor_thermal_capacity=1000\n", + " )\n", + " optical_link_name = \"optical_link_1\"\n", + " ActorBuilder.add_comm_link(sat_actor, optical_transmitter_name, comms_sat, optical_receiver_name, optical_link_name)\n", + " \n", + " instance = paseos.init_sim(local_actor=sat_actor)\n", + " paseos_instances.append(instance)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can use PASEOS internal plotting to have a look at our setup now. As of writing the plotting isn't ideal for larger constellations ¯\\_(ツ)_/¯" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "paseos_instances[0].empty_known_actors()\n", + "for instance in paseos_instances[1:]:\n", + " paseos_instances[0].add_known_actor(instance.local_actor)\n", + "plotter = paseos.plot(paseos_instances[0], paseos.PlotType.SpacePlot)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we define some operational constraints for our constellation. Let's say our LEO satellites have two tasks:\n", + "\n", + "* Processing - Consumes 100W and can only be performed at < 56.85° C and if our battery is at least 20% charged.\n", + "* Standby - What we do if the above constraint is violated (costs 2W)\n", + "\n", + "To this end, we define a function that returns `True` if the constraint is met." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_consumption_and_activity(actor):\n", + " \"\"\"Determine power consumption and activity for an actor.\"\"\"\n", + " if operational_constraint(actor):\n", + " value = 100\n", + " action = \"Processing\"\n", + " transmitters = actor.get_all_transmitters()\n", + " for transmitterName in transmitters:\n", + " transmitter = actor.get_transmitter(transmitterName)\n", + " if transmitter.active:\n", + " value += transmitter.input_power\n", + " action += \"Communicating\"\n", + "\n", + " # Power consumption for processing is 100W\n", + " return value,action\n", + " else: \n", + " # Power consumption in standby is 2W\n", + " return 2.0, \"Standby\"\n", + "\n", + "def operational_constraint(actor):\n", + " \"\"\"Determine if constraint is met for an actor\"\"\"\n", + " if (actor.state_of_charge > 0.2 \n", + " and actor.temperature_in_K < 330):\n", + " return True\n", + " else:\n", + " return False" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Running the simulation\n", + "\n", + "Now, we are ready to run the main simulation. We will simulate the operations of this constellation for 8 hours. Every 600 seconds (or once the constraint is no longer valid) we will redecide whether a satellite is going to \"charge\" or \"process\".\n", + "\n", + "Whenever a satellite starts violating the operational constraint, it will stop the activity and continue to standby. This is marked by an `INFO` output from PASEOS.\n", + "\n", + "If the activity is interrupted due to overheating or lack of battery charge, we continue charging in standby for the remaining time of the 600 second window." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "simulation_time = 6.0 * 3600 # eight hours in seconds\n", + "t = 0 # starting time in seconds\n", + "timestep = 1200 # how often we decide what each sat should do [s]\n", + "\n", + "# Run until end of simulation\n", + "while t <= simulation_time: \n", + " N_standby = 0 # track in standby\n", + " N_processing = 0 # track processors\n", + " N_interruped = 0 # track which switched due to constraint\n", + " # For each satellite, we perform the following steps\n", + " for instance in paseos_instances:\n", + " local_t = instance.local_actor.local_time\n", + " # Update known actors, i.e. for each sat if they can\n", + " # see the ground station and the comms satellite\n", + " for link in instance.local_actor.communication_links:\n", + " distance, elevation_angle = calc_dist_and_alt_angle(instance.local_actor, link.receiver_actor, local_t)\n", + " if instance.local_actor.is_in_line_of_sight(link.receiver_actor, epoch=local_t):\n", + " link.set_line_of_sight(True)\n", + " bitrate = link.get_bitrate(distance, elevation_angle)\n", + " link.set_bitrate(bitrate)\n", + " else:\n", + " link.set_bitrate(0)\n", + " link.set_line_of_sight(False)\n", + " link.set_distance(distance)\n", + " link.set_elevation_angle(elevation_angle)\n", + " \n", + " instance.empty_known_actors()\n", + "\n", + " # Determine whether satellite is processing or in standby\n", + " power_consumption, activity = get_consumption_and_activity(instance.local_actor)\n", + " if \"Processing\" in activity:\n", + " N_processing += 1\n", + " eval_constraint = lambda: operational_constraint(instance.local_actor)\n", + " else:\n", + " N_standby += 1\n", + " eval_constraint = None\n", + " \n", + " # This call is only necessary to display running activity \n", + " # in PASEOS internal monitoring (which we use below)\n", + " instance.local_actor._current_activity = activity\n", + "\n", + " # Advance the simulation state of this satellite\n", + " # Note how we pass the \"eval_constraint\" to tell paseos to evaluate if the constraint\n", + " # for running the \"Processing\" is still satisfied\n", + " time_remaining_to_advance = instance.advance_time(\n", + " time_to_advance=timestep,\n", + " current_power_consumption_in_W=power_consumption,\n", + " constraint_function=eval_constraint\n", + " )\n", + "\n", + " # If activity was interrupted by constraint violation, proceed in standby for remainder\n", + " if time_remaining_to_advance > 0:\n", + " N_interruped += 1\n", + " instance.advance_time(\n", + " time_to_advance=time_remaining_to_advance,\n", + " current_power_consumption_in_W=2.0,\n", + " constraint_function=None\n", + " )\n", + " # Advance the comm satellite and ground station\n", + " for instance in comms_instances:\n", + " instance.advance_time(time_to_advance=timestep,current_power_consumption_in_W=0.0)\n", + " \n", + " print(f\"Time: {t}s - # of Processing = {N_processing} ({N_interruped} interrupted), # of Standby = {N_standby}\")\n", + " t += timestep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Plotting and Analysis\n", + "\n", + "Alright! Now, let's have a look at what actually happened during our simulation.\n", + "\n", + "For this, we will make use of [PASEOS' monitor](https://github.com/aidotse/PASEOS#monitor).\n", + "\n", + "The below code uses it to extract a few quantities for all our sats and plots them over the course of the simulation for each satellite (each color is one).\n", + "\n", + "This will give us a good impression of whether each satellite's operations are sensible.\n", + "\n", + "We will also accumulate all the data into a [pandas dataframe](https://pandas.pydata.org/docs/user_guide/10min.html) for convenience." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from communication_example_utils import get_known_actor_comms_status, get_bitrate_status, get_line_of_sight_status, get_distance_status, get_elevation_angle_status\n", + "\n", + "# Quantities we want to track\n", + "quantities = [\"temperature\",\"state_of_charge\",\"current_activity\",\"known_actors\",\"is_in_eclipse\", \"bitrate\", \"line_of_sight\", \"distance\", \"elevation_angle\"]\n", + "\n", + "# Setup data frame to collect all data\n", + "df = pd.DataFrame(columns=(\"Time\",\"ID\"))\n", + "for idx,item in enumerate(quantities):\n", + " names = []\n", + "\n", + " # Dataframe for this quantity\n", + " small_df = pd.DataFrame(columns=(\"Time\",\"ID\"))\n", + " \n", + " plt.figure(figsize=(8, 2),dpi=150)\n", + "\n", + " # Get data from all satellites and plot it\n", + " for instance in paseos_instances:\n", + "\n", + " # Get time of each data point\n", + " timesteps = instance.monitor[\"timesteps\"]\n", + "\n", + " # Get data\n", + " communication_histories = []\n", + "\n", + " if item == \"known_actors\":\n", + " values = get_known_actor_comms_status(instance.monitor[item])\n", + " elif item == \"bitrate\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_bitrate_status(link))\n", + " elif item == \"line_of_sight\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_line_of_sight_status(link))\n", + " elif item == \"distance\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_distance_status(link))\n", + " elif item == \"elevation_angle\":\n", + " for link in instance.local_actor.communication_links:\n", + " communication_histories.append(get_elevation_angle_status(link))\n", + " else:\n", + " values = instance.monitor[item]\n", + " names.append(instance.local_actor.name)\n", + "\n", + " if item == \"bitrate\" or item == \"line_of_sight\" or item == \"distance\" or item == \"elevation_angle\":\n", + " # Collect data from this sat into big dataframe\n", + " for history in communication_histories:\n", + " plt.figure(figsize=(8, 2),dpi=150)\n", + " smaller_df = pd.DataFrame({\"Time\": timesteps,\"ID\": len(timesteps)*[instance.local_actor.name],item: history})\n", + " small_df = pd.concat([small_df,smaller_df])\n", + " if item == \"line_of_sight\":\n", + " smaller_df[item] = smaller_df[item].astype(\"boolean\")\n", + " plt.plot(timesteps,history)\n", + " plt.xlabel(\"Time [s]\")\n", + " plt.ylabel(item)\n", + " else:\n", + " smaller_df = pd.DataFrame({\"Time\": timesteps,\"ID\": len(timesteps)*[instance.local_actor.name],item: values})\n", + " if item == \"is_in_eclipse\": #pandas things...\n", + " smaller_df[\"is_in_eclipse\"] = smaller_df[\"is_in_eclipse\"].astype(\"boolean\")\n", + " small_df = pd.concat([small_df,smaller_df])\n", + " \n", + " # Plot it :)\n", + " plt.plot(timesteps,values)\n", + " plt.xlabel(\"Time [s]\")\n", + " plt.ylabel(item.replace(\"_\", \" \"))\n", + "\n", + " # Add a legend showing which satellite is which\n", + " # plt.legend(\n", + " # names,\n", + " # fontsize = 8,\n", + " # bbox_to_anchor=(0.5, 1.4),\n", + " # ncol=10,\n", + " # loc=\"upper center\",\n", + " # )\n", + " \n", + " df = df.merge(small_df,how=\"right\")" + ] + }, + { + "cell_type": "code", + "outputs": [], + "source": [], + "metadata": { + "collapsed": false + }, + "execution_count": null + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + }, + "vscode": { + "interpreter": { + "hash": "457d73575bcd07fce3b582d06bfae9446395c25f1ea618852d5f58e28a465e48" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/Constellation_example/constellation.ipynb b/examples/Constellation_example/constellation.ipynb index 9f4ab89c..c621cafd 100644 --- a/examples/Constellation_example/constellation.ipynb +++ b/examples/Constellation_example/constellation.ipynb @@ -849,7 +849,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.8 | packaged by conda-forge | (main, Nov 22 2022, 08:16:33) [MSC v.1929 64 bit (AMD64)]" + "version": "3.11.6" }, "vscode": { "interpreter": { diff --git a/paseos/__init__.py b/paseos/__init__.py index e9025da1..91917679 100644 --- a/paseos/__init__.py +++ b/paseos/__init__.py @@ -1,23 +1,22 @@ -from loguru import logger -from dotmap import DotMap import pykep as pk +from dotmap import DotMap +from loguru import logger -from .utils.load_default_cfg import load_default_cfg -from .utils.check_cfg import check_cfg -from .paseos import PASEOS -from .actors.base_actor import BaseActor from .actors.actor_builder import ActorBuilder +from .actors.base_actor import BaseActor from .actors.ground_station_actor import GroundstationActor from .actors.spacecraft_actor import SpacecraftActor from .central_body.central_body import CentralBody -from .communication.get_communication_window import get_communication_window from .communication.find_next_window import find_next_window +from .communication.get_communication_window import get_communication_window +from .paseos import PASEOS from .power.power_device_type import PowerDeviceType +from .utils.check_cfg import check_cfg +from .utils.load_default_cfg import load_default_cfg from .utils.reference_frame import ReferenceFrame from .utils.set_log_level import set_log_level from .visualization.plot import plot, PlotType - set_log_level("WARNING") logger.debug("Loaded module.") @@ -27,9 +26,12 @@ def init_sim(local_actor: BaseActor, cfg: DotMap = None, starting_epoch: pk.epoc """Initializes PASEOS Args: - local_actor (BaseActor): The actor linked to the local device which is required to model anything. - cfg (DotMap, optional): Configuration file. If None, default configuration will be used. Defaults to None. - starting_epoch(pk.epoch): Starting epoch of the simulation. Will override cfg and local actor one. + local_actor (BaseActor): The actor linked to the local device which is required to model + anything. + cfg (DotMap, optional): Configuration file. If None, default configuration will be used. + Defaults to None. + starting_epoch(pk.epoch): Starting epoch of the simulation. Will override cfg and local + actor one. Returns: PASEOS: Instance of the simulation (only one can exist, singleton) """ @@ -47,8 +49,8 @@ def init_sim(local_actor: BaseActor, cfg: DotMap = None, starting_epoch: pk.epoc if local_actor.local_time.mjd2000 * pk.DAY2SEC != cfg.sim.start_time: logger.warning( - "You provided a different starting epoch for PASEOS than the local time of the local actor." - + "starting_epoch will be used." + "You provided a different starting epoch for PASEOS than the local time of the local " + "actor." + "starting_epoch will be used." ) sim = PASEOS(local_actor=local_actor, cfg=cfg) diff --git a/paseos/actors/actor_builder.py b/paseos/actors/actor_builder.py index 17d022a3..5e43fadb 100644 --- a/paseos/actors/actor_builder.py +++ b/paseos/actors/actor_builder.py @@ -1,19 +1,23 @@ from typing import Callable, Any -from loguru import logger import numpy as np -from dotmap import DotMap import pykep as pk +from dotmap import DotMap +from loguru import logger from skyfield.api import wgs84 +from paseos.geometric_model.geometric_model import GeometricModel from .base_actor import BaseActor -from .spacecraft_actor import SpacecraftActor from .ground_station_actor import GroundstationActor +from .spacecraft_actor import SpacecraftActor from ..central_body.central_body import CentralBody -from ..thermal.thermal_model import ThermalModel +from ..communication.device_type import DeviceType +from ..communication.transmitter_model import TransmitterModel +from ..communication.receiver_model import ReceiverModel +from ..communication.link_model import LinkModel from ..power.power_device_type import PowerDeviceType from ..radiation.radiation_model import RadiationModel -from paseos.geometric_model.geometric_model import GeometricModel +from ..thermal.thermal_model import ThermalModel class ActorBuilder: @@ -46,9 +50,6 @@ def get_actor_scaffold(name: str, actor_type: object, epoch: pk.epoch): assert ( actor_type != BaseActor ), "BaseActor cannot be initiated. Please use SpacecraftActor or GroundstationActor" - assert ( - actor_type == SpacecraftActor or actor_type == GroundstationActor - ), f"Unsupported actor_type {actor_type}, Please use SpacecraftActor or GroundstationActor." logger.trace(f"Creating an actor blueprint with name {name}") @@ -74,11 +75,9 @@ def set_ground_station_location( minimum_altitude_angle (float): Minimum angle above the horizon that this station can communicate with. """ - assert latitude >= -90 and latitude <= 90, "Latitude is -90 <= lat <= 90" - assert longitude >= -180 and longitude <= 180, "Longitude is -180 <= lat <= 180" - assert ( - minimum_altitude_angle >= 0 and minimum_altitude_angle <= 90 - ), "0 <= minimum_altitude_angle <= 90." + assert -90 <= latitude <= 90, "Latitude is -90 <= lat <= 90" + assert -180 <= longitude <= 180, "Longitude is -180 <= lat <= 180" + assert 0 <= minimum_altitude_angle <= 90, "0 <= minimum_altitude_angle <= 90." actor._skyfield_position = wgs84.latlon( latitude_degrees=latitude, longitude_degrees=longitude, @@ -99,7 +98,8 @@ def set_central_body( """Define the central body of the actor. This is the body the actor is orbiting around. If a mesh is provided, it will be used to compute visibility and eclipse checks. - Otherwise, a sphere with the provided radius will be used. One of the two has to be provided. + Otherwise, a sphere with the provided radius will be used. One of the two has to be + provided. Note the specification here will not affect the actor orbit. For that, use set_orbit, set_TLE or set_custom_orbit. @@ -112,8 +112,10 @@ def set_central_body( rotation_declination (float): Declination of the rotation axis in degrees in the central body's inertial frame. Rotation at current actor local time is presumed to be 0. rotation_right_ascension (float): Right ascension of the rotation axis in degrees in - the central body's inertial frame. Rotation at current actor local time is presumed to be 0. - rotation_period (float): Rotation period in seconds. Rotation at current actor local time is presumed to be 0. + the central body's inertial frame. Rotation at current actor local time is presumed + to be 0. + rotation_period (float): Rotation period in seconds. Rotation at current actor local + time is presumed to be 0. """ assert isinstance( actor, SpacecraftActor @@ -171,7 +173,8 @@ def set_central_body( # Check if the actor already had a central body if actor.has_central_body: logger.warning( - "The actor already had a central body. Only one central body is supported. Overriding old body." + "The actor already had a central body. Only one central body is supported. " + "Overriding old body." ) # Set central body @@ -234,7 +237,8 @@ def set_TLE( """Define the orbit of the actor using a TLE. For more information on TLEs see https://en.wikipedia.org/wiki/Two-line_element_set . - TLEs can be obtained from https://www.space-track.org/ or https://celestrak.com/NORAD/elements/ + TLEs can be obtained from https://www.space-track.org/ or + https://celestrak.com/NORAD/elements/ Args: actor (SpacecraftActor): Actor to update. @@ -269,7 +273,8 @@ def set_orbit( position (list of floats): [x,y,z]. velocity (list of floats): [vx,vy,vz]. epoch (pk.epoch): Time of position / velocity. - central_body (pk.planet): Central body around which the actor is orbiting as a pykep planet. + central_body (pk.planet): Central body around which the actor is orbiting as a pykep + planet. """ assert isinstance(actor, SpacecraftActor), "Orbit only supported for SpacecraftActors" @@ -289,7 +294,8 @@ def set_orbit( @staticmethod def set_position(actor: BaseActor, position: list): - """Sets the actors position. Use this if you do *not* want the actor to have a keplerian orbit around a central body. + """Sets the actors position. Use this if you do *not* want the actor to have a keplerian + orbit around a central body. Args: actor (BaseActor): Actor set the position on. @@ -310,18 +316,24 @@ def set_position(actor: BaseActor, position: list): def set_geometric_model( actor: SpacecraftActor, mass: float, vertices=None, faces=None, scale: float = 1 ): - """Define geometry of the spacecraft actor. This is done in the spacecraft body reference frame, and can be - transformed to the inertial/PASEOS reference frame using the reference frane transformations in the attitude + """Define geometry of the spacecraft actor. This is done in the spacecraft body reference + frame, and can be + transformed to the inertial/PASEOS reference frame using the reference frane + transformations in the attitude model. When used in the attitude model, the geometric model is in the body reference frame. Args: actor (SpacecraftActor): Actor to update. mass (float): Mass of the spacecraft in kg. - vertices (list): List of all vertices of the mesh in terms of distance (in m) from origin of body frame. - Coordinates of the corners of the object. If not selected, it will default to a cube that can be scaled. + vertices (list): List of all vertices of the mesh in terms of distance (in m) from + origin of body frame. + Coordinates of the corners of the object. If not selected, it will default to a + cube that can be scaled. by the scale. Uses Trimesh to create the mesh from this and the list of faces. - faces (list): List of the indexes of the vertices of a face. This builds the faces of the satellite by - defining the three vertices to form a triangular face. For a cuboid each face is split into two + faces (list): List of the indexes of the vertices of a face. This builds the faces of + the satellite by + defining the three vertices to form a triangular face. For a cuboid each face is + split into two triangles. Uses Trimesh to create the mesh from this and the list of vertices. scale (float): Parameter to scale the cuboid by, defaults to 1. """ @@ -329,7 +341,11 @@ def set_geometric_model( actor._mass = mass geometric_model = GeometricModel( - local_actor=actor, actor_mass=mass, vertices=vertices, faces=faces, scale=scale + local_actor=actor, + actor_mass=mass, + vertices=vertices, + faces=faces, + scale=scale, ) actor._mesh = geometric_model.set_mesh() actor._moment_of_inertia = geometric_model.find_moment_of_inertia @@ -367,7 +383,8 @@ def set_power_devices( # Check if the actor already had a power device if actor.has_power_model: logger.warning( - "The actor already had a power device. Currently only one device is supported. Overriding old device." + "The actor already had a power device. Currently only one device is supported. " + "Overriding old device." ) logger.trace("Checking battery values for sensibility.") @@ -402,10 +419,12 @@ def set_radiation_model( Args: actor (SpacecraftActor): The actor to add to. - data_corruption_events_per_s (float): Single bit of data being corrupted, events per second, + data_corruption_events_per_s (float): Single bit of data being corrupted, events per + second, i.e. a Single Event Upset (SEU). restart_events_per_s (float): Device restart being triggered, events per second. - failure_events_per_s (float): Complete device failure, events per second, i.e. a Single Event Latch-Up (SEL). + failure_events_per_s (float): Complete device failure, events per second, + i.e. a Single Event Latch-Up (SEL). """ # check for spacecraft actor assert isinstance( @@ -445,7 +464,8 @@ def set_thermal_model( emission and due to actor activities. For the moment, it is a slightly simplified version of the single node model from "Spacecraft Thermal Control" by Prof. Isidoro Martínez - available at http://imartinez.etsiae.upm.es/~isidoro/tc3/Spacecraft%20Thermal%20Modelling%20and%20Testing.pdf + available at http://imartinez.etsiae.upm.es/~isidoro/tc3/Spacecraft%20Thermal%20Modelling + %20and%20Testing.pdf Args: actor (SpacecraftActor): Actor to model. @@ -458,9 +478,11 @@ def set_thermal_model( actor_emissive_area (float): Actor area emitting (radiating) heat. actor_thermal_capacity (float): Actor's thermal capacity in J / (kg * K). body_solar_irradiance (float, optional): Irradiance from the sun in W. Defaults to 1360. - body_surface_temperature_in_K (float, optional): Central body surface temperature. Defaults to 288. + body_surface_temperature_in_K (float, optional): Central body surface temperature. + Defaults to 288. body_emissivity (float, optional): Centrla body emissivity [0,1] in IR. Defaults to 0.6. - body_reflectance (float, optional): Central body reflectance of sun light. Defaults to 0.3. + body_reflectance (float, optional): Central body reflectance of sun light. Defaults + to 0.3. power_consumption_to_heat_ratio (float, optional): Conversion ratio for activities. 0 leads to know heat-up due to activity. Defaults to 0.5. """ @@ -472,7 +494,8 @@ def set_thermal_model( # Check if the actor already had a thermal model if actor.has_thermal_model: logger.warning( - "The actor already had a thermal model. Currently only one model is supported. Overriding old model." + "The actor already had a thermal model. Currently only one model is supported. " + "Overriding old model." ) assert actor_mass > 0, "Actor mass has to be positive." @@ -520,26 +543,175 @@ def set_thermal_model( ) @staticmethod - def add_comm_device(actor: BaseActor, device_name: str, bandwidth_in_kbps: float): - """Creates a communication device. + def add_comm_link( + transmitter_actor: BaseActor, + transmitter_name: str, + receiver_actor: BaseActor, + receiver_name: str, + link_name: str, + frequency: float = None, + ) -> None: + """Adds a link to an actor, used with link budget modelling. + Args: + transmitter_actor (BaseActor): actor that the communication link will be + added to. + transmitter_name (str): the name of the transmitter device. + receiver_actor (BaseActor): the receiving actor. + receiver_name (str): the name of the receiver device. + link_name (str): the name of the link. + frequency (float, optional): the frequency of the radio link. Cannot be used for + optical where wavelength of 1550 is required. + """ + transmitter_model = transmitter_actor.get_transmitter(transmitter_name) + receiver_model = receiver_actor.get_receiver(receiver_name) + + if frequency is not None: + assert frequency > 0, "Frequency needs to be higher than 0 Hz." + link = LinkModel( + transmitter_actor=transmitter_actor, + transmitter_model=transmitter_model, + receiver_actor=receiver_actor, + receiver_model=receiver_model, + frequency=frequency, + ) + transmitter_actor.add_comm_link(link_name, link) + + @staticmethod + def add_comm_device( + actor: BaseActor, + device_name: str, + bandwidth_in_kbps: float = 0, + input_power: float = 0, + power_efficiency: float = 0, + antenna_efficiency: float = 0, + antenna_diameter: float = None, + antenna_gain: float = None, + point_losses: float = 0, + line_losses: float = 0, + polarization_losses: float = 0, + noise_temperature: float = 0, + fwhm: float = None, + device_type: DeviceType = None, + ): + """Creates a communication device. Can be used by just setting bandwidth_in_kbps, + or can be used with link budget modelling. Args: + actor (BaseActor): actor that the communication device will be added to. device_name (str): device_name of the communication device. bandwidth_in_kbps (float): device bandwidth in kbps. + input_power (float): input power of this device. + power_efficiency (float): power efficiency of the device, between 0 and 1. + antenna_efficiency (float): efficiency of the antenna, between 0 and 1. + antenna_diameter (float): diameter of the antenna, in m. Either set antenna_gain, + antenna_diameter or fwhm (only for optical). + antenna_gain (float): gain of the antenna, in dB. Either set antenna_gain, + or antenna_diameter or fwhm (fwhm only for optical). + point_losses(float): pointing losses, in dB. + line_losses (float): line losses, in dB. + polarization_losses (float): polarization_losses, in dB. + noise_temperature (float): noise temperature, in K. + fwhm (float): full width at half maximum, in rad. + device_type (DeviceType): type of the device. """ - if device_name in actor.communication_devices: - raise ValueError( - "Trying to add already existing communication device with device_name: " - + device_name + # If link budget modelling is used. + if device_type is not None: + assert input_power >= 0, "Input power needs to be higher than 0." + assert 0 <= power_efficiency <= 1, "Power efficiency should be between 0 and 1." + assert 0 <= antenna_efficiency <= 1, "Antenna efficiency should be between 0 and 1." + assert line_losses >= 0, "Line losses needs to be 0 or higher." + assert point_losses >= 0, "Pointing losses needs to be 0 or higher." + assert line_losses >= 0, "Line losses needs to be 0 or higher." + assert polarization_losses >= 0, "Polarization losses needs to be 0 or higher." + assert noise_temperature >= 0, "Noise temperature needs to be 0 or higher." + if fwhm is not None: + assert fwhm >= 0, "full width at half maximum needs to be 0 or higher." + if antenna_gain is not None: + assert antenna_gain >= 0, "Antenna gain needs to be 0 or higher." + if antenna_diameter is not None: + assert antenna_diameter >= 0, "Antenna diameter needs to be 0 or higher" + assert ( + antenna_gain is not None or antenna_diameter is not None or fwhm is not None + ), "Antenna gain or antenna diameter or fwhm needs to be set." + assert not ( + antenna_diameter is not None + and antenna_gain is not None + or antenna_diameter is not None + and fwhm is not None + or antenna_gain is not None + and fwhm is not None + ), "Only set one of antenna gain, antenna diameter and fwhm not multiple." + + if device_type == DeviceType.RADIO_TRANSMITTER: + assert isinstance( + actor, SpacecraftActor + ), "Only a spacecraft can contain a radio transmitter." + radio_transmitter = TransmitterModel( + input_power=input_power, + power_efficiency=power_efficiency, + antenna_efficiency=antenna_efficiency, + line_losses=line_losses, + point_losses=point_losses, + antenna_gain=antenna_gain, + antenna_diameter=antenna_diameter, + device_type=device_type, ) + actor.add_transmitter(device_name, radio_transmitter) + elif device_type == DeviceType.RADIO_RECEIVER: + radio_receiver = ReceiverModel( + line_losses=line_losses, + polarization_losses=polarization_losses, + noise_temperature=noise_temperature, + antenna_diameter=antenna_diameter, + antenna_gain=antenna_gain, + device_type=device_type, + ) + actor.add_receiver(device_name, radio_receiver) + elif device_type == DeviceType.OPTICAL_TRANSMITTER: + assert isinstance( + actor, SpacecraftActor + ), "Only a spacecraft can contain an optical transmitter." + optical_transmitter = TransmitterModel( + input_power=input_power, + power_efficiency=power_efficiency, + antenna_efficiency=antenna_efficiency, + line_losses=line_losses, + point_losses=point_losses, + antenna_gain=antenna_gain, + antenna_diameter=antenna_diameter, + fwhm=fwhm, + device_type=device_type, + ) + actor.add_transmitter(device_name, optical_transmitter) + elif device_type == DeviceType.OPTICAL_RECEIVER: + assert isinstance( + actor, SpacecraftActor + ), "Only a spacecraft can contain an optical receiver." + optical_receiver = ReceiverModel( + line_losses=line_losses, + antenna_diameter=antenna_diameter, + antenna_gain=antenna_gain, + device_type=device_type, + ) + actor.add_receiver(device_name, optical_receiver) + else: + if device_name in actor.communication_devices: + raise ValueError( + "Trying to add already existing communication device with device_name: " + + device_name + ) + actor._communication_devices[device_name] = DotMap(bandwidth_in_kbps=bandwidth_in_kbps) - actor._communication_devices[device_name] = DotMap(bandwidth_in_kbps=bandwidth_in_kbps) - - logger.debug(f"Added comm device with bandwith={bandwidth_in_kbps} kbps to actor {actor}.") + logger.debug( + f"Added comm device with bandwidth={bandwidth_in_kbps} kbps to actor {actor}." + ) @staticmethod def add_custom_property( - actor: BaseActor, property_name: str, initial_value: Any, update_function: Callable + actor: BaseActor, + property_name: str, + initial_value: Any, + update_function: Callable, ): """Adds a custom property to the actor. This e.g. allows tracking any physical the user would like to track. @@ -573,7 +745,8 @@ def add_custom_property( # remove property if this failed del actor._custom_properties[property_name] raise TypeError( - "Update function must accept three parameters: actor, time_to_advance, current_power_consumption_in_W." + "Update function must accept three parameters: actor, time_to_advance, " + "current_power_consumption_in_W." ) # Check that the update function returns a value of the same type as the initial value @@ -581,15 +754,18 @@ def add_custom_property( # remove property if this failed del actor._custom_properties[property_name] raise TypeError( - f"Update function must return a value of type {type(initial_value)} matching initial vaue." + f"Update function must return a value of type {type(initial_value)} matching " + f"initial vaue." ) - # Check that the initial value is the same as the value returned by the update function with time 0 + # Check that the initial value is the same as the value returned by the update function + # with time 0 if new_value != initial_value: # remove property if this failed del actor._custom_properties[property_name] raise ValueError( - "Update function must return the existing value when called with unchanged time (dt = 0)." + "Update function must return the existing value when called with unchanged time (" + "dt = 0)." ) actor._custom_properties_update_function[property_name] = update_function diff --git a/paseos/actors/base_actor.py b/paseos/actors/base_actor.py index bae022cd..f9a2b6a7 100644 --- a/paseos/actors/base_actor.py +++ b/paseos/actors/base_actor.py @@ -1,13 +1,15 @@ from abc import ABC from typing import Callable, Any -from loguru import logger -import pykep as pk import numpy as np +import pykep as pk from dotmap import DotMap +from loguru import logger -from ..central_body.is_in_line_of_sight import is_in_line_of_sight from ..central_body.central_body import CentralBody +from ..central_body.is_in_line_of_sight import is_in_line_of_sight +from ..communication.receiver_model import ReceiverModel +from ..communication.transmitter_model import TransmitterModel class BaseActor(ABC): @@ -34,8 +36,17 @@ class BaseActor(ABC): # Central body this actor is orbiting _central_body = None - # Communication links dictionary - _communication_devices = DotMap(_dynamic=False) + # Transmitters for the communication solution where link budget modelling is used + _transmitters: DotMap(_dynamic=False) + + # Receivers for the communication solution where link budget modelling is used + _receivers: DotMap(_dynamic=False) + + # List of communication links for the communication solution where link budget modelling is used + _communication_links: DotMap(_dynamic=False) + + # Communication devices for the simple communication solution where a constant bitrate is set + _communication_devices: DotMap(_dynamic=False) # Tracks user-defined custom properties _custom_properties = DotMap(_dynamic=False) @@ -56,10 +67,10 @@ class BaseActor(ABC): _previous_altitude = None def __init__(self, name: str, epoch: pk.epoch) -> None: - """Constructor for a base actor + """Constructor for a base actor. Args: - name (str): Name of this actor + name (str): Name of this actor. epoch (pykep.epoch): Current local time of the actor. """ logger.trace("Instantiating Actor.") @@ -69,6 +80,10 @@ def __init__(self, name: str, epoch: pk.epoch) -> None: self._communication_devices = DotMap(_dynamic=False) + self._receivers = DotMap(_dynamic=False) + self._transmitters = DotMap(_dynamic=False) + self._communication_links = DotMap(_dynamic=False) + def get_custom_property(self, property_name: str) -> Any: """Returns the value of the specified custom property. @@ -83,6 +98,89 @@ def get_custom_property(self, property_name: str) -> Any: return self._custom_properties[property_name] + def get_transmitter(self, name: str): + """Get the transmitter model with the specified name. + + Args: + name (str): the name of the transmitter. + Returns: + Transmitter: the transmitter with the specified name. + """ + return self._transmitters[name].model + + def get_all_transmitters(self) -> [str]: + """Get a list with the names of the all the transmitters. + + Returns: + [str]: Get a list with the names of the all the transmitters. + """ + return list(self._transmitters.keys()) + + def get_receiver(self, name: str): + """Get the receiver model with the specified name. + + Args: + name (str): the name of the receiver. + + Returns: + Receiver: the receiver with the specified name. + """ + return self._receivers[name].model + + def get_all_receivers(self) -> [str]: + """Get a list with the names of the all the receivers. + + Returns: + [str]: Get a list with the names of the all the receivers. + """ + return list(self._receivers.keys()) + + def add_transmitter(self, device_name, model: TransmitterModel): + """Adds a transmitter to the actor. + + Args: + device_name (str): The name of the transmitter. + model (TransmitterModel): The transmitter model. + """ + if device_name in self._transmitters.keys(): + raise ValueError( + f"Transmitter with name '{device_name}' already exists for actor {self}." + ) + self._transmitters[device_name] = DotMap(model=model) + + def add_receiver(self, device_name: str, model: ReceiverModel): + """Adds a receiver to the actor. + + Args: + device_name (str): The name of the receiver. + model (ReceiverModel): The receiver model. + """ + if device_name in self._receivers.keys(): + raise ValueError(f"Receiver with name '{device_name}' already exists for actor {self}.") + self._receivers[device_name] = DotMap(model=model) + + def add_comm_link(self, link_name: str, model): + """Adds a communication link to the actor. + + Args: + link_name (str): The name of the link. + model (LinkModel): The link model. + """ + if link_name in self._communication_links.keys(): + raise ValueError(f"Link with name '{link_name}' already exists for actor {self}.") + self._communication_links[link_name] = DotMap(model=model) + + def get_comm_link(self, link_name: str): + """Get the link model with the specified name. + + Args: + link_name (str): the name of the link. + + Returns: + Receiver: the link with the specified name. + """ + return self._communication_links[link_name].model + @property def custom_properties(self): """Returns a dictionary of custom properties for this actor.""" @@ -177,7 +275,8 @@ def current_activity(self) -> str: @property def local_time(self) -> pk.epoch: - """Returns local time of the actor as pykep epoch. Use e.g. epoch.mjd2000 to get time in days. + """Returns local time of the actor as pykep epoch. Use e.g. epoch.mjd2000 to get time in + days. Returns: pk.epoch: local time of the actor @@ -208,6 +307,15 @@ def _check_init_value_sensibility( assert len(position) == 3, "Position has to have 3 elements (x,y,z)" assert len(velocity) == 3, "Velocity has to have 3 elements (vx,vy,vz)" + @property + def communication_links(self): + """Returns communication links. + + Returns: + list: List of communication links. + """ + return [link.model for link in self._communication_links.values()] + def __str__(self): return self.name @@ -259,7 +367,8 @@ def get_altitude( return self._previous_altitude def get_position(self, epoch: pk.epoch): - """Compute the position of this actor at a specific time. Requires orbital parameters or position set. + """Compute the position of this actor at a specific time. Requires orbital parameters or + position set. Args: epoch (pk.epoch): Time as pykep epoch @@ -290,11 +399,13 @@ def get_position(self, epoch: pk.epoch): return self._orbital_parameters.eph(epoch)[0] raise NotImplementedError( - "No suitable way added to determine actor position. Either set an orbit or position with ActorBuilder." + "No suitable way added to determine actor position. Either set an orbit or position " + "with ActorBuilder." ) def get_position_velocity(self, epoch: pk.epoch): - """Compute the position / velocity of this actor at a specific time. Requires orbital parameters set. + """Compute the position / velocity of this actor at a specific time. Requires orbital + parameters set. Args: epoch (pk.epoch): Time as pykep epoch. @@ -341,7 +452,8 @@ def is_in_line_of_sight( other_actor (BaseActor): The actor to check line of sight with epoch (pk,.epoch): Epoch at which to check the line of sight minimum_altitude_angle(float): The altitude angle (in degree) at which the actor has - to be in relation to the ground station position to be visible. It has to be between 0 and 90. + to be in relation to the ground station position to be visible. It has to be between + 0 and 90. Only relevant if one of the actors is a ground station. plot (bool): Whether to plot a diagram illustrating the positions. diff --git a/paseos/actors/ground_station_actor.py b/paseos/actors/ground_station_actor.py index 6055f02c..58ad5d82 100644 --- a/paseos/actors/ground_station_actor.py +++ b/paseos/actors/ground_station_actor.py @@ -2,7 +2,7 @@ import pykep as pk from skyfield.api import load -from paseos.actors.base_actor import BaseActor +from .base_actor import BaseActor class GroundstationActor(BaseActor): diff --git a/paseos/actors/spacecraft_actor.py b/paseos/actors/spacecraft_actor.py index 30f75272..aa5a53ed 100644 --- a/paseos/actors/spacecraft_actor.py +++ b/paseos/actors/spacecraft_actor.py @@ -1,9 +1,9 @@ -from loguru import logger import pykep as pk +from loguru import logger from paseos.actors.base_actor import BaseActor -from paseos.power import discharge_model from paseos.power import charge_model +from paseos.power import discharge_model class SpacecraftActor(BaseActor): @@ -47,7 +47,8 @@ def set_is_dead(self): self._is_dead = True def set_was_interrupted(self): - """Sets this device to "was_interrupted=True" indicating current activities were interrupted.""" + """Sets this device to "was_interrupted=True" indicating current activities were + interrupted.""" self._was_interrupted = True @property diff --git a/paseos/central_body/is_in_line_of_sight.py b/paseos/central_body/is_in_line_of_sight.py index e66230bd..a04b02da 100644 --- a/paseos/central_body/is_in_line_of_sight.py +++ b/paseos/central_body/is_in_line_of_sight.py @@ -1,36 +1,9 @@ -from loguru import logger -import pykep as pk -import os import numpy as np -from skyfield.units import AU_M -from skyfield.api import load -from skyfield.vectorlib import VectorFunction - -_SKYFIELD_EARTH_PATH = os.path.join(os.path.dirname(__file__) + "/../resources/", "de421.bsp") -# Skyfield Earth, in the future we may not always want to load this. -_SKYFIELD_EARTH = load(_SKYFIELD_EARTH_PATH)["earth"] - - -class SkyfieldSkyCoordinate(VectorFunction): - """Small helper class to compute altitude angle""" - - def __init__(self, r_in_m, earth_pos_in_au): - # Value 0 corresponds to the solar system barycenter according to - # this: https://rhodesmill.org/skyfield/api-vectors.html#skyfield.vectorlib.VectorFunction.center - self.center = 0 - # Position vector - self.r = earth_pos_in_au + r_in_m / AU_M - - @property - def target(self): - # This is a property to avoid circular references as described - # here: https://github.com/skyfielders/python-skyfield/blob/master/skyfield/planetarylib.py#L222 - return self +import pykep as pk +from loguru import logger - def _at(self, t): - # Velocity vector - v = [0.0, 0.0, 0.0] - return self.r, v, self.center, "SkyfieldSkyCoordinate" +from ..utils.sky_field_sky_coordinate import SkyfieldSkyCoordinate +from ..resources.constants import SKYFIELD_EARTH def _is_in_line_of_sight_spacecraft_to_spacecraft(actor, other_actor, epoch: pk.epoch, plot=False): @@ -85,12 +58,12 @@ def _is_in_line_of_sight_ground_station_to_spacecraft( t_skyfield = ground_station._skyfield_timescale.tt_jd(epoch.jd) # Ground station location in barycentric - gs_position = (_SKYFIELD_EARTH + ground_station._skyfield_position).at(t_skyfield) + gs_position = (SKYFIELD_EARTH + ground_station._skyfield_position).at(t_skyfield) # Actor position in barycentric other_actor_pos = SkyfieldSkyCoordinate( r_in_m=np.array(spacecraft.get_position(epoch)), - earth_pos_in_au=_SKYFIELD_EARTH.at(t_skyfield).position.au, + earth_pos_in_au=SKYFIELD_EARTH.at(t_skyfield).position.au, ) # Trigger observation calculation @@ -108,8 +81,8 @@ def _is_in_line_of_sight_ground_station_to_spacecraft( def plot(gs_pos_t, sat_pos_t, t): # Converting to geocentric - r1 = gs_pos_t.position.m - _SKYFIELD_EARTH.at(t).position.m - r2 = sat_pos_t.position.m - _SKYFIELD_EARTH.at(t).position.m + r1 = gs_pos_t.position.m - SKYFIELD_EARTH.at(t).position.m + r2 = sat_pos_t.position.m - SKYFIELD_EARTH.at(t).position.m gs_point = Point(r1) sat_point = Point(r2) line = Line(r1, r2 - r1) @@ -165,9 +138,10 @@ def is_in_line_of_sight( ): if minimum_altitude_angle is None: minimum_altitude_angle = actor._minimum_altitude_angle - assert ( - other_actor.central_body.planet.name.lower() == "earth" - ), f"Ground stations can only be used with Earth for now (not {other_actor.central_body.planet.name})." + assert other_actor.central_body.planet.name.lower() == "earth", ( + f"Ground stations can only be used with Earth for now (not " + f"{other_actor.central_body.planet.name})." + ) return _is_in_line_of_sight_ground_station_to_spacecraft( actor, other_actor, epoch, minimum_altitude_angle, plot ) @@ -185,5 +159,6 @@ def is_in_line_of_sight( ) else: raise NotImplementedError( - f"Cannot compute line of sight between {type(actor).__name__} and {type(other_actor).__name__}." + f"Cannot compute line of sight between {type(actor).__name__} " + f"and {type(other_actor).__name__}." ) diff --git a/paseos/communication/__init__.py b/paseos/communication/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/paseos/communication/device_type.py b/paseos/communication/device_type.py new file mode 100644 index 00000000..412ba933 --- /dev/null +++ b/paseos/communication/device_type.py @@ -0,0 +1,15 @@ +from enum import Enum + + +class DeviceType(Enum): + """Describes the different communication device types + 1 - A radio transmitter + 2 - A radio receiver + 3 - An optical transmitter + 4 - An optical receiver + """ + + RADIO_TRANSMITTER = 1 + RADIO_RECEIVER = 2 + OPTICAL_TRANSMITTER = 3 + OPTICAL_RECEIVER = 4 diff --git a/paseos/communication/find_next_window.py b/paseos/communication/find_next_window.py index 1a5ed688..8d58774f 100644 --- a/paseos/communication/find_next_window.py +++ b/paseos/communication/find_next_window.py @@ -34,7 +34,7 @@ def find_next_window( ) local_actor_comm_link = local_actor.communication_devices[local_actor_communication_link_name] - assert local_actor_comm_link.bandwidth_in_kbps > 0, "Bandiwidth has to be positive." + assert local_actor_comm_link.bandwidth_in_kbps > 0, "Bandwidth has to be positive." assert search_step_size > 0, "dt has to be positive." # Compute start and end of the search window diff --git a/paseos/communication/gain_calc.py b/paseos/communication/gain_calc.py new file mode 100644 index 00000000..e23e38a8 --- /dev/null +++ b/paseos/communication/gain_calc.py @@ -0,0 +1,58 @@ +import math + + +def calc_radio_gain_from_wavelength_diameter_db( + wavelength: float, antenna_diameter: float, antenna_efficiency: float +) -> float: + """Calculates antenna gain (directivity) based on wavelength and diameter, valid for + parabolic antennas. + + Args: + wavelength (float): The wavelength of the signal, in meters. + antenna_diameter (int): The diameter of the antenna, in meters. + antenna_efficiency (float): The antenna efficiency. + + Returns: + The antenna gain (directivity) in dB. + """ + assert wavelength > 0, "Wavelength needs to be larger than 0." + assert antenna_diameter > 0, "Antenna diameter needs to be larger than 0." + assert 0 < antenna_efficiency <= 1, "Antenna efficiency should be between 0 and 1." + return 10 * math.log10(antenna_efficiency * (math.pi * antenna_diameter / wavelength) ** 2) + + +def calc_optical_gain_from_wavelength_diameter_db( + wavelength: float, antenna_diameter: float, antenna_efficiency: float +) -> float: + """Calculates antenna gain (directivity) based on wavelength and diameter, valid for + parabolic antennas. + + Args: + wavelength (float): The wavelength of the signal, in meters. + antenna_diameter (float): The diameter of the antenna, in meters. + antenna_efficiency (float): The antenna efficiency. + + Returns: + The antenna gain (directivity) in dB. + """ + assert wavelength > 0, "Wavelength needs to be larger than 0." + assert antenna_diameter > 0, "Antenna diameter needs to be larger than 0." + assert 0 < antenna_efficiency <= 1, "Antenna efficiency should be between 0 and 1." + antenna_radius = antenna_diameter / 2 + aperture_area = math.pi * antenna_radius**2 + return 10 * math.log10(antenna_efficiency * (4 * math.pi * aperture_area / wavelength**2)) + + +def calc_gain_from_fwhm_db(fwhm: float) -> float: + """Calculates gain based on full width at half maximum. + + Args: + fwhm (float): the full width at half maximum, in rad. + + Returns: + The gain in dB. + """ + assert fwhm > 0, "FWHM needs to be larger than 0." + + result = 10 * math.log10((4 * math.sqrt(math.log(2)) / fwhm) ** 2) + return result diff --git a/paseos/communication/get_communication_window.py b/paseos/communication/get_communication_window.py index dc7ed768..57d836e1 100644 --- a/paseos/communication/get_communication_window.py +++ b/paseos/communication/get_communication_window.py @@ -17,16 +17,19 @@ def get_communication_window( data_to_send_in_b: int = sys.maxsize, window_timeout_value_in_s=7200, ): - """Returning the communication window and the amount of data that can be transmitted from the local to the target actor. + """Returning the communication window and the amount of data that can be transmitted from the + local to the target actor. Args: local_actor (BaseActor): Local actor. - local_actor_communication_link_name (str): Name of the local_actor's communication link to use. + local_actor_communication_link_name (str): Name of the local_actor's communication link + to use. target_actor (BaseActor): other actor. dt (float): Simulation timestep [s]. Defaults to 10. t0 (pk.epoch): Current simulation time. Defaults to local time. data_to_send_in_b (int): Amount of data to transmit [b]. Defaults to maxint. - window_timeout_value_in_s (float, optional): Timeout for estimating the communication window. Defaults to 7200.0. + window_timeout_value_in_s (float, optional): Timeout for estimating the communication + window. Defaults to 7200.0. Returns: pk.epoch: Communication window start time. pk.epoch: Communication window end time. @@ -43,7 +46,7 @@ def get_communication_window( ) local_actor_comm_link = local_actor.communication_devices[local_actor_communication_link_name] - assert local_actor_comm_link.bandwidth_in_kbps > 0, "Bandiwidth has to be positive." + assert local_actor_comm_link.bandwidth_in_kbps > 0, "Bandwidth has to be positive." assert dt > 0, "dt has to be positive." assert data_to_send_in_b > 0, "data_to_send_in_b has to be positive." diff --git a/paseos/communication/link_budget_calc.py b/paseos/communication/link_budget_calc.py new file mode 100644 index 00000000..241787e2 --- /dev/null +++ b/paseos/communication/link_budget_calc.py @@ -0,0 +1,90 @@ +import numpy as np +import pykep as pk + +from paseos.utils.sky_field_sky_coordinate import SkyfieldSkyCoordinate +from paseos.actors.ground_station_actor import GroundstationActor +from paseos.actors.spacecraft_actor import SpacecraftActor +from paseos.resources.constants import SKYFIELD_EARTH + + +def calc_dist_and_alt_angle_spacecraft_ground( + spacecraft_actor, ground_station_actor, epoch: pk.epoch +) -> (float, float): + """Calculates distance and elevation angle based on spacecraft and ground station positions. + + Args: + spacecraft_actor (SpacecraftActor): the spacecraft actor model. + ground_station_actor (GroundStationActor): the ground station actor model. + epoch (pk.epoch): the current epoch. + + Returns: + distance, elevation (float, float): the distance in m and the elevation angle in degrees. + """ + # Converting time to skyfield to use its API + t_skyfield = ground_station_actor._skyfield_timescale.tt_jd(epoch.jd) + + # Ground station location in barycentric + gs_position = (SKYFIELD_EARTH + ground_station_actor._skyfield_position).at(t_skyfield) + + # Actor position in barycentric + other_actor_pos = SkyfieldSkyCoordinate( + r_in_m=np.array(spacecraft_actor.get_position(epoch)), + earth_pos_in_au=SKYFIELD_EARTH.at(t_skyfield).position.au, + ) + + # Trigger observation calculation + observation = gs_position.observe(other_actor_pos).apparent() + + # Compute angle + altitude_angle = observation.altaz()[0].degrees + distance = observation.distance().m + + return distance, altitude_angle + + +def calc_dist_and_alt_angle_spacecraft_spacecraft( + local_actor, known_actor, epoch: pk.epoch +) -> (float, float): + """Calculates distance and elevation angle between two spacecraft. + + Args: + local_actor (SpacecraftActor): the local spacecraft actor model. + known_actor (SpacecraftActor): the other spacecraft actor model. + epoch (pk.epoch): the current epoch. + + Returns: + distance, elevation (float, float): the distance in m and 0 for the elevation angle. + """ + + local_actor_pos = np.array(local_actor.get_position(epoch)) + other_actor_pos = np.array(known_actor.get_position(epoch)) + + distance = np.sqrt( + (local_actor_pos[0] - other_actor_pos[0]) ** 2 + + (local_actor_pos[1] - other_actor_pos[1]) ** 2 + + (local_actor_pos[2] - other_actor_pos[2]) ** 2 + ) + altitude_angle = 0 + + return distance, altitude_angle + + +def calc_dist_and_alt_angle(local_actor, known_actor, epoch: pk.epoch): + """Calculates distance and elevation angle between two actors. + + Args: + local_actor (BaseActor): the local actor model. + known_actor (BaseActor): the other actor model. + epoch (pk.epoch): the current epoch. + + Returns: + distance, elevation (float, float): the distance in m and the elevation angle in degrees. + """ + + if isinstance(local_actor, SpacecraftActor) and isinstance(known_actor, GroundstationActor): + return calc_dist_and_alt_angle_spacecraft_ground(local_actor, known_actor, epoch) + elif isinstance(local_actor, SpacecraftActor) and isinstance(known_actor, SpacecraftActor): + return calc_dist_and_alt_angle_spacecraft_spacecraft(local_actor, known_actor, epoch) + else: + print("No suitable types for distance calculations.") + return None diff --git a/paseos/communication/link_model.py b/paseos/communication/link_model.py new file mode 100644 index 00000000..fbb8280c --- /dev/null +++ b/paseos/communication/link_model.py @@ -0,0 +1,233 @@ +import math + +from loguru import logger + +from .receiver_model import ReceiverModel +from .transmitter_model import TransmitterModel +from ..actors.base_actor import BaseActor +from ..resources.constants import ( + C, + WAVELENGTH_OPTICAL, + REQUIRED_S_N_MARGIN_OPTICAL_DB, + PHOTONS_PER_BIT, + PLANCK_CONSTANT, + BOLTZMANN_DB, + REQUIRED_S_N_MARGIN_RADIO_DB, + REQUIRED_S_N_RATIO_RADIO_DB, +) +from .device_type import DeviceType + + +class LinkModel: + """This class defines a link model, containing one transmitter and one receiver.""" + + # Keep track of bitrate throughout the simulation + _bitrate_history = [] + + # Keep track of line of sight throughout the simulation + _line_of_sight_history = [] + + # Keep track of distance throughout the simulation + _distance_history = [] + + # Keep track of elevation angle throughout the simulation + _elevation_angle_history = [] + + # Keep track of current bitrate throughout the simulation + _current_bitrate = 0 + + # Keep track of current line of sight throughout the simulation + _current_line_of_sight = False + + # Keep track of current distance throughout the simulation + _current_distance = 0 + + # Keep track of current elevation angle throughout the simulation + _current_elevation_angle = 0 + + def __init__( + self, + transmitter_actor: BaseActor, + transmitter_model: TransmitterModel, + receiver_actor: BaseActor, + receiver_model: ReceiverModel, + frequency: float = 0, + ) -> None: + """Initializes the model. + + Args: + transmitter_actor (BaseActor): the transmitter actor in this link. + transmitter_model (TransmitterModel): the transmitter device model in this link. + receiver_actor (BaseActor): the receiver actor in this link. + receiver_model (ReceiverModel): the receiver device model. + """ + + logger.debug("Initializing link model.") + if transmitter_model.device_type == DeviceType.RADIO_TRANSMITTER: + self.wavelength = C / frequency # in m + elif transmitter_model.device_type == DeviceType.OPTICAL_TRANSMITTER: + self.wavelength = WAVELENGTH_OPTICAL + + self.transmitter_actor = transmitter_actor + self.transmitter = transmitter_model + self.receiver_actor = receiver_actor + self.receiver = receiver_model + self._bitrate_history = [] + self._line_of_sight_history = [] + self._distance_history = [] + self._elevation_angle_history = [] + + self.transmitter.set_gain(self.wavelength) + self.receiver.set_gain_db(self.wavelength) + + def get_path_loss_db(self, slant_range: float) -> float: + """Gets the path loss (free space loss) for a link. + + Args: + slant_range (float): The slant range of the link, in meters. + + Returns: + The path loss (free space loss) in dB. + """ + assert slant_range > 0, "Slant range needs to be higher than 0 meters" + + return 20 * math.log10(4 * math.pi * slant_range / self.wavelength) + + def get_bitrate(self, slant_range: float, min_elevation_angle: float) -> float: + """Gets the bitrate for a link based on current slant range and minimum elevation angle. + + Args: + slant_range (float): The slant range of the link, in meters. + min_elevation_angle (float): The minimum elevation angle for this receiver, in degrees. + + Returns: + The bitrate in bps. + """ + assert slant_range > 0, "Slant range needs to be higher than 0 meters" + assert ( + 90 >= min_elevation_angle >= 0 + ), "Elevation angle needs to be between 0 and 90 degrees" + bitrate = 0 + + self.total_channel_loss_db = self.get_path_loss_db(slant_range) + + if self.transmitter.device_type == DeviceType.OPTICAL_TRANSMITTER: + # Based on Giggenbach, D., Knopp, M. T., & Fuchs, C. (2023). + # Link budget calculation in optical LEO satellite downlinks with on/off‐keying and + # large signal divergence: A simplified methodology. International Journal of Satellite + # Communications and Networking. + self.signal_at_receiver_db = ( + self.transmitter.effective_isotropic_radiated_power_db - self.total_channel_loss_db + ) + + self.received_signal_power_with_gain_db = ( + self.signal_at_receiver_db + + self.receiver.antenna_gain_db + - self.receiver.line_losses_db + ) + + self.received_signal_power_with_margin_db = ( + self.received_signal_power_with_gain_db - REQUIRED_S_N_MARGIN_OPTICAL_DB + ) # dBm + + self.received_signal_power_with_margin_nw = ( + 10 ** (self.received_signal_power_with_margin_db / 10) * 1e-3 + ) # nW + + bitrate = ( + self.received_signal_power_with_margin_nw + / PHOTONS_PER_BIT + * WAVELENGTH_OPTICAL + / PLANCK_CONSTANT + / C + ) + elif self.transmitter.device_type == DeviceType.RADIO_TRANSMITTER: + # Based on Kirkpatrick, D. (1999). Space mission analysis and design (Vol. 8). + # J. R. Wertz, W. J. Larson, & D. Klungle (Eds.). Torrance: Microcosm. + + self.signal_at_receiver_db = ( + self.transmitter.effective_isotropic_radiated_power_db - self.total_channel_loss_db + ) + + self.s_n_power_density_db = ( + self.signal_at_receiver_db + + self.receiver.antenna_gain_db + - self.receiver.polarization_losses_db + - self.receiver.line_losses_db + - self.receiver.noise_temperature_db + - BOLTZMANN_DB + ) + self.s_n_power_density_including_margin_db = ( + self.s_n_power_density_db - REQUIRED_S_N_MARGIN_RADIO_DB + ) + + bitrate = 10 ** ( + -0.1 * (REQUIRED_S_N_RATIO_RADIO_DB - self.s_n_power_density_including_margin_db) + ) + + if bitrate < 0: + bitrate = 0 + + return bitrate + + def set_bitrate(self, bitrate: float) -> None: + """Sets the bitrate of this link for a certain epoch. + + Args: + bitrate (float): The bitrate of this link, in bps. + """ + self._current_bitrate = bitrate + + def set_line_of_sight(self, state: bool) -> None: + """Sets the line of sight of this link for a certain epoch, + if there is a line of sight, the transmitter and receiver are set to active. + + Args: + state (bool): The current line of sight state. + """ + self._current_line_of_sight = state + if state: + self.transmitter.set_active_state(True) + self.receiver.set_active_state(True) + else: + self.transmitter.set_active_state(False) + self.receiver.set_active_state(False) + + def set_distance(self, distance: float) -> None: + """Sets the distance of this link for a certain epoch. + + Args: + distance (float): The slant range of the link, in meters. + """ + self._current_distance = distance + + def set_elevation_angle(self, angle: float) -> None: + """Sets the elevation angle of this link for a certain epoch. + + Args: + angle (float): The elevation angle, in degrees. + """ + self._current_elevation_angle = angle + + def save_state(self) -> None: + """Saves the state of this link.""" + self._bitrate_history.append(self._current_bitrate) + self._line_of_sight_history.append(self._current_line_of_sight) + self._distance_history.append(self._current_distance) + self._elevation_angle_history.append(self._current_elevation_angle) + + @property + def bitrate_history(self): + return self._bitrate_history + + @property + def line_of_sight_history(self): + return self._line_of_sight_history + + @property + def distance_history(self): + return self._distance_history + + @property + def elevation_angle_history(self): + return self._elevation_angle_history diff --git a/paseos/communication/receiver_model.py b/paseos/communication/receiver_model.py new file mode 100644 index 00000000..fd78fd78 --- /dev/null +++ b/paseos/communication/receiver_model.py @@ -0,0 +1,73 @@ +import math +from loguru import logger + +from .gain_calc import ( + calc_optical_gain_from_wavelength_diameter_db, + calc_radio_gain_from_wavelength_diameter_db, +) +from .device_type import DeviceType + + +class ReceiverModel: + """This class defines a receiver model.""" + + def __init__( + self, + device_type: DeviceType, + line_losses: float, + antenna_diameter: float = 0, + antenna_gain: float = 0, + polarization_losses: float = 0, + noise_temperature: float = 0, + ) -> None: + """Initializes the model. + + Args: + line_losses (float): The line losses of the receiver, in dB. + antenna_diameter (float): The diameter of the antenna, in m. + Either this or the gain needs to be given. + antenna_gain (float): The gain of the antenna, either this or the diameter + needs to be given so that gain can be determined. + + """ + + logger.debug("Initializing receiver model.") + assert ( + device_type is DeviceType.RADIO_RECEIVER or device_type is DeviceType.OPTICAL_RECEIVER + ), "Device type must be RADIO_RECEIVER or OPTICAL_RECEIVER" + self.device_type = device_type + self.line_losses_db = line_losses # in dB + self.antenna_diameter = antenna_diameter # in m + self.antenna_gain_db = antenna_gain # in dB + + if device_type == DeviceType.RADIO_RECEIVER: + self.polarization_losses_db = polarization_losses # in dB + self.noise_temperature_db = 10 * math.log10(noise_temperature) # in dBK + + self.active = False + + def set_gain_db(self, wavelength: float = 0) -> None: + """Sets the gain of the receiver. + + Args: + wavelength (float): the wavelength of the receiver, in m. + """ + + # If no constant gain value was passed and set in the constructor, it needs to be calculated + if self.antenna_gain_db is None: + if self.device_type == DeviceType.OPTICAL_RECEIVER: + self.antenna_gain_db = calc_optical_gain_from_wavelength_diameter_db( + wavelength, self.antenna_diameter, 1 + ) + elif self.device_type == DeviceType.RADIO_RECEIVER: + self.antenna_gain_db = calc_radio_gain_from_wavelength_diameter_db( + wavelength, self.antenna_diameter, 1 + ) + + def set_active_state(self, is_active: bool) -> None: + """Sets the active state of the transmitter. + + Args: + is_active (bool): whether the transmitter is active. + """ + self.active = is_active diff --git a/paseos/communication/transmitter_model.py b/paseos/communication/transmitter_model.py new file mode 100644 index 00000000..9442447b --- /dev/null +++ b/paseos/communication/transmitter_model.py @@ -0,0 +1,105 @@ +import math +from loguru import logger + +from .device_type import DeviceType +from .gain_calc import ( + calc_radio_gain_from_wavelength_diameter_db, + calc_optical_gain_from_wavelength_diameter_db, + calc_gain_from_fwhm_db, +) + + +class TransmitterModel: + """This class defines a simple transmitter model.""" + + def __init__( + self, + input_power: float, + power_efficiency: float, + antenna_efficiency: float, + line_losses: float, + point_losses: float, + device_type: DeviceType, + antenna_gain: float = None, + antenna_diameter: float = None, + fwhm: float = None, + ) -> None: + """Initializes the model. + + Args: + input_power (float): Input power into the signal amplifier, in W. + power_efficiency (float): The power efficiency of the signal amplifier, determines + the output power. + antenna_efficiency (float): The efficiency of the antenna. + line_losses (float): The line losses of the transmitter, in dB. + point_losses (float): The pointing losses of the transmitter, in dB. + antenna_gain (float): The gain of the antenna, either this or the diameter needs to + be given so that gain + can be determined. + antenna_diameter (float): The diameter of the antenna, in m. Either this or the gain + needs to be given. + """ + + logger.debug("Initializing general transmitter model.") + + assert ( + device_type is DeviceType.RADIO_TRANSMITTER + or device_type is DeviceType.OPTICAL_TRANSMITTER + ), "Device type must be RADIO_TRANSMITTER or OPTICAL_TRANSMITTER" + + self.device_type = device_type + self.input_power = input_power + self.antenna_efficiency = antenna_efficiency + self.antenna_gain_db = antenna_gain + self.antenna_diameter = antenna_diameter + self.line_losses_db = line_losses + self.pointing_losses_db = point_losses + self.active = False + self.power_efficiency = power_efficiency + self.full_width_half_maximum = fwhm + self.effective_isotropic_radiated_power_db = 0 + + if device_type == DeviceType.RADIO_TRANSMITTER: + self.output_power_db = 10 * math.log10(input_power * power_efficiency) # dBW + elif device_type == DeviceType.OPTICAL_TRANSMITTER: + self.output_power_db = 10 * math.log10(input_power * power_efficiency * 1000) # dBm + + def set_effective_isotropic_radiated_power_db(self) -> None: + """Sets the Effective Isotropic Radiated Power (EIRP) for a transmitter.""" + self.effective_isotropic_radiated_power_db = ( + self.output_power_db + - self.line_losses_db + - self.pointing_losses_db + + self.antenna_gain_db + ) + + def set_gain(self, wavelength: float) -> None: + """Sets the gain of the transmitter. + + Args: + wavelength (float): the wavelength of the transmitter, in m. + """ + + # If no constant gain value was passed and set in the constructor, it needs to be calculated + if self.antenna_gain_db is None: + if self.device_type == DeviceType.RADIO_TRANSMITTER: + self.antenna_gain_db = calc_radio_gain_from_wavelength_diameter_db( + wavelength, self.antenna_diameter, self.antenna_efficiency + ) + elif self.device_type == DeviceType.OPTICAL_TRANSMITTER: + if self.antenna_diameter is not None: + self.antenna_gain_db = calc_optical_gain_from_wavelength_diameter_db( + wavelength, self.antenna_diameter, self.antenna_efficiency + ) + else: + self.antenna_gain_db = calc_gain_from_fwhm_db(self.full_width_half_maximum) + + self.set_effective_isotropic_radiated_power_db() + + def set_active_state(self, is_active: bool) -> None: + """Sets the active state of the transmitter. + + Args: + is_active (bool): whether the transmitter is active. + """ + self.active = is_active diff --git a/paseos/paseos.py b/paseos/paseos.py index 94661f00..d010f2de 100644 --- a/paseos/paseos.py +++ b/paseos/paseos.py @@ -1,13 +1,13 @@ -import types import asyncio import sys +import types +import pykep as pk from dotmap import DotMap from loguru import logger -import pykep as pk -from paseos.actors.base_actor import BaseActor from paseos.activities.activity_manager import ActivityManager +from paseos.actors.base_actor import BaseActor from paseos.utils.operations_monitor import OperationsMonitor @@ -47,6 +47,7 @@ def __init__(self, local_actor: BaseActor, cfg): Args: local_actor (BaseActor): local actor. cfg (DotMap): simulation configuration. + communication_links ([LinkModel]): list of communication links """ logger.trace("Initializing PASEOS") self._cfg = cfg @@ -98,9 +99,10 @@ def advance_time( float: Time remaining to advance (or 0 if done) """ - assert ( - not self._is_advancing_time - ), "advance_time is already running. This function is not thread-safe. Avoid mixing (async) activities and calling it." + assert not self._is_advancing_time, ( + "advance_time is already running. This function is not thread-safe. Avoid mixing (" + "async) activities and calling it." + ) self._is_advancing_time = True assert time_to_advance > 0, "Time to advance has to be positive." @@ -237,7 +239,8 @@ def simulation_time(self) -> float: @property def local_time(self) -> pk.epoch: - """Returns local time of the actor as pykep epoch. Use e.g. epoch.mjd2000 to get time in days. + """Returns local time of the actor as pykep epoch. Use e.g. epoch.mjd2000 to get time in + days. Returns: pk.epoch: local time of the actor @@ -246,7 +249,8 @@ def local_time(self) -> pk.epoch: @property def monitor(self): - """Access paseos operations monitor which tracks local actor attributes such as temperature or state of charge. + """Access paseos operations monitor which tracks local actor attributes such as + temperature or state of charge. Returns: OperationsMonitor: Monitor object. @@ -327,10 +331,12 @@ def register_activity( activity_function (types.CoroutineType): Function to execute during the activity. Needs to be async. Can accept a list of arguments to be specified later. power_consumption_in_watt (float): Power consumption of the activity in W (per second). - on_termination_function (types.CoroutineType): Function to execute when the activities stops + on_termination_function (types.CoroutineType): Function to execute when the + activities stops (either due to completion or constraint not being satisfied anymore). Needs to be async. Can accept a list of arguments to be specified later. - constraint_function (types.CoroutineType): Function to evaluate if constraints are still valid. + constraint_function (types.CoroutineType): Function to evaluate if constraints are + still valid. Should return True if constraints are valid, False if they aren't. Needs to be async. Can accept a list of arguments to be specified later. """ @@ -371,7 +377,8 @@ def perform_activity( termination_func_args: list = None, constraint_func_args: list = None, ): - """Perform the specified activity. Will advance the simulation if automatic clock is not disabled. + """Perform the specified activity. Will advance the simulation if automatic clock is not + disabled. Args: name (str): Name of the activity diff --git a/paseos/resources/constants.py b/paseos/resources/constants.py new file mode 100644 index 00000000..1e03f16e --- /dev/null +++ b/paseos/resources/constants.py @@ -0,0 +1,21 @@ +import os +from skyfield.api import load + +BOLTZMANN_DB = -228.6 # in dB +C = 299792458 # in m/s +PLANCK_CONSTANT = 6.62607015e-34 # in m^2 kg / s + +REQUIRED_BER_RADIO = 10e-5 +REQUIRED_S_N_RATIO_RADIO_DB = 9.6 # in dB +REQUIRED_S_N_MARGIN_RADIO_DB = 4.3 # in dB + + +WAVELENGTH_OPTICAL = 1550e-9 # in m +REQUIRED_BER_OPTICAL = 10e-3 +REQUIRED_S_N_MARGIN_OPTICAL_DB = 3 # in dB +PHOTONS_PER_BIT = 250 + + +_SKYFIELD_EARTH_PATH = os.path.join(os.path.dirname(__file__) + "/../resources/", "de421.bsp") +# Skyfield Earth, in the future we may not always want to load this. +SKYFIELD_EARTH = load(_SKYFIELD_EARTH_PATH)["earth"] diff --git a/paseos/tests/communication_test.py b/paseos/tests/communication_test.py new file mode 100644 index 00000000..41d6936c --- /dev/null +++ b/paseos/tests/communication_test.py @@ -0,0 +1,654 @@ +"""Tests to check the communication models creation and calculation function(s)""" +import numpy as np +import pykep as pk +import pytest + +from paseos.actors.ground_station_actor import GroundstationActor +from paseos.communication.device_type import DeviceType +from paseos.communication.receiver_model import ReceiverModel +from paseos.communication.transmitter_model import TransmitterModel +from paseos.communication.link_model import LinkModel +from paseos import ActorBuilder, SpacecraftActor +from paseos.communication.link_budget_calc import calc_dist_and_alt_angle + + +def test_link_creation(): + """ + Tests whether the creation of a link is successful and raises the correct exceptions in case of invalid parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + maspalomas_groundstation = ActorBuilder.get_actor_scaffold( + name="maspalomas_groundstation", actor_type=GroundstationActor, epoch=t0 + ) + receiver_name = "maspalomas_radio_receiver_1" + + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + + sat_actor: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + radio_name = "sat_radio_transmitter_1" + optical_transmitter_name = "sat_optical_transmitter_1" + + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=optical_transmitter_name, + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + fwhm=1e-3, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + comms_sat: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Comms", actor_type=SpacecraftActor, epoch=t0 + ) + optical_receiver_name = "optical_receiver_1" + ActorBuilder.add_comm_device( + actor=comms_sat, + device_name=optical_receiver_name, + line_losses=4.1, + antenna_gain=114.2, + device_type=DeviceType.OPTICAL_RECEIVER, + ) + + with pytest.raises(AssertionError, match="Frequency needs to be higher than 0 Hz."): + ActorBuilder.add_comm_link( + sat_actor, + radio_name, + maspalomas_groundstation, + receiver_name, + "link_1", + frequency=-8675e6, + ) + + link_name = "link_1" + ActorBuilder.add_comm_link( + sat_actor, radio_name, maspalomas_groundstation, receiver_name, link_name, frequency=8675e6 + ) + + link = sat_actor.get_comm_link(link_name) + + assert isinstance(link, LinkModel) + + +def test_receiver_creation(): + """ + Tests whether the creation of a receiver is successful and raises the correct exceptions in case of invalid parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + maspalomas_groundstation = ActorBuilder.get_actor_scaffold( + name="maspalomas_groundstation", actor_type=GroundstationActor, epoch=t0 + ) + ActorBuilder.set_ground_station_location( + maspalomas_groundstation, + latitude=27.7629, + longitude=-15.6338, + elevation=205.1, + minimum_altitude_angle=5, + ) + + receiver_name = "maspalomas_radio_receiver_1" + + with pytest.raises(AssertionError, match="Line losses needs to be 0 or higher."): + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=-1, + polarization_losses=3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + with pytest.raises( + AssertionError, + match="Antenna gain or antenna diameter or fwhm needs to be set.", + ): + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=3, + device_type=DeviceType.RADIO_RECEIVER, + ) + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=3, + antenna_gain=62.6, + antenna_diameter=2, + device_type=DeviceType.RADIO_RECEIVER, + ) + + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + + receiver = maspalomas_groundstation.get_receiver(receiver_name) + assert isinstance(receiver, ReceiverModel) + + +def test_radio_receiver_creation(): + """ + Tests whether the creation of a radio receiver is successful and raises the correct exceptions in case of invalid + parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + maspalomas_groundstation = ActorBuilder.get_actor_scaffold( + name="maspalomas_groundstation", actor_type=GroundstationActor, epoch=t0 + ) + ActorBuilder.set_ground_station_location( + maspalomas_groundstation, + latitude=27.7629, + longitude=-15.6338, + elevation=205.1, + minimum_altitude_angle=5, + ) + + receiver_name = "maspalomas_radio_receiver_1" + + with pytest.raises(AssertionError, match="Polarization losses needs to be 0 or higher."): + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=-3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + with pytest.raises(AssertionError, match="Noise temperature needs to be 0 or higher."): + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=-135, + line_losses=1, + polarization_losses=3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=3, + antenna_gain=62.6, + device_type=DeviceType.RADIO_RECEIVER, + ) + + receiver = maspalomas_groundstation.get_receiver(receiver_name) + assert isinstance(receiver, ReceiverModel) + + +def test_optical_receiver_creation(): + """ + Tests whether the creation of an optical receiver is successful and raises the correct exceptions in case of invalid + parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + comms_sat: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + optical_receiver_name = "optical_receiver_1" + ActorBuilder.add_comm_device( + actor=comms_sat, + device_name=optical_receiver_name, + line_losses=4.1, + antenna_gain=114.2, + device_type=DeviceType.OPTICAL_RECEIVER, + ) + + receiver = comms_sat.get_receiver(optical_receiver_name) + assert isinstance(receiver, ReceiverModel) + + +def test_transmitter_creation(): + """ + Tests whether the creation of a transmitter is successful and raises the correct exceptions in case of invalid + parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + sat_actor: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + radio_name = "sat_radio_transmitter_1" + + with pytest.raises(AssertionError, match="Input power needs to be higher than 0."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=-2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Power efficiency should be between 0 and 1."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=1.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Power efficiency should be between 0 and 1."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=-0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Antenna efficiency should be between 0 and 1."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=1.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Antenna efficiency should be between 0 and 1."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=-0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Line losses needs to be 0 or higher."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=-1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises(AssertionError, match="Pointing losses needs to be 0 or higher."): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=-5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + +def test_radio_transmitter_creation(): + """ + Tests whether the creation of a radio transmitter is successful and raises the correct exceptions in case of invalid + parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + sat_actor: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + radio_name = "sat_radio_transmitter_1" + + with pytest.raises( + AssertionError, + match="Antenna gain or antenna diameter or fwhm needs to be set.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + antenna_diameter=1, + antenna_gain=100, + line_losses=1, + point_losses=5, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=2, + power_efficiency=0.5, + antenna_efficiency=0.5, + line_losses=1, + point_losses=5, + antenna_diameter=0.3, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + transmitter = sat_actor.get_transmitter(radio_name) + assert isinstance(transmitter, TransmitterModel) + + +def test_optical_transmitter_creation(): + """ + Tests whether the creation of an optical transmitter is successful and raises the correct exceptions in case of invalid + parameters. + """ + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + sat_actor: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + + with pytest.raises( + AssertionError, + match="Antenna gain or antenna diameter or fwhm needs to be set.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + antenna_gain=100, + fwhm=1e-3, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + antenna_gain=100, + antenna_diameter=1, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + fwhm=1e-3, + antenna_diameter=1, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + with pytest.raises( + AssertionError, + match="Only set one of antenna gain, antenna diameter and fwhm not multiple.", + ): + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + fwhm=1e-3, + antenna_diameter=1, + antenna_gain=100, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name="optical_transmitter_name", + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + fwhm=1e-3, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + transmitter = sat_actor.get_transmitter("optical_transmitter_name") + assert isinstance(transmitter, TransmitterModel) + + +def test_bitrate_calculation(): + """ + Tests whether the bitrate is calculated correctly for radio and optical links. + """ + # Radio input based on Kirkpatrick, D. (1999). Space mission analysis and design (Vol. 8). + # J. R. Wertz, W. J. Larson, & D. Klungle (Eds.). Torrance: Microcosm. + + t0 = pk.epoch_from_string("2023-Jan-04 20:00:00") + maspalomas_groundstation = ActorBuilder.get_actor_scaffold( + name="maspalomas_groundstation", actor_type=GroundstationActor, epoch=t0 + ) + receiver_name = "maspalomas_radio_receiver_1" + + ActorBuilder.add_comm_device( + actor=maspalomas_groundstation, + device_name=receiver_name, + noise_temperature=135, + line_losses=1, + polarization_losses=0.3, + antenna_gain=39.1, + device_type=DeviceType.RADIO_RECEIVER, + ) + + sat_actor: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Sat", actor_type=SpacecraftActor, epoch=t0 + ) + radio_name = "sat_radio_transmitter_1" + + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=radio_name, + input_power=40, + power_efficiency=0.5, + antenna_efficiency=1, + line_losses=1, + point_losses=8.5, + antenna_gain=14.2, + device_type=DeviceType.RADIO_TRANSMITTER, + ) + + optical_transmitter_name = "sat_optical_transmitter_1" + + # Optical transmitter input based on Giggenbach, D., Knopp, M. T., & Fuchs, C. (2023). + # Link budget calculation in optical LEO satellite downlinks with on/off‐keying and + # large signal divergence: A simplified methodology. International Journal of Satellite + # Communications and Networking. + ActorBuilder.add_comm_device( + actor=sat_actor, + device_name=optical_transmitter_name, + input_power=1, + power_efficiency=1, + antenna_efficiency=1, + line_losses=1, + point_losses=3, + fwhm=1e-3, + device_type=DeviceType.OPTICAL_TRANSMITTER, + ) + + comms_sat: SpacecraftActor = ActorBuilder.get_actor_scaffold( + name="Comms", actor_type=SpacecraftActor, epoch=t0 + ) + optical_receiver_name = "optical_receiver_1" + + # Optical receiver input based on + # https://www.eoportal.org/satellite-missions/edrs#lct-laser-communication-terminal + ActorBuilder.add_comm_device( + actor=comms_sat, + device_name=optical_receiver_name, + line_losses=4.1, + antenna_diameter=135e-3, + device_type=DeviceType.OPTICAL_RECEIVER, + ) + + ActorBuilder.add_comm_link( + sat_actor, radio_name, maspalomas_groundstation, receiver_name, "link_1", frequency=2.2e9 + ) + + # Based on hand calculations + link_model = sat_actor.get_comm_link("link_1") + radio_bitrate = link_model.get_bitrate(2831e3, 0) + assert np.isclose(radio_bitrate, 114e6, 0.01, 1000) + + ActorBuilder.add_comm_link( + sat_actor, optical_transmitter_name, comms_sat, optical_receiver_name, "optical_link_1" + ) + + # Based on hand calculations + optical_link_model = sat_actor.get_comm_link("optical_link_1") + optical_bitrate = optical_link_model.get_bitrate(595e3, 0) + assert np.isclose(optical_bitrate, 86.5e6, 0.01, 1000) + + +def test_communication_link_sat_to_sat(): + """ + Tests whether the distance calculations between satellites is correct. + """ + earth = pk.planet.jpl_lp("earth") + sat1 = ActorBuilder.get_actor_scaffold("sat1", SpacecraftActor, pk.epoch(0)) + sat2 = ActorBuilder.get_actor_scaffold("sat2", SpacecraftActor, pk.epoch(0)) + + altitude = 500e3 + inclination = 0 + W_1 = 0 + W_2 = 30 * pk.DEG2RAD + argPeriapsis = 0 + + a = altitude + 6371000 # in [m], earth radius included + e = 0 + i = inclination * pk.DEG2RAD + w = argPeriapsis * pk.DEG2RAD + E = 0 + + r_1, v_1 = pk.core.par2ic([a, e, i, W_1, w, E], 1) + r_2, v_2 = pk.core.par2ic([a, e, i, W_2, w, E], 1) + + ActorBuilder.set_orbit( + sat1, + position=r_1, + velocity=v_1, + epoch=pk.epoch(0), + central_body=earth, + ) + ActorBuilder.set_orbit( + sat2, + position=r_2, + velocity=v_2, + epoch=pk.epoch(0), + central_body=earth, + ) + + dist, elevation_angle = calc_dist_and_alt_angle(sat1, sat2, sat1.local_time) + + # Based on hand calculations + assert np.isclose(dist, 3556691, 0.01, 100) + + +if __name__ == "__main__": + test_bitrate_calculation() + test_communication_link_sat_to_sat() + test_link_creation() + test_optical_receiver_creation() + test_optical_transmitter_creation() + test_radio_receiver_creation() + test_radio_transmitter_creation() + test_receiver_creation() + test_transmitter_creation() diff --git a/paseos/tests/gain_calc_test.py b/paseos/tests/gain_calc_test.py new file mode 100644 index 00000000..45b4190d --- /dev/null +++ b/paseos/tests/gain_calc_test.py @@ -0,0 +1,30 @@ +"""Test to check the gain_calc function(s)""" +import numpy as np +import pytest + +from paseos.communication.gain_calc import ( + calc_radio_gain_from_wavelength_diameter_db, + calc_gain_from_fwhm_db, +) + + +def test_gain_calculation(): + with pytest.raises(AssertionError, match="Wavelength needs to be larger than 0."): + calc_radio_gain_from_wavelength_diameter_db(-5, 0, 0.5) + + with pytest.raises(AssertionError, match="Antenna diameter needs to be larger than 0."): + calc_radio_gain_from_wavelength_diameter_db(10, -2, 0.5) + with pytest.raises(AssertionError, match="Antenna efficiency should be between 0 and 1."): + calc_radio_gain_from_wavelength_diameter_db(10, 2, -0.5) + with pytest.raises(AssertionError, match="Antenna efficiency should be between 0 and 1."): + calc_radio_gain_from_wavelength_diameter_db(10, 2, 1.5) + + gain_radio = calc_radio_gain_from_wavelength_diameter_db(0.035, 15, 0.5) + assert np.isclose(gain_radio, 59.6, rtol=0.01, atol=1.0) + + gain_optical = calc_gain_from_fwhm_db(1e-3) + assert np.isclose(gain_optical, 70.45, rtol=0.01, atol=1) + + +if __name__ == "__main__": + test_gain_calculation() diff --git a/paseos/utils/operations_monitor.py b/paseos/utils/operations_monitor.py index 51850e47..87927f88 100644 --- a/paseos/utils/operations_monitor.py +++ b/paseos/utils/operations_monitor.py @@ -1,9 +1,9 @@ import csv -from loguru import logger -from dotmap import DotMap -import pykep as pk import matplotlib.pyplot as plt +import pykep as pk +from dotmap import DotMap +from loguru import logger from paseos.actors.base_actor import BaseActor @@ -34,20 +34,23 @@ def __getitem__(self, item): """Get a logged attributes values. Args: - item (str): Name of item. Available are "timesteps","current_activity","state_of_charge", + item (str): Name of item. Available are "timesteps","current_activity", + "state_of_charge", "is_in_eclipse","known_actors","position","velocity","temperature" """ - assert item in ( - list(self._log.keys()) + list(self._log.custom_properties.keys()) - ), f"Untracked quantity. Available are {self._log.keys() + self._log.custom_properties.keys()}" + assert item in (list(self._log.keys()) + list(self._log.custom_properties.keys())), ( + f"Untracked quantity. Available are " + f"{self._log.keys() + self._log.custom_properties.keys()}" + ) if item in self._log.custom_properties.keys(): return self._log.custom_properties[item] return self._log[item] def plot(self, item): - assert item in ( - list(self._log.keys()) + list(self._log.custom_properties.keys()) - ), f"Untracked quantity. Available are {self._log.keys() + self._log.custom_properties.keys()}" + assert item in (list(self._log.keys()) + list(self._log.custom_properties.keys())), ( + f"Untracked quantity. Available " + f"are {self._log.keys() + self._log.custom_properties.keys()}" + ) if item in self._log.custom_properties.keys(): values = self._log.custom_properties[item] else: @@ -58,16 +61,13 @@ def plot(self, item): plt.xlabel("Time [s]") plt.ylabel(item.replace("_", " ")) - def log( - self, - local_actor: BaseActor, - known_actors: list, - ): + def log(self, local_actor: BaseActor, known_actors: list, communication_links=None): """Log the current time step. Args: local_actor (BaseActor): The local actors whose status we are monitoring. known_actors (list): List of names of the known actors. + communication_links: List of communication links. """ logger.trace("Logging iteration") assert local_actor.name == self._actor_name, "Expected actor's name was" + self._actor_name @@ -90,6 +90,10 @@ def log( else: self._log.is_in_eclipse.append(local_actor._previous_eclipse_status) + if local_actor.communication_links is not None: + for link in local_actor.communication_links: + link.save_state() + # Track all custom properties for key, value in local_actor.custom_properties.items(): if key not in self._log.custom_properties.keys(): diff --git a/paseos/utils/sky_field_sky_coordinate.py b/paseos/utils/sky_field_sky_coordinate.py new file mode 100644 index 00000000..bd35bd42 --- /dev/null +++ b/paseos/utils/sky_field_sky_coordinate.py @@ -0,0 +1,24 @@ +from skyfield.units import AU_M +from skyfield.vectorlib import VectorFunction + + +class SkyfieldSkyCoordinate(VectorFunction): + """Small helper class to compute altitude angle""" + + def __init__(self, r_in_m, earth_pos_in_au): + # Value 0 corresponds to the solar system barycenter according to + # this: https://rhodesmill.org/skyfield/api-vectors.html#skyfield.vectorlib.VectorFunction.center + self.center = 0 + # Position vector + self.r = earth_pos_in_au + r_in_m / AU_M + + @property + def target(self): + # This is a property to avoid circular references as described + # here: https://github.com/skyfielders/python-skyfield/blob/master/skyfield/planetarylib.py#L222 + return self + + def _at(self, t): + # Velocity vector + v = [0.0, 0.0, 0.0] + return self.r, v, self.center, "SkyfieldSkyCoordinate"