_

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

 

时间:2017-09-10 来源:码农网


 

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API用于文档搜索。

项目主页:https://github.com/chaokunyang/jkes

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

sudo bin/elasticsearch-plugin install analysis-smartcn

配置

@EnableAspectJAutoProxy@EnableJkes@Configurationpublic class JkesConfig {  @Bean
  public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);
  }
}

提供JkesProperties Bean

@Component@Configurationpublic class JkesConf extends DefaultJkesPropertiesImpl {    @PostConstruct
    public void setUp() {
        Config.setJkesProperties(this);
    }    @Override
    public String getKafkaBootstrapServers() {        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";
    }    @Override
    public String getKafkaConnectServers() {        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";
    }    @Override
    public String getEsBootstrapServers() {        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";
    }    @Override
    public String getDocumentBasePackage() {        return "com.timeyang.jkes.integration_test.domain";
    }    @Override
    public String getClientId() {        return "integration_test";
    }

}

这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置

增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController@RequestMapping("/api/search")public class SearchEndpoint {    private Indexer indexer;    @Autowired
    public SearchEndpoint(Indexer indexer) {        this.indexer = indexer;
    }    @RequestMapping(value = "/start_all", method = RequestMethod.POST)    public void startAll() {
        indexer.startAll();
    }    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)    public void start(@PathVariable("entityClassName") String entityClassName) {
        indexer.start(entityClassName);
    }    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)    public Map<String, Boolean> stopAll() {        return indexer.stopAll();
    }    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {        return indexer.stop(entityClassName);
    }    @RequestMapping(value = "/progress", method = RequestMethod.GET)    public Map<String, IndexProgress> getProgress() {        return indexer.getProgress();
    }

}

快速开始

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity {    // @Id will be identified automatically
    // @Field(type = FieldType.Long)
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )    private String name;    @Field(type = FieldType.Keyword)    private String gender;    @Field(type = FieldType.Integer)    private Integer age;    // don't add @Field to test whether ignored
    // @Field(type = FieldType.Text)
    private String description;    @Field(type = FieldType.Object)    @ManyToOne(fetch = FetchType.EAGER)    @JoinColumn(name = "group_id")    private PersonGroup personGroup;

}
@lombok.Data@Entity@Document(type = "person_group", alias = "person_group_alias")public class PersonGroup extends AuditedEntity {    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    private String name;    private String interests;    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)    private List<Person> persons;    private String description;    @DocumentId
    @Field(type = FieldType.Long)    public Long getId() {        return id;
    }    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )    public String getName() {        return name;
    }    @Field(type = FieldType.Text)    public String getInterests() {        return interests;
    }    @Field(type = FieldType.Nested)    public List<Person> getPersons() {        return persons;
    }    /**
     * 不加Field注解,测试序列化时是否忽略
     */
    public String getDescription() {        return description;
    }
}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。

URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10

Nested query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {    "nested": {      "path": "persons",      "score_mode": "avg",      "query": {        "bool": {          "must": [
            {              "range": {                "persons.age": {                  "gt": 5
                }
              }
            }
          ]
        }
      }
    }
  }
}

match query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {      "match": {        "interests": "Hadoop"
      }
    }
}

bool query

{  "query": {    "bool" : {      "must" : {        "match" : { "interests" : "Hadoop" }
      },      "filter": {        "term" : { "name.raw" : "name0" }
      },      "should" : [
        { "match" : { "interests" : "Flink" } },
        {            "nested" : {                "path" : "persons",                "score_mode" : "avg",                "query" : {                    "bool" : {                        "must" : [
                        { "match" : {"persons.name" : "name40"} },
                        { "match" : {"persons.interests" : "interests"} }
                        ],                        "must_not" : {                            "range" : {                              "age" : { "gte" : 50, "lte" : 60 }
                            }
                          }
                    }
                }
            }
        }

      ],      "minimum_should_match" : 1,      "boost" : 1.0
    }

  }

}

Source filtering

integration_test_person_group/person_group/_search
{    "_source": false,    "query" : {        "match" : { "name" : "name17" }
    }
}
integration_test_person_group/person_group/_search
{    "_source": {            "includes": [ "name", "persons.*" ],            "excludes": [ "date*", "version", "persons.age" ]
        },    "query" : {        "match" : { "name" : "name17" }
    }
}

prefix

integration_test_person_group/person_group/_search
{ 
  "query": {    "prefix" : { "name" : "name" }
  }
}

wildcard

integration_test_person_group/person_group/_search
{    "query": {        "wildcard" : { "name" : "name*" }
    }
}

regexp

integration_test_person_group/person_group/_search
{    "query": {        "regexp":{            "name": "na.*17"
        }
    }
}

Jkes工作原理

索引工作原理:

查询工作原理:

流程图

模块介绍

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

jkes-services

jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

jkes-integration-test

jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

开发

To build a development version you’ll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.

Contribute

LICENSE

This project is licensed under Apache License 2.0.



赞助打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开微信扫一扫,即可进行扫码打赏哦

您的支持,鼓励我们做得更好!

「人云亦云」


标签: 涨知识 | 如有转载,请注明出处 |
本文链接:
http://blog.hellotom.top/articles/813/


亲,看完记得留下足迹哦


更多有料好玩的内容 尽在人云亦云公众号
360网站安全检测平台