一、概述

Apache Azkaban 是一个开源的批处理工作流调度系统,用于办理和调度Hadoop生态系统中的使命和作业。

  • Azkaban 供给了一个直观的Web界面,让用户能够轻松地界说、调度和监控作业流。它支撑工作流的可视化编辑,能够将使命以依靠联系的方式组织起来,然后完结复杂的工作流程。用户能够界说作业的依靠联系、时刻调度、重试机制等,并监控作业的履行情况和日志。

  • Azkaban 能够与Hadoop生态系统中的各种东西和框架集成,如Hadoop、Hive、Pig、Spark等。它经过与这些东西的集成,能够方便地调度和履行这些东西生成的作业。

  • Azkaban 还供给了一套安全机制,能够操控用户对作业流的访问权限,确保敏感数据和使命的安全性。

总的来说,Apache Azkaban是一个功能强大的批处理工作流调度系统,能够协助用户办理和调度Hadoop生态系统中的作业和使命,进步工作效率和数据处理的可靠性。

通过 docker-compose 快速部署 Azkaban 保姆级教程
这儿只是解说容器化快速布置进程,想了解更多关于 Azkaban 的知识点可重视我以下文章:

  • 大数据Hadoop之——使命调度器Azkaban(Azkaban环境布置)
  • 大数据Hadoop之——Azkaban API详解
  • 【云原生】Azkaban on k8s 解说与实战操作

二、Azkaban 的调度流程

通过 docker-compose 快速部署 Azkaban 保姆级教程
Apache Azkaban的调度流程能够概括为以下几个步骤:

  1. 界说作业流:运用Azkaban的Web界面或Azkaban的DSL言语,用户界说作业流并指定使命之间的依靠联系。作业流由一系列使命组成,能够按照次序或并行方式履行。

  2. 作业提交:当作业流需要履行时,Azkaban会将使命提交到履行环境中(如Hadoop集群)。这能够经过调用相应的履行引擎(如Azkaban Executor)来完结。使命提交时,Azkaban会将使命的相关信息和依靠联系传递给履行引擎。

  3. 依靠联系解析:履行引擎接纳到使命后,会解析使命之间的依靠联系。它会查看每个使命所依靠的其他使命是否已经完结。假如有未满意的依靠联系,使命将等候依靠使命完结后再履行。

  4. 使命履行:一旦使命的依靠联系满意,履行引擎会开端履行使命。使命能够是各种类型,如Hadoop作业、Shell脚本、Spark作业等。履行引擎会调用相应的履行器来履行使命,并供给所需的参数和装备。

  5. 使命监控和日志:在使命履行期间,Azkaban会实时监控使命的履行状况,并记载使命的日志输出。用户能够经过Azkaban的Web界面查看使命的履行进度、日志和错误信息。这有助于及时发现和排查履行问题。

  6. 依靠联系查看:在使命履行完结后,履行引擎会查看使命的输出和后续使命的依靠联系。假如有后续使命依靠当前使命的输出,履行引擎会传递相应的输出给后续使命,并继续履行后续使命。

  7. 完结和陈述:当作业流中的所有使命都履行完结后,Azkaban会将作业流的履行状况标记为完结,并生成履行陈述。陈述能够包括使命的履行成果、履行时刻、日志等信息,以便用户进行查看和剖析。

总的来说,Apache Azkaban的调度流程包括界说作业流、使命提交、依靠联系解析、使命履行、监控和日志、依靠联系查看以及完结和陈述等步骤。这些步骤确保了作业流的正确履行和办理,并供给了实时的监控和日志记载功能。

三、前期准备

1)布置 docker

# 装置yum-config-manager装备东西
yum -y install yum-utils
# 主张运用阿里云yum源:(引荐)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 装置docker-ce版别
yum install -y docker-ce
# 发动并开机发动
systemctl enable --now docker
docker --version

2)布置 docker-compose

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version

四、创立网络

# 创立,留意不能运用hadoop_network,要不然发动hs2服务的时候会有问题!!!
docker network create hadoop-network
# 查看
docker network ls

五、Azkaban 编列布置

1)装置 MySQL

如想快速布置MySQL,可参考我这篇文章:经过 docker-compose 快速布置 MySQL保姆级教程

2)下载 Azkaban 编译

git clone https://github.com/azkaban/azkaban.git
# 编译
cd azkaban; ./gradlew build installDist

这儿也供给一下我编译的装置包,下载地址如下:

链接:pan.baidu.com/s/1zvUyfXg3… 提取码:6666

3)初始化 azkaban 用户和表

创立 azkaban 用户

#【温馨提示】一般公司制止mysql -u root -p123456这种方式衔接,在history里有记载,存在安全隐患,小伙伴不要被公司安全审计哦,牢记!!!
mysql -u root -p
# 输入暗码:123456
CREATE DATABASE azkaban;
CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban';
GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;

开端初始化表数据

cd ${AZKABAN_HOME}/azkaban-db
# 衔接mysql
mysql -u root -p
#暗码:123456
use azkaban
# 可能版别不一样,sql文件也不太一样,create-all-sql-*.sql
source create-all-sql-3.91.0-313-gadb56414.sql

4)装备

  • conf/exec/azkaban.properties
azkaban.name=Test
azkaban.label=My Local Azkaban
azkaban.default.servlet.path=/index
web.resource.dir=web/
default.timezone.id=Asia/Shanghai
# default.timezone.id=America/Los_Angeles
user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=conf/azkaban-users.xml
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
velocity.dev.mode=false
jetty.use.ssl=false
jetty.maxThreads=25
jetty.port=8081
# azkaban.webserver.url=http://localhost:8081
azkaban.webserver.url=https://azkaban-web:8081
mail.sender=
mail.host=
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache
jetty.connector.stats=true
executor.connector.stats=true
azkaban.jobtype.plugin.dir=plugins/jobtypes
database.type=mysql
mysql.port=3306
mysql.host=mysql
# mysql.host=localhost
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
executor.maxThreads=50
executor.flow.threads=30
azkaban.executor.runtimeProps.override.eager=false
executor.port=12321

参数阐明:

executor.port:不设置就是随机值了,不方便办理,所以这儿还是固定一个端口号,看材料大部分都是运用12321这个端口,这儿也随大流
  • conf/web/azkaban.properties
azkaban.name=Test
azkaban.label=My Local Azkaban
azkaban.default.servlet.path=/index
web.resource.dir=web/
default.timezone.id=Asia/Shanghai
# default.timezone.id=America/Los_Angeles
user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=conf/azkaban-users.xml
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
velocity.dev.mode=false
jetty.use.ssl=false
jetty.maxThreads=25
jetty.port=8081
mail.sender=
mail.host=
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache
jetty.connector.stats=true
executor.connector.stats=true
database.type=mysql
mysql.port=3306
mysql.host=mysql
# mysql.host=localhost
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
azkaban.use.multiple.executors=true
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
# azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
azkaban.executorselector.comparator.Memory=1
azkaban.executorselector.comparator.LastDispatched=1
azkaban.executorselector.comparator.CpuUsage=1

参数阐明:

azkaban.executorselector.filters:调度策略
# 把MinimumFreeMemory去掉,由于MinimumFreeMemory是6G,自己电脑资源有限,假如小伙伴的机器资源雄厚,能够保留
# StaticRemainingFlowSize:依据排队的使命数来调度使命到哪台executor机器
# CpuStatus:跟据cpu空闲状况来调度使命到哪台executor机器

5)发动脚本 bootstrap.sh

#!/usr/bin/env sh
wait_for() {
    echo Waiting for $1 to listen on $2...
    while ! nc -z $1 $2; do echo waiting...; sleep 1s; done
}
startAzkaban() {
   node_type=$1
   if [ "$node_type" = "exec" ];then
      {
        activateAzkabanExec
      }&
      # 【留意】需要切到exec目录下发动,由于装备文件装备的是相对途径
      cd ${AZKABAN_HOME}/azkaban-exec-server
      ./bin/internal/internal-start-executor.sh 2>&1 |tee -a executorServerLog__`date +%F+%T`.out
      # 激活 exec
      activateAzkabanExec
   elif [ "$node_type" = "web" ];then
      first_exec=$2
      exec_port=$3
      wait_for $first_exec $exec_port
      # 【留意】需要切到web目录下发动,由于装备文件装备的是相对途径
      cd ${AZKABAN_HOME}/azkaban-web-server
      ./bin/internal/internal-start-web.sh 2>&1 |tee -a webServerLog_`date +%F+%T`.out
   fi
}
# 激活 exec
activateAzkabanExec(){
  until netstat -ntlp|grep -q :12321; do echo waiting for azkaban-exec; sleep 1; done
  curl -G "`hostname`:12321/executor?action=activate" && echo
}
startAzkaban $@

留意先发动exec,再发动web,否则web会报No active executors found的异常信息。

6)构建镜像 Dockerfile

FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos-jdk:7.7.1908
# 添加 azkaban 包
ENV AZKABAN_VERSION 3.91.0-313
RUN mkdir /opt/apache/azkaban-${AZKABAN_VERSION}
ADD ./azkaban-${AZKABAN_VERSION}/azkaban-db.tar.gz /opt/apache/azkaban-${AZKABAN_VERSION}/
ADD ./azkaban-${AZKABAN_VERSION}/azkaban-exec-server.tar.gz /opt/apache/azkaban-${AZKABAN_VERSION}/
ADD ./azkaban-${AZKABAN_VERSION}/azkaban-web-server.tar.gz /opt/apache/azkaban-${AZKABAN_VERSION}/
ENV AZKABAN_HOME /opt/apache/azkaban
RUN ln -s /opt/apache/azkaban-${AZKABAN_VERSION} $AZKABAN_HOME
# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh
RUN chown -R hadoop:hadoop /opt/apache
WORKDIR $AZKABAN_HOME

开端构建镜像

docker build -t registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/azkaban:3.91.0-313 . --no-cache
# 为了方便小伙伴下载即可运用,我这儿将镜像文件推送到阿里云的镜像仓库
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/azkaban:3.91.0-313
### 参数解释
# -t:指定镜像称号
# . :当前目录Dockerfile
# -f:指定Dockerfile途径
#  --no-cache:不缓存

7)编列 docker-compose.yaml

version: '3'
services:
  azkaban-web:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/azkaban:3.91.0-313
    user: "hadoop:hadoop"
    container_name: azkaban-web
    hostname: azkaban-web
    restart: always
    privileged: true
    depends_on:
      - azkaban-exec
    env_file:
      - .env
    volumes:
      - ./conf/web/azkaban.properties:${AZKABAN_HOME}/azkaban-web-server/conf/azkaban.properties
    ports:
      - "${AZKABAN_WEB_PORT}"
    command: ["sh","-c","/opt/apache/bootstrap.sh web azkaban-azkaban-exec-1 ${AZKABAN_EXEC_PORT}"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :${AZKABAN_WEB_PORT} || exit 1"]
      interval: 10s
      timeout: 20s
      retries: 3
  azkaban-exec:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/azkaban:3.91.0-313
    user: "hadoop:hadoop"
    restart: always
    privileged: true
    deploy:
      replicas: ${AZKABAN_EXEC_REPLICAS}
    env_file:
      - .env
    volumes:
      - ./conf/exec/azkaban.properties:${AZKABAN_HOME}/azkaban-exec-server/conf/azkaban.properties
    expose:
      - "${AZKABAN_EXEC_PORT}"
    command: ["sh","-c","/opt/apache/bootstrap.sh exec"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :${AZKABAN_EXEC_PORT} || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 3
# 衔接外部网络
networks:
  hadoop-network:
    external: true

.env 文件内容

AZKABAN_HOME=/opt/apache/azkaban
AZKABAN_EXEC_REPLICAS=2
AZKABAN_WEB_PORT=8081
AZKABAN_EXEC_PORT=12321

8)开端布置

# --project-name指定项目称号,默认是当前目录称号
docker-compose --project-name=azkaban -f docker-compose.yaml up -d
# 查看
docker-compose --project-name=azkaban -f docker-compose.yaml ps
# 卸载
docker-compose --project-name=azkaban -f docker-compose.yaml down

六、简略测试验证

# 查看对外端口
docker-compose -p=azkaban ps

web 访问:http://ip:port 账号/暗码:azkaban/azkaban

通过 docker-compose 快速部署 Azkaban 保姆级教程

七、常用的 Azkaban 客户端指令

1)服务启停

### 1、azkaban-exec-server
# azkaban-exec-server 发动(前台)
cd ${AZKABAN_HOME}/azkaban-exec-server
./bin/internal/internal-start-executor.sh
# 后台发动
cd ${AZKABAN_HOME}/azkaban-exec-server
./bin/start-exec.sh
# 重启
cd ${AZKABAN_HOME}/azkaban-exec-server
./bin/shutdown-exec.sh && ./bin/start-exec.sh
### 2、azkaban-web-server
# azkaban-web-server 发动(前台)
cd ${AZKABAN_HOME}/azkaban-web-server
./bin/internal/internal-start-web.sh
# 后台发动
cd ${AZKABAN_HOME}/azkaban-web-server
./bin/start-web.sh
# 重启
cd ${AZKABAN_HOME}/azkaban-web-server
./bin/shutdown-web.sh && ./bin/start-web.sh

2)azkaban exec 节点激活

curl -G "${exec_ip}:12321/executor?action=activate" && echo

3)其它常用api接口

以下是一些Apache Azkaban中常用的API接口:

1、创立项目

URL:/manager?action=create
办法:POST
参数:name(项目称号)、description(项目描绘)

2、上传项目ZIP文件

URL:/manager?action=upload
办法:POST
参数:file(ZIP文件)

3、获取项目流列表

URL:/manager?action=fetchprojectflows
办法:GET
参数:project(项目称号)

4、履行项目流

URL:/executor
办法:POST
参数:ajax(固定值executeFlow)、project(项目称号)、flow(流称号)

5、取消履行项目流

URL:/executor
办法:POST
参数:ajax(固定值cancelFlow)、execid(履行ID)

6、获取履行状况

URL:/executor
办法:GET
参数:ajax(固定值fetchexecflow)、execid(履行ID)

7、获取履行日志

URL:/executor
办法:GET
参数:ajax(固定值fetchExecJobLogs)、execid(履行ID)、jobId(使命ID)、offset(偏移量)、length(日志长度)

8、获取履行陈述

URL:/executor
办法:GET
参数:ajax(固定值fetchexecflow)&execid(履行ID)

这些API接口能够经过发送HTTP请求与Azkaban的Web服务器进行交互,履行项目的创立、上传、履行以及获取履行状况、日志和陈述等操作。能够依据具体需求运用适当的API接口来办理和调度Azkaban中的作业流。

以下是运用Python示例代码来演示怎么运用Azkaban的API接口:

import requests
# 创立项目
def create_project(project_name, description):
    url = "http://localhost:8081/manager"
    params = {
        "action": "create",
        "name": project_name,
        "description": description
    }
    response = requests.post(url, params=params)
    print(response.text)
# 上传项目ZIP文件
def upload_project_zip(project_name, zip_file_path):
    url = "http://localhost:8081/manager"
    files = {
        "file": open(zip_file_path, "rb")
    }
    params = {
        "action": "upload",
        "project": project_name
    }
    response = requests.post(url, params=params, files=files)
    print(response.text)
# 履行项目流
def execute_flow(project_name, flow_name):
    url = "http://localhost:8081/executor"
    params = {
        "ajax": "executeFlow",
        "project": project_name,
        "flow": flow_name
    }
    response = requests.post(url, params=params)
    print(response.text)
# 获取履行状况
def get_execution_status(execution_id):
    url = f"http://localhost:8081/executor?ajax=fetchexecflow&execid={execution_id}"
    response = requests.get(url)
    print(response.text)
# 获取履行日志
def get_execution_logs(execution_id, job_id, offset, length):
    url = f"http://localhost:8081/executor?ajax=fetchExecJobLogs&execid={execution_id}&jobId={job_id}&offset={offset}&length={length}"
    response = requests.get(url)
    print(response.text)
# 获取履行陈述
def get_execution_report(execution_id):
    url = f"http://localhost:8081/executor?ajax=fetchexecflow&execid={execution_id}"
    response = requests.get(url)
    print(response.text)
# 示例用法
create_project("my_project", "My Azkaban project")
upload_project_zip("my_project", "path/to/project.zip")
execute_flow("my_project", "my_flow")
get_execution_status("12345")
get_execution_logs("12345", "my_job", 0, 100)
get_execution_report("12345")

到此 经过 docker-compose 快速布置 Azkaban 保姆级教程就完结了,有任何疑问欢迎重视我大众号【大数据与云原生技能共享】加群交流或私信交流,如本篇文章对您有所协助,费事帮忙一键三连(点赞、转发、收藏)~~

通过 docker-compose 快速部署 Azkaban 保姆级教程