Intrusion detection and prevention with AWS Lambda and DynamoDB streams

April 5, 2017 by Paulina BudzoƄ

Intrusion detection system (IDS) and intrusion prevention system (IPS) tend to be expensive and complicated. In AWS, you can go for much simpler solution - WAF. But that requires you to use Application Load Balancer or CloudFront. But even with WAF, you have to manage a list IP addresses of attackers that should be blocked. Or, if you only ever need to block single IPs for short periods of time, NACLs may be a much easier option! Here’s a walkthrough on how you can implement a terribly simple (yet very powerful) intrusion detection and prevention in AWS with Lambda and DynamoDB Streams for a web application.

First things first: I’m going to show you how to block IPs which send “naughty” requests to your server using NACLs in VPC. Note that you can only ever have 20 rules in one NACLs, and that includes the default deny rules. Assuming you only have IPv4 support enabled in your VPC, default DENY + one ALLOW rule, that leaves only 18 rules to use. With IPv6 support enabled, 16 are left. That means, if you’re going to be blocking single IPs, you can only block 16 at the time. You can potentially extend the code below to deny access to whole CIDR blocks, but if you have that kind of need, I’d advise you to use WAF and use the Lambdas below to curate your IP lists for WAF. I’m using NACLs, because the server I wrote this code for used a Classic Load Balancer and there was no appetite to use CloudFront or Application Load Balancer - plus it only ever needed to block 2-3 IPs at one time.

What does the setup below accomplishes? It detects repeated requests blocked by Mod_Security. Loads to stuff is logged to Datadog Events. Such IP is stored in Dynamo with 24 hours time-to-live (TTL). If the IP causes an error within the next 24 hours, the counter next to it is increased and TTL is extended for another 24 hours (from the time of the second blocked request). If the counter reaches 10, the IP is blocked by NACL. If the IP is “quiet” for 24 hours, it will be deleted by Dynamo (TTL expires) and removed from NACL (if it was added before). Obviously, IPs blocked by NACL will become “quiet”, so they will be removed from NACL after the 24-hour ban. This means, we are capable of blocking not only IPs committing offences within a short period of time, but also those waiting up to 24 hours between attacks.

OK, let’s get to it. First of all, you’re going to need a way to find the “bad” IPs. You can send your access/error logs to CloudWatch and use a Lambda to analyze each row. The webserver I was working with, already had Mod_Security installed as a module for Apache. You can use the capability of Mod_Security to block the IPs, but: a) there’s a list of IPs that mod_security has to manage, which will be tricky if you’re running on more that one EC2 ( how would you share this database?) b) the block would come from the server, to the request would still generate load on your machine. Still, use Mod_Security to block any attack attempts that reach your server.

If you block the request before it even reaches your server, the attacher cannot launch a DOS attack (DDoS is still possible, I hope you understand the difference ;) ).

So, there’s what we did: Apache Error Logs are pushed to CloudWatch, which are streamed to a Lambda function. Lambda recognizes errors and performs various actions.

An example of a Mod_Security error log (about a request that was blocked) (modified to obscure time and addresses):

 [Mon Jan 01 01:01:01.000000 2017] [:error] [pid 1234:tid 123456789012345] [client 192.0.2.1:1234] [client 192.0.2.1] ModSecurity: Access denied with code 403 (phase 2). Operator EQ matched 0 at REQUEST_HEADERS. [file "/etc/httpd/modsecurity.d/activated_rules/modsecurity_crs_21_protocol_anomalies.conf"] [line "11"] [id "960009"] [rev "1"] [msg "Request Missing a User Agent Header"] [severity "NOTICE"] [ver "OWASP_CRS/2.2.9"] [maturity "9"] [accuracy "9"] [tag "OWASP_CRS/PROTOCOL_VIOLATION/MISSING_HEADER_UA"] [tag "WASCTC/WASC-21"] [tag "OWASP_TOP_10/A7"] [tag "PCI/6.5.10"] [hostname "192.0.2.2"] [uri "/"] [unique_id "abc"]

Lambda below analyses each error. For errors that are fairly common (empty User-agent, numeric Host header, etc.) it simply logs the attempt to Dynamo. For other errors, it also sends an event to Datadog ( you can modify this to send the info to SNS or anywhere else).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
 from __future__ import print_function
 
 import base64
 import gzip
 import json
 import os
 import re
 import time
 import urllib2
 
 import boto3
 
 IGNORE_IDS = ['960009', '960017'] # put any Mod_sec IDs that you don't want to be notified about
 dynamodb = boto3.client('dynamodb')
 
 
 def lambda_handler(event, context):
     based = base64.b64decode(event['awslogs']['data'])
     f = open("/tmp/temp.log", 'w')
     f.write(based)
     f.close()
     with gzip.open("/tmp/temp.log", "r") as f:
         content = json.loads(f.read())
 
     for log in content['logEvents']:
         matches = re.match(
             "^\[([^\]]+)] \[([^\]]+)] \[pid ([^\]]+)\:tid ([^\]]+)] (\[client ([^\]]+)] )?(\[client (?P<client>[^\]]+)] )?(?P<msg>.*)$",
             log['message'])
         if matches is None:
             print("Could not match string: " + log['message'])
             send_to_datadog("CloudWatch Logs: Could not match error string ", log['message'], "warning")
             continue
 
         error_message = matches.group('msg')
 
         if error_message.startswith("ModSecurity:"):
             # find modsecurity error id
             log_id = re.search("(\[id \"(?P<id>[^]]+)\"])", error_message)
             if log_id is None:
                 print("Could not match ModSecurity error id: " + error_message)
                 send_to_datadog("CloudWatch Logs: Could not match ModSecurity error id", error_message, "warning")
                 continue
 
             # record the ip of the client to dynamo
             record_to_dynamo(matches.group('client'))
 
             # whether to send a notification to datadog - only if not common errror
             if log_id.group('id') not in IGNORE_IDS:
                 # try to find the error message
                 log_msg = re.search("(\[msg \"(?P<msg>[^]]+)\"])", error_message)
                 if log_msg is None:
                     print("Could not match ModSecurity error message: " + error_message)
                     send_to_datadog("CloudWatch Logs: Could not match ModSecurity error message", error_message,
                                     "warning")
                     continue
 
                 # send error message to datadog
                 print("modsec error, send to datadog")
                 send_to_datadog("ModSecurity: " + log_msg.group('msg'), error_message)
 
             else:
                 print("Ignored known event, not send to datadog")
 
         else:
             # apache error, not from modsecurity - send to datadog as urgent error
             print("httpd error, send to datadog")
             send_to_datadog("CloudWatch Logs: httpd error", error_message)
 
 
 def record_to_dynamo(client):
     if client is not None:
         time_expires = int(time.time() + 24 * 3600)
         dynamodb.update_item(
             TableName='YOUR_DYNAMODB_TABLE',
             Key={
                 'client': {
                     'S': str(client)
                 }
             },
             ReturnValues='NONE',
             ReturnConsumedCapacity='NONE',
             ReturnItemCollectionMetrics='NONE',
             UpdateExpression='ADD event_count :c SET last_event = :last',
             ExpressionAttributeValues={
                 ':c': {
                     'N': '1'
                 },
                 ':last': {
                     'N': str(time_expires)
                 }
             }
         )
         print("Inserted to Dynamo: " + str(client))
 
 
 def send_to_datadog(title, text, type="error"):
     data = {
         "title": title,
         "text": text,
         "priority": "normal",
         "alert_type": type
 
     }
     url = "https://app.datadoghq.com/api/v1/events?api_key=" + os.environ['api_key']
     request = urllib2.Request(
         url,
         headers={'Content-type': 'application/json'},
         data=json.dumps(data),
 
     )
     request.get_method = lambda: 'POST'
     urllib2.urlopen(request)

Send environment variable api_key for the Lambda to your Datadog api key.

Once you have your logs in CloudWatch and the Lambda above, you can create a subscription for CloudWatch to Lambda. You can do it from the AWS Console (choose a log stream and “Stream to AWS Lambda”, use empty log format to stream the data as it is), or from CLI:

Give CloudWatch Logs permission to invoke Lambda:

aws lambda add-permission --function-name YOUR_LAMBDA_NAME --statement-id logs-httpd --principal "logs.REGION.amazonaws.com" --action "lambda:InvokeFunction" --source-arn "arn:aws:logs:REGION:ACCOUNT_ID:log-group:LOG_GROUP:*" --source-account ACCOUNT_ID

Create a subscription for logs to Lambda:

aws logs put-subscription-filter --log-group-name "LOG_GROUP" --filter-name LambdaStream_logs_http --filter-pattern "" --destination-arn "arn:aws:lambda:REGION:ACCOUNT_ID:function:YOUR_LAMBDA_NAME"

You’ll also need:

  1. DynamoDB table (put its name in YOUR_DYNAMODB_TABLE in the Lambda code) with client primary key (String).
  2. Role for Lambda to allow it to write to logs (usual role for Lambda) and perform dynamodb:UpdateItem on the Dynamo table.

Now, this Lambda will only log the IPs to Dynamo and send various notifications to Datadog. That’s the IDS part of it.

Here’s where the “magic” happens:

  1. Set the last_event key as TTL on the DynamoDB table.
  2. Create another Lambda (code to follow) and create a DynamoDB Stream (you can do it under Triggers in AWS Console for Dynamo) to stream each event from Dynamo to that new Lambda.

This will trigger your new Lambda whenever there’s an action performed against your DynamoDB Table (item is added, modified or removed). last_event is actually the time of last event + 24 hours - set as TTL for the Dynamo and it will be extended every time this IP makes another naughty request. If TTL expires, Dynamo will remove the record.

Here’s the second Lambda:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import boto3

NACL_ID = 'acl-abc123'
MAX_RULE = 100 # assuming you have an ALLOW rule for the internet, this should be it's number
client = boto3.client('ec2')


def lambda_handler(event, context):
  next_rule = 0
  for event in event['Records']:
      # block IP after 10 bad requests
      if event['eventName'] == 'MODIFY' and int(event['dynamodb']['NewImage']['event_count']['N']) >= 10:
          nacls = client.describe_network_acls(
              NetworkAclIds=[NACL_ID]
          )
          if len(nacls['NetworkAcls']) == 0:
              raise Exception("No NACLs found!")

          # find next available rule number
          for entry in nacls['NetworkAcls'][0]['Entries']:
              if entry['Egress'] == False and entry['RuleAction'] == 'deny':
                  if entry['RuleNumber'] >= MAX_RULE:
                      continue

                  if entry['RuleNumber'] > next_rule and entry['RuleNumber'] < MAX_RULE:
                      next_rule = entry['RuleNumber']

          next_rule += 1
          print("Blocking " + event['dynamodb']['NewImage']['client']['S'] + "/32 with rule " + str(next_rule))
          res = client.create_network_acl_entry(
              NetworkAclId=NACL_ID,
              RuleNumber=next_rule,
              Protocol="-1",
              RuleAction="deny",
              Egress=False,
              CidrBlock=event['dynamodb']['NewImage']['client']['S'] + "/32"
          )
      # remove IP from NACL when item is removed from Dynamo
      elif event['eventName'] == 'REMOVE':
          client_ip = event['dynamodb']['OldImage']['client']['S'] + "/32"
          print("Checking for removal: " + client_ip)
          nacls = client.describe_network_acls(
              NetworkAclIds=[NACL_ID]
          )
          if len(nacls['NetworkAcls']) == 0:
              raise Exception("No NACLs found!")

          for entry in nacls['NetworkAcls'][0]['Entries']:
              if 'CidrBlock' in entry and entry['Egress'] == False and entry['RuleAction'] == 'deny' \
                      and entry['CidrBlock'] == client_ip:
                  print("Removing rule " + str(entry['RuleNumber']))
                  response = client.delete_network_acl_entry(
                      NetworkAclId=NACL_ID,
                      RuleNumber=entry['RuleNumber'],
                      Egress=False
                  )

This, as we said, will be triggered every time there’s an update on Dynamo table. What this code does:

  1. If the row is modified (it should only ever happen from the first Lambda) and event_count is at least 10, the IP address will be added to NACL (defined at the top of the code).
  2. If the row is deleted (either manually or by Dynamo TTL), the Lambda will make sure the NACL entry for this IP is removed (if it existed).

Tip from #2 above: you can un-block an IP by simply deleting it from Dynamo.

This Lambda requires the following permissions on its role:

1
2
3
4
5
6
7
 dynamodb:DescribeStream
 dynamodb:GetRecords
 dynamodb:GetShardIterator
 dynamodb:ListStreams
 ec2:DescribeNetworkAcls
 ec2:CreateNetworkAclEntry
 ec2:DeleteNetworkAclEntry

Note: those EC2 permissions cannot be restricted to specific NACLs (sic!), so you need to set the resource to “*”.

Possible improvements that you can make:

  1. Extend the ban by performing an update on dynamo in the second Lambda - if you want to block the IP for 48, 72, … hours (this will re-trigger second Lambda and be detected as MODIFY event, so modify the code accordingly).
  2. You can obviously tweak the numbers - 24 hours and 10 attempts can be changed to whatever you want.
  3. Use WAF update-ip-set instead of NACLs for larger sets of IPs.
  4. Items with TTL in DynamoDB are not guaranteed to be deleted at the exact time TTL expires. In this use-case this is not a problem (the IP will simply be blocked for longer), but take it into account if you’re using the TTL for other things.
Posted in: AWS Security