Job Result Openers¶
This section demonstrates how use client.open_job_results() and to enhance
the method's capabilities by adding a custom JobResultOpener to the client.
We use a test server here:
wraptile run -- wraptile.services.local.testing:service
In [1]:
Copied!
from cuiman import Client
from cuiman import Client
In [2]:
Copied!
client = Client()
# client
client = Client()
# client
In [3]:
Copied!
# client.get_processes()
# client.get_processes()
We'd like to open the output of a process that outputs something to the filesystem,
we use "simulate_scene" here. It has a output_path parameter for that purpose.
In [4]:
Copied!
client.get_process(process_id="simulate_scene")
client.get_process(process_id="simulate_scene")
Out[4]:
{
"description": "Simulate a set scene images slices for testing. Creates an xarray dataset with `periodicity` time slices and writes it as Zarr into a temporary location. Requires installed `dask`, `xarray`, and `zarr` packages.",
"id": "simulate_scene",
"inputs": {
"bbox": {
"description": "Bounding box in geographical coordinates.",
"minOccurs": 0,
"schema": {
"default": [
-180,
-90,
180,
90
],
"format": "bbox",
"items": {
"type": "number"
},
"maxItems": 4,
"minItems": 4,
"type": "array"
},
"title": "Bounding box"
},
"end_date": {
"minOccurs": 0,
"schema": {
"default": "2025-02-01",
"format": "date",
"type": "string"
},
"title": "End date"
},
"output_path": {
"description": "Local output path or URI.",
"schema": {
"minLength": 1,
"nullable": true,
"type": "string"
},
"title": "Output path",
"x-ui:advanced": true
},
"periodicity": {
"description": "Size of time steps in days.",
"minOccurs": 0,
"schema": {
"default": 1,
"maximum": 10,
"minimum": 1,
"type": "integer"
},
"title": "Periodicity"
},
"resolution": {
"description": "Spatial resolution in degree.",
"minOccurs": 0,
"schema": {
"default": 0.5,
"maximum": 1,
"minimum": 0.01,
"type": "number"
},
"title": "Spatial resolution"
},
"start_date": {
"minOccurs": 0,
"schema": {
"default": "2025-01-01",
"format": "date",
"type": "string"
},
"title": "Start date"
},
"var_names": {
"description": "Comma-separated list of variable names.",
"schema": {
"default": "a, b, c",
"type": "string"
},
"title": "Variable names",
"x-ui:advanced": true
}
},
"outputs": {
"return_value": {
"description": "A link.",
"schema": {
"properties": {
"href": {
"title": "Href",
"type": "string"
},
"hreflang": {
"examples": [
"en"
],
"nullable": true,
"title": "Hreflang",
"type": "string"
},
"rel": {
"examples": [
"service"
],
"nullable": true,
"title": "Rel",
"type": "string"
},
"title": {
"nullable": true,
"title": "Title",
"type": "string"
},
"type": {
"examples": [
"application/json"
],
"nullable": true,
"title": "Type",
"type": "string"
}
},
"required": [
"href"
],
"type": "object"
},
"title": "Link"
}
},
"title": "Generate scene for testing",
"version": "0.0.0"
}
Execute the process to receive a job ID (and other job state information):
In [5]:
Copied!
job_info = client.execute_process(process_id="simulate_scene", request={"inputs": {"output_path": "test.zarr"}})
job_info
job_info = client.execute_process(process_id="simulate_scene", request={"inputs": {"output_path": "test.zarr"}})
job_info
Out[5]:
{
"created": "2026-05-08T12:44:06.019130Z",
"jobID": "job_2",
"processID": "simulate_scene",
"status": "accepted"
}
In [8]:
Copied!
job_id = job_info.jobID
job_id = job_info.jobID
Getting the job results will return an object that just points the output, but will not actually open it:
In [9]:
Copied!
client.get_job_results(job_id)
client.get_job_results(job_id)
Out[9]:
{
"return_value": {
"href": "file:///C:/Users/norma/Projects/eozilla/test.zarr",
"type": "application/zarr"
}
}
But if we try the open the job's result, we expect this to fail, because no openers are registered yet.
In [10]:
Copied!
try:
dataset = client.open_job_result(job_id, timeout=5)
except Exception as e:
print(f"error: {e}")
try:
dataset = client.open_job_result(job_id, timeout=5)
except Exception as e:
print(f"error: {e}")
Therefore we create a new job result opener type:
In [11]:
Copied!
from cuiman.api.opener import JobResultOpener, JobResultOpenContext
class MyZarrOpener(JobResultOpener):
"""Open Zarr datasets from link results."""
@classmethod
def is_usable(cls):
"""Check whether xarray is installed."""
try:
import xarray as xr
return True
except ImportError:
return False
async def accept_job_result(self, ctx: JobResultOpenContext):
"""Check if we can open the given job result."""
return ctx.output_link.type == "application/zarr"
async def open_job_result(self, ctx):
"""Open the given job result."""
import xarray as xr
return xr.open_zarr(ctx.output_link.href, **ctx.options)
from cuiman.api.opener import JobResultOpener, JobResultOpenContext
class MyZarrOpener(JobResultOpener):
"""Open Zarr datasets from link results."""
@classmethod
def is_usable(cls):
"""Check whether xarray is installed."""
try:
import xarray as xr
return True
except ImportError:
return False
async def accept_job_result(self, ctx: JobResultOpenContext):
"""Check if we can open the given job result."""
return ctx.output_link.type == "application/zarr"
async def open_job_result(self, ctx):
"""Open the given job result."""
import xarray as xr
return xr.open_zarr(ctx.output_link.href, **ctx.options)
... and register it:
In [12]:
Copied!
client.config.register_job_result_opener(MyZarrOpener)
client.config.register_job_result_opener(MyZarrOpener)
Out[12]:
<function cuiman.api.opener.registry.JobResultOpenerRegistry.register.<locals>.unregister()>
Opening the job result should now work as expected:
In [13]:
Copied!
dataset = client.open_job_result(job_id)
dataset
dataset = client.open_job_result(job_id)
dataset
Out[13]:
<xarray.Dataset> Size: 193MB
Dimensions: (time: 31, lat: 360, lon: 720)
Coordinates:
* time (time) datetime64[ns] 248B 2025-01-01 2025-01-02 ... 2025-01-31
* lat (lat) float64 3kB -89.75 -89.25 -88.75 -88.25 ... 88.75 89.25 89.75
* lon (lon) float64 6kB -179.8 -179.2 -178.8 -178.2 ... 178.8 179.2 179.8
Data variables:
a (time, lat, lon) float64 64MB dask.array<chunksize=(31, 360, 720), meta=np.ndarray>
b (time, lat, lon) float64 64MB dask.array<chunksize=(31, 360, 720), meta=np.ndarray>
c (time, lat, lon) float64 64MB dask.array<chunksize=(31, 360, 720), meta=np.ndarray>In [ ]:
Copied!