es查詢 scroll scan用法

2021-08-14 02:37:50 字數 4038 閱讀 8469

需求大概是:需要實時地推送日誌系統的日誌訊息,提供給其他同事查詢使用。當前時間查詢前一分鐘的資料,因為資料量大,考慮用es的scroll_scan方法。

**:

#!/usr/bin/env python

# -*- coding:utf-8 -*-

"""查詢es資料 demo

由於es資料存在一定延遲, 所以統計一分鐘前的資料.

"""統一的request.gets函式藉口,加上異常處理.

note: 適用於返回資料為json字串的cgi介面.

"""try:

r = requests.get(url=url, params=params, timeout=timeout)

if r.ok:

ret = r.json()

return ret

else:

logger.error(' faild, code: , cause: '\

.format(url, r.status_code, r.text[:200]))

except requests.exceptions.connectionerror:

logger.exception('connection error: %s' % (url, ))

except requests.exceptions.requestexception:

logger.exception('request error'.format(url))

return {}

def requests_post(url, data, timeout=2):

"""統一的requests.post函式介面,加上異常處理.

note: 適用於返回資料為json字串的cgi介面.

"""try:

r = requests.post(url=url, data=data, timeout=timeout)

if r.ok:

ret = r.json()

return ret

else:

logger.error(' faild, code: , cause: '\

.format(url, r.status_code, r.text))

except requests.exceptions.connectionerror:

logger.exception('connection error: %s' % (url, ))

except exception:

logger.exception('request error'.format(url))

return {}

def gen_index(date, name="logstash"):

return '-'.format(name, date.strftime('%y.%m.%d'))

def get_exact_index_name(from_time, to_time, name="logstash"):

"""獲取精確的index名稱"""

from_time -= datetime.timedelta(hours=8)

to_time -= datetime.timedelta(hours=8)

day = to_time.day - from_time.day

if day >= 1:

indexs =

for idx in range(day + 1):

index_name = ",".join(indexs)

else:

index_name = gen_index(to_time, name)

return index_name

def get_query_data(from_time, to_time, should_terms):

should =

for item in should_terms:

query_template = ,

},"query": }}

}}

}return json.dumps(query_template)

def get_type_data(from_time, to_time, type_name, size=500):

index_name = get_exact_index_name(from_time, to_time)

initial_url = es_url_pre + "//_search/?scroll=2m&size=&search_type=scan".format(index_name, type_name, size)

messages, counts = , 0

should_terms = [, ]

data = get_query_data(from_time.strftime("%y-%m-%dt%h:%m:%s"),

to_time.strftime("%y-%m-%dt%h:%m:%s"),

should_terms)

rets = requests_post(initial_url, data, timeout=2)

if not rets:

return messages, counts

scroll_id, counts = rets.get("_scroll_id", ""), rets.get("hits", ).get("total", 0)

if not counts:

return messages, counts

scroll_url = es_url_pre + "_search/scroll?"

while true:

params =

res = request_get(scroll_url, params=params, timeout=1)

hits = res.get("hits", {}).get("hits", )

if not hits:

break

for hit in hits:

scroll_id = res.get("_scroll_id", "")

return messages, counts

def main(from_time, to_time):

type_name = "bilog"

size = 1000

messages, counts = get_type_data(from_time, to_time, type_name, size=size)

return messages, counts

if __name__ == "__main__":

start_time = datetime.datetime.now()

to_time = start_time.replace(second=0, microsecond=0) \

- datetime.timedelta(minutes=1)

from_time = (to_time - datetime.timedelta(minutes=1))

messages, counts = main(from_time, to_time)

end_time = datetime.datetime.now()

print end_time-start_time

es 父子查詢 es父子文件建立查詢

一 準備 1,elasticsearch 5.6.9 2,kibana 5.6.9 3,jdk1.8 二 建立索引,文件 1建立資料庫put database?pretty station stationname 三 填充資料 插入父文件一條記錄 插入id 1的6路post database lin...

ES查詢語句

1.萬用字元查詢keyword欄位 不會建分詞索引,會建索引 2.刪除並釋放磁碟空間 post monitor delete by query 1.查詢你要刪除的doc資料 以2019 5 18 00 00 00時間節點和time欄位為例 具體 如下 monitor search post 2.手動...

ES 常用查詢

1.term精確查詢,實際上是包含的意思 用法一 與bool,filter使用 get zf en search 用法二 直接term查詢 get zf en search 2.bulk 批量寫入,注意,必須指定 id,須換行 如果 id存在,執行的是update操作 3.組合查詢,bool 布林 ...