2020年1月14日
php通过sql调用elasticsearch
需求背景
- 如何在不改动原来业务代码的情况下,将原来请求mysql数据库的请求,替换为请求es?
- 另外,还可减少工程师学习dsl查询语言的时间,或者说根本不需要工程师改动什么,框架层面支持便可实现es的查询应用。
- es内部提供了elasticsearch-sql的插件,安装插件后,变可以通过sql的形式查询es。
注意点:
- es 目前不支持offset,但支持limit
- es 每次返回最多一千条,超过一千条数据需要通过游标获取
php通过sql请求es的示例代码:
<?php
/**
* @see https://www.elastic.co/what-is/elasticsearch-sql
*/
class ElasticsearchSqlSelect
{
private static url = '127.0.0.1';
private staticauth = 'your username:your password';
private static header = ['Content-type:application/json;charset=utf-8'];
private static function uri()
{
return '/_sql?format=json';
}
/**
* elasticsearch 执行sql
*/
public static function sql(sql, type = 'q', arraykey = [])
{
if ('q' == type) {sql = str_replace('"', "'", sql);match = self::splitRequest(sql);
}params = [type =>sql];
url = self::url . self::uri();
response = self::post(url, params);
self::errCheck(response);
key =key ?: array_column(response['columns'], 'name');
array_walk(response['rows'], function (&val) use (key) {
val = array_combine(key, val);
});
//es单次返回1000条,超过1000条通过游标获取
if (isset(response['cursor']) && count(response['rows']) >= 1000) {data['rows'] = array_merge(response['rows'], self::sql(response['cursor'], 'cursor', key));
}
//todo 优化offset limit请求
if (type == 'q') {
return empty(match['offset']) ?response['rows']
: array_slice(response['rows'],match['offset'], match['limit']);
}
returndata['rows'];
}
/**
* es sql查询 暂不支持offset,需要单独处理
*/
private static function splitRequest(&sql)
{pattern = '/LIMIT\s+(?<offset>\d+)\s*,\s*(?<limit>\d+)/i';
if (preg_match(pattern,sql, match)) {limit = ' limit ' . (match['offset'] +match['limit']);
sql = preg_replace(pattern, limit,sql);
return match;
}
return [];
}
//执行curl post请求
private static function post(url, params)
{body = json_encode(params);ch = curl_init(url);option = array(
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_SSL_VERIFYHOST => false,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_POST => true,
CURLOPT_HEADER => false,
CURLOPT_POSTFIELDS => body,
CURLOPT_USERPWD => self::auth,
CURLOPT_HTTPHEADER => self::header,
CURLOPT_HTTPAUTH => CURLAUTH_BASIC,
CURLOPT_PROTOCOLS => CURLPROTO_HTTP | CURLPROTO_HTTPS,
);
curl_setopt_array(ch, option);res = curl_exec(ch);
curl_close(ch);
return json_decode(res, true);
}
//错误检查
private static function errCheck(arrayresponse)
{
if (isset(response['error'])) {
throw new \Exception(response['error']['reson'], response['status']);
}
}
}
//调用demo:
//sql = "SELECT count(*),filed1,field2 FROM you_index WHERE create_date = '2020-01-02' group by filed1,field2";
//res = ElasticsearchSqlSelect::sql(sql);