Source code for sepal_ui.scripts.gee

"""All the heleper methods to interface Google Earthengine with sepal-ui."""

import time
from pathlib import Path
from typing import List, Union

import ee
import ipyvuetify as v

from sepal_ui.message import ms
from sepal_ui.scripts import decorator as sd


[docs] @sd.need_ee def wait_for_completion(task_descripsion: str, widget_alert: v.Alert = None) -> str: """Wait until the selected process is finished. Display some output information. Args: task_descripsion: name of the running task widget_alert: alert to display the output messages Returns: the final state of the task """ state = "UNSUBMITTED" while state != "COMPLETED": # print in a widget if widget_alert: widget_alert.add_live_msg(ms.status.format(state)) # wait 5 seconds time.sleep(5) # search for the task in task_list current_task = is_task(task_descripsion) state = current_task.state if state == "FAILED": raise Exception(ms.status.format(state)) # print in a widget if widget_alert: widget_alert.add_live_msg(ms.status.format(state), "success") return state
[docs] @sd.need_ee def is_task(task_descripsion: str) -> ee.batch.Task: """Search for the described task in the user Task list return None if nothing is found. Args: task_descripsion: the task description Returns: return the found task else None """ current_task = None for task in ee.batch.Task.list(): if task.config["description"] == task_descripsion: current_task = task break return current_task
[docs] @sd.need_ee def is_running(task_descripsion: str) -> ee.batch.Task: """Search for the described task in the user Task list return None if nothing is currently running. Args: task_descripsion: the task description Returns: return the found task else None """ current_task = is_task(task_descripsion) if current_task: if current_task.state not in ["RUNNING", "READY"]: current_task = None return current_task
[docs] @sd.need_ee def get_assets(folder: Union[str, Path] = "") -> List[dict]: """Get all the assets from the parameter folder. every nested asset will be displayed. Args: folder: the initial GEE folder Returns: the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' """ # set the folder and init the list asset_list = [] folder = str(folder) or ee.data.getAssetRoots()[0]["id"] def _recursive_get(folder, asset_list): # loop in the assets for asset in ee.data.listAssets({"parent": folder})["assets"]: asset_list += [asset] if asset["type"] == "FOLDER": asset_list = _recursive_get(asset["name"], asset_list) return asset_list return _recursive_get(folder, asset_list)
[docs] @sd.need_ee def is_asset(asset_name: str, folder: Union[str, Path] = "") -> bool: """Check if the asset already exist in the user asset folder. Args: asset_descripsion: the descripsion of the asset folder: the folder of the glad assets Returns: true if already in folder """ # get the folder folder = str(folder) or ee.data.getAssetRoots()[0]["id"] # get all the assets asset_list = get_assets(folder) # search for asset existence exist = False for asset in asset_list: if asset_name == asset["name"]: exist = True break return exist
[docs] @sd.need_ee def delete_assets(asset_id: str, dry_run: bool = True) -> None: """Delete the selected asset and all its content. This method will delete all the files and folders existing in an asset folder. By default a dry run will be launched and if you are satisfyed with the displayed names, change the ``dry_run`` variable to ``False``. No other warnng will be displayed. .. warning:: If this method is used on the root directory you will loose all your data, it's highly recommended to use a dry run first and carefully review the destroyed files. Args: asset_id: the Id of the asset or a folder dry_run: whether or not a dry run should be launched. dry run will only display the files name without deleting them. """ # define the action to execute for each asset based on the dry run mode def delete(id: str) -> None: if dry_run is True: print(f"to be deleted: {id}") else: print(f"deleting: {id}") ee.data.deleteAsset(id) return # identify the type of asset asset_info = ee.data.getAsset(asset_id) if asset_info["type"] == "FOLDER": # get all the assets asset_list = get_assets(folder=asset_id) # split the files by nesting levels # we will need to delete the more nested files first assets_ordered = {} for asset in asset_list: lvl = len(asset["id"].split("/")) assets_ordered.setdefault(lvl, []) assets_ordered[lvl].append(asset) # delete all items starting from the more nested one but not folders assets_ordered = dict(sorted(assets_ordered.items(), reverse=True)) for lvl in assets_ordered: for i in assets_ordered[lvl]: delete(i["name"]) # delete the initial folder/asset delete(asset_id) return