Elasticsearch Notes: Difference between revisions
(10 intermediate revisions by the same user not shown) | |||
Line 181: | Line 181: | ||
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html | https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html | ||
== | == Example Filters == | ||
kibana can let you make simple filters , but you probably want more. | kibana can let you make simple filters , but you probably want more. | ||
Line 234: | Line 234: | ||
search(): | |||
python | |||
<pre> | <pre> | ||
res = es.search(index=theindex, size=mysize, query={ | |||
"bool" : { | |||
"must" : [ | |||
{ "exists": { "field": "my.field.name"}}, | |||
{ "range" : { "@timestamp" : { "gte" : "2022-04-01T00:00:00"}}}, | |||
{ "range" : { "@timestamp" : { "lt" : "2022-04-30T00:00:00"}}} | |||
] | |||
} | |||
}) | |||
</pre> | |||
alternate: | |||
<pre> | |||
res = es.search(index=theindex, size=mysize, query={ | |||
"bool" : { | |||
"must" : [ | |||
{ "exists": { "field": "my.field.name"}}, | |||
{ "range" : { "@timestamp" : { | |||
"gte" : "2022-04-01T00:00:00", | |||
"lt" : "2022-04-30T00:00:00" | |||
} | |||
} | |||
} | |||
] | |||
} | |||
}) | |||
</pre> | |||
scan , for when there are many hits to scroll over. | |||
<pre> | |||
if client != None: | |||
logging.info("Client is not none") | |||
query = {"query": { | |||
"bool" : { | |||
"must" : [ | |||
{ "exists": { "field": "From"}}, | |||
{ "exists": { "field": "To"}}, | |||
{ "range" : { "@timestamp" : { | |||
"gte" : gte, | |||
"lt" : lt | |||
} | |||
} | |||
} | |||
] | |||
} | |||
} | |||
} | |||
# call the helpers library's scan() method to scroll | |||
resp = helpers.scan( | |||
client, | |||
query=query, | |||
# query={"match_all": {}}, | |||
index = 'log-*', | |||
size = mysize | |||
) | |||
for hit in resp: | |||
# print("{}".format(hit['_id'])) | |||
print("{} {}".format(hit['_source']['From'], hit['_source']['To'])) | |||
</pre> | </pre> | ||
Line 310: | Line 374: | ||
"zkb.totalValue": { | "zkb.totalValue": { | ||
"lt": 200000000 | "lt": 200000000 | ||
} | |||
} | |||
} | |||
} | |||
</pre> | |||
using a script to make an aggregate, in this case dynamic length of field: | |||
<pre> | |||
GET /my-index-000001/_search?size=0 | |||
{ | |||
"runtime_mappings": { | |||
"message.length": { | |||
"type": "long", | |||
"script": "emit(doc['message.keyword'].value.length())" | |||
} | |||
}, | |||
"aggs": { | |||
"message_length": { | |||
"histogram": { | |||
"interval": 10, | |||
"field": "message.length" | |||
} | } | ||
} | } | ||
Line 341: | Line 427: | ||
} | } | ||
</pre> | |||
== Wildcard queries in field name == | |||
ref: https://stackoverflow.com/questions/31433032/wildcard-queries-in-field-name | |||
<pre> | |||
{ | |||
"query": { | |||
"query_string": { | |||
"fields": [ | |||
"fieldname*" | |||
], | |||
"query": "value" | |||
} | |||
} | |||
} | |||
</pre> | </pre> | ||
Line 465: | Line 568: | ||
ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log | ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log | ||
</pre> | </pre> | ||
== Performance == | |||
https://www.elastic.co/blog/advanced-tuning-finding-and-fixing-slow-elasticsearch-queries | |||
GET /_cat/thread_pool/search?v&h=node_name,name,active,rejected,completed | |||
== Network == | == Network == | ||
Line 485: | Line 594: | ||
sudo pip install elasticsearch | sudo pip install elasticsearch | ||
sudo pip install certifi | sudo pip install certifi | ||
=== last 5 minute period === | |||
<pre> | |||
now = datetime.now() | |||
now_epoch = now.timestamp() | |||
mymod = now_epoch % 300 | |||
nearest5min = now_epoch - mymod | |||
prior5min = nearest5min - 300 | |||
nearest5min_date = datetime.fromtimestamp( nearest5min ) | |||
prior5min_date = datetime.fromtimestamp( prior5min ) | |||
nearest5minstr = nearest5min_date.strftime("%Y-%m-%dT%H:%M:%S") | |||
prior5minstr = prior5min_date.strftime("%Y-%m-%dT%H:%M:%S") | |||
res = es.search(index="filebeat-*", size=mysize, query={ | |||
"bool" : { | |||
"must" : [ | |||
{ "match": { "metrics_of_note" : "0"}}, | |||
{ "range" : { "@timestamp" : { | |||
"gte" : prior5minstr, | |||
"lt" : nearest5min_date, | |||
} | |||
} | |||
} | |||
], | |||
} | |||
}) | |||
</pre> | |||
last 24h: | |||
yesterday: | |||
=== script collection of aggregats === | |||
<pre> | |||
from elasticsearch import Elasticsearch, helpers, exceptions | |||
from elasticsearch import logger as elasticsearch_logger | |||
import pprint | |||
import json | |||
import logging | |||
from datetime import datetime | |||
# logging.info(Elasticsearch.__version__) | |||
pp = pprint.PrettyPrinter(indent=4,compact=True) | |||
elasticsearch_logger.setLevel(logging.DEBUG) | |||
client = Elasticsearch( "XXX" ) | |||
mysize = 5 | |||
bucketsizeminutes = 1 | |||
try: | |||
client_info = Elasticsearch.info(client) | |||
# pp.pprint(client_info) | |||
except exceptions.ConnectionError as err: | |||
logging.error(pp.pformat(client_info)) | |||
logging.error('Elasticsearch client error: {}'.format(err)) | |||
client = None | |||
# "gte" : "2022-06-27T23:38:00", | |||
# "lt" : "2022-06-27T23:52:00" | |||
now = datetime.now() | |||
now_epoch = now.timestamp() | |||
mymod = now_epoch % 300 | |||
nearest5min = now_epoch - mymod | |||
prior5min = nearest5min - 300 | |||
#print("Now is {}".format(now)) | |||
#print("Nearest 5 min is {}".format(datetime.fromtimestamp( nearest5min ) )) | |||
#print("Priot 5 min is {}".format(datetime.fromtimestamp( prior5min ) )) | |||
gte = datetime.fromtimestamp( prior5min ) | |||
lt = datetime.fromtimestamp( nearest5min ) | |||
summary = {} | |||
froms = {} | |||
tos = {} | |||
print ("gte {} lt {}".format(gte,lt)) | |||
if client != None: | |||
logging.info("Client is not none") | |||
mybody = { | |||
"aggs": { | |||
"timeouts_over_time": { | |||
"date_histogram": { | |||
"field": "@timestamp", | |||
"calendar_interval": "1m" | |||
} | |||
} | |||
}, | |||
"query": { | |||
"bool": { | |||
"must": [ | |||
{ "range" : { "@timestamp" : { "gte" : gte, "lt" : lt } } }, | |||
{ "match_phrase": { "Exception.FullName": "System.TimeoutException"} } | |||
] | |||
} | |||
} | |||
} | |||
# use scan when you want the searches results , or hits, must use "search" for aggs. | |||
# resp = helpers.scan( | |||
resp = client.search ( | |||
body=mybody, | |||
# query={"match_all": {}}, | |||
index = 'efoe-logs-*', | |||
# size = 0 | |||
) | |||
for agg in resp['aggregations']: | |||
for bucket in resp['aggregations'][agg]['buckets']: | |||
# pp.pprint(bucket) | |||
print("{} {} {}".format(bucket['key'],bucket['key_as_string'],bucket['doc_count'])) | |||
</pre> | |||
== to read == | == to read == |
Latest revision as of 02:04, 14 October 2024
how to secure
Tough, use an app proxy to be sure. For now: local access only. Not designed with security in mind.
to file /etc/elasticsearch/elasticsearch.yml added to the end
script.disable_dynamic: true
quick stuff
HEAD is no longer used, instead use kibana, which is it's own service.
elasticsearch-head and elastic search plugin ( https://github.com/mobz/elasticsearch-head )
_search?search_type=count
{ "aggs" : { "all_users": { "terms": { "field": "screen_name" } } } }
list indexes and summary:
curl 'localhost:9200/_cat/indices?v'
show health
curl 'localhost:9200/_cat/health?v'
list nodes:
curl 'localhost:9200/_cat/nodes?v'
delete an index
curl -XDELETE 'http://localhost:9200/twitterindex_v2/'
created an index with mappings from a file:
curl -XPUT localhost:9200/twitterindex_v2 -T 'mapping.1'
get the mappoings for an index
curl -XGET "http://localhost:9200/test-index/_mapping" | jsonlint > mapping
More advanced delete
Examine the index:
GET _cat/indices/myindex-2021.08.10
open the index ( to ensure that any clsoed ness is removecd )
POST /myindex-2021.08.10/_open
Block writes:
PUT /myindex-2021.08.10/_settings {"settings": {"index.blocks.write": "true"}}
Clone the index to a copy:
POST /myindex-2021.08.10/_clone/myindex-2021.08.10-copy{"settings": {"index.blocks.write": null}}
Wait for the copy to go green:
GET /_cluster/health/myindex-2021.08.10-copy?wait_for_status=green&timeout=180s
Remove the old index:
DELETE /myindex-2021.08.10
pattern of data import
- import data
- dump mapping
- edit mapping
- create new index with new mapping
- import data again.
Explicitly mapping date fields
from: http://joelabrahamsson.com/dynamic-mappings-and-dates-in-elasticsearch/
curl -XPUT "http://localhost:9200/myindex" -d' { "mappings": { "tweet": { "date_detection": false, "properties": { "postDate": { "type": "date" } } } } }'
curl -XPUT 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4' -H 'Content-Type: application/json' -d' { "container" : { "_timestamp" : {"enabled": true, "type":"date", "format": "epoch_second", "store":true, "path" : "timestamp"} }, "mappings": { "sensordata": { "properties": { "temperature": { "type": "float" }, "humidity": { "type": "float" }, "timestamp": { "type": "date" } } } } } ' curl -XGET 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4/_mapping' | python -m json.tool curl -XGET 'https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v4/sensordata/_search' | python -m json.tool
Changing mappings
so you don't like the data mapping and you want to change it:
first dump the existing mapping to a file:
curl -XGET 'http://localhost:9200/fitstat_v1/_mapping' | python -m json.tool > fitstat_v1_mapping
then copy that mapping to the new version:
cp fitstat_v1_mapping fitstat_v2_mapping
edit the new mapping, for example adding "type": "nested", to you nested objects.
then create a new index specifying the new mapping:
curl -XPUT 'http://localhost:9200/fitstat_v2' -d @fitstat_v2_mapping
next: extractin from old, puting into new and nuking old.
... FIXME
backup
from: https://www.elastic.co/guide/en/elasticsearch/guide/current/backing-up-your-cluster.html
add to the end of /etc/elasticsearch/elasticsearch.yml :
path.repo: ["/mnt/freenas/dataset_elasticsearch/backup"]
root@keres /mnt/freenas/dataset_elasticsearch/backup # curl -XPUT "http://localhost:9200/_snapshot/freenas_backup" -d' { "type": "fs", "settings": { "location": "/mnt/freenas/dataset_elasticsearch/backup" } }'
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
Example Filters
kibana can let you make simple filters , but you probably want more.
between X and Y
{ "query": { "range" : { "price" : { "gte" : 1000, "lte" : 2000 } } } }
none zero entries:
{ "query": { "range": { "timeTakenInSeconds": { "gt": 0 } } } }
Includes any of these (Iranian Airports):
{ "query": { "bool": { "should": [ {"match_phrase": {"SearchCriteria.FromCode": "ABD"}}, {"match_phrase": {"SearchCriteria.FromCode": "ACP"}}, ... {"match_phrase": {"SearchCriteria.FromCode": "ZAH"}}, {"match_phrase": {"SearchCriteria.FromCode": "ZBR"}} ], "minimum_should_match": 1 } } }
Example Queries
search():
python
res = es.search(index=theindex, size=mysize, query={ "bool" : { "must" : [ { "exists": { "field": "my.field.name"}}, { "range" : { "@timestamp" : { "gte" : "2022-04-01T00:00:00"}}}, { "range" : { "@timestamp" : { "lt" : "2022-04-30T00:00:00"}}} ] } })
alternate:
res = es.search(index=theindex, size=mysize, query={ "bool" : { "must" : [ { "exists": { "field": "my.field.name"}}, { "range" : { "@timestamp" : { "gte" : "2022-04-01T00:00:00", "lt" : "2022-04-30T00:00:00" } } } ] } })
scan , for when there are many hits to scroll over.
if client != None: logging.info("Client is not none") query = {"query": { "bool" : { "must" : [ { "exists": { "field": "From"}}, { "exists": { "field": "To"}}, { "range" : { "@timestamp" : { "gte" : gte, "lt" : lt } } } ] } } } # call the helpers library's scan() method to scroll resp = helpers.scan( client, query=query, # query={"match_all": {}}, index = 'log-*', size = mysize ) for hit in resp: # print("{}".format(hit['_id'])) print("{} {}".format(hit['_source']['From'], hit['_source']['To']))
example searches
{ "query": { "match_all": {} } }
{ "query": { "match": { "filter_level": "low" } } }
{ "query": { "match": { "source": "iPad" } }, "_source": [ "source" , "text"] }
{ "size": 0, "aggs": { "group_by_state": { "terms": { "field": "source" } } } }
"size": 0, - print agg only and not hits. PERFORMANCE!!
{ "fields": [], "sort": [ { "zkb.totalValue": { "order": "asc" } }, "_score" ], "query": { "range": { "zkb.totalValue": { "lt": 200000000 } } } }
{ "fields" : [ "victim.shipTypeID" , "victim.corporationName", "victim.characterID" , "victim.characterName"], "sort" : [ { "zkb.totalValue" : {"order" : "asc"}}, "_score" ], "query": { "range": { "zkb.totalValue": { "lt": 200000000 } } } }
using a script to make an aggregate, in this case dynamic length of field:
GET /my-index-000001/_search?size=0 { "runtime_mappings": { "message.length": { "type": "long", "script": "emit(doc['message.keyword'].value.length())" } }, "aggs": { "message_length": { "histogram": { "interval": 10, "field": "message.length" } } } }
changing-mapping-with-zero-downtime
https://www.elastic.co/blog/changing-mapping-with-zero-downtime
aggregates
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html
what are the possible values in this field?
GET /myindex-pattern-*/_search { "aggs": { "keys": { "terms": { "field": "my.field" } } }, "size": 0 }
Wildcard queries in field name
ref: https://stackoverflow.com/questions/31433032/wildcard-queries-in-field-name
{ "query": { "query_string": { "fields": [ "fieldname*" ], "query": "value" } } }
moving data between indexes
Use ElasticDump ( https://www.npmjs.com/package/elasticdump )
1) yum install epel-release
2) yum install nodejs
3) yum install nodejs npm
4) npm install elasticdump
5) cd node_modules/elasticdump/bin
6)
./elasticdump \ --input=http://192.168.1.1:9200/original \ --output=http://192.168.1.2:9200/newCopy \ --type=data
elasticdump \ --input=http://localhost:9700/.kibana \ --output=http://localhost:9700/.kibana_read_only \ --type=mapping elasticdump \ --input=http://localhost:9700/.kibana \ --output=http://localhost:9700/.kibana_read_only \ --type=data
Dumping to a file
In this example I dump my AWS Elasticsearch cluster to a file.
it's one index with 20k records, not huge.
time /home/david/node_modules/.bin/elasticdump \ --input=https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v5 \ --output=/mnt/freenas/dataset_elasticsearch/iotworld_v5/iotworld_v5_mapping.json \ --type=mapping time /home/david/node_modules/.bin/elasticdump \ --input=https://search-myiotcatcher-eq4tipuq24ltctdtgz5hydwvb4.us-east-1.es.amazonaws.com/iotworld_v5 \ --output=/mnt/freenas/dataset_elasticsearch/iotworld_v5/iotworld_v5.json \ --type=data
Disk full -> readonly lock
If the disk fills up the indexes will got into "read-only" mode.
reset it like this:
curl -X PUT http://${HOST}:9200/.kibana/_settings -d ' { "index": { "blocks": { "read_only_allow_delete": "false" } } }' -H'Content-Type: application/json'
and you will get back if it worked:
{"acknowledged":true}
Trim data
#!/bin/sh export HOST=es.staging.thecarrotlab.com for i in `curl -s -XGET "http://es.staging.thecarrotlab.com:9200/_cat/indices?v" | grep logsta | sort -k 3 -n -r | awk '{print $3}' | tail -n +32` do echo $i curl -XDELETE "http://es.staging.thecarrotlab.com:9200/$i" echo done
clean up old logstash indexes 32 days old +
#!/bin/sh export HOST=servername for i in `curl -s -XGET "http://$HOST:9200/_cat/indices?v" | grep logsta | sort -k 3 -n -r | awk '{print $3}' | tail -n +32` do echo $i echo curl -XDELETE "http://$HOST:9200/$i" done
serverconfig notes
stuff I've added to my default config:
# for backups path.repo: ["/mnt/freenas/dataset_elasticsearch/backup"] # to disallow remote code execution script.disable_dynamic: true
/etc/sysconfig/sysconfig/elasticsearch ( grep -v ^# )
DATA_DIR=/data/elasticsearch/data LOG_DIR=/data/elasticsearch/log WORK_DIR=data/elasticsearch/tmp ES_HEAP_SIZE=2g ES_GC_LOG_FILE=/data/elasticsearch/log/gc.log
Performance
https://www.elastic.co/blog/advanced-tuning-finding-and-fixing-slow-elasticsearch-queries
GET /_cat/thread_pool/search?v&h=node_name,name,active,rejected,completed
Network
/etc/services updated:
$ grep 9200 /etc/services elasticsearch-rest 9200/tcp # elasticsearch-restful api #wap-wsp 9200/tcp # WAP connectionless session service wap-wsp 9200/udp # WAP connectionless session service $ grep 9300 /etc/services elasticsearch-transport 9300/tcp # elasticsearch-transpost # vrace 9300/tcp # Virtual Racing Service vrace 9300/udp # Virtual Racing Service
python es
sudo pip install elasticsearch sudo pip install certifi
last 5 minute period
now = datetime.now() now_epoch = now.timestamp() mymod = now_epoch % 300 nearest5min = now_epoch - mymod prior5min = nearest5min - 300 nearest5min_date = datetime.fromtimestamp( nearest5min ) prior5min_date = datetime.fromtimestamp( prior5min ) nearest5minstr = nearest5min_date.strftime("%Y-%m-%dT%H:%M:%S") prior5minstr = prior5min_date.strftime("%Y-%m-%dT%H:%M:%S") res = es.search(index="filebeat-*", size=mysize, query={ "bool" : { "must" : [ { "match": { "metrics_of_note" : "0"}}, { "range" : { "@timestamp" : { "gte" : prior5minstr, "lt" : nearest5min_date, } } } ], } })
last 24h:
yesterday:
script collection of aggregats
from elasticsearch import Elasticsearch, helpers, exceptions from elasticsearch import logger as elasticsearch_logger import pprint import json import logging from datetime import datetime # logging.info(Elasticsearch.__version__) pp = pprint.PrettyPrinter(indent=4,compact=True) elasticsearch_logger.setLevel(logging.DEBUG) client = Elasticsearch( "XXX" ) mysize = 5 bucketsizeminutes = 1 try: client_info = Elasticsearch.info(client) # pp.pprint(client_info) except exceptions.ConnectionError as err: logging.error(pp.pformat(client_info)) logging.error('Elasticsearch client error: {}'.format(err)) client = None # "gte" : "2022-06-27T23:38:00", # "lt" : "2022-06-27T23:52:00" now = datetime.now() now_epoch = now.timestamp() mymod = now_epoch % 300 nearest5min = now_epoch - mymod prior5min = nearest5min - 300 #print("Now is {}".format(now)) #print("Nearest 5 min is {}".format(datetime.fromtimestamp( nearest5min ) )) #print("Priot 5 min is {}".format(datetime.fromtimestamp( prior5min ) )) gte = datetime.fromtimestamp( prior5min ) lt = datetime.fromtimestamp( nearest5min ) summary = {} froms = {} tos = {} print ("gte {} lt {}".format(gte,lt)) if client != None: logging.info("Client is not none") mybody = { "aggs": { "timeouts_over_time": { "date_histogram": { "field": "@timestamp", "calendar_interval": "1m" } } }, "query": { "bool": { "must": [ { "range" : { "@timestamp" : { "gte" : gte, "lt" : lt } } }, { "match_phrase": { "Exception.FullName": "System.TimeoutException"} } ] } } } # use scan when you want the searches results , or hits, must use "search" for aggs. # resp = helpers.scan( resp = client.search ( body=mybody, # query={"match_all": {}}, index = 'efoe-logs-*', # size = 0 ) for agg in resp['aggregations']: for bucket in resp['aggregations'][agg]['buckets']: # pp.pprint(bucket) print("{} {} {}".format(bucket['key'],bucket['key_as_string'],bucket['doc_count']))
to read
- https://www.elastic.co/blog/data-visualization-with-elasticsearch-and-protovis
- https://greg.blog/2012/08/20/quickly-build-faceted-search-with-elasticsearch-and-backbone-js/
- https://www.elastic.co/blog/elasticsearch-storage-the-true-story