嗨,我是Dmitry Logvinenko-Lucky Group of Companies分析部门的数据工程师。
我将告诉您有关开发ETL流程的绝佳工具-Apache Airflow。但是,Airflow用途广泛且用途广泛,即使您不处理数据流,但也需要定期启动任何进程并监视其执行情况,您也应该仔细研究一下。
是的,我不仅会告诉,而且还会显示:该程序有很多代码,屏幕截图和建议。
当您用Google搜索Airflow / Wikimedia Commons一词时,通常会看到什么
目录
介绍
Apache Airflow就像Django:
- 用Python编写,
- 有一个出色的管理面板,
- 可无限扩展
-只有更好,并且是为完全不同的目的而制作的,即(如写在kata之前):
- ( Celery/Kubernetes )
- workflow Python-
- API , ( ).
Apache Airflow :
- ( SQL Server PostgreSQL, API , 1) DWH ODS ( Vertica Clickhouse).
-
cron
, ODS, .
32 50 GB . Airflow :
- 200 ( workflows, ),
- 70 ,
- ( ) .
, , , über-, :
SQL Server’, 50 — , , ( , --), Orders ( ). , (-, -, ETL-) , , Vertica.
!
, ( )
( )
, SQL
- , ETL- aka :
Informatica Power Center — , , , . 1% . ? , -, - . -, , ---. , Airbus A380/, .
, 30
SQL Server Integration Server — . : SQL Server , ETL- - . : , … , .
dtsx
( XML ) , ? , ? , , . , , :
. SSIS-...
… . Apache Airflow.
, ETL- — Python-, . , Python- - 13” .
, , Airflow, , Celery , .
, docker-compose.yml
:
- Airflow: Scheduler, Webserver. Flower Celery- (
apache/airflow:1.10.10-python3.7
, ); - PostgreSQL, Airflow ( , . .), Celery — ;
- Redis, Celery;
- Celery worker, .
-
./dags
. , .
- ( ), - . https://github.com/dm-logv/airflow-tutorial.
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
:
- puckel/docker-airflow – . , .
- Airflow
airflow.cfg
, ( ), . - , production-ready: heartbeats , . , .
- , :
- , .
- — .
:
$ docker-compose up --scale worker=3
, , -:
- Airflow: http://127.0.0.1:8080/admin/
- Flower: http://127.0.0.1:5555/dashboard
«», :
Scheduler — Airflow, , , : , , .
, , (, , ) -
run_duration
— . .
DAG ( «») — « », , (. ) Package SSIS Workflow Informatica.
, .
DAG Run — ,
execution_date
. ( , , ).
Operator — , - . :
- action,
PythonOperator
, () Python-; - transfer, , ,
MsSqlToHiveTransfer
; - sensor - .
HttpSensor
, ,GoogleCloudStorageToS3Operator
. : «? !» , . , .
- action,
Task — .
Task instance — - , - ( ,
LocalExecutor
CeleryExecutor
), (. . — ), .
, , .
, :
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
:
- ;
sql_server_ds
—List[namedtuple[str, str]]
Airflow Connections ;dag
— ,globals()
, Airflow . :
-
orders
— -, - , ,
- , 6 (
timedelta()
cron
-0 0 0/6 ? * * *
, —@daily
);
-
workflow()
, . .- :
- ;
-
PythonOperator
,workflow()
. ( ) .provide_context
,**context
.
. :
- -,
- , ( Airflow, Celery ).
, .
?
docker-compose.yml
requirements.txt
.
:
— task instances, .
, :
, , — . — .
,./dags
, —git
Gitlab, Gitlab CImaster
.
Flower
-, , - — Flower.
-:
, :
:
— :
, , .
— . Airflow , .
task instances.
, :
, Clear . , , - , .
, — Airflow. , : Browse/Task Instances
:
( , ):
,
DAG, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent(""" , """),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""\
, , {{ dag.dag_id }}
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
- ? : , ; , ; , ( , ).
:
from commons.operators import TelegramBotSendMessage
— , , . ( );default_args={}
— ;to='{{ var.value.all_the_kings_men }}'
—to
, Jinja email-,Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
— . , ;tg_bot_conn_id='tg_main'
—conn_id
,Admin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
— Telegram ;task_concurrency=1
— task instances . ,VerticaOperator
( );report_update >> [email, tg]
—VerticaOperator
, :
- , . Tree View :
— .
— Jinja-, . , :
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
execution_date
YYYY-MM-DD
: 2020-07-14
. , ( Tree View), .
Rendered -. :
:
, , , .
, ( ). Admin/Variables
:
, :
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
, JSON. JSON-:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
: {{ var.json.bot_config.bot.token }}
.
. : Admin/Connections
, / . :
( , ), ( tg_main
) — , Airflow ( - — ), .
: BaseHook.get_connection()
, , ( Round Robin, Airflow).
Variables Connections, , , : , — Airflow. C , , , UI. — - , () .
— . Airflow — . , JiraHook
Jira ( -), SambaHook
smb
-.
, , TelegramBotSendMessage
commons/operators.py
:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
, Airflow, :
-
BaseOperator
, Airflow- ( ) -
template_fields
, Jinja . -
__init__()
, , . - .
-
TelegramBotHook
, -. - ()
BaseOperator.execute()
, Airfow , — , . (, ,stdout
stderr
— Airflow , , , .)
, commons/hooks.py
. , :
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
, , :
- , — :
conn_id
; - :
get_conn()
, -extra
( JSON), ( !) Telegram-:{"bot_token": "YOuRAwEsomeBOtToKen"}
. -
TelegramBot
, .
. c TelegramBotHook().clent
TelegramBotHook().get_conn()
.
, Telegram REST API, python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
— :TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
— , , Open Source.
, . , ...
- ! ? !
- ?
, - ? SQL Server Vertica , , !
, - . .
:
,- SQL Server
- Vertica
, , docker-compose.yml
:
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
:
- Vertica
dwh
, - SQL Server,
- - (
mssql_init.py
!)
, , :
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
, , Data Profiling/Ad Hoc Query
:
,
ETL- , : , , , :
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}\n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
. :
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Airflow
pymssql
- - — .
-
pandas
,DataFrame
— .
{dt}
%s
, ,pandas
pymssql
params: List
,tuple
.
,pymssql
,pyodbc
.
, Airflow :
, . . . --, ?! :
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
Airflow, , , . , pink.
:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
:
- , ,
- ( ),
- — ( ) .
: Vertica. , , — CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
-
StringIO
. pandas
DataFrame
CSV
-.- Vertica .
-
copy()
!
, , , :
session.loaded_rows = cursor.rowcount
session.successful = True
.
. :
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
VerticaOperator()
( , ). , :
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
— , — , — ,
, ?
, «»
, : ETL-: SSIS Airflow… … , , , !
- , Apache Airflow — — .
: , — Airflow : , , ( , ).
, -
,
start_date
. , .start_date
. ,start_date
,schedule_interval
— , DAG .
start_date = datetime(2020, 7, 7, 0, 1, 2)
.
:
Task is missing the start_date parameter
, , .
. , ( Airflow ), -, , . . , PostgreSQL 20 5 , .
LocalExecutor. , , . LocalExecutor’ , , , CeleryExecutor. , , Celery , «, , !»
:
- Connections ,
- SLA Misses , ,
- XCom ( !) .
. ? . Gmail >90k Airflow, - 100 .
: Apache Airflow Pitfails
, , Airflow :
REST API — Experimental, . , / , DAG Run .
CLI — , WebUI, . :
backfill
.
, , : « , , 1 13 ! ---!». :
airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- :
initdb
,resetdb
,upgradedb
,checkdb
. run
, , . ,LocalExecutor
, Celery-.-
test
, . connections
.
Python API — , , .
/home/airflow/dags
,ipython
? , , :
from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
Airflow. , , API.
, , . — , .
, SQL!WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Airflow .
- Apache Airflow Documentation — , . , ?
- Best Practices — .
- The Airflow UI — :
- Understanding Apache Airflow’s key concepts — , (!) - .
- Tianlong's Blog — A Guide On How To Build An Airflow Server/Cluster — Airflow-.
- Running Apache Airflow At Lyft — , , .
- How Apache Airflow Distributes Jobs on Celery workers — Celery.
- DAG Writing Best Practices in Apache Airflow — , ID , , .
- Managing Dependencies in Apache Airflow — Trigger Rule, .
- Airflow: When Your DAG is Far Behind The Schedule — «, » , .
- Useful SQL queries for Apache Airflow — SQL- Airflow.
- Get started developing workflows with Apache Airflow — .
- Building the Fetchr Data Science Infra on AWS with Presto and Airflow — AWS Data Science.
- 7 Common Errors to Check when Debugging Airflow DAGs — ( - - ).
- Store and access password using Apache Airflow — , , Connections.
- The Zen of Python and Apache Airflow — DAG, , , .
- Airflow: Lesser Known Tips, Tricks, and Best Practises —
default arguments
params
, Variables Connections. - Profiling the Airflow Scheduler — , Airflow 2.0.
- Apache Airflow with 3 Celery workers in docker-compose —
docker-compose
. - 4 Templating Tasks Using the Airflow Context — .
- Error Notifications in Airflow — Slack.
- Airflow Workshop: DAG’ — , XCom.
, :
- Macros reference — .
- Common Pitfalls — Airflow — .
- puckel /泊坞窗,通风:泊坞窗Apache的气流-
docker-compose
进行实验,调试等等。 - python-telegram-bot / python-telegram-bot:我们为您提供了一个您不能拒绝的包装器-Telegram REST API的Python包装器。