Airflow - Custom Mail Scheduler

Airflow - Custom Mail Scheduler

How to setup Airflow DAG scheduler to trigger Custom Mail.

Introduction

We are trying to set up an Airflow scheduler to trigger at set intervals to send a custom email to recipients. To know what is Airflow and its usage please visit the official website - Apache-Airflow

The flow of the blog would be as below.

  1. Airflow Setup on local System using Docker

  2. Airflow DAG python file configuration setup.

  3. Python SMTP code script

  4. Airflow DAG on UI setup.

I. Airflow Setup on the local system using Docker

I'm using a windows system, and since I'm using Docker for the installation on my local system, OS you are using shouldn't make any difference as you can follow along.

To know more about Docker Container and how it is used to create a specific virtual environment, visit their official website - why-docker.

To start off with the Airflow setup, we need to follow the below steps.

  1. Download and Install Docker desktop - official website.

  2. After Installing the Docker desktop, we need to create a docker-compose.yaml file. Paste the content here in that file. Please make sure to have file name and type are correct.

  3. Now we need to create an airflow directory. Go to the following path: C:/Users/<your_user>/. Inside that directory, create a folder called docker and inside of docker create another folder called airflow. Please make sure to have the file name.

  4. After creating the docker/airflow. Now we need to create the three folders inside the airflow folder,

    1. Create three folders called dags, plugins, logs and config respectively.

    2. Move your docker-compose.yaml into the airflow folder.

  5. We also need airflow.cfg, for mail setup. You can copy the text from here, airflow.cfg. Place this file in the airflow/config folder. If this looks like a lot of unknown text, below are the only lines we will be using from the file.

     [email]
     email_backend = airflow.utils.email.send_email_smtp
     email_conn_id = smtp_default
     default_email_on_retry = True
     default_email_on_failure = True
    
     [smtp]
     # If you want airflow to send emails on retries, failure, and you want to use 
     # the airflow.utils.email.send_email_smtp function, you have to configure an 
     # smtp server here 
     smtp_host = smtp.gmail.com 
     smtp_starttls = False 
     smtp_ssl = False 
     # Example: smtp_user = airflow 
     smtp_user = YOUR_EMAIL_ID
     # Example: smtp_password = airflow 
     smtp_password = YOU_RAPPLICATION_PWD
     smtp_port = 587
     smtp_mail_from = YOUR_EMAIL_ID
    
  6. If all the file structures are done correctly, it should look like the below.

     docker/
     └─airflow/
          ├─dags/
          │ └─(All the Python DAG will be stored here)
          ├─logs/
          │ └─(All the autogenerted DAG logs are stored here)
          ├─config/
          │ └─airflow.cfg
          ├─plugins/
          └─docker-compose.yaml
    
  7. Now we are ready to start our instance of Airflow in docker. We need to open a PowerShell window and go to the directory above (docker directory). Then, we need to run the following commands, make sure you docker in running before running the below commands:

     ## to start the airflow container
     $ docker-compose up airflow-init
    
     # to build the docker with configuration in compose file.
     $ docker-compose up
    
  8. After running the commands, In your Docker Desktop app, you can see a container named airflow was created inside the section Containers/Apps. This container will have 7 sub-containers inside.

  9. We have our Apache Airflow installation complete, and ready to start developing your DAGs. Go to localhost:8080, log in with user “airflow” and password “airflow” and start coding.

II. Airflow DAG python file configuration setup

Now that we are done with the Airflow instance setup, we will now start with the DAG code with default and custom configurations.

Note - All DAG python codes are placed in the airflow/dag folder.

This is straight forward template code from airflow's official website. You can refer to the DAG python code form here. Below are a few customizations that are done in code.

  1. default_args can be customised with start_date and end_date of trigger.

    Note - if you want your trigger work from the current date, always give a past date.

     # DAG structure
     default_args = {
         'owner': 'nischay',
         'depends_on_past': False,
         'start_date': datetime(2023,3,26),
         'end_date': datetime(2023,3,31),
         # sender mail id 
         'email': ['sender@mail.com'],
         'retries': 1,
         # 'retry_delay': dt.timedelta(minutes=1)
     }
    
  2. DAG configuration - changes to how it appears in UI

    • schedule_interval - this is the most important setting in this code, it sets the trigger schedule. You can set stand trigger intervals.
    dag = DAG(
        # DAG name
        dag_id='Your_DAG_name',
        default_args=default_args,
        description='DAG description',
        # 3am, 11am, and 7pm daily
        # by default it takes PST time.
        schedule_interval= '0 3,11,19 * * *',
        catchup=False,
        params={"description": ""},
        # similar to hashtags to filter on specific names.
        tags = ["Email"] )
  • A few standard examples can be found here.

  • You can build your custom schedule on this website - crontab

  1. Specific DAG to run. Here with the DAG configurations setup done, we can specify the particular python function to run in the below lines of code.

    send_email - python function to trigger a mail, which will be explained in the next section.

     with dag:
         # Task 1 - sending mail.
         send_email_task = PythonOperator(
             task_id = 'Send_Email',
             python_callable = send_email,
             dag = dag,
         )
         # DAG run order.
         send_email_task
    

III. Python SMTP code script

This would be the final piece of our code. We need to now write a function to send mail. We need to do a few steps before writing the code. We will be using a Gmail SMTP for sending mail from a Gmail account.

  1. Create an Application password for your Gmail, follow these steps.

  2. Below is the python code using smtp library.

     def send_email():
         import smtplib
         from email.mime.multipart import MIMEMultipart
         from email.mime.text import MIMEText
    
         body = ''' Hi Team, \n Please find the link attached below.
                     \n
                      Body Text
                     \n
         Regards and Thank you, \n
         Your Name
         ''' 
         msg = MIMEMultipart()
    
         msg['Subject'] = 'Airflow Alert - Dashboard Link'
         msg['From'] = "sender@gmail.com"
         # add mutpltile emails in this way - "email1, email2"
         recipients = "reciever1@gmail.com, reciever2@gmail.com"
         msg['To'] = (', ').join(recipients.split(','))
         msg.attach(MIMEText(body,'plain'))
         server = smtplib.SMTP('smtp.gmail.com', 587)
         server.starttls()
         server.login("sender@gmail.com", 'Application_password')
         server.send_message(msg)
         server.quit()
    

    The full DAG code can be found here.

IV. Airflow UI

Now that we have set up all our, we can activate our DAG in the Airflow UI. Go to localhost:8080. Below is an example snapshot of DAG named "Automated_Email_Test".

  • Tags - "email"

  • Schedule - '0 3,11,19 * * *', this will trigger #3 am, 11 am, and 7 pm daily.

You can manually trigger the DAG using the play button on the right side. That's all there is, your Mail Scheduler using Airflow is ready.

You can find all the code snippets and files in this repository. Please comment below, if you have any questions.

If you like my work and want to connect.
You are currently here! 👉 Blog

👨🏼‍💻 GitHub
👔 LinkedIn
🐥 Twitter

References

  1. https://medium.com/@garc1a0scar/how-to-start-with-apache-airflow-in-docker-windows-902674ad1bbe

  2. https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html

  3. https://stackoverflow.com/questions/64505/sending-mail-from-python-using-smtp