记录一下新学习的技术:k8s machine learning pipeline

k8s全名Kubenetes, 用于解决 ml 中过程模糊的问题

  • pipeline 顾名思义,多个任务运行相互独立,相互流通必要的输入输出
  • 现在传输类型局限为str
  • 搭建管道用k8s装饰器
  • 指定每个阶段在什么容器环境下运行
  • 每个容器不互通,复杂数据(如dataframe)需要从外界获取,如k8s提供的轻量化存储Minio
  • [ extra ]为解决国内docker连接问题和防止公司内部泄密问题,容器储存使用内网搭建的容器管理平台Harbor

pipeline 生成方法

  • 每一个方框代表一个python方程,这个方程需要指定输入输出,以NamedTuple来体现
  • 由于python在制定类型上模糊,NamedTuple 本身在python中起到监督python方程写法作用
from typing import NamedTuple

def function_a() -> NamedTuple('tuple_name', [('outputname_in_func', 'outputtype')]):
#复杂类型可直接用tuple
# e.g.  
def function_b() -> NamedTuple('tuple_name', [('outputname_in_func', 'outputname_outside_func')]):

在这里会确定这个方程的输入输出: 这个tuple在degbug中的名称叫‘tuple_name’, 标出你想让这个方程output出来的variable名称, 在return时标明tuple

return tuple_name(output_name)

写好方程后,需要告诉pipeline你这个方程在什么环境下运行:

from kfp.components import func_to_container_op
xx_op = func_to_container_op(function_a, base_image='python@3.8')

连接管道

from xxx import your container function
import kfp
import kfp.dsl as dsl

@dsl.pipeline(name='pipeline name',
              description='your pipeline description')
def pipeline_main_function(args):
  """
  连接所有的container
  """
  container1 = op1(op1_parm1, ...)
  op1_res = container1.outputs['outputname_outside_func']
  # 然后在把这个结果作为参数传到下一个container


if __name__ == '__main__':
  kfp.compiler.Compiler().compile(
      pipeline_main_function,
      package_path='yaml location you want to store'
  )