꾸준히 오래오래

데이터 엔지니어의 공부 곳간✏️

Data/Airflow

[Airflow] Custom Operator 생성 시, template_fields 알아보기

zzi_yun 2024. 3. 2. 23:32

Airflow를 사용하다 보면, 상황에 맞게 필요한 Custom Operator를 생성하게 된다.

보통 batch job을 돌리며서 logical_date, ds, data_interval_start와 같은 날짜 템플릿 변수를 자주 사용하며,

이는 Custom Operator의 경우에도 마찬가지이다.

 

이때 Jinja template을 이용하여 Custom Operator를 매개변수화 할 수 있으며, 

template_fields에 있는 필드 이름을 Operator를 렌더링 하는 동안 템플릿으로 간주한다.

 

Templating

아래는 HelloOperator라는 BashOperator를 상속받아 생성한 custom operator의 예시이다.

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

 

with dag:
    hello_task = HelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="Earth",
    )

 

template_fields에 name이라는 필드가 명시 되어있고, 이를 Jinja가 {{ task_instance.task_id }}를 실제 값으로 치환한다.

 

위와 같은 예시 이외에도 template_ext를 통해서 sql 파일도 템플릿을 실제 값으로 대체해서 사용할 수 있다.

templated_fields에는 확장자를 제외한 파일이름과 template_ext에는 확장자를 명시해 주면 된다

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("guest_name",)
    template_ext = ".sql"

    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.guest_name = name

이 경우에 guest_name.sql이라는 파일 속 템플릿 값이 실제 값으로 대체될 것이다.

 

 

Notice

template_fields를 사용시에 몇 가지 알고 있어야 하는 점들이 있다. 

1. template_fields에 명시된 값들의 할당

template_fields에 명시된 값들은 생성자에서 직접 할당이 되거나 부모의 생성자를 통해서 할당이 되어야 한다.

그리고 할당 시에 이름은 template_fields에 명시한 이름과 동일해야 한다. 

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path # template_fields의 path, self.path

 

class HelloOperator(BaseOperator):
    template_fields = ("field_a", "field_b")

    def __init__(field_a, field_b) -> None:
        self.field_b = field_b
        # fields_a도 할당이 필요
        # self.field_a = field_a

 

이 경우에 template_fields에 field_a와 fields_b를 명시했지만, field_b만 할당이 되었기 때문에 오류가 발생한다.

 

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a) -> None:
        self.field_a = field_a


class MyHelloOperator(HelloOperator):
    template_fields = ("field_a", "field_b")

    def __init__(field_b, **kwargs) -> None: #__init__(field_a, field_bm **kwargs)-> None
        super().__init__(**kwargs) # 본인 생성자에서 할당하거나, super.init(field_a=field_a, **kwargs)
        self.field_b = field_b

 

이 경우에는 MyHelloOperator가 HelloOperator를 상속받은 케이스로, MyHelloOperator template_fields로 field_a와 field_b를 명시한 상태이다. 정상적으로 동작하기 위해서는 (1) MyHelloOperaotor의 생성자는 field_a도 전달받아야 하며, (2) 이를 본인의 생성자에서 할당하거나 (self.field_a=field_a) 혹은 부모 생성자에게 넘겨 할당시켜야 한다. (super().init(field_a=field_a, **kwargs))

 

2. template_fields의 명시한 값들의 치환 시점

템플릿의 치환은 Operator의 pre_execute 함수가 호출되기 직전에 이루어진다.

그렇기 때문에  __init__ 내에서 파라미터에 대한 조작을 하는 것은 옳지 않다.

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a) -> None:
        self.field_a = field_a.lower()  # X, self.field_a = field_a가 되어야 함

 

값에 조작은 execute() 내에서 조작하는 것이 옳은 방법이다.

 

 

[참고 자료]

Airflow 공식 문서(Operator) https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html#concepts-jinja-templating

 

Operators — Airflow Documentation

 

airflow.apache.org

Airflow 공식 문서(Custom Operator)

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

 

Creating a custom Operator — Airflow Documentation

 

airflow.apache.org