Displaying ECS Fargate logs in Airflow UI

In one of my projects I am working on, I'm using AWS Elastic Container Service to run Fargate tasks. Why? I have jobs that I need to run, so it's quite easy to pack into a Docker container and deploy it on Fargate and pay just for used resources. I'm using Airflow to schedule those tasks. Since Airflow runs tasks on Fargate, all job logs are created within the Fargate task (Docker container). Fargate, fortunately, supports streaming logs into CloudWatch, but if you have a dozen of log lines, going through it in CloudWatch is not super friendly (online reading and pagination for example). So luckily Airflow ECS Operator supports displaying those logs from the Fargate task within Airflow UI which for me is better and useful since I have everything in one place and I don't have to go CloudWatch to investigate. I had issues configuring Airflow/Fargate to display logs so I want to describe my approach and my understanding. Usually when there is an issue of displaying logs from CloudWatch you get the exception in Airflow logs: 

 An error occurred (ResourceNotFoundException) when calling the GetLogEvents operation: The specified log group does not exist.

and an Airflow task is marked as failed. My issue was to set correctly log configuration so Airflow can read it (which was probably caused by a lack of knowledge of how CloudWatch log work)

1. Setting Fargate task

When setting a Fargate task and Container section there are Log Configuration options, I check "Auto-configure CloudWatch Logs" and then everything is predefined, of course, it's possible to set custom values.

 Fargate logs settings

With these settings, a Log group in Cloud Watch is created automatically afterward.

In my case, both container name and Fargate task name are "fargate_logging".

2. Setting Airflow Task.

in Airflow ECSOperator I'm setting logs with these values:

awslogs_group is "/ecs/fargate_logging"

awslogs_stream is "ecs/fargate_logging" (without "/" at the start).

Task could look something like this (not full configuration):

task = ECSOperator(
    dag=dag,
    task_id=task_id,
    task_definition=task_definition,
    cluster=cluster,
    region_name=region_name,
    launch_type=launch_type,
    overrides = {},
    awslogs_group='/ecs/fargate_logging',
    awslogs_stream_prefix='ecs/fargate_logging'
)

 

3. How it works (under the hood):

Check in the CloudWatch if Log group is created and that you actually have logs there:

CloudWatch Fargate logs

 

We see that the Logs stream (name) consists of "ecs/fargate_logging" + some number 

After the Fargate task is executed, Airflow ECS Operator is reading CluodWatch logs and streams as Airflow logs, this is the core part. We see that it's parsing task_id and together with awslogs_stream_prefix it's putting together stream_name. Task_id is the id of the Fargate task which was executed.

self.log.info('ECS Task logs output:')
task_id = self.arn.split("/")[-1]
stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)
self.log.info("aws log group {}".format(self.awslogs_group))
self.log.info("stream name: {}".format(stream_name))
self.log.info("task_id: {}".format(task_id))
for event in self.get_logs_hook().get_log_events(self.awslogs_group, stream_name):
    dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
    self.log.info("[{}] {}".format(dt.isoformat(), event['message']))

I added log lines to see exactly what goes as a stream_name after I set values. This is what I got in Airflow logs UI:

[2021-03-31 07:23:53,773] {ecs_operator.py:172} INFO - ECS Task logs output:
[2021-03-31 07:23:53,774] {ecs_operator.py:175} INFO - aws log group /ecs/fargate_logging
[2021-03-31 07:23:53,774] {ecs_operator.py:176} INFO - stream name: ecs/fargate_logging/571504e3bf504615b39d2fdbea15a987
[2021-03-31 07:23:53,774] {ecs_operator.py:177} INFO - task_id: 571504e3bf504615b39d2fdbea15a987

The main point is that awslog_group and stream_name match what is in Cloud Watch.

So for example when in CloudWatch we have a Log stream with a name: ecs/fargate_logging/571504e3bf504615b39d2fdbea15a987, awslogs_stream_prefix is "ecs/fargate_logging".

I was using the following snippet while debugging to read logs directly:

import boto3
client = boto3.client('logs')
response = client.get_log_events(
    logGroupName='/ecs/fargate_logging',
    logStreamName='ecs/fargate_logging/571504e3bf504615b39d2fdbea15a987'

)
print(response)

 

Again when configuring Airflow ECS Operator awslog_group and awslogs_stream_prefix must match to what it is in CloudWatch (not the other way around)!!!

In Airflow you don't set values how you want them to be, but how they are set in CloudWatch so you can read them correctly. When I was setting Airflow initially (without much experience with ECS) I thought that I'm setting configuration for logs how it should be (how they will be written) but it's the other way around as I emphasized already.

I hope this will help those who will have similar issues.

 

 

blog comments powered by Disqus