Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,6 @@ local.properties
# TeXlipse plugin
.texlipse


# Python virtual env
venv
26 changes: 25 additions & 1 deletion docs/free/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,26 @@ Example for retrieving all state vectors currently received by your receivers (n

from opensky_api import OpenSkyApi

api = OpenSkyApi(USERNAME, PASSWORD)
api = OpenSkyApi(client_json_path="path/to/oauth/credentials.json")
states = api.get_my_states()
print(states)
for s in states.states:
print(s.sensors)

There are 3 ways to authenticate.

You can use OAuth, by pointing to the `.json` file downloaded from the `OpenSky account page <https://opensky-network.org/my-opensky/account>`.

api = OpenSkyApi(client_json_path="path/to/oauth/credentials.json")

You can use OAuth by providing an ID and Secret explicitly:

api = OpenSkyApi(client_id=id, client-secret=secret)

You can use Basic Authentication (deprecated) by providing a username and password:

api = OpenSkyApi(username=username, password=password)

It is also possible to retrieve state vectors for a certain area. For this purpose, you need to provide a bounding box.
It is defined by lower and upper bounds for longitude and latitude. The following example shows how to retrieve data
for a bounding box which encompasses Switzerland::
Expand Down Expand Up @@ -115,3 +129,13 @@ how to get the live track for aircraft with transponder address 3c4b26 (D-ABYF):
track = api.get_track_by_aircraft("3c4b26")
print(track)

Logging
-------

The client logs to a logger called `opensky_api`.
You can display logs with:

import logging
logger = logging.getLogger("opensky_api")
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
4 changes: 3 additions & 1 deletion docs/free/rest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ These are the required request parameters:
| | | as Unix time (seconds since epoch) |
+----------------+-----------+------------------------------------------------+

The given time interval must cover more than two days (UTC)!
The given time interval must cover no more than two days (UTC)!
e.g. 1/1/2025 23:00 UTC to 3/1/2025 01:00 UTC is a period of 26 hours, but covers 3 UTC dates,
so will be rejected with a 400 error.

Response
^^^^^^^^
Expand Down
135 changes: 104 additions & 31 deletions python/opensky_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import time
from collections import defaultdict
from datetime import datetime
import json
import datetime as dt

import requests

Expand Down Expand Up @@ -262,43 +264,106 @@ class OpenSkyApi(object):
Main class of the OpenSky Network API. Instances retrieve data from OpenSky via HTTP.
"""

def __init__(self, username=None, password=None):
def __init__(self, username=None, password=None, client_json_path=None, client_id=None, client_secret=None):
"""Create an instance of the API client. If you do not provide username and password requests will be
anonymous which imposes some limitations.

:param str username: an OpenSky username (optional).
:param str password: an OpenSky password for the given username (optional).
Either pass no arguments for anonymous, limited access,
or pass client_json_path for OAuth authentication from a json file,
or pass client_id and client_secret for OAuth authentication with separate ID and key,
or pass username and password for deprecated basic authentication.

:param str client_json_path: a file path to a local json file downloaded from OpenSky,
with keys clientId and clientSecret.
:param str client_id: an OpenSky API client ID for OAuth (optional)
:param str client_secret: an OpenSky API client secret for OAuth (optional)
:param str username: an OpenSky username for basic auth (optional, deprecated).
:param str password: an OpenSky password for basic auth (optional, deprecated).
"""
if username is not None:
self._auth = (username, password)
self._session = requests.Session()


oauth_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"

if client_json_path:
# read the secret from a file
if client_id or client_secret or username or password:
raise ValueError("If providing client_json_path, provide no other secret or ID argument")
with open(client_json_path, 'r') as f:
secret = json.load(f)
client_id = secret['clientId']
client_secret = secret['clientSecret']

if client_id:
if username or password:
raise ValueError("Must provide either (client_id and client_secret) or (username and password), not a mixture.")
elif client_secret is None:
raise ValueError("Must provide a client_secret if providing a client_id (OAuth)")

logger.debug("Requesting OAuth token")
r = self._session.post(
oauth_url,
data={
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)

if r.status_code != 200:
logger.error("Request to {} returned status {}".format(oauth_url, r.status_code))
raise ValueError("Authentication Failure (OAuth token generation)") from ex

self._oauth_token = r.json()['access_token']

self._session.headers.update({
'Authorization': "Bearer " + self._oauth_token
})
logger.debug("OAuth token obtained successfully")
self._anonymous = False

# Basic Auth (deprecated)
elif username:
if password:
raise ValueError("Must provide a password if providing a username (basic auth)")
self._session.auth = (username, password) = (username, password)
self._anonymous = False
else:
self._auth = ()
self._anonymous = True

self._api_url = "https://opensky-network.org/api"
self._last_requests = defaultdict(lambda: 0)


def _get_json(self, url_post, callee, params=None):
def _get_json(self, path, callee, params=None, _404_as_empty=False):
"""
Sends HTTP request to the given endpoint and returns the response as a json.
Raises an exception if the HTTP status was an error code.

:param str url_post: endpoint to which the request will be sent.
:param str path: The part of the URL after the API base url to which the request will be sent.
:param Callable callee: method that calls _get_json().
:param dict params: request parameters.
:rtype: dict|None
:param bool _404_as_empty: If True, a 404 error will result in an empty list being returned.
:rtype: dict

"""
r = requests.get(
"{0:s}{1:s}".format(self._api_url, url_post),
auth=self._auth,
r = self._session.get(
"{0:s}{1:s}".format(self._api_url, path),
params=params,
timeout=15.00,
)
if r.status_code == 200:
if _404_as_empty and (r.status_code == 404):
# Some API paths are defined to return a 404 when the result set is empty
logger.debug("404 response turned into empty list")
self._last_requests[callee] = time.time()
return r.json()
else:
logger.debug(
"Response not OK. Status {0:d} - {1:s}".format(r.status_code, r.reason)
)
return None
return []
elif r.status_code != 200:
logger.error("Error response with status {}, body: {}".format(r.status_code, repr(r.text)))
r.raise_for_status()

self._last_requests[callee] = time.time()
return r.json()

def _check_rate_limit(self, time_diff_noauth, time_diff_auth, func):
"""
Expand All @@ -309,7 +374,7 @@ def _check_rate_limit(self, time_diff_noauth, time_diff_auth, func):
:param callable func: the API function to evaluate.
:rtype: bool
"""
if len(self._auth) < 2:
if self._anonymous:
return abs(time.time() - self._last_requests[func]) >= time_diff_noauth
else:
return abs(time.time() - self._last_requests[func]) >= time_diff_auth
Expand Down Expand Up @@ -384,8 +449,8 @@ def get_my_states(self, time_secs=0, icao24=None, serials=None):
:return: OpenSkyStates if request was successful, None otherwise.
:rtype: OpenSkyStates | None
"""
if len(self._auth) < 2:
raise Exception("No username and password provided for get_my_states!")
if self._anonymous:
raise Exception("No authentication provided for get_my_states!")
if not self._check_rate_limit(0, 1, self.get_my_states):
logger.debug("Blocking request due to rate limit.")
return None
Expand Down Expand Up @@ -415,12 +480,12 @@ def get_flights_from_interval(self, begin, end):
"""
if begin >= end:
raise ValueError("The end parameter must be greater than begin.")
if end - begin > 7200:
if end - begin > 2 * 60 * 60:
raise ValueError("The time interval must be smaller than 2 hours.")

params = {"begin": begin, "end": end}
states_json = self._get_json(
"/flights/all", self.get_flights_from_interval, params=params
"/flights/all", self.get_flights_from_interval, params=params, _404_as_empty=True
)

if states_json is not None:
Expand All @@ -446,7 +511,7 @@ def get_flights_by_aircraft(self, icao24, begin, end):

params = {"icao24": icao24, "begin": begin, "end": end}
states_json = self._get_json(
"/flights/aircraft", self.get_flights_by_aircraft, params=params
"/flights/aircraft", self.get_flights_by_aircraft, params=params, _404_as_empty=True
)

if states_json is not None:
Expand All @@ -465,12 +530,12 @@ def get_arrivals_by_airport(self, airport, begin, end):
"""
if begin >= end:
raise ValueError("The end parameter must be greater than begin.")
if end - begin > 604800:
raise ValueError("The time interval must be smaller than 7 days.")
if _count_utc_dates(begin, end) > 2:
raise ValueError("The time interval must span no more than 2 dates in UTC")

params = {"airport": airport, "begin": begin, "end": end}
states_json = self._get_json(
"/flights/arrival", self.get_arrivals_by_airport, params=params
"/flights/arrival", self.get_arrivals_by_airport, params=params, _404_as_empty=True
)

if states_json is not None:
Expand All @@ -489,12 +554,12 @@ def get_departures_by_airport(self, airport, begin, end):
"""
if begin >= end:
raise ValueError("The end parameter must be greater than begin.")
if end - begin > 604800:
raise ValueError("The time interval must be smaller than 7 days.")
if _count_utc_dates(begin, end) > 2:
raise ValueError("The time interval must span no more than 2 dates in UTC")

params = {"airport": airport, "begin": begin, "end": end}
states_json = self._get_json(
"/flights/departure", self.get_departures_by_airport, params=params
"/flights/departure", self.get_departures_by_airport, params=params, _404_as_empty=True
)

if states_json is not None:
Expand Down Expand Up @@ -526,3 +591,11 @@ def get_track_by_aircraft(self, icao24, t=0):
if states_json is not None:
return FlightTrack(states_json)
return None


def _get_utc_date_from_epoch(e):
return dt.datetime.fromtimestamp(e, tz=dt.timezone.utc).date()

# count how many calendar days in UTC are spanned by two timestamps
def _count_utc_dates(begin, end):
return (_get_utc_date_from_epoch(end) - _get_utc_date_from_epoch(begin)) / dt.timedelta(days=1)
21 changes: 15 additions & 6 deletions python/test_opensky_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import time
from datetime import datetime
from unittest import TestCase, skipIf
from datetime import datetime, timezone
from unittest import TestCase, skipIf, main

from opensky_api import FlightData, FlightTrack, OpenSkyApi, StateVector, Waypoint

Expand All @@ -32,11 +32,14 @@
TEST_USERNAME = ""
TEST_PASSWORD = ""
TEST_SERIAL = []
TEST_SECRET_PATH = "/home/matthew/.local/share/credentials/opensky_api_key.json"


class TestOpenSkyApi(TestCase):
def setUp(self):
if len(TEST_USERNAME) > 0:
if TEST_SECRET_PATH:
self.api = OpenSkyApi(client_json_path=TEST_SECRET_PATH)
elif len(TEST_USERNAME) > 0:
self.api = OpenSkyApi(TEST_USERNAME, TEST_PASSWORD)
else:
self.api = OpenSkyApi()
Expand Down Expand Up @@ -193,7 +196,7 @@ def test_flight_track_parsing(self):
def test_get_my_states_no_auth(self):
a = OpenSkyApi()
with self.assertRaisesRegex(
Exception, "No username and password provided for get_my_states!"
Exception, "No authentication provided for get_my_states!"
):
a.get_my_states()

Expand Down Expand Up @@ -283,7 +286,13 @@ def test_get_departures_by_airport_reversed_timestamps(self):
r = self.api.get_departures_by_airport("EDDF", 1517230800, 1517227200)

def test_get_departures_by_airport_too_long_time_interval(self):
begin = datetime(2025, 1, 1, 23, 0, 0, tzinfo=timezone.utc).timestamp()
end = datetime(2025, 1, 3, 1, 0, 0, tzinfo=timezone.utc).timestamp()

with self.assertRaisesRegex(
Exception, "The time interval must be smaller than 7 days"
Exception, "The time interval must span no more than 2 dates in UTC"
):
r = self.api.get_departures_by_airport("EDDF", 1517227200, 1517832001)
r = self.api.get_departures_by_airport("EDDF", begin, end)

if __name__ == '__main__':
main()