Elasticsearch Notes: Difference between revisions

From Federal Burro of Information
Jump to navigationJump to search
 
(16 intermediate revisions by the same user not shown)
Line 50: Line 50:


  curl -XGET "http://localhost:9200/test-index/_mapping" | jsonlint > mapping
  curl -XGET "http://localhost:9200/test-index/_mapping" | jsonlint > mapping
== More advanced delete ==
Examine the index:
GET _cat/indices/myindex-2021.08.10
open the index ( to ensure that any clsoed ness is removecd )
POST /myindex-2021.08.10/_open
Block writes:
PUT /myindex-2021.08.10/_settings
{"settings": {"index.blocks.write": "true"}}
Clone the index to a copy:
POST /myindex-2021.08.10/_clone/myindex-2021.08.10-copy{"settings": {"index.blocks.write": null}}
Wait for the copy to go green:
GET /_cluster/health/myindex-2021.08.10-copy?wait_for_status=green&timeout=180s
Remove the old index:
DELETE /myindex-2021.08.10


== pattern of data import ==
== pattern of data import ==
Line 153: Line 180:


https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
== Example Filters ==
kibana can let you make simple filters , but you probably want more.
between X and Y
<pre>
{
    "query": {
        "range" : {
            "price" : {
                "gte" : 1000,
                "lte" : 2000
            }
        }
    }
}
</pre>
none zero entries:
<pre>
{
  "query": {
    "range": {
      "timeTakenInSeconds": {
        "gt": 0
      }
    }
  }
}
</pre>
Includes any of these (Iranian Airports):
<pre>
{
  "query": {
    "bool": {
      "should": [
{"match_phrase": {"SearchCriteria.FromCode": "ABD"}},
{"match_phrase": {"SearchCriteria.FromCode": "ACP"}},
...
{"match_phrase": {"SearchCriteria.FromCode": "ZAH"}},
{"match_phrase": {"SearchCriteria.FromCode": "ZBR"}}
      ],
      "minimum_should_match": 1
    }
  }
}
</pre>
== Example Queries ==
search():
python
<pre>
res  = es.search(index=theindex, size=mysize, query={
    "bool" : {
        "must" : [
        { "exists": { "field": "my.field.name"}},
        { "range" : { "@timestamp" : { "gte" : "2022-04-01T00:00:00"}}},
        { "range" : { "@timestamp" : { "lt"  : "2022-04-30T00:00:00"}}}
        ] 
  } 
})
</pre>
alternate:
<pre>
res  = es.search(index=theindex, size=mysize, query={
    "bool" : {
        "must" : [
        { "exists": { "field": "my.field.name"}},
        { "range" : { "@timestamp" : {
                                    "gte" : "2022-04-01T00:00:00",
                                    "lt"  : "2022-04-30T00:00:00"
                                    } 
                    } 
        } 
        ] 
  } 
})
</pre>
scan , for when there are many hits to scroll over.
<pre>
if client != None:
    logging.info("Client is not none")
    query = {"query": {
          "bool" : {
            "must" : [
              { "exists": { "field": "From"}},
              { "exists": { "field": "To"}},
              { "range" : { "@timestamp" : {
                                            "gte" : gte,
                                            "lt"  : lt
                                            } 
                          } 
              } 
            ] 
          } 
        }
    }
    # call the helpers library's scan() method to scroll
    resp = helpers.scan(
        client,
        query=query,
        # query={"match_all": {}},
        index = 'log-*',
        size = mysize
    ) 
    for hit in resp:
      # print("{}".format(hit['_id']))
      print("{} {}".format(hit['_source']['From'], hit['_source']['To']))
</pre>


== example searches ==
== example searches ==
Line 227: Line 374:
       "zkb.totalValue": {
       "zkb.totalValue": {
         "lt": 200000000
         "lt": 200000000
      }
    }
  }
}
</pre>
using a script to make an aggregate, in this case dynamic length of field:
<pre>
GET /my-index-000001/_search?size=0
{
  "runtime_mappings": {
    "message.length": {
      "type": "long",
      "script": "emit(doc['message.keyword'].value.length())"
    }
  },
  "aggs": {
    "message_length": {
      "histogram": {
        "interval": 10,
        "field": "message.length"
       }
       }
     }
     }
Line 240: Line 409:


https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-filters-aggregation.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-filters-aggregation.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html
what are the possible values in this field?
<pre>
GET /myindex-pattern-*/_search
{
  "aggs": {
    "keys": {
      "terms": {
        "field": "my.field"
      }
    }
  },
  "size": 0
}
</pre>
== Wildcard queries in field name ==
ref: https://stackoverflow.com/questions/31433032/wildcard-queries-in-field-name
<pre>
{
  "query": {
    "query_string": {
      "fields": [
        "fieldname*"
      ],
      "query": "value"
    }
  }
}
</pre>


== moving data between indexes ==
== moving data between indexes ==
Line 363: Line 568:
ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log
ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log
</pre>
</pre>
== Performance ==
https://www.elastic.co/blog/advanced-tuning-finding-and-fixing-slow-elasticsearch-queries
GET /_cat/thread_pool/search?v&h=node_name,name,active,rejected,completed


== Network ==
== Network ==
Line 383: Line 594:
  sudo pip install elasticsearch
  sudo pip install elasticsearch
  sudo pip install certifi
  sudo pip install certifi
=== last 5 minute period ===
<pre>
now = datetime.now()
now_epoch = now.timestamp()
mymod = now_epoch % 300
nearest5min = now_epoch - mymod
prior5min = nearest5min - 300
nearest5min_date = datetime.fromtimestamp( nearest5min )
prior5min_date  = datetime.fromtimestamp( prior5min )
nearest5minstr = nearest5min_date.strftime("%Y-%m-%dT%H:%M:%S")
prior5minstr  = prior5min_date.strftime("%Y-%m-%dT%H:%M:%S")
res  = es.search(index="filebeat-*", size=mysize, query={
    "bool" : {
        "must" : [
        { "match":  { "metrics_of_note" : "0"}},
        { "range" : { "@timestamp" : {
                                    "gte" : prior5minstr,
                                    "lt"  : nearest5min_date,
                                    } 
                    } 
        } 
        ], 
  } 
})
</pre>
last 24h:
yesterday:
=== script collection of aggregats ===
<pre>
from elasticsearch import Elasticsearch, helpers, exceptions
from elasticsearch import logger as elasticsearch_logger
import pprint
import json
import logging
from datetime import datetime
# logging.info(Elasticsearch.__version__)
pp = pprint.PrettyPrinter(indent=4,compact=True)
elasticsearch_logger.setLevel(logging.DEBUG)
client = Elasticsearch( "XXX" )
mysize = 5
bucketsizeminutes = 1
try:
    client_info = Elasticsearch.info(client)
    # pp.pprint(client_info)
except exceptions.ConnectionError as err:
    logging.error(pp.pformat(client_info))
    logging.error('Elasticsearch client error: {}'.format(err))
    client = None
# "gte" : "2022-06-27T23:38:00",
# "lt"  : "2022-06-27T23:52:00"
now = datetime.now()
now_epoch = now.timestamp()
mymod = now_epoch % 300
nearest5min = now_epoch - mymod
prior5min = nearest5min - 300
#print("Now is          {}".format(now))
#print("Nearest 5 min is {}".format(datetime.fromtimestamp( nearest5min ) ))
#print("Priot  5 min is {}".format(datetime.fromtimestamp( prior5min ) ))
gte = datetime.fromtimestamp( prior5min )
lt  = datetime.fromtimestamp( nearest5min )
summary = {}
froms = {}
tos = {}
print ("gte {} lt {}".format(gte,lt))
if client != None:
    logging.info("Client is not none")
    mybody = {
  "aggs": {
"timeouts_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1m"
      }
    }
  },
  "query": {
    "bool": {
      "must": [
          { "range"      : { "@timestamp" : { "gte" : gte, "lt"  : lt } } },
          { "match_phrase": { "Exception.FullName": "System.TimeoutException"} }
      ]
    }
  }
}
    # use scan when you want the searches results , or hits, must use "search" for aggs.
    # resp = helpers.scan(
    resp = client.search (
        body=mybody,
        # query={"match_all": {}},
        index = 'efoe-logs-*',
        # size = 0
    )
    for agg in resp['aggregations']:
        for bucket in resp['aggregations'][agg]['buckets']:
            # pp.pprint(bucket)
            print("{} {} {}".format(bucket['key'],bucket['key_as_string'],bucket['doc_count']))
</pre>


== to read ==
== to read ==
Line 390: Line 722:
* https://www.elastic.co/blog/elasticsearch-storage-the-true-story
* https://www.elastic.co/blog/elasticsearch-storage-the-true-story


== also see ==
== Also See ==


* [[Logstash Notes]]
* [[Logstash Notes]]
* [[Kibana Notes]]
* [[Kibana Notes]]

Latest revision as of 02:04, 14 October 2024

how to secure

Tough, use an app proxy to be sure. For now: local access only. Not designed with security in mind.

to file /etc/elasticsearch/elasticsearch.yml added to the end

script.disable_dynamic: true


quick stuff

HEAD is no longer used, instead use kibana, which is it's own service.

elasticsearch-head and elastic search plugin ( https://github.com/mobz/elasticsearch-head )

_search?search_type=count

{
 "aggs" : {
  "all_users": {
   "terms": {
    "field": "screen_name"
   }
  }
 }
}

list indexes and summary:

curl 'localhost:9200/_cat/indices?v'

show health

curl 'localhost:9200/_cat/health?v'

list nodes:

curl 'localhost:9200/_cat/nodes?v'

delete an index

curl -XDELETE 'http://localhost:9200/twitterindex_v2/'

created an index with mappings from a file:

curl -XPUT localhost:9200/twitterindex_v2 -T 'mapping.1'

get the mappoings for an index

curl -XGET "http://localhost:9200/test-index/_mapping" | jsonlint > mapping

More advanced delete

Examine the index:

GET _cat/indices/myindex-2021.08.10

open the index ( to ensure that any clsoed ness is removecd )

POST /myindex-2021.08.10/_open

Block writes:

PUT /myindex-2021.08.10/_settings
{"settings": {"index.blocks.write": "true"}}

Clone the index to a copy:

POST /myindex-2021.08.10/_clone/myindex-2021.08.10-copy{"settings": {"index.blocks.write": null}}

Wait for the copy to go green:

GET /_cluster/health/myindex-2021.08.10-copy?wait_for_status=green&timeout=180s

Remove the old index:

DELETE /myindex-2021.08.10

pattern of data import

  1. import data
  2. dump mapping
  3. edit mapping
  4. create new index with new mapping
  5. import data again.


Explicitly mapping date fields

from: http://joelabrahamsson.com/dynamic-mappings-and-dates-in-elasticsearch/

curl -XPUT "http://localhost:9200/myindex" -d'
{
   "mappings": {
      "tweet": {
         "date_detection": false,
         "properties": {
             "postDate": {
                 "type": "date"
             }
         }
      }
   }
}'
curl -XPUT 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4' -H 'Content-Type: application/json' -d'
{
    "container" : {
    "_timestamp" : {"enabled": true, "type":"date", "format": "epoch_second", "store":true, "path" : "timestamp"}
    },
  "mappings": {
    "sensordata": {
        "properties": {
            "temperature": {
                "type": "float"
            },
            "humidity": {
                "type": "float"
            },
            "timestamp": {
                "type": "date"
            }
        }
    }
  }
}
'


curl -XGET 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4/_mapping' | python -m json.tool


curl -XGET 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4/sensordata/_search' | python -m json.tool

Changing mappings

so you don't like the data mapping and you want to change it:

first dump the existing mapping to a file:

curl -XGET 'http://localhost:9200/fitstat_v1/_mapping' | python -m json.tool > fitstat_v1_mapping

then copy that mapping to the new version:

cp fitstat_v1_mapping fitstat_v2_mapping

edit the new mapping, for example adding "type": "nested", to you nested objects.

then create a new index specifying the new mapping:

curl -XPUT 'http://localhost:9200/fitstat_v2' -d @fitstat_v2_mapping

next: extractin from old, puting into new and nuking old.

... FIXME

backup

from: https://www.elastic.co/guide/en/elasticsearch/guide/current/backing-up-your-cluster.html

add to the end of /etc/elasticsearch/elasticsearch.yml :

path.repo: ["/mnt/freenas/dataset_elasticsearch/backup"]
root@keres /mnt/freenas/dataset_elasticsearch/backup # curl -XPUT "http://localhost:9200/_snapshot/freenas_backup" -d'
{
    "type": "fs",
    "settings": {
        "location": "/mnt/freenas/dataset_elasticsearch/backup"
    }
}'


https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html

Example Filters

kibana can let you make simple filters , but you probably want more.

between X and Y

{
    "query": {
        "range" : {
            "price" : {
                "gte" : 1000,
                "lte" : 2000
            }
        }
    }
}

none zero entries:

{
  "query": {
    "range": {
      "timeTakenInSeconds": {
        "gt": 0
      }
    }
  }
}

Includes any of these (Iranian Airports):

{
  "query": {
    "bool": {
      "should": [
{"match_phrase": {"SearchCriteria.FromCode": "ABD"}},
{"match_phrase": {"SearchCriteria.FromCode": "ACP"}},
...
{"match_phrase": {"SearchCriteria.FromCode": "ZAH"}},
{"match_phrase": {"SearchCriteria.FromCode": "ZBR"}}
      ],
      "minimum_should_match": 1
    }
  }
}

Example Queries

search():

python

res   = es.search(index=theindex, size=mysize, query={
    "bool" : { 
        "must" : [ 
        { "exists": { "field": "my.field.name"}},
        { "range" : { "@timestamp" : { "gte" : "2022-04-01T00:00:00"}}},
        { "range" : { "@timestamp" : { "lt"  : "2022-04-30T00:00:00"}}}
        ]   
   }   
 })

alternate:

res   = es.search(index=theindex, size=mysize, query={
    "bool" : { 
        "must" : [ 
        { "exists": { "field": "my.field.name"}},
        { "range" : { "@timestamp" : { 
                                     "gte" : "2022-04-01T00:00:00",
                                     "lt"  : "2022-04-30T00:00:00"
                                     }   
                    }   
        }   
        ]   
   }   
 })

scan , for when there are many hits to scroll over.

if client != None:
    logging.info("Client is not none")

    query = {"query": { 
          "bool" : { 
            "must" : [ 
              { "exists": { "field": "From"}},
              { "exists": { "field": "To"}},
              { "range" : { "@timestamp" : { 
                                            "gte" : gte,
                                            "lt"  : lt
                                            }   
                          }   
              }   
            ]   
          }   
        }
    }

    # call the helpers library's scan() method to scroll
    resp = helpers.scan(
        client,
        query=query,
        # query={"match_all": {}},
        index = 'log-*',
        size = mysize
    )   

    for hit in resp:
      # print("{}".format(hit['_id']))
      print("{} {}".format(hit['_source']['From'], hit['_source']['To']))

example searches

{
  "query": { "match_all": {} }
}


{
  "query": { "match": { "filter_level": "low" } }
}
{
  "query": { "match": { "source": "iPad" } },
   "_source": [ "source" , "text"]
}
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "source"
      }
    }
  }
}

"size": 0, - print agg only and not hits. PERFORMANCE!!

{
  "fields": [],
  "sort": [
    {
      "zkb.totalValue": {
        "order": "asc"
      }
    },
    "_score"
  ],
  "query": {
    "range": {
      "zkb.totalValue": {
        "lt": 200000000
      }
    }
  }
}
{
"fields" : [
    "victim.shipTypeID" ,
    "victim.corporationName",
    "victim.characterID" ,
    "victim.characterName"],
"sort" : [
        { "zkb.totalValue" : {"order" : "asc"}},
        "_score"
    ],
    "query": {
    "range": {
      "zkb.totalValue": {
        "lt": 200000000
      }
    }
  }
}

using a script to make an aggregate, in this case dynamic length of field:

GET /my-index-000001/_search?size=0
{
  "runtime_mappings": {
    "message.length": {
      "type": "long",
      "script": "emit(doc['message.keyword'].value.length())"
    }
  },
  "aggs": {
    "message_length": {
      "histogram": {
        "interval": 10,
        "field": "message.length"
      }
    }
  }
}

changing-mapping-with-zero-downtime

https://www.elastic.co/blog/changing-mapping-with-zero-downtime

aggregates

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-filters-aggregation.html

https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

what are the possible values in this field?

GET /myindex-pattern-*/_search
{
  "aggs": {
    "keys": {
      "terms": {
        "field": "my.field"
      }
    }
  },
  "size": 0
}

Wildcard queries in field name

ref: https://stackoverflow.com/questions/31433032/wildcard-queries-in-field-name

{
  "query": {
    "query_string": {
      "fields": [
        "fieldname*"
      ],
      "query": "value"
    }
  }
}

moving data between indexes

Use ElasticDump ( https://www.npmjs.com/package/elasticdump )

1) yum install epel-release

2) yum install nodejs

3) yum install nodejs npm

4) npm install elasticdump

5) cd node_modules/elasticdump/bin

6)

./elasticdump \
  --input=http://192.168.1.1:9200/original \
  --output=http://192.168.1.2:9200/newCopy \
  --type=data
elasticdump \
  --input=http://localhost:9700/.kibana \
  --output=http://localhost:9700/.kibana_read_only \
  --type=mapping

elasticdump \
  --input=http://localhost:9700/.kibana \
  --output=http://localhost:9700/.kibana_read_only \
  --type=data

Dumping to a file

In this example I dump my AWS Elasticsearch cluster to a file.

it's one index with 20k records, not huge.

time /home/david/node_modules/.bin/elasticdump \
  --input=https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v5 \
  --output=/mnt/freenas/dataset_elasticsearch/iotworld_v5/iotworld_v5_mapping.json \
  --type=mapping
time /home/david/node_modules/.bin/elasticdump \
  --input=https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v5 \
  --output=/mnt/freenas/dataset_elasticsearch/iotworld_v5/iotworld_v5.json \
  --type=data

Disk full -> readonly lock

If the disk fills up the indexes will got into "read-only" mode.

reset it like this:

curl -X PUT http://${HOST}:9200/.kibana/_settings -d '
{
"index": {
"blocks": {
"read_only_allow_delete": "false"
}
}
}' -H'Content-Type: application/json'

and you will get back if it worked:

{"acknowledged":true}

Trim data

#!/bin/sh

export HOST=es.staging.thecarrotlab.com
for i in `curl -s -XGET "http://es.staging.thecarrotlab.com:9200/_cat/indices?v" | grep logsta | sort -k 3 -n -r | awk '{print $3}' | tail -n +32`
do
echo $i

curl -XDELETE "http://es.staging.thecarrotlab.com:9200/$i"
echo

done

clean up old logstash indexes 32 days old +

#!/bin/sh
export HOST=servername
for i in `curl -s -XGET "http://$HOST:9200/_cat/indices?v" | grep logsta | sort -k 3 -n -r | awk '{print $3}' | tail -n +32`
do
echo $i
echo curl -XDELETE "http://$HOST:9200/$i"
done

serverconfig notes

stuff I've added to my default config:

# for backups
path.repo: ["/mnt/freenas/dataset_elasticsearch/backup"]
# to disallow remote code execution
script.disable_dynamic: true


/etc/sysconfig/sysconfig/elasticsearch ( grep -v ^# )

DATA_DIR=/data/elasticsearch/data
LOG_DIR=/data/elasticsearch/log
WORK_DIR=data/elasticsearch/tmp
ES_HEAP_SIZE=2g
ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log

Performance

https://www.elastic.co/blog/advanced-tuning-finding-and-fixing-slow-elasticsearch-queries

GET /_cat/thread_pool/search?v&h=node_name,name,active,rejected,completed

Network

/etc/services updated:

$ grep 9200 /etc/services
elasticsearch-rest 9200/tcp             # elasticsearch-restful api
#wap-wsp         9200/tcp                # WAP connectionless session service
wap-wsp         9200/udp                # WAP connectionless session service
$ grep 9300 /etc/services
elasticsearch-transport 9300/tcp        # elasticsearch-transpost
# vrace           9300/tcp                # Virtual Racing Service
vrace           9300/udp                # Virtual Racing Service

python es

sudo pip install elasticsearch
sudo pip install certifi

last 5 minute period

now = datetime.now()
now_epoch = now.timestamp()
mymod = now_epoch % 300 

nearest5min = now_epoch - mymod
prior5min = nearest5min - 300 

nearest5min_date = datetime.fromtimestamp( nearest5min )
prior5min_date   = datetime.fromtimestamp( prior5min )

nearest5minstr = nearest5min_date.strftime("%Y-%m-%dT%H:%M:%S")
prior5minstr   = prior5min_date.strftime("%Y-%m-%dT%H:%M:%S")

res   = es.search(index="filebeat-*", size=mysize, query={
    "bool" : { 
        "must" : [ 
        { "match":  { "metrics_of_note" : "0"}},
        { "range" : { "@timestamp" : { 
                                     "gte" : prior5minstr,
                                     "lt"  : nearest5min_date,
                                     }   
                    }   
        }   
        ],  
   }   
 })

last 24h:

yesterday:

script collection of aggregats

from elasticsearch import Elasticsearch, helpers, exceptions
from elasticsearch import logger as elasticsearch_logger
import pprint
import json
import logging
from datetime import datetime

# logging.info(Elasticsearch.__version__)

pp = pprint.PrettyPrinter(indent=4,compact=True)
elasticsearch_logger.setLevel(logging.DEBUG)
client = Elasticsearch( "XXX" )
mysize = 5 
bucketsizeminutes = 1

try:
    client_info = Elasticsearch.info(client)
    # pp.pprint(client_info)

except exceptions.ConnectionError as err:
    logging.error(pp.pformat(client_info))
    logging.error('Elasticsearch client error: {}'.format(err))
    client = None

# "gte" : "2022-06-27T23:38:00",
# "lt"  : "2022-06-27T23:52:00"

now = datetime.now()
now_epoch = now.timestamp()
mymod = now_epoch % 300

nearest5min = now_epoch - mymod
prior5min = nearest5min - 300

#print("Now is           {}".format(now))
#print("Nearest 5 min is {}".format(datetime.fromtimestamp( nearest5min ) ))
#print("Priot   5 min is {}".format(datetime.fromtimestamp( prior5min ) ))

gte = datetime.fromtimestamp( prior5min )
lt  = datetime.fromtimestamp( nearest5min )

summary = {}
froms = {}
tos = {}

print ("gte {} lt {}".format(gte,lt))

if client != None:
    logging.info("Client is not none")
    mybody = {
  "aggs": {
"timeouts_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1m"
      }
    }
  },
  "query": {
    "bool": {
      "must": [
          { "range"       : { "@timestamp" : { "gte" : gte, "lt"  : lt } } },
          { "match_phrase": { "Exception.FullName": "System.TimeoutException"} }
      ]
    }
  }
}

    # use scan when you want the searches results , or hits, must use "search" for aggs.
    # resp = helpers.scan(
    resp = client.search (
        body=mybody,
        # query={"match_all": {}},
        index = 'efoe-logs-*',
        # size = 0
    )

    for agg in resp['aggregations']:
        for bucket in resp['aggregations'][agg]['buckets']:
            # pp.pprint(bucket)
            print("{} {} {}".format(bucket['key'],bucket['key_as_string'],bucket['doc_count']))

to read

Also See