Skip to main content

Pipelines Operations

Initiate kubeflow clientโ€‹

Create instance of kubeflow client.

from katonic.pipeline.pipeline import Client
client = Client()

Start new Experimentโ€‹

start new experiemnt using kubeflow client instance

from katonic.pipeline.pipeline import Client
client = Client()
client.create_experiment('your experiment name')

Create pipeline componentโ€‹

convert your functions into pipeline components

from katonic.pipeline.pipeline import create_component_from_func


@create_component_from_func
def your_function(variable: type) -> type:
.
.write your function
.
return yourfunction output

Create a pipeline functionโ€‹

create a pipeline by defining task inside a pipeline function

from katonic.pipeline.pipeline import dsl

@dsl.pipeline(
name='your pipeline name',
description='your pipeline description',
)
def pipeline_function(variable: type) -> type:
task = your_function(variable)

Create a pipeline runโ€‹

create a run of your pipeline

from katonic.pipeline.pipeline import dsl, Client

client = Client()
client.create_run_from_pipeline_func(
pipeline_function,
arguments=dictionary of arguments,
mode=dsl.PipelineExecutionMode.V2_COMPATIBLE,
experiment_name='experiment name'
)

List pipelines experimentstry_testing_fileโ€‹

List all experiment

from katonic.pipeline.pipeline import Client

client = Client()
client.list_experiments()

Delete Experimentโ€‹

Delete the pipeline experiment

from katonic.pipeline.pipeline import Client

client = Client()
client.delete_experiment(experiment_id)

List all pipelines runsโ€‹

List all pipeline runs

from katonic.pipeline.pipeline import Client

client = Client()
client.list_runs(sort_by='created_at desc', page_size=100000).runs

Delete Runโ€‹

Delete the pipeline run

from katonic.pipeline.pipeline import Client

client = Client()
client.runs.delete_run(run_id)

Archive Runโ€‹

Delete the pipeline run

from katonic.pipeline.pipeline import Client

client = Client()
client.runs.archive_run(run_id)

Terminate active Runโ€‹

Delete the pipeline run

from katonic.pipeline.pipeline import Client

client = Client()
client.runs.terminate_run(run_id)

List Recurring runs jobโ€‹

list all recurring jobs of pipeline run.

from katonic.pipeline.pipeline import Client

client = Client()
client.list_recurring_runs().jobs

Delete recurring run jobโ€‹

Delete recurring run job by given job id.

from katonic.pipeline.pipeline import Client

client = Client()
client.jobs.delete_job(job_id)

List Pipelinesโ€‹

list all pipelines.

from katonic.pipeline.pipeline import Client

client = Client()
client.list_pipelines().pipelines

Delete pipelineโ€‹

Delete pipeline by given pipeline id.

from katonic.pipeline.pipeline import Client

client = Client()
client.delete_pipeline(pipeline_id)

Display DataFrame Artifactsโ€‹

Display dataframe artifact in pipeline ui.

from katonic.pipeline.display import Display

show = Display()
show.dataframe_display(data=artifact_object, desc='artifact name')
show.save_metadata()

Display Artifacts (str, tuple, dictionary)โ€‹

Display any type artifact in pipeline ui.

from katonic.pipeline.display import Display

show = Display()
show.basic_display(artifact, 'Missing value count')
show.save_metadata()

Display plotsโ€‹

Display plots in pipeline ui

from katonic.pipeline.display import Display
import seaborn as sns

show = Display()
sns.histplot(list_values)
show.plot_display(title='plot_name')
show.save_metadata()