airflow安装及使用入门(linux) 您所在的位置:网站首页 apache-airflow airflow安装及使用入门(linux)

airflow安装及使用入门(linux)

2023-12-23 03:00| 来源: 网络整理| 查看: 265

目录

airflow

概述

安装

安装python环境

安装Airflow

修改数据库为MySQL

修改执行器

配置邮件服务器

常用命令

airflow 概述

Airflow是一个以编程方式编写,安排和监视工作流的平台

主要用于任务调度的安排;使用Airflow将工作流编写任务的有向无环图(DAG)。Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务

安装

官网:Apache Airflow

安装python环境

1.首先安装anaconda/miniconda

下载地址:

可以从官网下载 :Anaconda | Anaconda Distribution

下载完成后在命令行中安装即可

 bash Anaconda3-2022.10-Linux-x86_64.sh

二者的区别:

conda是一种通用包管理系统,旨在构建和管理任何语言和任何类型的软件。举个例子:包管理与pip的使用类似,环境管理则允许用户方便地安装不同版本的python并可以快速切换。

Anaconda则是一个打包的集合,里面预装好了conda、某个版本的python、众多packages、科学计算工具等等,就是把很多常用的不常用的库都给你装好了。

Miniconda,顾名思义,它只包含最基本的内容——python与conda,以及相关的必须依赖项,对于空间要求严格的用户,Miniconda是一种选择。就只包含最基本的东西,其他的库得自己装。

随后加载环境变量配置文件,使其生效:

source ~/.bashrc

2.创建python3.8环境

首先配置国内镜像:

conda config --add channels Index of /anaconda/pkgs/free/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror

conda config --add channels Index of /anaconda/pkgs/main/ | 北京外国语大学开源软件镜像站 | BFSU Open Source Mirror

conda config --set show_channel_urls yes

创建python环境:

conda create --name airflow python=3.8

查看python版本:python -V

常用conda命令

禁止激活默认base环境:

conda config --set auto_activate_base false

创建环境:conda create -n env_name

查看所有环境:conda info --envs

删除一个环境:conda remove -n env_name --all

激活环境:conda activate airflow

退出当前环境:conda deactivate

安装Airflow

首先修改pip的源,下载速度更快:

pip install numpy -i Simple Index

然后创建pip的配置文件,使得修改的pip源生效:

sudo mkdir ~/.pip

sudo vim ~/.pip/pip.conf

添加以下内容:

[global] index-url = https://pypi.tuna.tsinghua.edu.cn/simple [install] trusted-host = https://pypi.tuna.tsinghua.edu.cn

安装airflow:pip install "apache-airflow==2.4.3"

出现上图所示标识,则说明安装的pip源已经生效;

初始化airflow:airflow db init

创建账号:

airflow users create \ --username admin \ --firstname why \ --lastname why \ --role Admin \ --email [email protected]

密码:123456

查看版本:airflow version

启动airflow web服务:airflow webserver -p 8089 -D(为防止8080端口被占用,选用不常用的端口)4ddbf71e2f309bb860aab8e64b207a0a.png

启动airflow调度:airflow scheduler -D

启动之后可以看到许多默认的调度示例(如上);

启停脚本:

#!/bin/bash case $1 in "start"){ echo " --------启动 airflow-------" ssh 你的服务器名称 "conda activate airflow;airflow webserver -p 8089 -D;airflow scheduler -D; conda deactivate" };; "stop"){ echo " --------关闭 airflow-------" ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15 };; esac 修改数据库为MySQL

点击查看相关指导操作;

首先在MySQL中建库:

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

由于版本不匹配,可能出现ssl证书的问题,可以选择关闭:

查看ssl是否开启:SHOW VARIABLES LIKE '%ssl%';

如何关闭?

sudo vim /etc/my.cnf

加入以下内容:skip_ssl

除此之外还可能出现时间戳问题:1067 - Invalid default value for ‘update_at’

原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

解决方法:在配置文件中写入:sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

或者在命令行:set session sql_mode='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

然后添加python连接的依赖:

先安装驱动:pip install mysql-connector-python

再修改配置文件:vim airflow/airflow.cfg

添加以下内容:sql_alchemy_conn = mysql+mysqlconnector://root:hadoop@服务器名称:3306/airflow_db

操作完成后重启数据库:sudo systemctl restart mysqld

注意:数据库必须重启,否则操作不生效

然后停止当前airflow服务,重新初始化后启动;创建账号进行登录

修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞

可以在配置文件中修改对应的调度器:vim airflow/airflow.cfg

选择LocalExecutor,KubernetesExecutor等均可;

然后重启airflow

eae72cb2b6d0456b2f98f66b81e45bbf.png

配置邮件服务器

修改airflow.cfg文件的内容:

主要参数:

smtp_host:SMTP服务器地址(在邮箱客户端可以查看)

port:SMTP服务器端口,163邮箱为25,QQ邮箱为465或587

修改代码,添加发送邮件的任务:

#!/usr/bin/python from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from datetime import datetime, timedelta default_args = { # 用户 'owner': 'why', # 是否开启任务依赖 'depends_on_past': True, # 邮箱 'email': ['[email protected]'], # 启动时间 'start_date':datetime(2023,1,19), # 出错是否发邮件报警 'email_on_failure': False, # 重试是否发邮件报警 'email_on_retry': False, # 重试次数 'retries': 1, # 重试时间间隔 'retry_delay': timedelta(minutes=5), } # 声明任务图 dag = DAG('test_email', default_args=default_args, schedule_interval=timedelta(days=1)) # 创建单个任务 t1 = BashOperator( # 任务id task_id='t1', # 任务命令 bash_command='ssh hadoop102 "echo "task1" >> /home/why/data/test_airflow.txt"', # 重试次数 retries=3, # 把任务添加进图中 dag=dag) t2 = BashOperator( task_id='t2', bash_command='ssh hadoop102 "echo "task2" >> /home/why/data/test_airflow.txt"', retries=3, dag=dag) t3 = BashOperator( task_id='t3', bash_command='ssh hadoop102 "echo "task3" >> /home/why/data/test_airflow.txt"', retries=3, dag=dag) email=EmailOperator( task_id="email", to="[email protected]", subject="test-subject", html_content="test-content", cc="[email protected]", dag=dag) t2.set_upstream(t1) t3.set_upstream(t2) email.set_upstream(t3)

参数含义:

to:收件人地址

subject:邮件主题

html_content:邮件内容(html格式)

cc:抄送人地址

然后重启airflow即可;

常用命令

首先需要切入到airflow的conda环境下;

airflow --help:查看帮助信息;

airflow users -help:查看具体命令(users)的帮助信息:

airflow users list:查看所有用户

airflow webserver -p 8089 -D:启动web端

airflow scheduler -D:启动调度器;

airflow dags list:查看所有任务

airflow tasks list test --tree:查看单个任务(test)(以树形结构打印)



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有