Skip to content

Procodile API

The Procodile Python API is provided by the procodile package.

procodile.ProcessRegistry

Bases: Mapping[str, Process]

A registry for processes.

Processes are Python functions with extra metadata and can be extended to create workflows using steps decorator.

A Workflow consists of one or more Python functions with metadata, designed to execute sequentially by resolving dependencies and passing outputs to downstream steps.

This class provides a read-only mapping from unique identifiers to facade-like Process instances. While the user interacts with these processes, the registry internally manages full [Workflow][procodile.workflow.Workflow] instances.

The internal Workflow objects hold the source-of-truth metadata required for dependency resolution and execution, while the exposed Process objects serve as the public interface for client interaction.

Source code in procodile\src\procodile\registry.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class ProcessRegistry(Mapping[str, Process]):
    """
    A registry for processes.

    Processes are Python functions with extra metadata and can be extended
    to create `workflows` using `steps` decorator.

    A Workflow consists of one or more Python functions with metadata,
    designed to execute sequentially by resolving dependencies and
    passing outputs to downstream steps.

    This class provides a read-only mapping from unique identifiers to
    facade-like [Process][procodile.process.Process] instances. While the
    user interacts with these processes, the registry internally
    manages full [Workflow][procodile.workflow.Workflow] instances.

    The internal Workflow objects hold the source-of-truth metadata required
    for dependency resolution and execution, while the exposed Process
    objects serve as the public interface for client interaction.
    """

    def __init__(self):
        self._workflows: dict[str, Workflow] = {}

    # --- Overriding Mapping interface ---

    def __getitem__(self, workflow_id: str) -> Process:
        return self._as_process(self._workflows[workflow_id])

    def __iter__(self):
        return iter(self._workflows)

    def __len__(self) -> int:
        return len(self._workflows)

    @staticmethod
    @functools.lru_cache
    def _as_process(workflow: Workflow) -> Process:
        """This is the facade process object that is returned when the client wants
        to see what processes are available and is also used to run the actual
        workflow."""

        main = next(iter(workflow.registry.main.values()))

        projected: Process = deepcopy(main)

        # Steps exist -> last step defines outputs
        if workflow.registry.steps:
            order, _ = workflow.execution_order
            last_step_id = order[-2]  # because the step before that is the actual
            # user defined last step.
            last_step = workflow.registry.steps[last_step_id]["step"]
            projected.description.outputs = last_step.description.outputs

        # Update the function of the Process exposed to be `workflow.run` so
        # that it executes the main and the steps after that in order.
        projected.function = workflow.run

        return projected

    # --- Public API ---

    # noinspection PyShadowingBuiltins
    def main(
        self,
        function: Callable | None = None,
        /,
        *,
        id: Optional[str] = None,
        version: Optional[str] = None,
        title: Optional[str] = None,
        description: Optional[str] = None,
        inputs: Optional[dict[str, FieldInfo | InputDescription]] = None,
        outputs: Optional[dict[str, FieldInfo | OutputDescription]] = None,
        inputs_arg: str | bool = False,
    ) -> Callable[[Callable], Workflow] | Callable:
        """
        A decorator that can be applied to a user function in order to
        register it as a process in this registry.

        Note:

            - Use `main` decorator to express a process that comprises multiple steps
              that require a reference to the main entry point.

            - Use `process` decorator to express a process that has no steps,
              hence requires no reference to a main step.

        The decorator can be used with or without parameters.

        Args:
            function: The decorated function that is passed automatically since
                `process()` is a decorator function.
            id: Optional process identifier. Must be unique within the registry.
                If not provided, the fully qualified function name will be used.
            version: Optional version identifier. If not provided, `"0.0.0"`
                will be used.
            title: Optional, short process title.
            description: Optional, detailed description of the process. If not
                provided, the function's docstring, if any, will be used.
            inputs: Optional mapping from function argument names
                to [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
                or [`InputDescription`][gavicore.models.InputDescription] instances.
                The preferred way is to annotate the arguments directly
                as described in [The Annotated Pattern](https://docs.pydantic.dev/latest/concepts/fields/#the-annotated-pattern).
                Use `InputDescription` instances to pass extra information that cannot
                be represented by a `pydantic.Field`, e.g., `additionalParameters` or `keywords`.
            outputs: Mapping from output names to
                [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
                or [`OutputDescription`][gavicore.models.InputDescription] instances.
                Required, if you have multiple outputs returned as a
                dictionary. In this case, the function must return a typed `tuple` and
                output names refer to the items of the tuple in given order.
            inputs_arg: Specifies the use of an _inputs argument_. An inputs argument
                is a container for the actual process inputs. If specified, it must
                be the only function argument (besides an optional job context
                argument) and must be a subclass of `pydantic.BaseModel`.
                If `inputs_arg` is `True` the only argument will be the input argument,
                if `inputs_arg` is a `str` it must be the name of the only argument.
        """

        def register_workflow(fn: Callable) -> Workflow:
            # noinspection PyUnresolvedReferences
            f_name = f"{fn.__module__}:{fn.__qualname__}"
            workflow_id = id or f_name
            workflow = Workflow(
                fn,
                workflow_id=workflow_id,
                id=id,
                version=version,
                title=title,
                description=description,
                inputs=inputs,
                outputs=outputs,
                inputs_arg=inputs_arg,
            )
            self._workflows[workflow_id] = workflow
            return workflow

        if function is None:
            return register_workflow
        return register_workflow(function)

    # alias for main, when users need to define just process without any steps
    process = main

    # --- Internal API ---

    def get_workflow(self, workflow_id: str) -> Workflow:
        return self._workflows[workflow_id]

    def workflows(self) -> dict[str, Workflow]:
        return self._workflows

main(function=None, /, *, id=None, version=None, title=None, description=None, inputs=None, outputs=None, inputs_arg=False)

A decorator that can be applied to a user function in order to register it as a process in this registry.

Note:

- Use `main` decorator to express a process that comprises multiple steps
  that require a reference to the main entry point.

- Use `process` decorator to express a process that has no steps,
  hence requires no reference to a main step.

The decorator can be used with or without parameters.

Parameters:

Name Type Description Default
function Callable | None

The decorated function that is passed automatically since process() is a decorator function.

None
id Optional[str]

Optional process identifier. Must be unique within the registry. If not provided, the fully qualified function name will be used.

None
version Optional[str]

Optional version identifier. If not provided, "0.0.0" will be used.

None
title Optional[str]

Optional, short process title.

None
description Optional[str]

Optional, detailed description of the process. If not provided, the function's docstring, if any, will be used.

None
inputs Optional[dict[str, FieldInfo | InputDescription]]

Optional mapping from function argument names to pydantic.Field or [InputDescription][gavicore.models.InputDescription] instances. The preferred way is to annotate the arguments directly as described in The Annotated Pattern. Use InputDescription instances to pass extra information that cannot be represented by a pydantic.Field, e.g., additionalParameters or keywords.

None
outputs Optional[dict[str, FieldInfo | OutputDescription]]

Mapping from output names to pydantic.Field or [OutputDescription][gavicore.models.InputDescription] instances. Required, if you have multiple outputs returned as a dictionary. In this case, the function must return a typed tuple and output names refer to the items of the tuple in given order.

None
inputs_arg str | bool

Specifies the use of an inputs argument. An inputs argument is a container for the actual process inputs. If specified, it must be the only function argument (besides an optional job context argument) and must be a subclass of pydantic.BaseModel. If inputs_arg is True the only argument will be the input argument, if inputs_arg is a str it must be the name of the only argument.

False
Source code in procodile\src\procodile\registry.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def main(
    self,
    function: Callable | None = None,
    /,
    *,
    id: Optional[str] = None,
    version: Optional[str] = None,
    title: Optional[str] = None,
    description: Optional[str] = None,
    inputs: Optional[dict[str, FieldInfo | InputDescription]] = None,
    outputs: Optional[dict[str, FieldInfo | OutputDescription]] = None,
    inputs_arg: str | bool = False,
) -> Callable[[Callable], Workflow] | Callable:
    """
    A decorator that can be applied to a user function in order to
    register it as a process in this registry.

    Note:

        - Use `main` decorator to express a process that comprises multiple steps
          that require a reference to the main entry point.

        - Use `process` decorator to express a process that has no steps,
          hence requires no reference to a main step.

    The decorator can be used with or without parameters.

    Args:
        function: The decorated function that is passed automatically since
            `process()` is a decorator function.
        id: Optional process identifier. Must be unique within the registry.
            If not provided, the fully qualified function name will be used.
        version: Optional version identifier. If not provided, `"0.0.0"`
            will be used.
        title: Optional, short process title.
        description: Optional, detailed description of the process. If not
            provided, the function's docstring, if any, will be used.
        inputs: Optional mapping from function argument names
            to [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
            or [`InputDescription`][gavicore.models.InputDescription] instances.
            The preferred way is to annotate the arguments directly
            as described in [The Annotated Pattern](https://docs.pydantic.dev/latest/concepts/fields/#the-annotated-pattern).
            Use `InputDescription` instances to pass extra information that cannot
            be represented by a `pydantic.Field`, e.g., `additionalParameters` or `keywords`.
        outputs: Mapping from output names to
            [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
            or [`OutputDescription`][gavicore.models.InputDescription] instances.
            Required, if you have multiple outputs returned as a
            dictionary. In this case, the function must return a typed `tuple` and
            output names refer to the items of the tuple in given order.
        inputs_arg: Specifies the use of an _inputs argument_. An inputs argument
            is a container for the actual process inputs. If specified, it must
            be the only function argument (besides an optional job context
            argument) and must be a subclass of `pydantic.BaseModel`.
            If `inputs_arg` is `True` the only argument will be the input argument,
            if `inputs_arg` is a `str` it must be the name of the only argument.
    """

    def register_workflow(fn: Callable) -> Workflow:
        # noinspection PyUnresolvedReferences
        f_name = f"{fn.__module__}:{fn.__qualname__}"
        workflow_id = id or f_name
        workflow = Workflow(
            fn,
            workflow_id=workflow_id,
            id=id,
            version=version,
            title=title,
            description=description,
            inputs=inputs,
            outputs=outputs,
            inputs_arg=inputs_arg,
        )
        self._workflows[workflow_id] = workflow
        return workflow

    if function is None:
        return register_workflow
    return register_workflow(function)

options: show_source: false heading_level: 3

procodile.Process dataclass

A process comprises a process description and executable code in form of a Python function.

Instances of this class are be managed by the ProcessRegistry.

Attributes:

Name Type Description
function Callable

The user's Python function.

signature Signature

The signature of function.

job_ctx_arg str | None

Names of function arguments of type JobContext.

model_class type[BaseModel]

Pydantic model class for the arguments of function.

description ProcessDescription

Process description modeled after OGC API - Processes - Part 1: Core.

Source code in procodile\src\procodile\process.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@dataclass
class Process:
    """
    A process comprises a process description and executable code
    in form of a Python function.

    Instances of this class are be managed by the
    [ProcessRegistry][procodile.ProcessRegistry].

    Attributes:
        function: The user's Python function.
        signature: The signature of `function`.
        job_ctx_arg: Names of `function` arguments of type `JobContext`.
        model_class: Pydantic model class for the arguments of `function`.
        description: Process description modeled after
            [OGC API - Processes - Part 1: Core](https://docs.ogc.org/is/18-062r2/18-062r2.html#toc37).
    """

    function: Callable
    signature: inspect.Signature
    model_class: type[BaseModel]
    description: ProcessDescription
    # names of special arguments
    inputs_arg: str | None
    job_ctx_arg: str | None

    # noinspection PyShadowingBuiltins
    @classmethod
    def create(
        cls,
        function: Callable,
        id: Optional[str] = None,
        version: Optional[str] = None,
        title: Optional[str] = None,
        description: Optional[str] = None,
        inputs: Optional[dict[str, FieldInfo | InputDescription]] = None,
        outputs: Optional[dict[str, FieldInfo | OutputDescription]] = None,
        inputs_arg: str | bool = False,
    ) -> "Process":
        """Create a new instance of this dataclass.

        Called by the `process_registry.main()` and by the `your_function.step()`
        decorator, where `your_function` is the function decorated with `main()`.

        Not intended to be used by clients.

        Args:
            function: The decorated function that is passed automatically since
                `process()` is a decorator function.
            id: Optional process identifier. Must be unique within the registry.
                If not provided, the fully qualified function name will be used.
            version: Optional version identifier. If not provided, `"0.0.0"`
                will be used.
            title: Optional, short process title.
            description: Optional, detailed description of the process. If not
                provided, the function's docstring, if any, will be used.
            inputs: Optional mapping from function argument names
                to [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
                or [`InputDescription`][gavicore.models.InputDescription] instances.
                The preferred way is to annotate the arguments directly
                as described in [The Annotated Pattern](https://docs.pydantic.dev/latest/concepts/fields/#the-annotated-pattern).
                Use `InputDescription` instances to pass extra information that cannot
                be represented by a `pydantic.Field`, e.g., `additionalParameters` or `keywords`.
            outputs: Mapping from output names to
                [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
                or [`OutputDescription`][gavicore.models.InputDescription] instances.
                Required, if you have multiple outputs returned as a
                dictionary. In this case, the function must return a typed `tuple` and
                output names refer to the items of the tuple in given order.
            inputs_arg: Specifies the use of an _inputs argument_. An inputs argument
                is a container for the actual process inputs. If specified, it must
                be the only function argument (besides an optional job context
                argument) and must be a subclass of `pydantic.BaseModel`.
                If `inputs_arg` is `True` the only argument will be the input argument,
                if `inputs_arg` is a `str` it must be the name of the only argument.

        """
        if not inspect.isfunction(function):
            raise TypeError("function argument must be callable")
        fn_name = f"{function.__module__}:{function.__qualname__}"
        id = id or fn_name
        version = version or "0.0.0"
        description = description or inspect.getdoc(function)
        signature = inspect.signature(function)
        input_descriptions, model_class, input_arg_, job_ctx_arg = _parse_inputs(
            fn_name, signature, inputs, inputs_arg
        )
        output_descriptions = _parse_outputs(
            fn_name, signature.return_annotation, outputs
        )
        return Process(
            function=function,
            signature=signature,
            model_class=model_class,
            description=ProcessDescription(
                id=id,
                version=version,
                title=title,
                description=description,
                inputs=input_descriptions,
                outputs=output_descriptions,
                # Note, we may later add the following:
                # metadata=metadata,
                # keywords=keywords,
                # links=links,
                # outputTransmission=output_transmission,
                # jobControlOptions=job_control_options,
            ),
            inputs_arg=input_arg_,
            job_ctx_arg=job_ctx_arg,
        )

create(function, id=None, version=None, title=None, description=None, inputs=None, outputs=None, inputs_arg=False) classmethod

Create a new instance of this dataclass.

Called by the process_registry.main() and by the your_function.step() decorator, where your_function is the function decorated with main().

Not intended to be used by clients.

Parameters:

Name Type Description Default
function Callable

The decorated function that is passed automatically since process() is a decorator function.

required
id Optional[str]

Optional process identifier. Must be unique within the registry. If not provided, the fully qualified function name will be used.

None
version Optional[str]

Optional version identifier. If not provided, "0.0.0" will be used.

None
title Optional[str]

Optional, short process title.

None
description Optional[str]

Optional, detailed description of the process. If not provided, the function's docstring, if any, will be used.

None
inputs Optional[dict[str, FieldInfo | InputDescription]]

Optional mapping from function argument names to pydantic.Field or [InputDescription][gavicore.models.InputDescription] instances. The preferred way is to annotate the arguments directly as described in The Annotated Pattern. Use InputDescription instances to pass extra information that cannot be represented by a pydantic.Field, e.g., additionalParameters or keywords.

None
outputs Optional[dict[str, FieldInfo | OutputDescription]]

Mapping from output names to pydantic.Field or [OutputDescription][gavicore.models.InputDescription] instances. Required, if you have multiple outputs returned as a dictionary. In this case, the function must return a typed tuple and output names refer to the items of the tuple in given order.

None
inputs_arg str | bool

Specifies the use of an inputs argument. An inputs argument is a container for the actual process inputs. If specified, it must be the only function argument (besides an optional job context argument) and must be a subclass of pydantic.BaseModel. If inputs_arg is True the only argument will be the input argument, if inputs_arg is a str it must be the name of the only argument.

False
Source code in procodile\src\procodile\process.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def create(
    cls,
    function: Callable,
    id: Optional[str] = None,
    version: Optional[str] = None,
    title: Optional[str] = None,
    description: Optional[str] = None,
    inputs: Optional[dict[str, FieldInfo | InputDescription]] = None,
    outputs: Optional[dict[str, FieldInfo | OutputDescription]] = None,
    inputs_arg: str | bool = False,
) -> "Process":
    """Create a new instance of this dataclass.

    Called by the `process_registry.main()` and by the `your_function.step()`
    decorator, where `your_function` is the function decorated with `main()`.

    Not intended to be used by clients.

    Args:
        function: The decorated function that is passed automatically since
            `process()` is a decorator function.
        id: Optional process identifier. Must be unique within the registry.
            If not provided, the fully qualified function name will be used.
        version: Optional version identifier. If not provided, `"0.0.0"`
            will be used.
        title: Optional, short process title.
        description: Optional, detailed description of the process. If not
            provided, the function's docstring, if any, will be used.
        inputs: Optional mapping from function argument names
            to [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
            or [`InputDescription`][gavicore.models.InputDescription] instances.
            The preferred way is to annotate the arguments directly
            as described in [The Annotated Pattern](https://docs.pydantic.dev/latest/concepts/fields/#the-annotated-pattern).
            Use `InputDescription` instances to pass extra information that cannot
            be represented by a `pydantic.Field`, e.g., `additionalParameters` or `keywords`.
        outputs: Mapping from output names to
            [`pydantic.Field`](https://docs.pydantic.dev/latest/concepts/fields/)
            or [`OutputDescription`][gavicore.models.InputDescription] instances.
            Required, if you have multiple outputs returned as a
            dictionary. In this case, the function must return a typed `tuple` and
            output names refer to the items of the tuple in given order.
        inputs_arg: Specifies the use of an _inputs argument_. An inputs argument
            is a container for the actual process inputs. If specified, it must
            be the only function argument (besides an optional job context
            argument) and must be a subclass of `pydantic.BaseModel`.
            If `inputs_arg` is `True` the only argument will be the input argument,
            if `inputs_arg` is a `str` it must be the name of the only argument.

    """
    if not inspect.isfunction(function):
        raise TypeError("function argument must be callable")
    fn_name = f"{function.__module__}:{function.__qualname__}"
    id = id or fn_name
    version = version or "0.0.0"
    description = description or inspect.getdoc(function)
    signature = inspect.signature(function)
    input_descriptions, model_class, input_arg_, job_ctx_arg = _parse_inputs(
        fn_name, signature, inputs, inputs_arg
    )
    output_descriptions = _parse_outputs(
        fn_name, signature.return_annotation, outputs
    )
    return Process(
        function=function,
        signature=signature,
        model_class=model_class,
        description=ProcessDescription(
            id=id,
            version=version,
            title=title,
            description=description,
            inputs=input_descriptions,
            outputs=output_descriptions,
            # Note, we may later add the following:
            # metadata=metadata,
            # keywords=keywords,
            # links=links,
            # outputTransmission=output_transmission,
            # jobControlOptions=job_control_options,
        ),
        inputs_arg=input_arg_,
        job_ctx_arg=job_ctx_arg,
    )

options: show_source: false heading_level: 3

gavicore.models.ProcessRequest

Bases: BaseModel

Source code in gavicore\src\gavicore\models.py
284
285
286
287
288
class ProcessRequest(BaseModel):
    inputs: Optional[dict[str, Any]] = None
    outputs: Optional[dict[str, Output]] = None
    response: Optional[ResponseType] = ResponseType.raw
    subscriber: Optional[Subscriber] = None

options: show_source: false heading_level: 3

gavicore.models.Subscriber

Bases: BaseModel

Optional URIs for callbacks for this job.

Support for this parameter is not required and the parameter may be removed from the API definition, if conformance class 'callback' is not listed in the conformance declaration under /conformance.

Source code in gavicore\src\gavicore\models.py
 99
100
101
102
103
104
105
106
107
108
109
110
class Subscriber(BaseModel):
    """
    Optional URIs for callbacks for this job.

    Support for this parameter is not required and the parameter may be
    removed from the API definition, if conformance class **'callback'**
    is not listed in the conformance declaration under `/conformance`.
    """

    successUri: Optional[AnyUrl] = None
    inProgressUri: Optional[AnyUrl] = None
    failedUri: Optional[AnyUrl] = None

options: show_source: false heading_level: 3

gavicore.models.Output

Bases: BaseModel

Source code in gavicore\src\gavicore\models.py
301
302
303
class Output(BaseModel):
    format: Optional[Format] = None
    transmissionMode: Optional[TransmissionMode] = TransmissionMode.value

options: show_source: false heading_level: 3

gavicore.util.request.ExecutionRequest

Bases: ProcessRequest

Process execution request. Extends ProcessRequest

  • to allow the process identifier being part of the request,
  • to allow creating nested object values for input names with dots.

Parameters:

Name Type Description Default
process_id

Process identifier

required
dotpath

Whether dots in input names should be used to create nested object values. Defaults to False.

required
inputs

Optional process inputs given as key-value mapping. Values may be of any JSON-serializable type accepted by the given process.

required
outputs

Optional process outputs given as key-value mapping. Values are of type Output supported by the given process.

required
subscriber

Optional subscriber of type Subscriber comprising callback URLs that are informed about process status changes while the processing takes place.

required
Source code in gavicore\src\gavicore\util\request.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class ExecutionRequest(ProcessRequest):
    """
    Process execution request.
    Extends [ProcessRequest][gavicore.models.ProcessRequest]

    - to allow the process identifier being part of the request,
    - to allow creating nested object values for input names with dots.

    Args:
        process_id: Process identifier
        dotpath: Whether dots in input names should be used to create
            nested object values. Defaults to `False`.
        inputs: Optional process inputs given as key-value mapping.
            Values may be of any JSON-serializable type accepted by
            the given process.
        outputs: Optional process outputs given as key-value mapping.
            Values are of type [Output][gavicore.models.Output]
            supported by the given process.
        subscriber: Optional subscriber of type
            [Subscriber][gavicore.models.Subscriber] comprising callback
            URLs that are informed about process status changes
            while the processing takes place.
    """

    process_id: Annotated[str, Field(title="Process identifier", min_length=1)]
    dotpath: Annotated[
        bool, Field(title="Whether to encode nested input values using dots ('.').")
    ] = False

    def to_process_request(self) -> ProcessRequest:
        """
        Convert this execution request into a process request as used by the
        `execute-process` operation.
        """
        inputs = self.inputs
        if inputs and self.dotpath:
            inputs = nest_dict(inputs)
        return ProcessRequest(
            inputs=inputs,
            outputs=self.outputs,
            response=self.response,
            subscriber=self.subscriber,
        )

    @classmethod
    def create(
        cls,
        process_id: str | None = None,
        dotpath: bool = False,
        request_path: str | None = None,
        inputs: list[str] | None = None,
        subscribers: list[str] | None = None,
    ) -> "ExecutionRequest":
        """
        A factory method to create an execution request.

        The method is intended to support CLI implementations parsing user inputs
        and creating validated execution requests.

        Args:
            process_id: Process identifier
            dotpath: Whether dots in input names should be used to create
                nested object values. Defaults to `False`.
            request_path: Local path to a file that contains an execution request
                in YAML or JSON format.
            inputs: Optional process inputs given as a list of "<key>=<value>" strings.
            subscribers: Optional subscribers given as a list of
                "<event>=<url>" strings.

        Return:
            A validated execution request of type `ExecutionRequest`.

        Raise:
            ValueError: if a validation error occurs.
        """
        request_dict, _ = _read_execution_request(request_path)
        if process_id:
            request_dict["process_id"] = process_id
        if dotpath:
            request_dict["dotpath"] = dotpath
        inputs_dict = _parse_inputs(inputs)
        if inputs_dict:
            request_dict["inputs"] = dict(request_dict.get("inputs") or {})
            request_dict["inputs"].update(inputs_dict)
        subscriber_dict = _parse_subscribers(subscribers)
        if subscriber_dict:
            request_dict["subscriber"] = dict(request_dict.get("subscriber") or {})
            request_dict["subscriber"].update(subscriber_dict)
        try:
            return ExecutionRequest(**request_dict)
        except pydantic.ValidationError as e:
            raise ValueError(f"Execution request is invalid: {e}") from e

    @classmethod
    def from_process_description(
        cls,
        process_description: ProcessDescription,
        dotpath: bool = False,
    ) -> "ExecutionRequest":
        """
        Create an execution request from the given process description.

        Args:
            process_description: The process description
            dotpath: Whether to allow for dot-separated input
                names for nested object values

        Returns:
            The execution requests populated with default values.
        """
        return _from_process_description(process_description, dotpath)

to_process_request()

Convert this execution request into a process request as used by the execute-process operation.

Source code in gavicore\src\gavicore\util\request.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def to_process_request(self) -> ProcessRequest:
    """
    Convert this execution request into a process request as used by the
    `execute-process` operation.
    """
    inputs = self.inputs
    if inputs and self.dotpath:
        inputs = nest_dict(inputs)
    return ProcessRequest(
        inputs=inputs,
        outputs=self.outputs,
        response=self.response,
        subscriber=self.subscriber,
    )

create(process_id=None, dotpath=False, request_path=None, inputs=None, subscribers=None) classmethod

A factory method to create an execution request.

The method is intended to support CLI implementations parsing user inputs and creating validated execution requests.

Parameters:

Name Type Description Default
process_id str | None

Process identifier

None
dotpath bool

Whether dots in input names should be used to create nested object values. Defaults to False.

False
request_path str | None

Local path to a file that contains an execution request in YAML or JSON format.

None
inputs list[str] | None

Optional process inputs given as a list of "=" strings.

None
subscribers list[str] | None

Optional subscribers given as a list of "=" strings.

None
Return

A validated execution request of type ExecutionRequest.

Raise

ValueError: if a validation error occurs.

Source code in gavicore\src\gavicore\util\request.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@classmethod
def create(
    cls,
    process_id: str | None = None,
    dotpath: bool = False,
    request_path: str | None = None,
    inputs: list[str] | None = None,
    subscribers: list[str] | None = None,
) -> "ExecutionRequest":
    """
    A factory method to create an execution request.

    The method is intended to support CLI implementations parsing user inputs
    and creating validated execution requests.

    Args:
        process_id: Process identifier
        dotpath: Whether dots in input names should be used to create
            nested object values. Defaults to `False`.
        request_path: Local path to a file that contains an execution request
            in YAML or JSON format.
        inputs: Optional process inputs given as a list of "<key>=<value>" strings.
        subscribers: Optional subscribers given as a list of
            "<event>=<url>" strings.

    Return:
        A validated execution request of type `ExecutionRequest`.

    Raise:
        ValueError: if a validation error occurs.
    """
    request_dict, _ = _read_execution_request(request_path)
    if process_id:
        request_dict["process_id"] = process_id
    if dotpath:
        request_dict["dotpath"] = dotpath
    inputs_dict = _parse_inputs(inputs)
    if inputs_dict:
        request_dict["inputs"] = dict(request_dict.get("inputs") or {})
        request_dict["inputs"].update(inputs_dict)
    subscriber_dict = _parse_subscribers(subscribers)
    if subscriber_dict:
        request_dict["subscriber"] = dict(request_dict.get("subscriber") or {})
        request_dict["subscriber"].update(subscriber_dict)
    try:
        return ExecutionRequest(**request_dict)
    except pydantic.ValidationError as e:
        raise ValueError(f"Execution request is invalid: {e}") from e

from_process_description(process_description, dotpath=False) classmethod

Create an execution request from the given process description.

Parameters:

Name Type Description Default
process_description ProcessDescription

The process description

required
dotpath bool

Whether to allow for dot-separated input names for nested object values

False

Returns:

Type Description
ExecutionRequest

The execution requests populated with default values.

Source code in gavicore\src\gavicore\util\request.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@classmethod
def from_process_description(
    cls,
    process_description: ProcessDescription,
    dotpath: bool = False,
) -> "ExecutionRequest":
    """
    Create an execution request from the given process description.

    Args:
        process_description: The process description
        dotpath: Whether to allow for dot-separated input
            names for nested object values

    Returns:
        The execution requests populated with default values.
    """
    return _from_process_description(process_description, dotpath)

options: show_source: false heading_level: 3

procodile.JobContext

Bases: ABC

Report process progress and check for task cancellation.

A process function can retrieve the current job context

  1. via JobContext.get() from within a process function, or
  2. as a function argument of type JobContext.
Source code in procodile\src\procodile\job.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class JobContext(ABC):
    """
    Report process progress and check for task cancellation.

    A process function can retrieve the current job context

    1. via [JobContext.get()][procodile.JobContext.get] from
       within a process function, or
    2. as a function argument of type [JobContext][procodile.JobContext].
    """

    @classmethod
    def get(cls) -> "JobContext":
        """
        Get the current job context.

        Returns the current job context that can be used by
        process functions to report job progress in percent
        or via messages and to check whether cancellation
        has been requested.
        This function is intended to be called from within
        a process function executed as a job. If called as a usual
        Python function (without a job serving as context), the
        returned context will have no-op methods only.

        Returns:
            An instance of the current job context.
        """
        frame = inspect.currentframe()
        try:
            while frame:
                job_context = frame.f_locals.get("__job_context__")
                if isinstance(job_context, JobContext):
                    return job_context
                frame = frame.f_back
        finally:
            # Always free alive frame-references
            del frame
        # noinspection PyUnreachableCode
        warnings.warn(
            "cannot determine current job context; using non-functional dummy",
            stacklevel=2,
        )
        return NullJobContext()

    @abstractmethod
    def report_progress(
        self,
        progress: Optional[int] = None,
        message: Optional[str] = None,
    ) -> None:
        """Report task progress.

        Args:
            progress: Progress in percent.
            message: Detail progress message.

        Raises:
            JobCancellationException: if an attempt has been made
                to cancel this job.
        """

    @abstractmethod
    def is_cancelled(self) -> bool:
        """Test whether an attempt has been made to cancel this job.
        It may still be running though.

        Returns:
            `True` if so, `False` otherwise.
        """

    @abstractmethod
    def check_cancelled(self) -> None:
        """Raise a `JobCancellationException`, if
        an attempt has been made to cancel this job.
        """

get() classmethod

Get the current job context.

Returns the current job context that can be used by process functions to report job progress in percent or via messages and to check whether cancellation has been requested. This function is intended to be called from within a process function executed as a job. If called as a usual Python function (without a job serving as context), the returned context will have no-op methods only.

Returns:

Type Description
JobContext

An instance of the current job context.

Source code in procodile\src\procodile\job.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def get(cls) -> "JobContext":
    """
    Get the current job context.

    Returns the current job context that can be used by
    process functions to report job progress in percent
    or via messages and to check whether cancellation
    has been requested.
    This function is intended to be called from within
    a process function executed as a job. If called as a usual
    Python function (without a job serving as context), the
    returned context will have no-op methods only.

    Returns:
        An instance of the current job context.
    """
    frame = inspect.currentframe()
    try:
        while frame:
            job_context = frame.f_locals.get("__job_context__")
            if isinstance(job_context, JobContext):
                return job_context
            frame = frame.f_back
    finally:
        # Always free alive frame-references
        del frame
    # noinspection PyUnreachableCode
    warnings.warn(
        "cannot determine current job context; using non-functional dummy",
        stacklevel=2,
    )
    return NullJobContext()

report_progress(progress=None, message=None) abstractmethod

Report task progress.

Parameters:

Name Type Description Default
progress Optional[int]

Progress in percent.

None
message Optional[str]

Detail progress message.

None

Raises:

Type Description
JobCancellationException

if an attempt has been made to cancel this job.

Source code in procodile\src\procodile\job.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@abstractmethod
def report_progress(
    self,
    progress: Optional[int] = None,
    message: Optional[str] = None,
) -> None:
    """Report task progress.

    Args:
        progress: Progress in percent.
        message: Detail progress message.

    Raises:
        JobCancellationException: if an attempt has been made
            to cancel this job.
    """

is_cancelled() abstractmethod

Test whether an attempt has been made to cancel this job. It may still be running though.

Returns:

Type Description
bool

True if so, False otherwise.

Source code in procodile\src\procodile\job.py
 96
 97
 98
 99
100
101
102
103
@abstractmethod
def is_cancelled(self) -> bool:
    """Test whether an attempt has been made to cancel this job.
    It may still be running though.

    Returns:
        `True` if so, `False` otherwise.
    """

check_cancelled() abstractmethod

Raise a JobCancellationException, if an attempt has been made to cancel this job.

Source code in procodile\src\procodile\job.py
105
106
107
108
109
@abstractmethod
def check_cancelled(self) -> None:
    """Raise a `JobCancellationException`, if
    an attempt has been made to cancel this job.
    """

options: show_source: false heading_level: 3

procodile.JobCancelledException

Bases: Exception

Raised if a job's cancellation has been requested.

Source code in procodile\src\procodile\job.py
30
31
class JobCancelledException(Exception):
    """Raised if a job's cancellation has been requested."""

options: show_source: false heading_level: 3

procodile.cli.new_cli(registry, name, version, help=None, summary=None, context=None)

Get the CLI instance configured to use the process registry that is given either by

  • a reference of the form "path.to.module:attribute",
  • or process registry instance,
  • or as a no-arg process registry getter function.

The process registry is usually a singleton in your application.

The context object obj of the returned CLI object will be of type dict and will contain a process registry getter function using the key get_process_registry.

The function must be called before any CLI command or callback has been invoked. Otherwise, the provided get_process_registry getter will not be recognized and all commands that require the process registry will fail with an AssertionError.

Parameters:

Name Type Description Default
name str

The name of the CLI application.

required
registry Union[str, ProcessRegistry, Callable[[], ProcessRegistry]]

A registry reference string, or a registry instance, or a no-arg function that returns a registry instance.

required
help str | None

Optional CLI application help text. If not provided, the default cuiman help text will be used.

None
summary str | None

A one-sentence human-readable description of the tool that will be used by the default help text. Hence, used only, if help is not provided. Should end with a dot '.'.

None
version str

Optional version string. If not provided, the cuiman version will be used.

required
context dict[str, Any] | None

Additional context values that will be registered with the CLI and can be accessed by commands that you add to the returned typer.Typer instance.

None
Return

a typer.Typer instance

Source code in procodile\src\procodile\cli\cli.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def new_cli(
    registry: Union[
        str,
        "ProcessRegistry",
        Callable[[], "ProcessRegistry"],
    ],
    name: str,
    version: str,
    help: str | None = None,
    summary: str | None = None,
    context: dict[str, Any] | None = None,
) -> typer.Typer:
    """
    Get the CLI instance configured to use the process registry
    that is given either by

    - a reference of the form "path.to.module:attribute",
    - or process registry instance,
    - or as a no-arg process registry getter function.

    The process registry is usually a singleton in your application.

    The context object `obj` of the returned CLI object
    will be of type `dict` and will contain
    a process registry getter function using the key
    `get_process_registry`.

    The function must be called before any CLI command or
    callback has been invoked. Otherwise, the provided
    `get_process_registry` getter will not be recognized and
    all commands that require the process registry will
    fail with an `AssertionError`.

    Args:
        name: The name of the CLI application.
        registry: A registry reference string,
            or a registry instance, or a no-arg
            function that returns a registry instance.
        help: Optional CLI application help text. If not provided, the default
            `cuiman` help text will be used.
        summary: A one-sentence human-readable description of the tool that
            will be used by the default help text. Hence, used only,
            if `help` is not provided. Should end with a dot '.'.
        version: Optional version string. If not provided, the
            `cuiman` version will be used.
        context: Additional context values that will be registered with the CLI
            and can be accessed by commands that you add to the
            returned `typer.Typer` instance.

    Return:
        a `typer.Typer` instance
    """
    assert bool(registry), "registry argument must be provided"
    assert bool(name), "name argument must be provided"
    assert bool(version), "version argument must be provided"

    t = typer.Typer(
        cls=AliasedGroup,
        name=name,
        help=(
            help
            or DEFAULT_HELP.format(summary=summary or DEFAULT_SUMMARY.format(name=name))
        ),
        add_completion=False,
        invoke_without_command=True,
        context_settings={
            "obj": {
                PROCESS_REGISTRY_GETTER_KEY: _parse_process_registry_getter(registry),
                **(context or {}),
            },
        },
    )

    @t.callback()
    def main(
        version_: Annotated[
            bool, typer.Option("--version", help="Show version and exit.")
        ] = False,
    ):
        if version_:
            from procodile import __version__ as procodile_version

            typer.echo(f"{version} (procodile {procodile_version})")
            return

    @t.command("execute-process")
    def execute_process(
        ctx: typer.Context,
        process_id: Annotated[Optional[str], PROCESS_ID_ARGUMENT] = None,
        dotpath: Annotated[bool, DOT_PATH_OPTION] = False,
        request_inputs: Annotated[Optional[list[str]], REQUEST_INPUT_OPTION] = None,
        request_subscribers: Annotated[
            Optional[list[str]], REQUEST_SUBSCRIBER_OPTION
        ] = None,
        request_file: Annotated[Optional[str], REQUEST_FILE_OPTION] = None,
    ):
        """
        Execute a process.

        The process request to be submitted may be read from a file given
        by `--request`, or from `stdin`, or from the `process_id` argument
        with zero, one, or more `--input` (or `-i`) options.

        The `process_id` argument and any given `--input` options will override
        settings with the same name found in the given request file or `stdin`,
        if any.
        """
        from procodile import ExecutionRequest, Job

        registry = _get_process_registry(ctx)
        execution_request = ExecutionRequest.create(
            process_id=process_id,
            dotpath=dotpath,
            inputs=request_inputs,
            subscribers=request_subscribers,
            request_path=request_file,
        )
        process_id_ = execution_request.process_id
        process = registry.get(process_id_)
        if process is None:
            raise click.ClickException(f"Process {process_id_!r} not found.")

        job = Job.create(process, request=execution_request.to_process_request())
        job_results = job.run()
        if job_results is not None:
            typer.echo(job_results.model_dump_json(indent=2))
        else:
            typer.echo(job.job_info.model_dump_json(indent=2))

    @t.command("list-processes", help="List all processes.")
    def list_processes(ctx: typer.Context):
        registry = _get_process_registry(ctx)
        typer.echo(
            json.dumps(
                {
                    k: v.description.model_dump(
                        mode="json",
                        by_alias=True,
                        exclude_none=True,
                        exclude_defaults=True,
                        exclude_unset=True,
                        exclude={"inputs", "outputs"},
                    )
                    for k, v in registry.items()
                },
                indent=2,
            )
        )

    @t.command("get-process", help="Get details of a process.")
    def get_process(
        ctx: typer.Context,
        process_id: Annotated[str, PROCESS_ID_ARGUMENT],
    ):
        import json

        registry = _get_process_registry(ctx)
        process = registry.get(process_id)
        if process is None:
            raise click.ClickException(f"Process {process_id!r} not found.")

        typer.echo(
            json.dumps(
                process.description.model_dump(
                    mode="json",
                    by_alias=True,
                    exclude_defaults=True,
                    exclude_none=True,
                    exclude_unset=True,
                ),
                indent=2,
            )
        )

    return t

options: show_source: false heading_level: 3