Airflow를 사용하다 보면, 상황에 맞게 필요한 Custom Operator를 생성하게 된다.
보통 batch job을 돌리며서 logical_date, ds, data_interval_start와 같은 날짜 템플릿 변수를 자주 사용하며,
이는 Custom Operator의 경우에도 마찬가지이다.
이때 Jinja template을 이용하여 Custom Operator를 매개변수화 할 수 있으며,
template_fields에 있는 필드 이름을 Operator를 렌더링 하는 동안 템플릿으로 간주한다.
Templating
아래는 HelloOperator라는 BashOperator를 상속받아 생성한 custom operator의 예시이다.
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
template_fields에 name이라는 필드가 명시 되어있고, 이를 Jinja가 {{ task_instance.task_id }}를 실제 값으로 치환한다.
위와 같은 예시 이외에도 template_ext를 통해서 sql 파일도 템플릿을 실제 값으로 대체해서 사용할 수 있다.
templated_fields에는 확장자를 제외한 파일이름과 template_ext에는 확장자를 명시해 주면 된다
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("guest_name",)
template_ext = ".sql"
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
이 경우에 guest_name.sql이라는 파일 속 템플릿 값이 실제 값으로 대체될 것이다.
Notice
template_fields를 사용시에 몇 가지 알고 있어야 하는 점들이 있다.
1. template_fields에 명시된 값들의 할당
template_fields에 명시된 값들은 생성자에서 직접 할당이 되거나 부모의 생성자를 통해서 할당이 되어야 한다.
그리고 할당 시에 이름은 template_fields에 명시한 이름과 동일해야 한다.
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path # template_fields의 path, self.path
class HelloOperator(BaseOperator):
template_fields = ("field_a", "field_b")
def __init__(field_a, field_b) -> None:
self.field_b = field_b
# fields_a도 할당이 필요
# self.field_a = field_a
이 경우에 template_fields에 field_a와 fields_b를 명시했지만, field_b만 할당이 되었기 때문에 오류가 발생한다.
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = ("field_a", "field_b")
def __init__(field_b, **kwargs) -> None: #__init__(field_a, field_bm **kwargs)-> None
super().__init__(**kwargs) # 본인 생성자에서 할당하거나, super.init(field_a=field_a, **kwargs)
self.field_b = field_b
이 경우에는 MyHelloOperator가 HelloOperator를 상속받은 케이스로, MyHelloOperator template_fields로 field_a와 field_b를 명시한 상태이다. 정상적으로 동작하기 위해서는 (1) MyHelloOperaotor의 생성자는 field_a도 전달받아야 하며, (2) 이를 본인의 생성자에서 할당하거나 (self.field_a=field_a) 혹은 부모 생성자에게 넘겨 할당시켜야 한다. (super().init(field_a=field_a, **kwargs))
2. template_fields의 명시한 값들의 치환 시점
템플릿의 치환은 Operator의 pre_execute 함수가 호출되기 직전에 이루어진다.
그렇기 때문에 __init__ 내에서 파라미터에 대한 조작을 하는 것은 옳지 않다.
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a.lower() # X, self.field_a = field_a가 되어야 함
값에 조작은 execute() 내에서 조작하는 것이 옳은 방법이다.
[참고 자료]
Airflow 공식 문서(Operator) https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html#concepts-jinja-templating
Operators — Airflow Documentation
airflow.apache.org
Airflow 공식 문서(Custom Operator)
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
Creating a custom Operator — Airflow Documentation
airflow.apache.org