airflow安装及使用入门(linux) | 您所在的位置:网站首页 › apache-airflow › airflow安装及使用入门(linux) |
目录 airflow 概述 安装 安装python环境 安装Airflow 修改数据库为MySQL 修改执行器 配置邮件服务器 常用命令 airflow 概述Airflow是一个以编程方式编写,安排和监视工作流的平台 主要用于任务调度的安排;使用Airflow将工作流编写任务的有向无环图(DAG)。Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务 安装官网:Apache Airflow 安装python环境1.首先安装anaconda/miniconda 下载地址: 可以从官网下载 :Anaconda | Anaconda Distribution 下载完成后在命令行中安装即可 bash Anaconda3-2022.10-Linux-x86_64.sh 二者的区别: conda是一种通用包管理系统,旨在构建和管理任何语言和任何类型的软件。举个例子:包管理与pip的使用类似,环境管理则允许用户方便地安装不同版本的python并可以快速切换。 Anaconda则是一个打包的集合,里面预装好了conda、某个版本的python、众多packages、科学计算工具等等,就是把很多常用的不常用的库都给你装好了。 Miniconda,顾名思义,它只包含最基本的内容——python与conda,以及相关的必须依赖项,对于空间要求严格的用户,Miniconda是一种选择。就只包含最基本的东西,其他的库得自己装。 随后加载环境变量配置文件,使其生效: source ~/.bashrc 2.创建python3.8环境 首先配置国内镜像: conda config --add channels Index of /anaconda/pkgs/free/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror conda config --add channels Index of /anaconda/pkgs/main/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror conda config --set show_channel_urls yes 创建python环境: conda create --name airflow python=3.8 查看python版本:python -V 常用conda命令 禁止激活默认base环境: conda config --set auto_activate_base false 创建环境:conda create -n env_name 查看所有环境:conda info --envs 删除一个环境:conda remove -n env_name --all 激活环境:conda activate airflow 退出当前环境:conda deactivate 安装Airflow首先修改pip的源,下载速度更快: pip install numpy -i Simple Index 然后创建pip的配置文件,使得修改的pip源生效: sudo mkdir ~/.pip sudo vim ~/.pip/pip.conf 添加以下内容: [global] index-url = https://pypi.tuna.tsinghua.edu.cn/simple [install] trusted-host = https://pypi.tuna.tsinghua.edu.cn安装airflow:pip install "apache-airflow==2.4.3" 出现上图所示标识,则说明安装的pip源已经生效; 初始化airflow:airflow db init 创建账号: airflow users create \ --username admin \ --firstname why \ --lastname why \ --role Admin \ --email [email protected]密码:123456 查看版本:airflow version 启动airflow web服务:airflow webserver -p 8089 -D(为防止8080端口被占用,选用不常用的端口) 启动airflow调度:airflow scheduler -D 启动之后可以看到许多默认的调度示例(如上); 启停脚本: #!/bin/bash case $1 in "start"){ echo " --------启动 airflow-------" ssh 你的服务器名称 "conda activate airflow;airflow webserver -p 8089 -D;airflow scheduler -D; conda deactivate" };; "stop"){ echo " --------关闭 airflow-------" ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15 };; esac 修改数据库为MySQL点击查看相关指导操作; 首先在MySQL中建库: CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; 由于版本不匹配,可能出现ssl证书的问题,可以选择关闭: 查看ssl是否开启:SHOW VARIABLES LIKE '%ssl%'; 如何关闭? sudo vim /etc/my.cnf 加入以下内容:skip_ssl 除此之外还可能出现时间戳问题:1067 - Invalid default value for ‘update_at’ 原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。 解决方法:在配置文件中写入:sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION 或者在命令行:set session sql_mode='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'; 然后添加python连接的依赖: 先安装驱动:pip install mysql-connector-python 再修改配置文件:vim airflow/airflow.cfg 添加以下内容:sql_alchemy_conn = mysql+mysqlconnector://root:hadoop@服务器名称:3306/airflow_db 操作完成后重启数据库:sudo systemctl restart mysqld 注意:数据库必须重启,否则操作不生效 然后停止当前airflow服务,重新初始化后启动;创建账号进行登录 修改执行器官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞 可以在配置文件中修改对应的调度器:vim airflow/airflow.cfg 选择LocalExecutor,KubernetesExecutor等均可; 然后重启airflow 修改airflow.cfg文件的内容: 主要参数: smtp_host:SMTP服务器地址(在邮箱客户端可以查看) port:SMTP服务器端口,163邮箱为25,QQ邮箱为465或587 修改代码,添加发送邮件的任务: #!/usr/bin/python from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from datetime import datetime, timedelta default_args = { # 用户 'owner': 'why', # 是否开启任务依赖 'depends_on_past': True, # 邮箱 'email': ['[email protected]'], # 启动时间 'start_date':datetime(2023,1,19), # 出错是否发邮件报警 'email_on_failure': False, # 重试是否发邮件报警 'email_on_retry': False, # 重试次数 'retries': 1, # 重试时间间隔 'retry_delay': timedelta(minutes=5), } # 声明任务图 dag = DAG('test_email', default_args=default_args, schedule_interval=timedelta(days=1)) # 创建单个任务 t1 = BashOperator( # 任务id task_id='t1', # 任务命令 bash_command='ssh hadoop102 "echo "task1" >> /home/why/data/test_airflow.txt"', # 重试次数 retries=3, # 把任务添加进图中 dag=dag) t2 = BashOperator( task_id='t2', bash_command='ssh hadoop102 "echo "task2" >> /home/why/data/test_airflow.txt"', retries=3, dag=dag) t3 = BashOperator( task_id='t3', bash_command='ssh hadoop102 "echo "task3" >> /home/why/data/test_airflow.txt"', retries=3, dag=dag) email=EmailOperator( task_id="email", to="[email protected]", subject="test-subject", html_content="test-content", cc="[email protected]", dag=dag) t2.set_upstream(t1) t3.set_upstream(t2) email.set_upstream(t3)参数含义: to:收件人地址 subject:邮件主题 html_content:邮件内容(html格式) cc:抄送人地址 然后重启airflow即可; 常用命令首先需要切入到airflow的conda环境下; airflow --help:查看帮助信息; airflow users -help:查看具体命令(users)的帮助信息: airflow users list:查看所有用户 airflow webserver -p 8089 -D:启动web端 airflow scheduler -D:启动调度器; airflow dags list:查看所有任务 airflow tasks list test --tree:查看单个任务(test)(以树形结构打印) |
CopyRight 2018-2019 实验室设备网 版权所有 |