Skip to content

prefect_spark_on_k8s_operator.app

Module to define SparkApplication and monitor its Run

Classes

SparkApplication

Bases: JobBlock

A block representing a spark application configuration. The object instance can be created by from_yaml_file classmethod. Below are the additional attributes which can be passed to it if you want to change the SparkApplicationRun behaviour.

Attributes:

Name Type Description
manifest Dict[str, Any]

The spark application manifest(spark-on-k8s-operator v1Beta2 API spec) to run. This dictionary can be produced using yaml.safe_load.

credentials KubernetesCredentials

The credentials to configure a client from.

delete_after_completion bool

Whether to delete the application after it has completed. Defaults to True.

interval_seconds int

The number of seconds to wait between application status checks. Defaults to 5 seconds.

namespace str

The namespace to create and run the application in. Defaults to default.

timeout_seconds Optional[int]

The number of seconds to wait for the application in UNKNOWN state before timing out. Defaults to 600 (10 minutes).

collect_driver_logs Optional[bool]

Whether to collect driver logs after completion. By default this is done only upon failures. Logs for successful runs will be collected by setting this option to True. Defaults to False.

api_kwargs Dict[str, Any]

Additional arguments to include in Kubernetes API calls.

Source code in prefect_spark_on_k8s_operator/app.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SparkApplication(JobBlock):
    """A block representing a spark application configuration.
    The object instance can be created by `from_yaml_file` classmethod.
    Below are the additional attributes which can be passed to it if you want to
    change the `SparkApplicationRun` behaviour.

    Attributes:
        manifest:
            The spark application manifest(spark-on-k8s-operator v1Beta2 API spec)
            to run. This dictionary can be produced
            using `yaml.safe_load`.
        credentials:
            The credentials to configure a client from.
        delete_after_completion:
            Whether to delete the application after it has completed.
            Defaults to `True`.
        interval_seconds:
            The number of seconds to wait between application status checks.
            Defaults to `5` seconds.
        namespace:
            The namespace to create and run the application in. Defaults to `default`.
        timeout_seconds:
            The number of seconds to wait for the application in UNKNOWN
            state before timing out.
            Defaults to `600` (10 minutes).
        collect_driver_logs:
            Whether to collect driver logs after completion.
            By default this is done only upon failures. Logs for successful runs will
            be collected by setting this option to True.
            Defaults to `False`.
        api_kwargs:
            Additional arguments to include in Kubernetes API calls.
    """

    # Duplicated description until griffe supports pydantic Fields.
    manifest: Dict[str, Any] = Field(
        default=...,
        title="SparkApplication Manifest",
        description=(
            "The spark application manifest(as per spark-on-k8s-operator API spec)"
            " to run. This dictionary can be produced "
            "using `yaml.safe_load`."
        ),
    )
    api_kwargs: Dict[str, Any] = Field(
        default_factory=dict,
        title="Additional API Arguments",
        description="Additional arguments to include in Kubernetes API calls.",
        example={"pretty": "true"},
    )
    credentials: KubernetesCredentials = Field(
        default=..., description="The credentials to configure a client from."
    )
    delete_after_completion: bool = Field(
        default=True,
        description="Whether to delete the application after it has completed.",
    )
    interval_seconds: int = Field(
        default=5,
        description="The number of seconds to wait between application status checks.",
    )
    namespace: str = Field(
        default="default",
        description="The namespace to create and run the application in.",
    )
    timeout_seconds: Optional[int] = Field(
        default=600,
        description="The number of seconds to wait for the application in "
        "UNKNOWN state before timing out.",
    )
    collect_driver_logs: Optional[bool] = Field(
        default=False,
        description=(
            "Whether to collect driver logs after completion."
            " By default this is done only upon failures."
            " Logs for successful runs will be collected by setting this option to True"
        ),
    )

    _block_type_name = "Spark On K8s Operator"
    _block_type_slug = "spark-on-k8s-operator"
    _logo_url = "https://docs.prefect.io/img/collections/spark-on-kubernetes.png?h=250"  # noqa: E501
    _documentation_url = "https://tardunge.github.io/prefect-spark-on-k8s-operator/app/#prefect_spark_on_k8s_operator.app.SparkApplication"  # noqa
    name: str = ""

    @sync_compatible
    async def trigger(self) -> "SparkApplicationRun":
        """Apply the spark application and return a `SparkApplicationRun` object.

        Returns:
            SparkApplicationRun object.
        """

        # randomize the application run instance name.
        name = (
            self.manifest.get(constants.METADATA).get(constants.NAME)
            + "-"
            + "".join(random.choices(string.ascii_lowercase + string.digits, k=4))
        )
        self.manifest.get(constants.METADATA)[constants.NAME] = name

        manifest = await create_namespaced_custom_object.fn(
            kubernetes_credentials=self.credentials,
            group=constants.GROUP,
            version=constants.VERSION,
            plural=constants.PLURAL,
            body=self.manifest,
            namespace=self.namespace,
            **self.api_kwargs,
        )
        self.logger.info(
            "Created spark application: "
            f"{manifest.get(constants.METADATA).get(constants.NAME)}"
        )

        self.manifest = manifest
        self.name = manifest.get(constants.METADATA).get(constants.NAME)
        return SparkApplicationRun(spark_application=self)

    @classmethod
    def from_yaml_file(
        cls: Type[Self], manifest_path: Union[Path, str], **kwargs
    ) -> Self:
        """Create a `SparkApplication` from a YAML file.
        Supports manifests of SparkApplication or ScheduledSparkApplication Kind only.
        If a `ScheduledSparkApplication` is provided, it is converted to a
        `SparkApplication` Kind. It forcefully sets the restartPolicy of the application
        to 'Never'.

        Args:
            manifest_path: The YAML file to create the `SparkApplication` from.

        Returns:
            A SparkApplicationRun object.
        """
        with open(manifest_path, "r") as yaml_stream:
            yaml_dict = yaml.safe_load(yaml_stream)

        # convert ScheduledSparkApplication to SparkApplication
        # as the schedules are handled by prefect.
        if yaml_dict.get(constants.KIND) == constants.SCHEDULED_SPARK_APPLICATION_KIND:
            yaml_dict[constants.KIND] = constants.SPARK_APPLICATION_KIND
            yaml_dict[constants.SPEC] = yaml_dict[constants.SPEC][constants.TEMPLATE]

        if (
            yaml_dict.get(constants.KIND) not in constants.SPARK_APPLICATION_KINDS
            or yaml_dict.get(constants.SPEC).get(constants.TYPE)
            not in constants.SPARK_APPLICATION_TYPES
        ):
            raise TypeError(
                "The provided manifest has either unsupport kind or spec.type"
            )

        # Forcefully set restartPolicy to Never. Schedules and retries
        # should be dictated by prefect flow.
        yaml_dict.get(constants.SPEC).update(constants.RESTART_POLICY)
        return cls(manifest=yaml_dict, **kwargs)

Functions

from_yaml_file classmethod

Create a SparkApplication from a YAML file. Supports manifests of SparkApplication or ScheduledSparkApplication Kind only. If a ScheduledSparkApplication is provided, it is converted to a SparkApplication Kind. It forcefully sets the restartPolicy of the application to 'Never'.

Parameters:

Name Type Description Default
manifest_path Union[Path, str]

The YAML file to create the SparkApplication from.

required

Returns:

Type Description
Self

A SparkApplicationRun object.

Source code in prefect_spark_on_k8s_operator/app.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@classmethod
def from_yaml_file(
    cls: Type[Self], manifest_path: Union[Path, str], **kwargs
) -> Self:
    """Create a `SparkApplication` from a YAML file.
    Supports manifests of SparkApplication or ScheduledSparkApplication Kind only.
    If a `ScheduledSparkApplication` is provided, it is converted to a
    `SparkApplication` Kind. It forcefully sets the restartPolicy of the application
    to 'Never'.

    Args:
        manifest_path: The YAML file to create the `SparkApplication` from.

    Returns:
        A SparkApplicationRun object.
    """
    with open(manifest_path, "r") as yaml_stream:
        yaml_dict = yaml.safe_load(yaml_stream)

    # convert ScheduledSparkApplication to SparkApplication
    # as the schedules are handled by prefect.
    if yaml_dict.get(constants.KIND) == constants.SCHEDULED_SPARK_APPLICATION_KIND:
        yaml_dict[constants.KIND] = constants.SPARK_APPLICATION_KIND
        yaml_dict[constants.SPEC] = yaml_dict[constants.SPEC][constants.TEMPLATE]

    if (
        yaml_dict.get(constants.KIND) not in constants.SPARK_APPLICATION_KINDS
        or yaml_dict.get(constants.SPEC).get(constants.TYPE)
        not in constants.SPARK_APPLICATION_TYPES
    ):
        raise TypeError(
            "The provided manifest has either unsupport kind or spec.type"
        )

    # Forcefully set restartPolicy to Never. Schedules and retries
    # should be dictated by prefect flow.
    yaml_dict.get(constants.SPEC).update(constants.RESTART_POLICY)
    return cls(manifest=yaml_dict, **kwargs)
trigger async

Apply the spark application and return a SparkApplicationRun object.

Returns:

Type Description
SparkApplicationRun

SparkApplicationRun object.

Source code in prefect_spark_on_k8s_operator/app.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@sync_compatible
async def trigger(self) -> "SparkApplicationRun":
    """Apply the spark application and return a `SparkApplicationRun` object.

    Returns:
        SparkApplicationRun object.
    """

    # randomize the application run instance name.
    name = (
        self.manifest.get(constants.METADATA).get(constants.NAME)
        + "-"
        + "".join(random.choices(string.ascii_lowercase + string.digits, k=4))
    )
    self.manifest.get(constants.METADATA)[constants.NAME] = name

    manifest = await create_namespaced_custom_object.fn(
        kubernetes_credentials=self.credentials,
        group=constants.GROUP,
        version=constants.VERSION,
        plural=constants.PLURAL,
        body=self.manifest,
        namespace=self.namespace,
        **self.api_kwargs,
    )
    self.logger.info(
        "Created spark application: "
        f"{manifest.get(constants.METADATA).get(constants.NAME)}"
    )

    self.manifest = manifest
    self.name = manifest.get(constants.METADATA).get(constants.NAME)
    return SparkApplicationRun(spark_application=self)

SparkApplicationRun

Bases: JobRun[Dict[str, Any]]

A container representing a run of a spark application.

Source code in prefect_spark_on_k8s_operator/app.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class SparkApplicationRun(JobRun[Dict[str, Any]]):
    """A container representing a run of a spark application."""

    def __init__(
        self,
        spark_application: "SparkApplication",
    ):
        self.application_logs = None

        self._completed = False
        self._timed_out = False
        self._terminal_state = ""
        self._error_msg = "The Run has not encounterend any errors."
        self._spark_application = spark_application
        self._status = None
        self._cleanup_status = False

    async def _cleanup(self) -> bool:
        """Deletes the resources created by the spark application.
        Produces Resourceleak warning in case the delete was unsuccessful.
        """
        deleted = await delete_namespaced_custom_object.fn(
            kubernetes_credentials=self._spark_application.credentials,
            group=constants.GROUP,
            version=constants.VERSION,
            plural=constants.PLURAL,
            name=self._spark_application.name,
            namespace=self._spark_application.namespace,
            **self._spark_application.api_kwargs,
        )
        status = deleted.get(constants.STATUS) == constants.SUCCESS
        if status:
            self.logger.info(
                "cleaned up resources for app: "
                f"{deleted.get(constants.DETAILS).get(constants.NAME)}"
            )
        else:
            self.logger.warning(
                f"Resource leak: failed to clean up, {self._spark_application.name}",
            )

        return status

    async def _fetch_status(self) -> Dict[str, Any]:
        """Reads the runtime status of the spark application."""
        self._status = await get_namespaced_custom_object_status.fn(
            kubernetes_credentials=self._spark_application.credentials,
            group=constants.GROUP,
            version=constants.VERSION,
            plural=constants.PLURAL,
            name=self._spark_application.name,
            namespace=self._spark_application.namespace,
            **self._spark_application.api_kwargs,
        )
        return self._status

    @sync_compatible
    async def wait_for_completion(self):
        """Waits for the application to reach a terminal state.
        The terminal states currently used are:
        COMPLETED:
            The application has run successfully.
        FAILED:
            The application has encounterend some error.
        UNKNOWN:
            The application is in UNKNOWN state. This happens if the
            Kubernetes node hosting the driver pod crashes. If the
            state doesn't change for `timeout_seconds`, then the application
            is terminated.
        For more information on the spark-on-k8s-operator state-machine please
        refer [here](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
        blob/master/pkg/controller/sparkapplication/controller.go#L485)
        """
        self.application_logs = {}

        # wait for the status to change from ""(empty string) to something.
        while True:
            status = await self._fetch_status()
            if constants.STATUS not in status:
                await sleep(1)
                continue
            break

        # wait for the application to reach a terminal_state
        # which is either COMPLETED or FAILED
        # or UNKNOWN(for timeout_seconds)
        while not self._completed:
            status = await self._fetch_status()
            app_state = (
                status.get(constants.STATUS)
                .get(constants.APPLICATION_STATE)
                .get(constants.STATE)
            )
            self.logger.info(f"Last obeserved heartbeat: {app_state}")
            if app_state in [constants.COMPLETED, constants.FAILED]:
                self._completed = True
                self._terminal_state = app_state
                self.logger.info(f"{self._completed}")
                self.logger.info(f"{self._terminal_state}")

            # happens when node/kubelet crashes.
            # Stop the application if this state doesn't change until timeout_seconds.
            elif app_state == constants.UNKNOWN:
                timer_start = int(perf_counter())
                while not self._timed_out:
                    await sleep(self._spark_application.interval_seconds)
                    status = await self._fetch_status()
                    app_state = (
                        status.get(constants.STATUS)
                        .get(constants.APPLICATION_STATE)
                        .get(constants.STATE)
                    )
                    if app_state != constants.UNKNOWN:
                        timer_start = 0
                        break
                    if (
                        int(perf_counter()) - timer_start
                        > self._spark_application.timeout_seconds
                    ):
                        self._completed = True
                        self._timed_out = True
                        self._terminal_state = app_state
            else:
                await sleep(self._spark_application.interval_seconds)

        # restore the value after getting rid of loops.
        if self._terminal_state != constants.COMPLETED:
            self._completed = False

        _tail_logs = False
        if self._terminal_state == constants.FAILED:
            _tail_logs = True
        if self._spark_application.collect_driver_logs:
            _tail_logs = True
        if self._terminal_state == constants.UNKNOWN:
            _tail_logs = False

        if _tail_logs:
            self._error_msg = (
                self._status.get(constants.STATUS)
                .get(constants.APPLICATION_STATE)
                .get(constants.ERROR_MESSAGE, self._error_msg)
            )
            app_id = self._status.get(constants.STATUS).get(
                constants.SPARK_APPLICATION_ID
            )
            app_submission_id = self._status.get(constants.STATUS).get(
                constants.SUBMISSION_ID
            )

            v1_pod_list = await list_namespaced_pod.fn(
                kubernetes_credentials=self._spark_application.credentials,
                namespace=self._spark_application.namespace,
                label_selector=generate_pod_selectors(
                    self._spark_application.name,
                    app_id,
                    app_submission_id,
                ),
                **self._spark_application.api_kwargs,
            )
            self.logger.info(f"pod_list: {len(v1_pod_list.items)}")
            for pod in v1_pod_list.items:
                pod_name = pod.metadata.name
                self.logger.info(f"Capturing logs for pod {pod_name!r}.")
                self.application_logs[pod_name] = await read_namespaced_pod_log.fn(
                    kubernetes_credentials=self._spark_application.credentials,
                    namespace=self._spark_application.namespace,
                    pod_name=pod_name,
                    container=constants.SPARK_DRIVER_CONAINER_NAME,
                    **self._spark_application.api_kwargs,
                )

        if self._spark_application.delete_after_completion or self._timed_out:
            self._cleanup_status = await self._cleanup()

    @sync_compatible
    async def fetch_result(self) -> Dict[str, Any]:
        """Returns the logs from driver pod when:
        `collect_driver_logs` is set to true
        or the application is not in COMPLETED state.

        Returns:
            A dict containing the driver pod name and its main container logs.

        Raises:
            RuntimeError: If the application fails or in unknown state
              for timeout_seconds.
        """
        for pod in self.application_logs:
            self.logger.info(f"logs for pod {pod}")
            self.logger.info(f"{self.application_logs[pod]}")
        # self.logger.info(f"logs: {self.application_logs}")
        if self._terminal_state != constants.COMPLETED:
            raise RuntimeError(
                "The SparkApplication run is not in a completed state,"
                f" last observed state was {self._terminal_state} - "
                f"possible errors: {self._error_msg}"
            )
        return self.application_logs

Functions

fetch_result async

Returns the logs from driver pod when: collect_driver_logs is set to true or the application is not in COMPLETED state.

Returns:

Type Description
Dict[str, Any]

A dict containing the driver pod name and its main container logs.

Raises:

Type Description
RuntimeError

If the application fails or in unknown state for timeout_seconds.

Source code in prefect_spark_on_k8s_operator/app.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
@sync_compatible
async def fetch_result(self) -> Dict[str, Any]:
    """Returns the logs from driver pod when:
    `collect_driver_logs` is set to true
    or the application is not in COMPLETED state.

    Returns:
        A dict containing the driver pod name and its main container logs.

    Raises:
        RuntimeError: If the application fails or in unknown state
          for timeout_seconds.
    """
    for pod in self.application_logs:
        self.logger.info(f"logs for pod {pod}")
        self.logger.info(f"{self.application_logs[pod]}")
    # self.logger.info(f"logs: {self.application_logs}")
    if self._terminal_state != constants.COMPLETED:
        raise RuntimeError(
            "The SparkApplication run is not in a completed state,"
            f" last observed state was {self._terminal_state} - "
            f"possible errors: {self._error_msg}"
        )
    return self.application_logs
wait_for_completion async

Waits for the application to reach a terminal state. The terminal states currently used are:

COMPLETED

The application has run successfully.

FAILED

The application has encounterend some error.

UNKNOWN

The application is in UNKNOWN state. This happens if the Kubernetes node hosting the driver pod crashes. If the state doesn't change for timeout_seconds, then the application is terminated.

For more information on the spark-on-k8s-operator state-machine please refer here

Source code in prefect_spark_on_k8s_operator/app.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
@sync_compatible
async def wait_for_completion(self):
    """Waits for the application to reach a terminal state.
    The terminal states currently used are:
    COMPLETED:
        The application has run successfully.
    FAILED:
        The application has encounterend some error.
    UNKNOWN:
        The application is in UNKNOWN state. This happens if the
        Kubernetes node hosting the driver pod crashes. If the
        state doesn't change for `timeout_seconds`, then the application
        is terminated.
    For more information on the spark-on-k8s-operator state-machine please
    refer [here](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
    blob/master/pkg/controller/sparkapplication/controller.go#L485)
    """
    self.application_logs = {}

    # wait for the status to change from ""(empty string) to something.
    while True:
        status = await self._fetch_status()
        if constants.STATUS not in status:
            await sleep(1)
            continue
        break

    # wait for the application to reach a terminal_state
    # which is either COMPLETED or FAILED
    # or UNKNOWN(for timeout_seconds)
    while not self._completed:
        status = await self._fetch_status()
        app_state = (
            status.get(constants.STATUS)
            .get(constants.APPLICATION_STATE)
            .get(constants.STATE)
        )
        self.logger.info(f"Last obeserved heartbeat: {app_state}")
        if app_state in [constants.COMPLETED, constants.FAILED]:
            self._completed = True
            self._terminal_state = app_state
            self.logger.info(f"{self._completed}")
            self.logger.info(f"{self._terminal_state}")

        # happens when node/kubelet crashes.
        # Stop the application if this state doesn't change until timeout_seconds.
        elif app_state == constants.UNKNOWN:
            timer_start = int(perf_counter())
            while not self._timed_out:
                await sleep(self._spark_application.interval_seconds)
                status = await self._fetch_status()
                app_state = (
                    status.get(constants.STATUS)
                    .get(constants.APPLICATION_STATE)
                    .get(constants.STATE)
                )
                if app_state != constants.UNKNOWN:
                    timer_start = 0
                    break
                if (
                    int(perf_counter()) - timer_start
                    > self._spark_application.timeout_seconds
                ):
                    self._completed = True
                    self._timed_out = True
                    self._terminal_state = app_state
        else:
            await sleep(self._spark_application.interval_seconds)

    # restore the value after getting rid of loops.
    if self._terminal_state != constants.COMPLETED:
        self._completed = False

    _tail_logs = False
    if self._terminal_state == constants.FAILED:
        _tail_logs = True
    if self._spark_application.collect_driver_logs:
        _tail_logs = True
    if self._terminal_state == constants.UNKNOWN:
        _tail_logs = False

    if _tail_logs:
        self._error_msg = (
            self._status.get(constants.STATUS)
            .get(constants.APPLICATION_STATE)
            .get(constants.ERROR_MESSAGE, self._error_msg)
        )
        app_id = self._status.get(constants.STATUS).get(
            constants.SPARK_APPLICATION_ID
        )
        app_submission_id = self._status.get(constants.STATUS).get(
            constants.SUBMISSION_ID
        )

        v1_pod_list = await list_namespaced_pod.fn(
            kubernetes_credentials=self._spark_application.credentials,
            namespace=self._spark_application.namespace,
            label_selector=generate_pod_selectors(
                self._spark_application.name,
                app_id,
                app_submission_id,
            ),
            **self._spark_application.api_kwargs,
        )
        self.logger.info(f"pod_list: {len(v1_pod_list.items)}")
        for pod in v1_pod_list.items:
            pod_name = pod.metadata.name
            self.logger.info(f"Capturing logs for pod {pod_name!r}.")
            self.application_logs[pod_name] = await read_namespaced_pod_log.fn(
                kubernetes_credentials=self._spark_application.credentials,
                namespace=self._spark_application.namespace,
                pod_name=pod_name,
                container=constants.SPARK_DRIVER_CONAINER_NAME,
                **self._spark_application.api_kwargs,
            )

    if self._spark_application.delete_after_completion or self._timed_out:
        self._cleanup_status = await self._cleanup()

Functions

generate_pod_selectors

Generates pod selector labels given the name, app_id and submission_id of the spark application.

Source code in prefect_spark_on_k8s_operator/app.py
28
29
30
31
32
33
34
35
36
def generate_pod_selectors(name: str, app_id: str, submission_id: str) -> List[str]:
    """Generates pod selector labels given the
    name, app_id and submission_id of the spark application.
    """
    template = string.Template(constants.LABELS_TEMPLATE)
    labels = template.safe_substitute(
        name=name, app_id=app_id, submission_id=submission_id
    )
    return labels