"""All the heleper methods to interface Google Earthengine with sepal-ui."""
import asyncio
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from pathlib import Path
from typing import Any, Callable, List, Union
import ee
import ipyvuetify as v
from deprecated.sphinx import deprecated, versionadded
from pysepal.message import ms
[docs]
def get_ee_project() -> str:
"""Get the current Earth Engine project ID.
Returns:
The project ID (e.g., 'ee-username').
"""
if not ee.data.is_initialized():
raise RuntimeError("Earth Engine is not initialized. Call init_ee() first.")
config = ee.data.getProjectConfig()
# Extract project ID from 'projects/PROJECT_ID/config' format
return config["name"].split("/")[1]
[docs]
@versionadded(version="3.0", reason="moved from utils to a dedicated module")
def need_ee(func: Callable) -> Any:
"""Decorator to execute check if the object require EE binding.
Trigger an exception if the connection is not possible.
Args:
func: the object on which the decorator is applied
Returns:
The return statement of the decorated method
"""
@wraps(func)
def wrapper_ee(*args, **kwargs):
# try to connect to ee
try:
init_ee()
except Exception:
raise Exception("This function needs an Earth Engine authentication")
return func(*args, **kwargs)
return wrapper_ee
[docs]
@need_ee
def wait_for_completion(task_descripsion: str, widget_alert: v.Alert = None) -> str:
"""Wait until the selected process is finished. Display some output information.
Args:
task_descripsion: name of the running task
widget_alert: alert to display the output messages
Returns:
the final state of the task
"""
state = "UNSUBMITTED"
while state != "COMPLETED":
# print in a widget
if widget_alert:
widget_alert.add_live_msg(ms.status.format(state))
# wait 5 seconds
time.sleep(5)
# search for the task in task_list
current_task = is_task(task_descripsion)
state = current_task.state
if state == "FAILED":
raise Exception(ms.status.format(state))
# print in a widget
if widget_alert:
widget_alert.add_live_msg(ms.status.format(state), "success")
return state
[docs]
@need_ee
def is_task(task_descripsion: str) -> ee.batch.Task:
"""Search for the described task in the user Task list return None if nothing is found.
Args:
task_descripsion: the task description
Returns:
return the found task else None
"""
current_task = None
for task in ee.batch.Task.list():
if task.config["description"] == task_descripsion:
current_task = task
break
return current_task
[docs]
@need_ee
def is_running(task_descripsion: str) -> ee.batch.Task:
"""Search for the described task in the user Task list return None if nothing is currently running.
Args:
task_descripsion: the task description
Returns:
return the found task else None
"""
current_task = is_task(task_descripsion)
if current_task:
if current_task.state not in ["RUNNING", "READY"]:
current_task = None
return current_task
@deprecated(
reason="This function is no longer required. Use GEEInterface.get_assets() instead for better performance and resource management.",
version="3.2.0",
category=DeprecationWarning,
)
async def _list_assets_concurrent(folders: list, semaphore: asyncio.Semaphore) -> list:
"""List assets concurrently using ThreadPoolExecutor.
.. deprecated:: 3.2.0
This function is deprecated. Use GEEInterface.get_assets() instead.
Args:
folders: list of folders to list assets from
Returns:
list of assets for each folder
"""
async with semaphore:
with ThreadPoolExecutor() as executor:
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder})
for folder in folders
]
results = await asyncio.gather(*tasks)
return results
# Deprecated alias - use GEEInterface.get_assets() instead
list_assets_concurrent = _list_assets_concurrent
[docs]
@deprecated(
reason="Use GEEInterface.get_assets() instead for better performance and resource management.",
version="3.2.0",
category=DeprecationWarning,
)
async def get_assets_async_concurrent(folder: str, gee_interface=None) -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.
.. deprecated:: 3.2.0
This function is deprecated. Use GEEInterface.get_assets() instead.
Args:
folder: the initial GEE folder
Returns:
the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
"""
if not gee_interface:
from pysepal.scripts.gee_interface import GEEInterface
gee_interface = GEEInterface()
return await gee_interface.get_assets_async(folder)
@deprecated(
reason="Use GEEInterface.get_assets() instead for better performance and resource management.",
version="3.2.0",
category=DeprecationWarning,
)
@need_ee
def _get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.
.. deprecated:: 3.2.0
This function is deprecated. Use GEEInterface.get_assets() instead.
Args:
folder: the initial GEE folder
async_: whether or not the function should be executed asynchronously
Returns:
the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
"""
folder = str(folder) if folder else f"projects/{get_ee_project()}/assets/"
return _get_assets_sync(folder)
@need_ee
def _get_assets_sync(folder: Union[str, Path] = "") -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.
Args:
folder: the initial GEE folder
Returns:
the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
"""
# set the folder and init the list
asset_list = []
def _recursive_get(folder, asset_list):
# loop in the assets
for asset in ee.data.listAssets({"parent": folder})["assets"]:
asset_list += [asset]
if asset["type"] == "FOLDER":
asset_list = _recursive_get(asset["name"], asset_list)
return asset_list
return _recursive_get(folder, asset_list)
# Deprecated alias - use GEEInterface.get_assets() instead
get_assets = _get_assets
[docs]
@deprecated(
reason="Use GEEInterface.get_asset() with not_exists_ok=True instead. "
"The 'asset_name' and 'folder' parameters are deprecated. Use 'asset_id' directly.",
version="3.2.0",
category=DeprecationWarning,
)
@need_ee
def is_asset(asset_name: str = None, folder: Union[str, Path] = "", asset_id: str = None) -> bool:
"""Check if the asset already exist in the user asset folder.
Args:
asset_name: [DEPRECATED] the name of the asset (use asset_id instead)
folder: [DEPRECATED] the folder of the assets (use full asset_id instead)
asset_id: the complete asset ID (e.g., 'projects/your-project/assets/folder/asset_name')
Returns:
true if asset exists, false otherwise
"""
# Handle backward compatibility
if asset_id is None:
if asset_name is None:
raise ValueError("Either 'asset_id' or 'asset_name' must be provided")
# Check if asset_name already contains the full path
if asset_name.startswith("projects/"):
asset_id = asset_name
else:
# Construct asset_id from legacy parameters
folder = str(folder) if folder else ""
if folder and not folder.startswith("projects/"):
# If folder doesn't start with 'projects/', assume it's a relative path
folder = f"projects/{get_ee_project()}/assets/{folder}"
elif not folder:
# If no folder provided, use default
folder = f"projects/{get_ee_project()}/assets/"
# Ensure folder ends with '/' for proper concatenation
if not folder.endswith("/"):
folder += "/"
asset_id = folder + asset_name
# Use GEEInterface to check if asset exists
from pysepal.scripts.gee_interface import GEEInterface
with GEEInterface() as gee_interface:
result = gee_interface.get_asset(asset_id, not_exists_ok=True)
return result is not None
[docs]
@need_ee
def delete_assets(asset_id: str, dry_run: bool = True) -> None:
"""Delete the selected asset and all its content.
This method will delete all the files and folders existing in an asset folder. By default a dry run will be launched and if you are satisfyed with the displayed names, change the ``dry_run`` variable to ``False``. No other warnng will be displayed.
.. warning::
If this method is used on the root directory you will loose all your data, it's highly recommended to use a dry run first and carefully review the destroyed files.
Args:
asset_id: the Id of the asset or a folder
dry_run: whether or not a dry run should be launched. dry run will only display the files name without deleting them.
"""
# define the action to execute for each asset based on the dry run mode
def delete(id: str) -> None:
if dry_run is True:
print(f"to be deleted: {id}")
else:
print(f"deleting: {id}")
ee.data.deleteAsset(id)
return
# identify the type of asset
asset_info = ee.data.getAsset(asset_id)
if asset_info["type"] == "FOLDER":
# get all the assets
asset_list = get_assets(folder=asset_id)
# split the files by nesting levels
# we will need to delete the more nested files first
assets_ordered = {}
for asset in asset_list:
lvl = len(asset["id"].split("/"))
assets_ordered.setdefault(lvl, [])
assets_ordered[lvl].append(asset)
# delete all items starting from the more nested one but not folders
assets_ordered = dict(sorted(assets_ordered.items(), reverse=True))
for lvl in assets_ordered:
for i in assets_ordered[lvl]:
delete(i["name"])
# delete the initial folder/asset
delete(asset_id)
return
[docs]
@need_ee
def get_task(task_id):
"""Get task."""
tasks_list = ee.batch.Task.list()
for task in tasks_list:
if task.id == task_id:
return task
raise Exception(f"The task id {task_id} doesn't exist in your tasks.")
[docs]
def init_ee() -> None:
r"""Initialize earth engine according using a token.
THe environment used to run the tests need to have a EARTHENGINE_TOKEN variable.
The content of this variable must be the copy of a personal credential file that you can find on your local computer if you already run the earth engine command line tool. See the usage question for a github action example.
- Windows: ``C:\Users\USERNAME\\.config\\earthengine\\credentials``
- Linux: ``/home/USERNAME/.config/earthengine/credentials``
- MacOS: ``/Users/USERNAME/.config/earthengine/credentials``
Note:
As all init method of pytest-gee, this method will fallback to a regular ``ee.Initialize()`` if the environment variable is not found e.g. on your local computer.
"""
if not ee.data.is_initialized():
credential_folder_path = Path.home() / ".config" / "earthengine"
credential_file_path = credential_folder_path / "credentials"
if "EARTHENGINE_TOKEN" in os.environ and not credential_file_path.exists():
# write the token to the appropriate folder
ee_token = os.environ["EARTHENGINE_TOKEN"]
credential_folder_path.mkdir(parents=True, exist_ok=True)
credential_file_path.write_text(ee_token)
# Extract the project name from credentials
_credentials = json.loads(credential_file_path.read_text())
project_id = _credentials.get("project_id", _credentials.get("project", None))
if not project_id:
raise NameError(
"The project name cannot be detected. "
"Please set it using `earthengine set_project project_name`."
)
# Check if we are using a google service account
if _credentials.get("type") == "service_account":
ee_user = _credentials.get("client_email")
credentials = ee.ServiceAccountCredentials(ee_user, str(credential_file_path))
ee.Initialize(credentials=credentials, project=project_id)
return
# if the user is in local development the authentication should
# already be available
ee.Initialize(project=project_id)