您现在的位置是:亿华云 > 人工智能

使用 AirFlow 调度 MaxCompute

亿华云2025-10-03 08:33:56【人工智能】9人已围观

简介背景airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种O

背景

airflow是使用Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),调度airflow可以定义一组有依赖的使用任务,按照依赖依次执行,调度通过python代码定义子任务,使用并支持各种Operate操作器,亿华云调度灵活性大,使用能满足用户的调度各种需求。本文主要介绍使用Airflow的使用python Operator调度MaxCompute 任务

一、环境准备

Python 2.7.5 PyODPS支持Python2.6以上版本

Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的调度包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。使用

pip install cython>=0.19.0 # 可选,云南idc服务商调度不建议Windows用户安装。使用

pip install pyodps

注意:如果requests包冲突,调度先卸载再安装对应的使用版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

# -*- coding: UTF-8 -*- import sys import os from odps import ODPS from odps import options from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from configparser import ConfigParser import time reload(sys) sys.setdefaultencoding(utf8) #修改系统默认编码。 # MaxCompute参数设置 options.sql.settings = { options.tunnel.limit_instance_tunnel: False, odps.sql.allow.fullscan: True} cfg = ConfigParser() cfg.read("odps.ini") print(cfg.items()) odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint")) 

default_args = {  owner: airflow, depends_on_past: False, retry_delay: timedelta(minutes=5), start_date:datetime(2020,1,15) # email: [airflow@example.com], # email_on_failure: False, # email_on_retry: False, # retries: 1, # queue: bash_queue, # pool: backfill, # priority_weight: 10, # end_date: datetime(2016, 1, 1), } dag = DAG( Airiflow_MC, default_args=default_args, schedule_interval=timedelta(seconds=30)) def read_sql(sqlfile): with io.open(sqlfile, encoding=utf-8, mode=r) as f: sql=f.read() f.closed return sql def get_time(): print 当前时间是{ }.format(time.time()) return time.time() def mc_job (): project = odps.get_project() # 取到默认项目。 instance=odps.run_sql("select * from long_chinese;") print(instance.get_logview_address()) instance.wait_for_success() with instance.open_reader() as reader: count = reader.count print("查询表数据条数:{ }".format(count)) for record in reader: print record return count t1 = PythonOperator ( task_id = get_time , provide_context = False , python_callable = get_time, dag = dag ) t2 = PythonOperator ( task_id = mc_job , provide_context = False , python_callable = mc_job , dag = dag ) t2.set_upstream(t1) 

2.提交

python Airiflow_MC.py 

3.进行测试

# print the list of active DAGs airflow list_dags # prints the list of tasks the "tutorial" dag_id airflow list_tasks Airiflow_MC # prints the hierarchy of tasks in the tutorial DAG airflow list_tasks Airiflow_MC --tree #测试task airflow test Airiflow_MC get_time 2010-01-16 airflow test Airiflow_MC mc_job 2010-01-16 

4.运行调度任务

登录到web界面点击按钮运行

5.查看任务运行结果

1.点击view log

2.查看结果

云服务器

很赞哦!(595)