2020年1月14日

php通过sql调用elasticsearch

作者 codecafe

需求背景

  • 如何在不改动原来业务代码的情况下,将原来请求mysql数据库的请求,替换为请求es?
  • 另外,还可减少工程师学习dsl查询语言的时间,或者说根本不需要工程师改动什么,框架层面支持便可实现es的查询应用。
  • es内部提供了elasticsearch-sql的插件,安装插件后,变可以通过sql的形式查询es。

注意点:

  1. es 目前不支持offset,但支持limit
  2. es 每次返回最多一千条,超过一千条数据需要通过游标获取

php通过sql请求es的示例代码:

<?php
/**
 * @see https://www.elastic.co/what-is/elasticsearch-sql
 */
class ElasticsearchSqlSelect
{
    private static url = '127.0.0.1';
    private staticauth = 'your username:your password';
    private static header = ['Content-type:application/json;charset=utf-8'];

    private static function uri()
    {
        return '/_sql?format=json';
    }

    /**
     * elasticsearch 执行sql
     */
    public static function sql(sql, type = 'q', arraykey = [])
    {
        if ('q' == type) {sql   = str_replace('"', "'", sql);match = self::splitRequest(sql);
        }params = [type =>sql];

        url = self::url . self::uri();

        response = self::post(url, params);

        self::errCheck(response);

        key =key ?: array_column(response['columns'], 'name');

        array_walk(response['rows'], function (&val) use (key) {
            val = array_combine(key, val);
        });

        //es单次返回1000条,超过1000条通过游标获取
        if (isset(response['cursor']) && count(response['rows']) >= 1000) {data['rows'] = array_merge(response['rows'], self::sql(response['cursor'], 'cursor', key));
        }
        //todo 优化offset limit请求
        if (type == 'q') {
            return empty(match['offset']) ?response['rows']
                : array_slice(response['rows'],match['offset'], match['limit']);
        }
        returndata['rows'];
    }

    /**
     * es sql查询 暂不支持offset,需要单独处理
     */
    private static function splitRequest(&sql)
    {pattern = '/LIMIT\s+(?<offset>\d+)\s*,\s*(?<limit>\d+)/i';
        if (preg_match(pattern,sql, match)) {limit = ' limit ' . (match['offset'] +match['limit']);
            sql   = preg_replace(pattern, limit,sql);
            return match;
        }
        return [];
    }

    //执行curl post请求
    private static function post(url, params)
    {body   = json_encode(params);ch     = curl_init(url);option = array(
            CURLOPT_SSL_VERIFYPEER => false,
            CURLOPT_SSL_VERIFYHOST => false,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_FOLLOWLOCATION => true,
            CURLOPT_POST           => true,
            CURLOPT_HEADER         => false,
            CURLOPT_POSTFIELDS     => body,
            CURLOPT_USERPWD        => self::auth,
            CURLOPT_HTTPHEADER     => self::header,
            CURLOPT_HTTPAUTH       => CURLAUTH_BASIC,
            CURLOPT_PROTOCOLS      => CURLPROTO_HTTP | CURLPROTO_HTTPS,
        );
        curl_setopt_array(ch, option);res = curl_exec(ch);
        curl_close(ch);
        return json_decode(res, true);
    }

    //错误检查
    private static function errCheck(arrayresponse)
    {
        if (isset(response['error'])) {
            throw new \Exception(response['error']['reson'], response['status']);
        }
    }
}
//调用demo:
//sql = "SELECT count(*),filed1,field2 FROM you_index WHERE create_date = '2020-01-02' group by filed1,field2";
//res = ElasticsearchSqlSelect::sql(sql);