一、Java API操作
Elasticsearch的Java客户端非常强大;它可以建立一个嵌入式实例并在必要时运行管理任务
运行一个Java应用程序和Elasticsearch时,有两种操作模式可供使用。该应用程序可在Elasticsearch集群中扮演更加主动或更加被动的角色。在更加主动的情况下(称为Node Client),应用程序实例将从集群接收请求,确定哪个节点应处理该请求,就像正常节点所做的一样。(应用程序甚至可以托管索引和处理请求。)另一种模式称为Transport Client,它将所有请求都转发到另一个Elasticsearch节点,由后者来确定最终目标
1. API基本操作
1.1 操作环境准备
1)创建maven工程
2)添加pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.1.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
3)等待依赖的jar包下载完成
当直接在ElasticSearch 建立文档对象时,如果索引不存在的,默认会自动创建,映射采用默认方式
1.2 获取Transport Client
(1)ElasticSearch服务默认端口9300
(2)Web管理平台端口9200
private TransportClient client;
@SuppressWarnings("unchecked")
@Before
public void getClient() throws Exception {
// 1 设置连接的集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();
// 2 连接集群
client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hsiehchou121"), 9300));
// 3 打印集群名称
System.out.println(client.toString());
}
(3)显示log4j2报错,在resource目录下创建一个文件命名为log4j2.xml并添加如下内容
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
1.3 创建索引
源代码
@Test
public void createIndex_blog(){
// 1 创建索引
client.admin().indices().prepareCreate("blog2").get();
// 2 关闭连接
client.close();
}
1.4 删除索引
源代码
@Test
public void deleteIndex(){
// 1 删除索引
client.admin().indices().prepareDelete("blog2").get();
// 2 关闭连接
client.close();
}
1.5 新建文档(源数据json串)
当直接在ElasticSearch建立文档对象时,如果索引不存在的,默认会自动创建,映射采用默认方式
源代码
@Test
public void createIndexByJson() throws UnknownHostException {
// 1 文档数据准备
String json = "{" + "\"id\":\"1\"," + "\"title\":\"基于Lucene的搜索服务器\","
+ "\"content\":\"它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口\"" + "}";
// 2 创建文档
IndexResponse indexResponse = client.prepareIndex("blog", "article", "1").setSource(json).execute().actionGet();
// 3 打印返回的结果
System.out.println("index:" + indexResponse.getIndex());
System.out.println("type:" + indexResponse.getType());
System.out.println("id:" + indexResponse.getId());
System.out.println("version:" + indexResponse.getVersion());
System.out.println("result:" + indexResponse.getResult());
// 4 关闭连接
client.close();
}
1.6 新建文档(源数据map方式添加json)
源代码
@Test
public void createIndexByMap() {
// 1 文档数据准备
Map<String, Object> json = new HashMap<String, Object>();
json.put("id", "2");
json.put("title", "基于Lucene的搜索服务器");
json.put("content", "它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口");
// 2 创建文档
IndexResponse indexResponse = client.prepareIndex("blog", "article", "2").setSource(json).execute().actionGet();
// 3 打印返回的结果
System.out.println("index:" + indexResponse.getIndex());
System.out.println("type:" + indexResponse.getType());
System.out.println("id:" + indexResponse.getId());
System.out.println("version:" + indexResponse.getVersion());
System.out.println("result:" + indexResponse.getResult());
// 4 关闭连接
client.close();
}
1.7 新建文档(源数据es构建器添加json)
源代码
@Test
public void createIndex() throws Exception {
// 1 通过es自带的帮助类,构建json数据
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("id", 3).field("title", "基于Lucene的搜索服务器").field("content", "它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。")
.endObject();
// 2 创建文档
IndexResponse indexResponse = client.prepareIndex("blog", "article", "3").setSource(builder).get();
// 3 打印返回的结果
System.out.println("index:" + indexResponse.getIndex());
System.out.println("type:" + indexResponse.getType());
System.out.println("id:" + indexResponse.getId());
System.out.println("version:" + indexResponse.getVersion());
System.out.println("result:" + indexResponse.getResult());
// 4 关闭连接
client.close();
}
1.8 搜索文档数据(单个索引)
源代码
@Test
public void getData() throws Exception {
// 1 查询文档
GetResponse response = client.prepareGet("blog", "article", "1").get();
// 2 打印搜索的结果
System.out.println(response.getSourceAsString());
// 3 关闭连接
client.close();
}
1.9 搜索文档数据(多个索引)
源代码
@Test
public void getMultiData() {
// 1 查询多个文档
MultiGetResponse response = client.prepareMultiGet().add("blog", "article", "1").add("blog", "article", "2", "3").add("blog", "article", "2").get();
// 2 遍历返回的结果
for(MultiGetItemResponse itemResponse:response){
GetResponse getResponse = itemResponse.getResponse();
// 如果获取到查询结果
if (getResponse.isExists()) {
String sourceAsString = getResponse.getSourceAsString();
System.out.println(sourceAsString);
}
}
// 3 关闭资源
client.close();
}
1.10 更新文档数据(update)
源代码
@Test
public void updateData() throws Throwable {
// 1 创建更新数据的请求对象
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("blog");
updateRequest.type("article");
updateRequest.id("3");
updateRequest.doc(XContentFactory.jsonBuilder().startObject()
// 对没有的字段添加, 对已有的字段替换
.field("title", "基于Lucene的搜索服务器")
.field("content","它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。大数据前景无限")
.field("createDate", "2017-8-22").endObject());
// 2 获取更新后的值
UpdateResponse indexResponse = client.update(updateRequest).get();
// 3 打印返回的结果
System.out.println("index:" + indexResponse.getIndex());
System.out.println("type:" + indexResponse.getType());
System.out.println("id:" + indexResponse.getId());
System.out.println("version:" + indexResponse.getVersion());
System.out.println("create:" + indexResponse.getResult());
// 4 关闭连接
client.close();
}
1.11 更新文档数据(upsert)
设置查询条件, 查找不到则添加IndexRequest内容,查找到则按照UpdateRequest更新
@Test
public void testUpsert() throws Exception {
// 设置查询条件, 查找不到则添加
IndexRequest indexRequest = new IndexRequest("blog", "article", "5")
.source(XContentFactory.jsonBuilder().startObject().field("title", "搜索服务器").field("content","它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。").endObject());
// 设置更新, 查找到更新下面的设置
UpdateRequest upsert = new UpdateRequest("blog", "article", "5")
.doc(XContentFactory.jsonBuilder().startObject().field("user", "李四").endObject()).upsert(indexRequest);
client.update(upsert).get();
client.close();
}
1.12 删除文档数据(prepareDelete)
源代码
@Test
public void deleteData() {
// 1 删除文档数据
DeleteResponse indexResponse = client.prepareDelete("blog", "article", "5").get();
// 2 打印返回的结果
System.out.println("index:" + indexResponse.getIndex());
System.out.println("type:" + indexResponse.getType());
System.out.println("id:" + indexResponse.getId());
System.out.println("version:" + indexResponse.getVersion());
System.out.println("found:" + indexResponse.getResult());
// 3 关闭连接
client.close();
}
2. 条件查询QueryBuilder
2.1 查询所有(matchAllQuery)
源代码
@Test
public void matchAllQuery() {
// 1 执行查询
SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")
.setQuery(QueryBuilders.matchAllQuery()).get();
// 2 打印查询结果
SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());//打印出每条结果
}
// 3 关闭连接
client.close();
}
2.2 对所有字段分词查询(queryStringQuery)
源代码
@Test
public void query() {
// 1 条件查询
SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")
.setQuery(QueryBuilders.queryStringQuery("全文")).get();
// 2 打印查询结果
SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());//打印出每条结果
}
// 3 关闭连接
client.close();
}
2.3 通配符查询(wildcardQuery)
:表示多个字符(0个或多个字符)
?:表示单个字符
源代码
@Test
public void wildcardQuery() {
// 1 通配符查询
SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")
.setQuery(QueryBuilders.wildcardQuery("content", "*全*")).get();
// 2 打印查询结果
SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());//打印出每条结果
}
// 3 关闭连接
client.close();
}
2.4 词条查询(TermQuery)
源代码
@Test
public void termQuery() {
// 1 第一field查询
SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")
.setQuery(QueryBuilders.termQuery("content", "全文")).get();
// 2 打印查询结果
SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());//打印出每条结果
}
// 3 关闭连接
client.close();
}
2.5 模糊查询(fuzzy)
源代码
@Test
public void fuzzy() {
// 1 模糊查询
SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")
.setQuery(QueryBuilders.fuzzyQuery("title", "lucene")).get();
// 2 打印查询结果
SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit searchHit = iterator.next(); // 每个查询对象
System.out.println(searchHit.getSourceAsString()); // 获取字符串格式打印
}
// 3 关闭连接
client.close();
}
3. 映射相关操作
源代码
@Test
public void createMapping() throws Exception {
// 1设置mapping
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject("article")
.startObject("properties")
.startObject("id1")
.field("type", "string")
.field("store", "yes")
.endObject()
.startObject("title2")
.field("type", "string")
.field("store", "no")
.endObject()
.startObject("content")
.field("type", "string")
.field("store", "yes")
.endObject()
.endObject()
.endObject()
.endObject();
// 2 添加mapping
PutMappingRequest mapping = Requests.putMappingRequest("blog4").type("article").source(builder);
client.admin().indices().putMapping(mapping).get();
// 3 关闭资源
client.close();
}
二、IK分词器
针对词条查询(TermQuery),查看默认中文分词器的效果:
curl -XGET ‘http://hsiehchou:9200/_analyze?pretty&analyzer=standard’ -d ‘中华人民共和国’
{
“tokens” : [
{
“token” : “中”,
“start_offset” : 0,
“end_offset” : 1,
“type” : “”,
“position” : 0
},
{
“token” : “华”,
“start_offset” : 1,
“end_offset” : 2,
“type” : “”,
“position” : 1
},
{
“token” : “人”,
“start_offset” : 2,
“end_offset” : 3,
“type” : “”,
“position” : 2
},
{
“token” : “民”,
“start_offset” : 3,
“end_offset” : 4,
“type” : “”,
“position” : 3
},
{
“token” : “共”,
“start_offset” : 4,
“end_offset” : 5,
“type” : “”,
“position” : 4
},
{
“token” : “和”,
“start_offset” : 5,
“end_offset” : 6,
“type” : “”,
“position” : 5
},
{
“token” : “国”,
“start_offset” : 6,
“end_offset” : 7,
“type” : “”,
“position” : 6
}
]
}
1. IK分词器的安装
1.1 前期准备工作
1)CentOS联网
配置CentOS能连接外网。Linux虚拟机ping www.baidu.com 是畅通的
2)jar包准备
(1)elasticsearch-analysis-ik-master.zip
(下载地址:https://github.com/medcl/elasticsearch-analysis-ik)
(2)apache-maven-3.6.0-bin.tar.gz
1.2 jar包安装
1)Maven解压、配置 MAVEN_HOME和PATH。
tar -zxvf apache-maven-3.6.0-bin.tar.gz -C /opt/module/
sudo vi /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/apache-maven-3.6.0
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
验证命令:mvn -version
2)Ik分词器解压、打包与配置
ik分词器解压
unzip elasticsearch-analysis-ik-master.zip -d ./
进入ik分词器所在目录
cd elasticsearch-analysis-ik-master
使用maven进行打包
mvn package -Pdist,native -DskipTests -Dtar
打包完成之后,会出现 target/releases/elasticsearch-analysis-ik-{version}.zip
pwd /opt/software/elasticsearch-analysis-ik-master/target/releases
对zip文件进行解压,并将解压完成之后的文件拷贝到es所在目录下的/plugins/
unzip elasticsearch-analysis-ik-6.0.0.zip
cp -r elasticsearch /opt/module/elasticsearch-5.6.1/plugins/
需要修改plugin-descriptor.properties文件,将其中的es版本号改为你所使用的版本号,即完成ik分词器的安装
vi plugin-descriptor.properties
修改为
elasticsearch.version=6.1.1
至此,安装完成,重启ES!
注意:需选择与es相同版本的ik分词器。
安装方法(2种):
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.1.1/elasticsearch-analysis-ik-6.1.1.zip
cp elasticsearch-analysis-ik-6.1.1.zip ./elasticsearch-6.1.1/plugins/
unzip elasticsearch-analysis-ik-6.1.1.zip -d ik-analyzer
elasticsearch-plugin install -f file:///usr/local/elasticsearch-analysis-ik-6.1.1.zip
2. IK分词器的使用
2.1 命令行查看结果
ik_smart模式
curl -XGET ‘http://hsiehchou121:9200/_analyze?pretty&analyzer=ik_smart’ -d ‘中华人民共和国’
curl -H “Content-Type:application/json” -XGET ‘http://192.168.116.121:9200/_analyze?pretty’ -d ‘{“analyzer”:”ik_smasysctl -prt”,”text”:”中华人民共和国”}’
{
“tokens” : [
{
“token” : “中华人民共和国”,
“start_offset” : 0,
“end_offset” : 7,
“type” : “CN_WORD”,
“position” : 0
}
]
}
ik_max_word模式
curl -XGET ‘http://hadoop121:9200/_analyze?pretty&analyzer=ik_max_word’ -d ‘中华人民共和国’
curl -H “Content-Type:application/json” -XGET ‘http://192.168.116.124:9200/_analyze?pretty’ -d ‘{“analyzer”:”ik_max_word”,”text”:”中华人民共和国”}’
{
“tokens” : [
{
“token” : “中华人民共和国”,
“start_offset” : 0,
“end_offset” : 7,
“type” : “CN_WORD”,
“position” : 0
},
{
“token” : “中华人民”,
“start_offset” : 0,
“end_offset” : 4,
“type” : “CN_WORD”,
“position” : 1
},
{
“token” : “中华”,
“start_offset” : 0,
“end_offset” : 2,
“type” : “CN_WORD”,
“position” : 2
},
{
“token” : “华人”,
“start_offset” : 1,
“end_offset” : 3,
“type” : “CN_WORD”,
“position” : 3
},
{
“token” : “人民共和国”,
“start_offset” : 2,
“end_offset” : 7,
“type” : “CN_WORD”,
“position” : 4
},
{
“token” : “人民”,
“start_offset” : 2,
“end_offset” : 4,
“type” : “CN_WORD”,
“position” : 5
},
{
“token” : “共和国”,
“start_offset” : 4,
“end_offset” : 7,
“type” : “CN_WORD”,
“position” : 6
},
{
“token” : “共和”,
“start_offset” : 4,
“end_offset” : 6,
“type” : “CN_WORD”,
“position” : 7
},
{
“token” : “国”,
“start_offset” : 6,
“end_offset” : 7,
“type” : “CN_CHAR”,
“position” : 8
}
]
}
2.2 JavaAPI操作
1)创建索引
//创建索引(数据库)
@Test
public void createIndex() {
//创建索引
client.admin().indices().prepareCreate("blog4").get();
//关闭资源
client.close();
}
2)创建mapping
//创建使用ik分词器的mapping
@Test
public void createMapping() throws Exception {
// 1设置mapping
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject("article")
.startObject("properties")
.startObject("id1")
.field("type", "string")
.field("store", "yes")
.field("analyzer","ik_smart")
.endObject()
.startObject("title2")
.field("type", "string")
.field("store", "no")
.field("analyzer","ik_smart")
.endObject()
.startObject("content")
.field("type", "string")
.field("store", "yes")
.field("analyzer","ik_smart")
.endObject()
.endObject()
.endObject()
.endObject();
// 2 添加mapping
PutMappingRequest mapping = Requests.putMappingRequest("blog4").type("article").source(builder);
client.admin().indices().putMapping(mapping).get();
// 3 关闭资源
client.close();
}
3)插入数据
//创建文档,以map形式
@Test
public void createDocumentByMap() {
HashMap<String, String> map = new HashMap<>();
map.put("id1", "2");
map.put("title2", "Lucene");
map.put("content", "它提供了一个分布式的web接口");
IndexResponse response = client.prepareIndex("blog4", "article", "3").setSource(map).execute().actionGet();
//打印返回的结果
System.out.println("结果:" + response.getResult());
System.out.println("id:" + response.getId());
System.out.println("index:" + response.getIndex());
System.out.println("type:" + response.getType());
System.out.println("版本:" + response.getVersion());
//关闭资源
client.close();
}
4) 词条查询
//词条查询
@Test
public void queryTerm() {
SearchResponse response = client.prepareSearch("blog4").setTypes("article").setQuery(QueryBuilders.termQuery("content","提供")).get();
//获取查询命中结果
SearchHits hits = response.getHits();
System.out.println("结果条数:" + hits.getTotalHits());
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
Store 的解释
官方文档说 store 默认是 no ,想当然的理解为也就是说这个 field 是不会 store 的,但是查询的时候也能查询出来
经过查找资料了解到原来 store 的意思是,是否在 _source 之外在独立存储一份。这里要说一下 _source 这是源文档,当索引数据的时候, elasticsearch 会保存一份源文档到 _source 。如果文档的某一字段设置了 store 为 yes (默认为 no),这时候会在 _source 存储之外再为这个字段独立进行存储,这么做的目的主要是针对内容比较多的字段
如果放到 _source 返回的话,因为_source 是把所有字段保存为一份文档,命中后读取只需要一次 IO,包含内容特别多的字段会很占带宽影响性能。通常我们也不需要完整的内容返回(可能只关心摘要),这时候就没必要放到 _source 里一起返回了(当然也可以在查询时指定返回字段)
三、Logstash
1. Logstash简介
Logstash is a tool for managing events and logs. You can use it to collect logs, parse them, and store them for later use (like, for searching).
logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层
首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。logstash和Elasticsearch是用Java写的,kibana使用node.js框架
这个软件官网有很详细的使用说明,https://www.elastic.co/,除了docs之外,还有视频教程。这篇博客集合了docs和视频里面一些比较重要的设置和使用
2. Logstash 安装
直接下载官方发布的二进制包的,可以访问 https://www.elastic.co/downloads/logstash 页面找对应操作系统和版本,点击下载即可
在终端中,像下面这样运行命令来启动 Logstash 进程:
输入(读取数据):file、es。 输出:file、es、kafka
bin/logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}’
-f文件 -e命令 标准输入、输出(命令行)
注意:如果出现如下报错,请调高虚拟机内存容量
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error=’Cannot allocate memory’ (errno=12)
然后你会发现终端在等待你的输入。没问题,敲入 Hello World,回车,
{
“@version” => “1”,
“host” => “*“,
“message” => “hello world”,
“@timestamp” => 2019-03-18T02:51:18.578Z
}
每位系统管理员都肯定写过很多类似这样的命令
cat randdata | awk ‘{print $2}’ | sort | uniq -c | tee sortdata
Logstash 就像管道符一样!
你输入(就像命令行的 cat )数据,然后处理过滤(就像 awk 或者 uniq 之类)数据,最后输出(就像 tee )到其他地方
3. Logstash 配置
3.1 input配置
读取文件(File)
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
output{stdout{codec=>rubydebug}}
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为
discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒
exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开
close_older
一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时
ignore_older
在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天
sincedb_path
如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile.sincedb),可以通过这个配置定义 sincedb 文件到其他位置
sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒
stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒
start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 “beginning”,logstash 进程就从头开始读取,类似 less +F 的形式运行
启动命令:../bin/logstash -f ./input_file.conf
测试命令:echo ‘hehe’ >> test.log
echo ‘hehe2’ >> message
标准输入(Stdin)
我们已经见过好几个示例使用 stdin 了。这也应该是 logstash 里最简单和基础的插件了
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
output{stdout{codec=>rubydebug}}
用上面的新 stdin 设置重新运行一次最开始的 hello world 示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:../bin/logstash -f ./input_stdin.conf。输入 “hello world” 并回车后,你会在终端看到如下输出
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-08T06:48:47.789Z",
"type" => "std",
"tags" => [
[0] "add"
],
"key" => "value",
"host" => "raochenlindeMacBook-Air.local"
}
解释
type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的
最常见的用法是像下面这样
input {
stdin {
type => "web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status => "1"
}
} else {
elasticsearch {
}
}
}
3.2 codec配置
Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)
在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了 codec 设置
所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的
codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等
事实上,我们在第一个 “hello world” 用例中就已经用过 codec 了 —— rubydebug 就是一种 codec!虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具
采用 JSON 编码
在早期的版本中,有一种降低 logstash 过滤器的 CPU 负载消耗的做法盛行于社区(在当时的 cookbook 上有专门的一节介绍):直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!
这个建议依然有效,不过在当前版本中需要稍微做一点配置变动 —— 因为现在有专门的 codec 设置
配置示例
input {
stdin {
add_field => {"key" => "value"}
codec => "json"
type => "std"
}
}
output{stdout{codec=>rubydebug}}
输入:
{“simCar”:18074045598,”validityPeriod”:”1996-12-06”,”unitPrice”:9,”quantity”:19,”amount”:35,”imei”:887540376467915,”user”:”test”}
运行结果:
{
“imei” => 887540376467915,
“unitPrice” => 9,
“user” => “test”,
“@timestamp” => 2019-03-19T05:01:53.451Z,
“simCar” => 18074045598,
“host” => “zzc-203”,
“amount” => 35,
“@version” => “1”,
“key” => “value”,
“type” => “std”,
“validityPeriod” => “1996-12-06”,
“quantity” => 19
}
3.3 filter配置
Grok插件
logstash拥有丰富的filter插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!Grok 是 Logstash 最重要的插件之一。也是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美
这个工具非常适用于系统日志,Apache和其他网络服务器日志,MySQL日志等
配置
input {
stdin {
type => "std"
}
}
filter {
grok {
match=>{"message"=> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output{stdout{codec=>rubydebug}}
输入:55.3.244.1 GET /index.html 15824 0.043
输出:
{
“@version” => “1”,
“host” => “zzc-203”,
“request” => “/index.html”,
“bytes” => “15824”,
“duration” => “0.043”,
“method” => “GET”,
“@timestamp” => 2019-03-19T05:09:55.777Z,
“message” => “55.3.244.1 GET /index.html 15824 0.043”,
“type” => “std”,
“client” => “55.3.244.1”
}
grok模式的语法如下:
%{SYNTAX:SEMANTIC}
SYNTAX:代表匹配值的类型,例如3.44可以用NUMBER类型所匹配,127.0.0.1可以使用IP类型匹配。
SEMANTIC:代表存储该值的一个变量名称,例如 3.44 可能是一个事件的持续时间,127.0.0.1可能是请求的client地址。所以这两个值可以用 %{NUMBER:duration} %{IP:client} 来匹配
你也可以选择将数据类型转换添加到Grok模式。默认情况下,所有语义都保存为字符串。如果您希望转换语义的数据类型,例如将字符串更改为整数,则将其后缀为目标数据类型。例如%{NUMBER:num:int}将num语义从一个字符串转换为一个整数。目前唯一支持的转换是int和float
Logstash附带约120个模式。你可以在这里找到它们https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
自定义类型
更多时候logstash grok没办法提供你所需要的匹配类型,这个时候我们可以使用自定义
创建自定义 patterns 文件
①创建一个名为patterns其中创建一个文件postfix (文件名无关紧要,随便起),在该文件中,将需要的模式写为模式名称,空格,然后是该模式的正则表达式。例如:
POSTFIX_QUEUEID [0-9A-F]{10,11}
②然后使用这个插件中的patterns_dir设置告诉logstash目录是你的自定义模式。
配置
input {
stdin {
type => "std"
}
}
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
output{stdout{codec=>rubydebug}}
输入:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1
输出:
{
“queue_id” => “BEF25A72965”,
“message” => “Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1”,
“pid” => “21403”,
“program” => “postfix/cleanup”,
“@version” => “1”,
“type” => “std”,
“logsource” => “mailserver14”,
“host” => “zzc-203”,
“timestamp” => “Jan 1 06:25:43”,
“syslog_message” => “message-id=<20130101142543.5828399CCAF@mailserver1”,
“@timestamp” => 2019-03-19T05:31:37.405Z
}
GeoIP 地址查询归类
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
配置
input {
stdin {
type => "std"
}
}
filter {
geoip {
source => "message"
}
}
output{stdout{codec=>rubydebug}}
输入:183.60.92.253
输出:
{
“type” => “std”,
“@version” => “1”,
“@timestamp” => 2019-03-19T05:39:26.714Z,
“host” => “zzc-203”,
“message” => “183.60.92.253”,
“geoip” => {
“country_code3” => “CN”,
“latitude” => 23.1167,
“region_code” => “44”,
“region_name” => “Guangdong”,
“location” => {
“lon” => 113.25,
“lat” => 23.1167
},
“city_name” => “Guangzhou”,
“country_name” => “China”,
“continent_code” => “AS”,
“country_code2” => “CN”,
“timezone” => “Asia/Shanghai”,
“ip” => “183.60.92.253”,
“longitude” => 113.25
}
}
3.4 output配置
标准输出(Stdout)
保存成文件(File)
通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。Logstash 当然也能做到这点
和 LogStash::Inputs::File 不同, LogStash::Outputs::File 里可以使用 sprintf format 格式来自动定义输出到带日期命名的路径
配置
input {
stdin {
type => "std"
}
}
output {
file {
path => "../data_test/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log"
codec => line { format => "custom format: %{message}"}
}
}
启动后输入,可看到文件
服务器间传输文件(File)
配置
接收日志服务器配置
input {
tcp {
mode => "server"
port => 9600
ssl_enable => false
}
}
filter {
json {
source => "message"
}
}
output {
file {
path => "/usr/local/logstash-6.6.2/data_test/%{+YYYY-MM-dd}/%{servip}-%{filename}"
codec => line { format => "%{message}"}
}
}
发送日志服务器配置
input{
file {
path => ["/usr/local/logstash-6.6.2/data_test/send.log"]
type => "ecolog"
start_position => "beginning"
}
}
filter {
if [type] =~ /^ecolog/ {
ruby {
code => "file_name = event.get('path').split('/')[-1]
event.set('file_name',file_name)
event.set('servip','接收方ip')"
}
mutate {
rename => {"file_name" => "filename"}
}
}
}
output {
tcp {
host => "接收方ip"
port => 9600
codec => json_lines
}
}
从发送方发送message,接收方可以看到写出文件
写入到ES
配置
input {
stdin {
type => "log2es"
}
}
output {
elasticsearch {
hosts => ["192.168.109.133:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
sniffing => true
template_overwrite => true
}
}
在head插件中可以看到数据
sniffing : 寻找其他es节点
实战举例:将错误日志写入es
配置
input {
file {
path => ["/usr/local/logstash-6.6.2/data_test/run_error.log"]
type => "error"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["192.168.109.133:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
sniffing => true
template_overwrite => true
}
}
四、Kibana
Kibana是一个开源的分析和可视化平台,设计用于和Elasticsearch一起工作
你用Kibana来搜索,查看,并和存储在Elasticsearch索引中的数据进行交互
你可以轻松地执行高级数据分析,并且以各种图标、表格和地图的形式可视化数据
Kibana使得理解大量数据变得很容易。它简单的、基于浏览器的界面使你能够快速创建和共享动态仪表板,实时显示Elasticsearch查询的变化
安装步骤
解压:tar -zxvf kibana-6.6.2-linux-x86_64.tar.gz
修改 kibana.yml 配置文件:
server.port: 5601
server.host: “192.168.116.121” ———-部署kinana服务器的ip
elasticsearch.hosts: [“http://192.168.116.121:9200“]
kibana.index: “.kibana”
启动kibana,报错:
./bin/kibana[error][status]
[plugin:remote_clusters@6.6.2] Status changed from red to red - X-Pack plugin is not installed on the [data] Elasticsearch cluster.
解决,卸载x-pack插件
elasticsearch-plugin remove x-pack
kibana-plugin remove x-pack
安装好后启动即可。页面操作