Kafka入门

基础知识

介绍

Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。
首先让我们看几个基本的消息系统术语:
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序成为producers.
将预订topics并消费消息的程序成为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.

分布式

每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,
followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:
作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

Producers

Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。
使用的更多的是第二种。

Consumers

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取
消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。Consumers可以加入一个consumer
组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的
机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。如果所有的consumer
不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。更常见的是,每个topic都有若干数量
的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个
发布-订阅模式,只不过订阅者是个组而不是单个consumer。

由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向
consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了
原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是
只允许一个消费者消费消息,当然这就意味着失去了并发性。
在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。
将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。
为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少
分区就允许多少并发消费。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。
如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

环境搭建

下载并解压

官网选择下载最新的二进制文件
解压

1
2
tar -vxzf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1

启动

1.启动zookeeper
2.启动kafka

1
bin/kafka-server-start.sh config/server.properties

3.创建topic

1
2
3
4
5
使用kafka-topics.sh 创建单分区单副本的topic test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4.产生消息
使用kafka-console-producer.sh 发送消息:

1
2
3
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello world!
Hello Kafka!

按Ctrl+C退出
5.消费消息
使用kafka-console-consumer.sh 接收消息并在终端打印:

1
2
3
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Hello world!
Hello Kafka!

Read More

Sporing Boot中使用log4j记录日志

引入log4j依赖

在创建Spring Boot工程时,我们引入了spring-boot-starter,其中包含了spring-boot-starter-logging,该依赖内容就是
Spring Boot默认的日志框架Logback,所以我们在引入log4j之前,需要先排除该包的依赖,再引入log4j的依赖,就像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
</dependency>

配置log4j.properties

在引入了log4j依赖之后,只需要在src/main/resources目录下加入log4j.properties配置文件,就可以开始对应用的日志进行配置使用。

控制台输出

通过如下配置,设定root日志的输出级别为INFO,appender为控制台输出stdout

1
2
3
4
5
6
7
# LOG4J配置
log4j.rootCategory=INFO, stdout
# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n

输出到文件

在开发环境,我们只是输出到控制台没有问题,但是到了生产或测试环境,或许持久化日志内容,方便追溯问题原因。
可以通过添加如下的appender内容,按天输出到不同的文件中去,同时还需要为log4j.rootCategory添加名为file的appender,
这样root日志就可以输出到logs/all.log文件中了。

1
2
3
4
5
6
7
8
9
#
log4j.rootCategory=INFO, stdout, file
# root日志输出
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.file=logs/all.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n

分类输出

1.可以按不同package进行输出。通过定义输出到logs/my.log的appender,并对com.didispace包下的日志级别设定
为DEBUG级别、appender设置为输出到logs/my.log的名为didifile的appender。

1
2
3
4
5
6
7
8
9
# com.juzi包下的日志配置
log4j.category.com.juzi=DEBUG, didifile
# com.didispace下的日志输出
log4j.appender.didifile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.didifile.file=logs/my.log
log4j.appender.didifile.DatePattern='.'yyyy-MM-dd
log4j.appender.didifile.layout=org.apache.log4j.PatternLayout
log4j.appender.didifile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L ---- %m%n

可以对不同级别进行分类,比如对ERROR级别输出到特定的日志文件中,具体配置可以如下。

1
2
3
4
5
6
7
8
og4j.logger.error=errorfile
# error日志输出
log4j.appender.errorfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorfile.file=logs/error.log
log4j.appender.errorfile.DatePattern='.'yyyy-MM-dd
log4j.appender.errorfile.Threshold = ERROR
log4j.appender.errorfile.layout=org.apache.log4j.PatternLayout
log4j.appender.errorfile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n

Spring Boot日志管理

Spring Boot在所有内部日志中使用Commons Logging,但是默认配置也提供了对常用日志的支持,
如:Java Util Logging,Log4J, Log4J2和Logback。每种Logger都可以通过配置使用控制台或者文件输出日志内容。

日志输出格式

1
2016-08-19 10:22:04.233 INFO 7368 --- [ main] com.juzi.AsyncTest : Started AsyncTest in 10.084 seconds (JVM running for 12.545)

输出内容元素具体如下:

时间日期 — 精确到毫秒
日志级别 — ERROR, WARN, INFO, DEBUG or TRACE
进程ID
分隔符 — — 标识实际日志的开始
线程名 — 方括号括起来(可能会截断控制台输出)
Logger名 — 通常使用源代码的类名
日志内容

控制台输出

在Spring Boot中默认配置了ERROR、WARN和INFO级别的日志输出到控制台。

我们可以通过两种方式切换至DEBUG级别:
1.在运行命令后加入–debug标志,如:$ java -jar myapp.jar –debug
2.在application.properties中配置debug=true,该属性置为true的时候,核心Logger(包含嵌入式容器、hibernate、spring)
会输出更多内容,但是你自己应用的日志并不会输出为DEBUG级别。

多彩输出

如果你的终端支持ANSI,设置彩色输出会让日志更具可读性。通过在application.properties中设置spring.output.ansi.enabled参数来支持。

1.NEVER:禁用ANSI-colored输出(默认项)
2.DETECT:会检查终端是否支持ANSI,是的话就采用彩色输出(推荐项)
3.ALWAYS:总是使用ANSI-colored格式输出,若终端不支持的时候,会有很多干扰信息,不推荐使用

文件输出

Spring Boot默认配置只会输出到控制台,并不会记录到文件中,但是我们通常生产环境使用时都需要以文件方式记录。

若要增加文件输出,需要在application.properties中配置logging.file或logging.path属性。

1.logging.file,设置文件,可以是绝对路径,也可以是相对路径。如:logging.file=my.log
2.logging.path,设置目录,会在该目录下创建spring.log文件,并写入日志内容,如:logging.path=/var/log

日志文件会在10Mb大小的时候被截断,产生新的日志文件,默认级别为:ERROR、WARN、INFO *

级别控制

在Spring Boot中只需要在application.properties中进行配置完成日志记录的级别控制。

配置格式:logging.level.*=LEVEL

1.logging.level:日志级别控制前缀,*为包名或Logger名
2.LEVEL:选项TRACE, DEBUG, INFO, WARN, ERROR, FATAL, OFF
举例:

logging.level.com.juzi=DEBUG com.juzi包下所有class以DEBUG级别输出
logging.level.root=WARN root日志以WARN级别输出

自定义日志配置

由于日志服务一般都在ApplicationContext创建前就初始化了,它并不是必须通过Spring的配置文件控制。
因此通过系统属性和传统的Spring Boot外部配置文件依然可以很好的支持日志控制和管理。

根据不同的日志系统,你可以按如下规则组织配置文件名,就能被正确加载:

1.Logback:logback-spring.xml, logback-spring.groovy, logback.xml, logback.groovy logback日志配置
2.Log4j:log4j-spring.properties, log4j-spring.xml, log4j.properties, log4j.xml
3.Log4j2:log4j2-spring.xml, log4j2.xml
4.JDK (Java Util Logging):logging.properties

Spring Boot官方推荐优先使用带有-spring的文件名作为你的日志配置(如使用logback-spring.xml,而不是logback.xml)

自定义输出格式

在Spring Boot中可以通过在application.properties配置如下参数控制输出格式:

1.logging.pattern.console:定义输出到控制台的样式(不支持JDK Logger)
2.logging.pattern.file:定义输出到文件的样式(不支持JDK Logger)

@EnableJpaRepositories配置详解

1,简单配置

@EnableJpaRepositories(“com.juzi.repository”)
或者 @EnableJpaRepositories({“com.juzi.repository”, “com.cshtong.second.repository”})

2,完整的@EnableJpaRepositories注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableJpaRepositories(
basePackages = {},
basePackageClasses = {},
includeFilters = {},
excludeFilters = {},
repositoryImplementationPostfix = "Impl",
namedQueriesLocation = "",//META-INF/jpa-named-queries.properties
queryLookupStrategy=QueryLookupStrategy.Key.CREATE_IF_NOT_FOUND, //QueryLookupStrategy.Key.x
repositoryFactoryBeanClass=JpaRepositoryFactoryBean.class, //class
entityManagerFactoryRef="entityManagerFactory",
transactionManagerRef="transactionManager",
considerNestedRepositories=false,
enableDefaultTransactions=true
)

各个配置项的作用
1)basePackage
用于配置扫描Repositories所在的package及子package。简单配置中的配置则等同于此项配置值,
basePackages可以配置为单个字符串,也可以配置为字符串数组形式。

1
2
3
4
5
@EnableJpaRepositories(
basePackages = "com.juzi")
多个包路径
@EnableJpaRepositories(
basePackages = {"com.juzi.repository", "com.juzi.second.repository"})

2)basePackageClasses
指定 Repository 类

1
2
3
4
@EnableJpaRepositories(basePackageClasses = BookRepository.class)
@EnableJpaRepositories(
basePackageClasses = {ShopRepository.class, OrganizationRepository.class})

3)includeFilters
过滤器,该过滤区采用ComponentScan的过滤器类
@EnableJpaRepositories(
includeFilters={@ComponentScan.Filter(type=FilterType.ANNOTATION, value=Repository.class)})
4)excludeFilters
不包含过滤器

@EnableJpaRepositories(
excludeFilters={
@ComponentScan.Filter(type=FilterType.ANNOTATION, value=Service.class),
@ComponentScan.Filter(type=FilterType.ANNOTATION, value=Controller.class)})

5)repositoryImplementationPostfix
实现类追加的尾部,比如ShopRepository,对应的为ShopRepositoryImpl

6)namedQueriesLocation
named SQL存放的位置,默认为META-INF/jpa-named-queries.properties

7)queryLookupStrategy
构建条件查询的策略,包含三种方式CREATE,USE_DECLARED_QUERY,CREATE_IF_NOT_FOUND

CREATE:按照接口名称自动构建查询
USE_DECLARED_QUERY:用户声明查询
CREATE_IF_NOT_FOUND:先搜索用户声明的,不存在则自动构建

8)repositoryFactoryBeanClass
指定Repository的工厂类

9)entityManagerFactoryRef
实体管理工厂引用名称,对应到@Bean注解对应的方法

10)transactionManagerRef

事务管理工厂引用名称,对应到@Bean注解对应的方法