In der klassischen on-prem IT Welt kommt es immer noch sehr häufig vor, dass cron-jobs dafür verantwortlich sind Geschäftsrelevante Aufgaben zu erfüllen. Während die Menge an Aufgaben und Systeme wächst, schrumpf die Möglichkeite all dies im Auge zu behalten. Abhilfe schaft hier Airflow, mit dessen Hilfe Dags [Direct Acyclic Graphs] erstellt werden können. DAGs können als Verkettungen von Aufgaben gesehen werden, die man mit Python beschreibt.

Ein Beispiel DAG, der Daten auf ein Samba-Share kopiert kann zum Beispiel so aussehen:

from airflow.models import DAG
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta

args = {
'owner': 'airflow',
'start_date': datetime(2018, 7, 21),
'email': ['jon.doe@doe.com'],
'email_on_failure': True,
'email_on_retry': False
}

dag = DAG(
dag_id='files2smb',
catchup=False,
default_args=args,
schedule_interval='9 11 * * *',
dagrun_timeout=timedelta(minutes=60))

ssh_conn_id = "SSH-DEST-CONN-ID"

sshHook = SSHHook(ssh_conn_id=ssh_conn_id)

srcPATH = "/your/path/to/files/"
smbauth = "-A /home/user/smb_auth"
smbdest = "//10.0.0.1/FOLDER"
xml_archive = srcPATH + "/archive/"

cmd_find = "cd " + srcPATH + " && find . -name '*.xml' -mmin -1445 -type f -printf '%f\n'"
cmd_smbclient_command = " -c 'put {} {}'"
cmd_smbclient = "smbclient " + smbauth + " " + smbdest

transfer = SSHOperator(
task_id="transfer",
command=cmd_find + " | xargs -i " + cmd_smbclient + cmd_smbclient_command,
ssh_hook=sshHook,
dag=dag)

cmd_files_copied = cmd_smbclient + " -c 'ls' 2>/dev/null|grep xml|awk '{print $1}'"
cmd_move_files = " mv {} " + xml_archive

move = SSHOperator(
task_id="move",
command="cd " + srcPATH + " && " + cmd_files_copied + "|xargs -i " + cmd_move_files + " 2&>/dev/null;exit 0",
ssh_hook=sshHook,
dag=dag)

cleanup = SSHOperator(
task_id="cleanup",
command="find " + xml_archive + " -name '*.xml' -mtime +30 -exec rm {} +",
ssh_hook=sshHook,
dag=dag)

move.set_upstream(transfer)
cleanup.set_upstream(move)

if __name__ == "__main__":
dag.cli()