"""Model object dedicated to Planet interface."""
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Union
import nest_asyncio
import planet.data_filter as filters
import traitlets as t
from deprecated.sphinx import deprecated
from planet import DataClient
from planet.auth import Auth
from planet.exceptions import NoPermission
from planet.http import Session
from sepal_ui.message import ms
from sepal_ui.model import Model
# known problem https://github.com/jupyter/notebook/issues/3397
nest_asyncio.apply()
[docs]
class PlanetModel(Model):
SUBS_URL: str = (
"https://api.planet.com/auth/v1/experimental/public/my/subscriptions"
)
"The url of the planet API subscription"
credentials: List[str] = []
"list containing [api_key] or pair of [username, password] to log in"
session: Optional[Session] = None
"planet.http.session: planet session."
subscriptions: t.Dict = t.Dict({}).tag(sync=True)
"All the dictionary info from the available subscriptions"
active = t.Bool(False).tag(sync=True)
"Value to determine if at least one subscription has the active true state"
[docs]
def __init__(self, credentials: Union[str, List[str]] = "") -> None:
"""Planet model helper to connect planet API client and perform requests.
It can be
instantiated whether itself or linked with a PlanetView input helper. All the methods
are aimed to be used without the need of a view.
Args:
credentials: planet API key or tuple of username and password of planet explorer.
"""
self.subscriptions = {}
self.session = None
self.active = False
if credentials:
self.init_session(credentials)
[docs]
@deprecated(
version="3.0",
reason="credentials member is deprecated, use self.auth._key instead",
)
def init_session(
self, credentials: Union[str, List[str]], write_secrets: bool = False
) -> None:
"""Initialize planet client with api key or credentials. It will handle errors.
Args:
credentials: planet API key, username and password pair or a secrets planet.json file.
write_secrets: either to write the credentials in the secret file or not. Defaults to True.
"""
if isinstance(credentials, str):
credentials = [credentials]
if not all(credentials):
raise ValueError(ms.planet.exception.empty)
if len(credentials) == 2:
self.auth = Auth.from_login(*credentials)
# Check if the str is a path to a secret file
elif len(credentials) == 1 and credentials[0].endswith(".json"):
self.auth = Auth.from_file(credentials[0])
else:
self.auth = Auth.from_key(credentials[0])
self.credentials = self.auth._key
self.session = Session(auth=self.auth)
self._is_active()
if self.active and write_secrets:
self.auth.store()
return
def _is_active(self) -> None:
"""Check if the key has an associated active subscription and change the state button accordingly."""
# As there is not any key that identify the nicfi contract,
# let's find though all the subscriptions a representative name
wildcards = ["Level_0", "Level_1", "Level2"]
# get the subs from the api key and save them in the model. It will be useful
# to avoid doing more calls.
tmp_subscriptions: dict[str, list] = {"nicfi": [], "others": []}
for sub in self.get_subscriptions():
for w in wildcards:
if w in str(sub):
tmp_subscriptions["nicfi"].append(sub)
break
if sub not in tmp_subscriptions["nicfi"]:
tmp_subscriptions["others"].append(sub)
self.subscriptions = tmp_subscriptions
states = self.search_status(self.subscriptions)
self.active = any([next(iter(d.values())) for d in states])
return
[docs]
def get_subscriptions(self) -> dict:
"""Load the user subscriptions.
Returns:
the dictionary of user subscription or empty list if nothing found
"""
req = self.session.request("GET", self.SUBS_URL)
try:
response = asyncio.run(req)
except NoPermission:
raise Exception(
"You don't have permission to access to this resource. Check your input data."
)
except Exception as e:
raise e
return response.json() if response.status_code == 200 else {}
[docs]
def get_items(
self,
aoi: dict,
start: Union[str, datetime],
end: Union[str, datetime],
cloud_cover: float,
limit: int = 0,
) -> list:
"""Request imagery items from the planet API for the requested dates.
Args:
aoi: geojson clipping geometry
start: the start of the request (YYYY-mm-dd))
end: the end of the request (YYYY-mm-dd))
cloud_cover: maximum cloud coverage.
limit: number of items to constrain the search. Defaults to 0 to use all of them.
Returns:
items found using the search query
"""
# cast start and end to str
start = (
datetime.strptime(start, "%Y-%m-%d") if isinstance(start, str) else start
)
end = datetime.strptime(end, "%Y-%m-%d") if isinstance(end, str) else end
and_filter = filters.and_filter(
[
filters.geometry_filter(aoi),
filters.range_filter("cloud_cover", lte=cloud_cover),
filters.date_range_filter("acquired", gt=start),
filters.date_range_filter("acquired", lt=end),
]
)
# PSScene3Band and PSScene4Band item type and assets are deprecated
# since January 2023 so we are now only looking at PSScene
item_types = ["PSScene"]
async def _main():
"""Create an asyncrhonous function here to avoid making the main get_items as async.
So we can keep calling get_items without any change.
"""
client = DataClient(self.session)
items = client.search(
item_types, and_filter, name="quick_search", limit=limit
)
items_list = [item async for item in items]
return items_list
return asyncio.run(_main())
[docs]
def get_mosaics(self) -> dict:
"""Get all the mosaics available in a client without pagination limitations.
Returns:
The mosaics contained in the API request.
Note:
The output format is the following:
.. code-block:: json
{
"_links": {
"_self": "<mosaic_url>",
"quads": "<quad_url>",
"tiles": "<xyz_tiles_url>"
},
"bbox": ["<4_corners_coordinates>"],
"coordinate_system": "<projection_system>",
"datatype": "uint16",
"first_acquired": "<date_of_aquisition>",
"grid": {
"quad_size": 4096,
"resolution": 4.77731426716
},
"id": "<mposaic_hashed_id>",
"interval": "<acquisition_interval>",
"item_types": ["<types_of_imagery"],
"last_acquired": "<last_image_timestamp>",
"level": "<max_zoom_level>",
"name": "<mosaic_name>",
"product_type": "timelapse",
"quad_download": true
}
"""
mosaics_url = "https://api.planet.com/basemaps/v1/mosaics"
request = self.session.request("GET", mosaics_url)
response = asyncio.run(request)
return response.json().get("mosaics", [])
[docs]
def get_quad(self, mosaic: dict, quad_id: str) -> dict:
"""Get a quad response for a specific mosaic and quad.
Args:
mosaic: A dict representing a mosaic in the format of list_mosaic
quad_id: A quad id (typically <xcoord>-<ycoord>)
Returns:
The quad information as a dict.
Note:
The output format is the following:
.. code-block:: json
{
"_links": {
"_self": "<quad_url>",
"download": "<download_url>",
"items": "<items_url>",
"thumbnail": "<thumbnail_url>"
},
"bbox": ["<corner_coordinates>"],
"id": "<quad_id>",
"percent_covered": 100
}
"""
quads_url = "https://api.planet.com/basemaps/v1/mosaics/{}/quads/{}"
quads_url = quads_url.format(mosaic["id"], quad_id)
request = self.session.request("GET", quads_url)
response = asyncio.run(request)
return response.json() or {}
[docs]
@staticmethod
def search_status(d: dict) -> List[Dict[str, bool]]:
"""Get the status of a specific subscription.
Args:
d: dictionary of subscription object
Returns:
the (sub.name: status) pairs
"""
states = []
for v in d.values():
for subs in v:
if "plan" in subs:
plan = subs.get("plan")
state = True if plan.get("state") == "active" else False
states.append({plan.get("name"): state})
return states