Skip to content

prefect_spark_on_k8s_operator.flows

A module to define flows interacting with spark-on-k8s-operator resources created in Kubernetes.

Classes

Functions

run_spark_application async

Flow for running a spark application using spark-on-k8s-operator.

Parameters:

Name Type Description Default
spark_application SparkApplication

The SparkApplication block that specifies the application run params.

required

Returns:

Type Description
Dict[str, Any]

The a dict of logs from driver pod after the application reached a terminal state which can be COMPLETED, FAILED, UNKNOWN (for time_out seconds).

Raises:

Type Description
RuntimeError

If the created spark application attains a failed/unknown status.

Example
import asyncio

from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_spark_on_k8s_operator import (
    SparkApplication,
    run_spark_application,
)

app = SparkApplication.from_yaml_file(
    credentials=KubernetesCredentials.load("k8s-creds"),
    manifest_path="path/to/job.yaml",
)


if __name__ == "__main__":
    # run the flow
    asyncio.run(run_spark_application(app))
Source code in prefect_spark_on_k8s_operator/flows.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@flow
async def run_spark_application(
    spark_application: SparkApplication,
) -> Dict[str, Any]:
    """Flow for running a spark application using spark-on-k8s-operator.

    Args:
        spark_application: The `SparkApplication` block that specifies the
            application run params.

    Returns:
        The a dict of logs from driver pod after the application reached a
            terminal state which can be COMPLETED, FAILED, UNKNOWN
            (for `time_out seconds`).

    Raises:
        RuntimeError: If the created spark application attains a failed/unknown status.

    Example:

        ```python
        import asyncio

        from prefect_kubernetes.credentials import KubernetesCredentials
        from prefect_spark_on_k8s_operator import (
            SparkApplication,
            run_spark_application,
        )

        app = SparkApplication.from_yaml_file(
            credentials=KubernetesCredentials.load("k8s-creds"),
            manifest_path="path/to/job.yaml",
        )


        if __name__ == "__main__":
            # run the flow
            asyncio.run(run_spark_application(app))
        ```
    """
    spark_application_run = await task(spark_application.trigger.aio)(spark_application)

    await task(spark_application_run.wait_for_completion.aio)(spark_application_run)

    return await task(spark_application_run.fetch_result.aio)(spark_application_run)