欢迎光临
我们一直在努力

Mybatis-Plus 真好用(乡村爱情加持)

mumupudding阅读(11)

乡村爱情


写在前面

MyBatis的增强方案确实有不少,甚至有种感觉是现在如果只用 “裸MyBatis”,不来点增强插件都不好意思了。这不,在上一篇文章《Spring Boot项目利用MyBatis Generator进行数据层代码自动生成》 中尝试了一下 MyBatis Generator。这次来点更加先进的 Mybatis-Plus,SQL语句都不用写了,分页也是自动完成,嗯,真香!


数据库准备

CREATE TABLE tbl_user( user_id BIGINT(20) NOT NULL COMMENT '主键ID', user_name VARCHAR(30) NULL DEFAULT NULL COMMENT '姓名', user_age INT(11) NULL DEFAULT NULL COMMENT '年龄', PRIMARY KEY (user_id)) charset = utf8;

MyBatis-Plus加持

  • 工程搭建 (不赘述了)

  • 依赖引入

<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.1.0</version></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.9</version></dependency><dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> <version>8.0.12</version></dependency>

主要是 Mybatis Plus、Lombok(不知道Lombok干嘛的?可以看这里)、Druid连接池 等依赖。

  • MyBatis Plus配置

项目配置

mybatis-plus:  mapper-locations: classpath:/mapper/*Mapper.xml

新增 MyBatis Plus配置类

@Configuration@MapperScan("cn.codesheep.springbtmybatisplus.mapper")public class MyBatisConfig {}

看到没,几乎零配置啊,下面就可以写业务逻辑了


业务编写

  • 实体类
@Data@TableName("tbl_user")public class User {    @TableId(value = "user_id")    private Long userId;    private String userName;    private Integer userAge;}
  • Mapper类
public interface UserMapper extends BaseMapper<User> {}

这里啥接口方法也不用写,就可以实现增删改查了!

  • Service类

Service接口:

public interface UserService extends IService<User> {    int insertUser( User user );    int updateUser( User user );    int deleteUser( User user );    User findUserByName( String userName );    IPage getUserPage( Page page, User user );}

Service实现:

@Service@AllArgsConstructorpublic class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {    // 增    @Override    public int insertUser(User user) {        return baseMapper.insert( user );    }    // 改    @Override    public int updateUser(User user) {        return baseMapper.updateById( user );    }    // 删    @Override    public int deleteUser(User user) {        return baseMapper.deleteById( user.getUserId() );    }    // 查    @Override    public User findUserByName( String userName ) {        return baseMapper.getUserByName( userName );    }}
  • Controller类
@RestController@RequestMapping("/user")public class UserContorller {    @Autowired    private UserService userService;    // 增    @PostMapping( value = "/insert")    public Object insert( @RequestBody User user ) {        return userService.insertUser( user );    }    // 改    @PostMapping( value = "/update")    public Object update( @RequestBody User user ) {        return userService.updateUser( user );    }    // 删    @PostMapping( value = "/delete")    public Object delete( @RequestBody User user ) {        return userService.deleteUser( user );    }    // 查    @GetMapping( value = "/getUserByName")    public Object getUserByName( @RequestParam String userName ) {        return userService.findUserByName( userName );    }}

通过以上几个简单的步骤,我们就实现了 tbl_user表的增删改查,传统 MyBatis的 XML文件一个都不需要写!


实际实验【《乡爱》加持】

  • 启动项目

很牛批的 logo就会出现

Mybatis Plus Logo

接下来通过 Postman来发送增删改查的请求

  • 插入记录

通过 Postman随便插入几条记录 POST localhost:8089/user/insert

{"userId":3,"userName":"刘能","userAge":"58"}{"userId":4,"userName":"赵四","userAge":"58"}{"userId":5,"userName":"谢广坤","userAge":"58"}{"userId":6,"userName":"刘大脑袋","userAge":"58"}

发送插入请求

插入结果

  • 修改记录

修改记录时需要带用户ID,比如我们修改 赵四 那条记录的名字为 赵四(Zhao Four)

发送修改请求

修改结果

  • 删除记录

修改记录时同样需要带用户ID,比如删除ID=6 那条 刘大脑袋的记录

image.png

  • 查询记录(普通查询,下文讲分页查询)

比如,按照名字来查询:GET localhost:8089/user/getUserByName?userName=刘能


最关心的分页问题

  • 首先装配分页插件
@Beanpublic PaginationInterceptor paginationInterceptor() { return new PaginationInterceptor();}
  • Mapper类
public interface UserMapper extends BaseMapper<User> {    // 普通查询    User getUserByName( String userName );    // 分页查询    IPage<List<User>> getUsersPage( Page page, @Param("query") User user );}
  • Service类
@Service@AllArgsConstructorpublic class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {    // 查:普通查    @Override    public User findUserByName( String userName ) {        return baseMapper.getUserByName( userName );    }    // 分页查    @Override    public IPage getUserPage(Page page, User user) {        return baseMapper.getUsersPage( page, user );    }}
  • Controller类
@GetMapping( value = "/page")public Object getUserPage( Page page, User user ) { return userService.getUserPage( page, user );}

实际实验一下,我们分页查询 年龄 = 58 的多条记录:

分页查询结果

可以看到结果数据中,除了给到当前页数据,还把总记录条数,总页数等一并返回了,很是优雅呢 !


写在最后

由于能力有限,若有错误或者不当之处,还请大家批评指正,一起学习交流!



状态机在马蜂窝机票订单交易系统中的应用与优化实践

mumupudding阅读(13)

在设计交易系统时,稳定性、可扩展性、可维护性都是我们需要关注的重点。本文将对如何通过状态机在交易系统中的应用解决上述问题做出一些探讨。

 

关于马蜂窝机票订单交易系统

交易系统往往存在订单维度多、状态多、交易链路长、流程复杂等特点。以马蜂窝大交通业务中的机票交易为例,用户提交的一个订单除了机票信息之外可能还包含很多信息,比如保险或者其他附加产品。其中保险又分为很多类型,如航意险、航延险、组合险等。

从用户的维度看,一个订单是由购买的主产品机票和附加产品共同构成,支付的时候是作为一个整体去支付,而如果想要退票、退保也是可以部分操作的;从供应商的维度看,一个订单中的每个产品背后都有独立的供应商,机票有机票的供应商,保险有保险的供应商,每个供应商的订单都需要分开出票、独立结算。

用户的购买支付流程、供应商的出票出保流程,构成一个有机的整体穿插在机票交易系统中,密不可分。

 

状态机在机票交易系统中的应用与优化

有限状态机的概念

有限状态机(以下简称状态机)是一种用于对事物或者对象行为进行建模的工具。

状态机将复杂的逻辑简化为有限个稳定状态,构建在这些状态之间的转移和动作等行为的数学模型,在稳定状态中判断事件。

对状态机输入一个事件,状态机会根据当前状态和触发的事件唯一确定一个状态迁移。

图1:FSM工作原理

 

业务系统的本质就是描述真实的世界,因此几乎所有的业务系统中都会有状态机的影子。订单交易流程更是天然适合状态机模型的应用。

以用户支付流程为例,如果不使用状态机,在接收到支付成功回调时则需要执行一系列动作:查询支付流水号、记录支付时间、修改主订单状态为已支付、通知供应商去出票、记录通知出票时间、修改机票子订单状态为出票中…… 逻辑非常繁琐,而且代码耦合严重。

为了使交易系统的订单状态按照设计流程正确向下流转,比如当前用户已支付,不允许再支付;当前订单已经关单,不能再通知出票等等,我们通过应用状态机的方式来优化机票交易系统,将所有的状态、事件、动作都抽离出来,对复杂的状态迁移逻辑进行统一管理,来取代冗长的 if else 判断,使机票交易系统中的复杂问题得以解耦,变得直观、方便操作,使系统更加易于维护和管理。

状态机设计

在数据库设计层面,我们将整个订单整体作为一个主订单,把供应商的订单作为子订单。假设一个用户同时购买了机票和保险,因为机票、保险对应的是不同的供应商,也就是 1 个主订单  order  对应 2 个子订单 sub_order。其中主订单 order 记录用户的信息(UID、联系方式、订单总价格等),子订单 sub_order 记录产品类型、供应商订单号、结算价格等。

同时,我们把正向出票、逆向退票改签分开,抽成不同的子系统。这样每个子系统都是完全独立的,有利于系统的维护和拓展。

对于机票正向子系统而言,有两套状态机:主订单状态机负责管理 order 的状态,包括创单成功、支付成功、交易成功、订单关闭等;子订单状态机负责管理 sub_order 的状态,维护预订成功到出票的流程。同样,对于逆向退票和改签子系统,也会有各自的状态机。

图2:机票主订单状态机状态转移示例

 

框架选型

目前业界常用的状态机引擎框架主要有 Spring Statemachine、Stateless4j、Squirrel-Foundation 等。经过结合实际业务进行横向对比后,最终我们决定使用 Squirrel-Foundation,主要是因为:

  1. 代码量适中,扩展和维护相对而言比较容易;

  2. StateMachine 轻量,实例创建开销小;

  3. 切入点丰富,支持状态进入、状态完成、异常等节点的监听,使转换过程留有足够的切入点;

  4. 支持使用注解定义状态转移,使用方便;

  5. 从设计上不支持单例复用,只能随用随 New,因此状态机的本身的生命流管理很清晰,不会因为状态机单例复用的问题造成麻烦。 

MSM 的设计与实现

结合大交通业务逻辑,我们在 Squirrel-Foundation 的基础之上进行了 Action 概念的抽取和二次封装,将状态迁移、异步消息糅合到一起,封装成为 MSM 框架 (MFW State Machine),用来实现业务订单状态定义、事件定义和状态机定义,并用注解的形式来描述状态迁移。

我们认为一次状态迁移必然会伴随着异步消息,因此把一个流程中必须要成功的数据库操作放到一个事务中,把允许失败重试并且对实时度要求不高的操作放到异步消息消费的流程中。

以机票订单支付成功为例,机票订单支付成功时,会涉及修改订单状态为已支付、更新支付流水号等,这些是在一个事务中;而通知供应商出票,则是放在异步消息消费中处理。异步消息的实现使用的是 RocketMQ,主要考虑到 RocketMQ 支持二阶段提交,消息可靠性有保证,支持重试,支持多个 Consumer 组。

以下具体说明:

1. 对每个状态迁移需要执行的动作,都会抽取出一个Action 类,并且继承 AbstractAction,支持多个不同的状态迁移执行相同的动作。这里主要取决于 public List<ActionCondition> matchConditions() 的实现,因此只需要 matchConditions 返回多个初始状态-事件的匹配条件键值对就可以了。每个 Action 都有一个对应的继承 MFWContext 类的上下文类,用于在 process saveDB 等方法中的通信。

2. 注册所有的 Action,添加每个状态迁移执行完成或者执行失败的监听。

3. 由于依赖 RocketMQ 异步消息,所以需要一个 Spring Bean 去继承 BaseMessageSender,这个类会生成异步消息提供者。如果要使用二阶段提交,则需要一个类继承 BaseMsgTransactionListener,这里可以参考机票的 OrderChangeMessageSender 和 OrderChangeMsgTransactionListener。

4. 最后,实现一个事件触发器类。在这个类里面包含一个 Apply 方法,传入订单 PO 对象、事件、对应的上下文,每次执行都实例化出一个状态机实例,并初始化当前状态,并调用 Fire 方法。

5. 实例化一个状态机对象,设置当前状态为数据库对应的状态,调用 Fire 方法之后,最终会执行到 OrderStateMachine 类里面用注解描述的 callMethod 方法。我们配置的是 callMethod = “action”,它就会反射执行当前类的 Action 方法。

Action 方法我们的实现是通过 super.action(from, to, event, context),就会执行 MFWStateMachine 的 Action 方法,先去根据当前状态和事件获取对应的Action,这里使用到了「工厂模式」,然后执行 Process 方法。如果成功,会执行在 MFWStateMachine 类初始化的 TransitionCompleteListener,执行该 Action的 afterProcess 方法来修改数据库记录以及发送消息;如果失败,会执行TransitionExceptionListener,执行该 Action 的onException 方法来进行相应处理。

综上,MSM 可以根据 Action 类的声明和配置,来动态生成出 Squirrel-Foundation 的状态机定义,而不需要由使用方再去定义一次,使 MSM 的使用更方便。

图3: UML

 

趟过的坑

1. 事务不生效

最初我们使用 Spring 注解方式进行事务管理,即在 Action 类的数据库操作方法上加 @Transactional 注解,却发现在实践中不起作用。经过排查后发现, Spring 的事务注解是靠 AOP 切面实现的。在对象内部的方法中调用该对象其他使用 AOP 注解的方法,被调用方法的 AOP 注解会失效。因为同一个类的内部代码调用中,不会走代理类。后来我们通过手动开启事务的方式来解决此问题。

2. 匹配 Action  

最初我们匹配 Action 有两种方式:精准匹配及非精准匹配。精准匹配是指只有当某个状态迁移的初始状态和触发的事件一致时,才能匹配到 Action;非精准匹配是指只要触发的事件一致,就可以匹配到 Action。后来我们发现非精准匹配在某些情形下会出现问题,于是统一改成了多条件精准匹配。即在执行状态机触发时执行的 Action 方法时,去精准匹配 Action,多个状态迁移执行的方法可以匹配到同一个 Action,这样能够复用 Action 代码而不会出问题。 

3. 异步消息一致性 

有一些情况是绝不能出现的,比如修改数据库没成功即发出了消息;或是修改数据库成功了,而发送消息失败;或是在提交数据库事务之前,消息已经发送成功了。解决这个问题我们用到了 RocketMQ 的事务消息功能,它支持二阶段提交,会先发送一条预处理消息,然后回调执行本地事务,最终提交或者回滚,帮助保证修改数据库的信息和发送异步消息的一致。

4. 同一条订单数据并发执行不同事件 

在某些情况下,同一条订单数据可能会在同一时间(毫秒级)同时触发不同的事件。如机票主订单在待支付状态下,可以接收支付中心的回调,触发支付成功事件;也可以由用户点击取消订单,或者超时未支付定时任务来触发关单事件。如果不做任何控制的话,一个订单将可能出现既支付成功又会被取消。

我们用数据库乐观锁来规避这个问题:在执行修改数据库的事务时,update 订单的语句带有原状态的条件判断,通过判断更新行数是否为 1,来决定是否抛出异常,即生成这样的 SQL 语句:update order where order_id = ‘1234′ and order_status = ‘待支付’。

这样的话,如果两个事件同时触发同时执行,谁先把事务提交成功,谁就能执行成功;事务提交较晚的事件会因为更新行数为 0 而执行失败,最终回滚事务,就仿佛无事发生过一样。

使用悲观锁也可以解决这个问题,这种方式是谁先争抢到锁谁就可以成功执行。但考虑到可能会有脚本对数据库批量修改,悲观锁存在死锁的潜在问题,我们最终还是采用了乐观锁的方式。

 

总结

MSM 状态机的定义和声明在 Squirrel-Foundation 的基础之上,抽取出 Action 概念,并对 Action 类配置起始状态、目标状态、触发的事件、上下文定义等,使 MSM 可以根据 Action 类的声明和配置,来动态生成出 Squirrel-Foundation 的状态机定义,而不需要使用方再去定义一次,操作更简单,维护起来也更容易。 

通过使用状态机,机票订单交易系统的流程复杂性问题迎刃而解,系统在稳定性、可扩展性、可维护性等方面也得到了显著的改善和提升。

状态机的使用场景不仅仅局限于订单交易系统,其他一些涉及到状态变更的复杂流程的系统也同样适用。希望通过本文的介绍,能使有状态机了解和使用需求的读者朋友有所收获。

 

本文作者:董天,马蜂窝大交通研发团队机票交易系统研发工程师。

(马蜂窝技术原创内容,转载务必注明出处保存文末二维码图片,谢谢配合。)

 

关注马蜂窝技术,找到更多你想要的内容

如何画出一张合格的技术架构图?

mumupudding阅读(14)

阿里妹导读:技术传播的价值,不仅仅体现在通过商业化产品和开源项目来缩短我们构建应用的路径,加速业务的上线速率,也体现在优秀工程师在工作效率提升、产品性能优化和用户体验改善等经验方面的分享,以提高我们的专业能力。

接下来,阿里巴巴技术专家三画,将分享自己和团队在画好架构图方面的理念和经验,希望对你有所帮助。

当我们想用一张或几张图来描述我们的系统时,是不是经常遇到以下情况:

  • 对着画布无从下手、删了又来?
  • 如何用一张图描述我的系统,并且让产品、运营、开发都能看明白?
  • 画了一半的图还不清楚受众是谁?
  • 画出来的图到底是产品图功能图还是技术图又或是大杂烩?
  • 图上的框框有点少是不是要找点儿框框加进来?
  • 布局怎么画都不满意……

如果有同样的困惑,本文将介绍一种画图的方法论,来让架构图更清晰。

先厘清一些基础概念

1、什么是架构?

架构就是对系统中的实体以及实体之间的关系所进行的抽象描述,是一系列的决策。

架构是结构和愿景。

系统架构是概念的体现,是对物/信息的功能与形式元素之间的对应情况所做的分配,是对元素之间的关系以及元素同周边环境之间的关系所做的定义。

做好架构是个复杂的任务,也是个很大的话题,本篇就不做深入了。有了架构之后,就需要让干系人理解、遵循相关决策。

2、什么是架构图?

系统架构图是为了抽象地表示软件系统的整体轮廓和各个组件之间的相互关系和约束边界,以及软件系统的物理部署和软件系统的演进方向的整体视图。

3、架构图的作用

一图胜千言。要让干系人理解、遵循架构决策,就需要把架构信息传递出去。架构图就是一个很好的载体。那么,画架构图是为了:

  • 解决沟通障碍
  • 达成共识
  • 减少歧义

4、架构图分类

搜集了很多资料,分类有很多,有一种比较流行的是4+1视图,分别为场景视图、逻辑视图、物理视图、处理流程视图和开发视图。

场景视图

场景视图用于描述系统的参与者与功能用例间的关系,反映系统的最终需求和交互设计,通常由用例图表示。

逻辑视图

逻辑视图用于描述系统软件功能拆解后的组件关系,组件约束和边界,反映系统整体组成与系统如何构建的过程,通常由UML的组件图和类图来表示。

物理视图

物理视图用于描述系统软件到物理硬件的映射关系,反映出系统的组件是如何部署到一组可计算机器节点上,用于指导软件系统的部署实施过程。

处理流程视图

处理流程视图用于描述系统软件组件之间的通信时序,数据的输入输出,反映系统的功能流程与数据流程,通常由时序图和流程图表示。

开发视图

开发视图用于描述系统的模块划分和组成,以及细化到内部包的组成设计,服务于开发人员,反映系统开发实施过程。

以上 5 种架构视图从不同角度表示一个软件系统的不同特征,组合到一起作为架构蓝图描述系统架构。

怎样的架构图是好的架构图

上面的分类是前人的经验总结,图也是从网上摘来的,那么这些图画的好不好呢?是不是我们要依葫芦画瓢去画这样一些图?

先不去管这些图好不好,我们通过对这些图的分类以及作用,思考了一下,总结下来,我们认为,在画出一个好的架构图之前, 首先应该要明确其受众,再想清楚要给他们传递什么信息 ,所以,不要为了画一个物理视图去画物理视图,为了画一个逻辑视图去画逻辑视图,而应该根据受众的不同,传递的信息的不同,用图准确地表达出来,最后的图可能就是在这样一些分类里。那么,画出的图好不好的一个直接标准就是:受众有没有准确接收到想传递的信息。

明确这两点之后,从受众角度来说,一个好的架构图是不需要解释的,它应该是自描述的,并且要具备一致性和足够的准确性,能够与代码相呼应。

画架构图遇到的常见问题

1、方框代表什么?

为什么适用方框而不是圆形,它有什么特殊的含义吗?随意使用方框或者其它形状可能会引起混淆。

2、虚线、实线什么意思?箭头什么意思?颜色什么意思?

随意使用线条或者箭头可能会引起误会。

3、运行时与编译时冲突?层级冲突?

架构是一项复杂的工作,只使用单个图表来表示架构很容易造成莫名其妙的语义混乱。

本文推荐的画图方法

C4 模型使用容器(应用程序、数据存储、微服务等)、组件和代码来描述一个软件系统的静态结构。这几种图比较容易画,也给出了画图要点,但最关键的是,我们认为,它明确指出了每种图可能的受众以及意义。

下面的案例来自C4官网,然后加上了一些我们的理解,来看看如何更好的表达软件架构

1、语境图(System Context Diagram)

这是一个想象的待建设的互联网银行系统,它使用外部的大型机银行系统存取客户账户、交易信息,通过外部电邮系统给客户发邮件。可以看到,非常简单、清晰,相信不需要解释,都看的明白,里面包含了需要建设的系统本身,系统的客户,和这个系统有交互的周边系统。

用途

这样一个简单的图,可以告诉我们,要构建的系统是什么;它的用户是谁,谁会用它,它要如何融入已有的IT环境。这个图的受众可以是开发团队的内部人员、外部的技术或非技术人员。即:

  • 构建的系统是什么
  • 谁会用它
  • 如何融入已有的IT环境

怎么画

中间是自己的系统,周围是用户和其它与之相互作用的系统。这个图的关键就是梳理清楚待建设系统的用户和高层次的依赖,梳理清楚了画下来只需要几分钟时间。

2、容器图(Container Diagram)

容器图是把语境图里待建设的系统做了一个展开。

上图中,除了用户和外围系统,要建设的系统包括一个基于javaspring mvc的web应用提供系统的功能入口,基于xamarin架构的手机app提供手机端的功能入口,一个基于java的api应用提供服务,一个mysql数据库用于存储,各个应用之间的交互都在箭头线上写明了。

看这张图的时候,不会去关注到图中是直角方框还是圆角方框,不会关注是实线箭头还是虚线箭头,甚至箭头的指向也没有引起太多注意。

我们有许多的画图方式,都对框、线的含义做了定义,这就需要画图的人和看图的人都清晰的理解这些定义,才能读全图里的信息,而现实是,这往往是非常高的一个要求,所以,很多图只能看个大概的含义。

用途

这个图的受众可以是团队内部或外部的开发人员,也可以是运维人员。用途可以罗列为:

  • 展现了软件系统的整体形态
  • 体现了高层次的技术决策
  • 系统中的职责是如何分布的,容器间的是如何交互的
  • 告诉开发者在哪里写代码

怎么画

用一个框图来表示,内部可能包括名称、技术选择、职责,以及这些框图之间的交互,如果涉及外部系统,最好明确边界。

3、组件图(Component Diagram)

组件图是把某个容器进行展开,描述其内部的模块。

用途

这个图主要是给内部开发人员看的,怎么去做代码的组织和构建。其用途有:

  • 描述了系统由哪些组件/服务组成
  • 厘清了组件之间的关系和依赖
  • 为软件开发如何分解交付提供了框架

4、类图(Code/Class Diagram)

这个图很显然是给技术人员看的,比较常见,就不详细介绍了。

案例分享

下面是内部的一个实时数据工具的架构图。作为一个应该自描述的架构图,这里不多做解释了。如果有看不明白的,那肯定是还画的不够好。

画好架构图可能有许多方法论,本篇主要介绍了C4这种方法,C4的理论也是不断进化的。但不论是哪种画图方法论,我们回到画图初衷,更好的交流,我们在画的过程中不必被条条框框所限制。简而言之,画之前想好:画图给谁看,看什么,怎么样不解释就看懂。

作者简介:三画,阿里巴巴技术专家,梓敬、鹏升和余乐对此文亦有贡献。三画曾多年从事工作流引擎研发工作,现专注于高并发移动互联网应用的架构和开发,和本文贡献者均来自阿里巴巴零售通部门。目前零售通大量招Java开发,欢迎有志之士投简历到 lst-wireless@alibaba-inc.com,和我们一起共建智能分销网络,让百万小店拥抱DT时代。

作者:三画

原文链接

本文为云栖社区原创内容,未经允许不得转载。

揭秘京东区块链开源项目——JD Chain

mumupudding阅读(11)

导言

近日,京东区块链底层引擎JD Chain正式对外开源并同步上线开源社区,旨在为企业级用户和开发者提供开源服务,帮助他们提高研发效率,加速技术创新。3月30日,国家互联网信息办公室公布了第一批区块链信息服务名称及备案编号,其中京东区块链BaaS平台、京东区块链防伪追溯通用平台等榜上有名。4月9日,京东发布《京东区块链技术实践白皮书(2019)》,总结了京东区块链在五大类应用场景中的技术实践,介绍了一系列落地案例。同时,白皮书还介绍了京东区块链的技术优势、体系架构与未来规划。

白皮书指出,京东区块链的技术架构分为JD Chain和JD BaaS两部分。其中,JD Chain作为核心引擎,聚焦解决区块链底层的关键技术问题,建立拥有中国自主知识产权的技术生态。JD BaaS是企业级服务平台,提供灵活易用和可伸缩的区块链系统管理能力,支持企业级用户在公有云、私有云及混合云环境快速部署,降低企业使用成本,促进应用落地。

JD Chain简介

01 高性能安全,功能“积木化”

区块链是一种新型分布式架构,以密码学和分布式技术为核心,无需借助“第三方” 就能在多个业务方之间进行安全、可信、直接的信息和价值交换。在这种点对点的信息和价值的交换中,区块链起到了“协议”的作用。

JD Chain团队认为区块链的5大核心技术是:密码算法、共识协议、数据账本模型、数据存储、API。JD Chain在这5个方向上重点突破,从企业的实际需求出发,在设计上推进性能优化、操作简化、安全强化和场景适配通用化,形成如图1中的关键技术特性:

   图1  JD Chain关键技术特性

高性能:采用全新的底层架构设计,交易处理达到万级TPS,交易确认缩短至秒级,支持海量存储和高性能密码算法。

积木化定制:共识、账本、合约、存储各自独立,标准接口交互通信,可实现灵活切换不同的密码算法。

强安全和隐私保护:提供多种具有隐私保护能力的算法,支持包括国密算法在内的多套密码体系。

有效数据治理:数据账本采用标准化结构设计,支持业务数据穿透检索、多维分析治理,支持数据的独立备份、归档、监管和审计。

多链协同:支持业务的多链管理,链间数据验证与交易执行,链的拆分与合并,同时可组合轻量公链模式。

低成本易维护:支持轻量网关节点部署,数据可无需开发合约快速上链,合约代码可复用、升级、本地化测试。

02 搭建新环境,重塑主体“关系”

JD Chain为企业提供了一个全新的数据底层,企业可以根据需求配置所需功能组件。万级交易处理速度,秒级交易快速确认,支持多链协同管理等优异性能,能够帮助企业实现更有效的链上数据治理,同时兼容多密码体系,确保数据的安全与隐私。JD Chain为企业业务模式创新提供了一种新的技术支撑,使其能够重塑各参与主体关系,开辟信任经济商业新领地。

具体来说,JD Chain的功能层次分为4个部分:网关服务、共识服务、数据账本和工具包,架构体系如下图2。

 图2  JD Chain架构体系

网关服务:JD Chain的网关服务是应用的接入层,提供终端接入、私钥托管、安全隐私和协议转换等功能。

数据账本:数据账本为各参与方提供区块链底层服务功能,包括区块、账户、配置和存储等。

共识服务:共识服务是JD Chain的核心实现层,包括共识网络、身份管理、安全权限、交易处理、智能合约和数据检索等功能,来保证各节点间账本信息的一致性。

工具包:节点可以使用JD Chain中提供的工具包获取上述三个层级的功能服务,并响应相关应用和业务。工具包贯穿整个区块链系统,使用者只需调用特定的接口即可使用对应工具。工具包包括数据管理、开发包(SDK)、安装部署和服务监控等。

03 多模型选择,简部署“量身打造”

各企业的信息化基础设施、技术能力、应用场景往往千差万别,不同的情况下如何选择适合自身的部署方式,往往是每个企业都会面临的实际问题。

JD Chain从易用性方面考虑到实际应用规模提供了面向中小型企业和大型企业两种不同的部署方案。

中小型企业可以直接采用如下图3、4的最简部署模型(只需一个客户端节点、一个网关节点和多个共识节点即可),它是保障JD Chain可正常运行的最低配置,在硬件条件满足的情况下,可以支持亿级交易,通常用于Demo实验或小型应用。另外,JD Chain的数据服务功能作为可选组件,支持链上数据的检索、汇总等功能(数据服务组件与共识节点部署在相同或不同服务器均可)。

图3  最简部署模型


 图4  加入数据服务的最简部署模型

随着应用级别的提升,数据存储的需求越来越大,每个共识节点可采用数据库集群的方式实现存储的平行化扩展(在这种方式下可支持交易级别达到十亿乃至更多),如图5。在某些中型实际应用中,共识节点会由不同的业务方安装部署,将共识节点集群化提升了系统整体的安全性和可扩展性,如图6。


图5  数据库集群部署模型

图6  数据库、共识节点集群部署模型

面对大型企业应用中极其复杂的业务关系和应用场景,JD Chain提供了对应的部署解决方案。在整个部署模型中涉及到多种类型的参与方、不同类型的终端,这些终端可以从任意授权的网关节点采用不同的接入方式加入区块链网络,如图7。

图7  大型企业应用部署模型

京东区块链技术实践白皮书

01 品质溯源助力食品药品安全和精准扶贫

据不完全统计,全球范围内受假冒伪劣商品影响的市场规模高达3000亿美元,其中有关食品、药品安全事件频发,由此产生的信任危机受到社会高度关注,运用技术手段加以解决,成为了政府和企业关注的重点。

基于区块链技术的去中心化、共识机制、不可篡改、信息可追溯等特点,京东区块链防伪追溯平台推出了消费品解决方案和医药行业解决方案。截至今年2月,平台已经累计有超过700家品牌商和超过5万个SKU入驻,入驻品牌商包括雀巢、惠氏、洋河、伊利等知名企业。平台有逾280万次的售后用户访问,上链数据多达13亿条,产品种类涉及食品、酒类、奶粉、日用品和医药用品,为营造安心可靠的消费体验和医疗服务做出了贡献。

比如,澳大利亚领先肉类产品出口商安格斯通过与京东区块链防伪追溯平台的深度合作,让国内消费者能够通过扫描包装上的二维码,了解到从牛的出生、生长、检疫、屠宰、加工、运输等全部信息,期间每一个环节都有自动记录、每一个环节都不能被人为篡改、每一个环节都能公示给消费者,让每一片牛肉都安全可靠。

与此同时,京东区块链的防伪追溯技术还应用在了精准扶贫领域,京东在国家级贫困县落地的“跑步鸡”、“游水鸭”和“飞翔鸽”等项目,通过计步脚环等物联网设备,结合视频溯源技术,将家禽运动数据、喂食、饮水、除虫等信息进行采集,并记录到区块链网络中,消费者扫码即可了解到所购农产品的养殖过程、生长环境等图文信息,在提升消费体验的同时,也为贫困地区的农民增加了收入。

02 数字存证和信用网络 服务诚信体系建设

在数字存证方面,京东区块链数字存证平台实现了可信存证、自动化取证、一键举证、侵权预警等功能,目前已经应用于电子合同、电子发票、电子证照、电子票据、互联网诉讼、版权保护等场景。

近日,京东集团与广州互联网法院共同签署了可信电子证据平台和司法信用共治平台两方面的合作协议,双方将利用各自的专业经验与技术优势共同确保证据数据过程可溯、记录可查,实现证据数据存储安全、验证便捷,且共同遵循安全、公正、中立、开放原则,妥善保管证据数据。同时,双方还将在依照法律法规和用户授权的前提下,共享司法信用信息,为推动网络空间信用体系建设提供有效支持。

除此之外,“京小租”是业内首家使用区块链技术解决消费租赁市场纠纷取证难问题的信用租赁平台,用户在进行商品租赁时,京小租平台通过自动化流程获取租赁业务中租赁协议、订单数据、租赁流程等数据并完成“上链”操作,保证租赁服务的公开透明。

在信用网络方面,区块链技术的不可篡改性和透明性可以服务于社会信用体系的建设,解决以往信用体系的痛点,辅助监管机构实现对社会主体的信用评价。京东区块链正在运用技术手段在数字身份、企业通用账号、信用租赁、物流征信等方面,在为完善社会信用体系提供助力的同时,也为企业经营和个人生活提供了便利。

开发者社区同步上线

JD Chain已在近日对外开源并同步上线了开源社区(http://ledger.jd.com/)。JD Chain开源对于行业和开发者来说都具有重要意义。

开放JD Chain高质量的技术代码、简明清晰的设计文档和代码示例,将帮助开发者快速建立明确、有效的学习路径,快速进入区块链技术领域;

JD Chain开源能够帮助企业提高研发效率;

JD Chain开源将促进区块链技术应用生态的构建、加速助推我国区块链技术的发展。

 

·END·

经典分布式算法 —— 浅显易懂的 Raft 算法实现

mumupudding阅读(9)


一、Raft概念

copy一下其他小伙伴写的文章: Raft算法详解

不同于Paxos算法直接从分布式一致性问题出发推导出来,Raft算法则是从多副本状态机的角度提出,用于管理多副本状态机的日志复制。Raft实现了和Paxos相同的功能,它将一致性分解为多个子问题:Leader选举(Leader election)、日志同步(Log replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等。同时,Raft算法使用了更强的假设来减少了需要考虑的状态,使之变的易于理解和实现。

Raft将系统中的角色分为领导者(Leader)、跟从者(Follower)和候选人(Candidate):

  • Leader:接受客户端请求,并向Follower同步请求日志,当日志同步到大多数节点上后告诉Follower提交日志。
  • Follower:接受并持久化Leader同步的日志,在Leader告之日志可以提交之后,提交日志。
  • Candidate:Leader选举过程中的临时角色。

本文不过多赘述 raft 算法是个什么东西… 这里再贴一个十分好理解的文章:The Raft Consensus Algorithm


二、系统初步设计

在对raft有一定理解后,我们简单梳理一下在raft选举过程中,我们需要的一些角色,以及角色的司职。

首先我们需要一个选举控制类,单例实现即可,节点的选举全权交给此选举控制类的实现,我们称其为 ElectOperator。

先讲一个 raft 中重要的概念:世代,也称为 epoch,但在这篇文章,将其称为 generation(不要纠结这个 = =)。 世代可以认为是一个标记当前发送的操作是否有效的标识,如果收到了小于本节点世代的请求,则可无视其内容,如果收到了大于本世代的请求,则需要更新本节点世代,并重置自己的身份,变为 Follower,类似于乐观锁的设计理念。

我们知道,raft中一共有三种角色:Follower、Candidate、Leader

(1)Follower

Follower 需要做什么呢:

  • 接收心跳
  • Follower 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
  • 接收拉票请求,并返回自己的投票

好的,Follower非常简单,只需要做三件事即可。

(2)Candidate

Candidate 扮演什么样的职能呢:

  • 接收心跳
  • Candidate 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
  • 接收拉票请求,并返回自己的投票
  • 向集群中的其他节点发起拉票请求
  • 当收到的投票大于半数( n/2 + 1, n为集群内的节点数量),转变为 Leader

Candidate 比起 Follower 稍微复杂一些,但前三件事情都是一样的。

(3)Leader

Leader 在选举过程中扮演的角色最为简单:

  • 接收心跳
  • 向集群内所有节点发送心跳

Leader 也是可以接收心跳的,当收到大于当前世代的心跳或请求后,Leader 需要转变为 Follower。Leader 不可能收到同世代的心跳请求,因为 (1) 在 raft 算法中,同一世代中,节点仅对同一个节点进行投票。(2) 需要收到过半投票才可以转变为 Leader。


三、系统初步实现

简单贴一下选举控制器需要的一些属性代码,下面的注释都说的很清楚了,其中需要补充的一点是定时任务使用了时间轮来实现,不理解没有关系…就是个定时任务,定时任务的一个引用放在 Map<TaskEnum, TimedTask> taskMap; 中,便于取消任务。

public class ElectOperator extends ReentrantLocker implements Runnable {    // 成为 Candidate 的退避时间(真实退避时间需要 randomized to be between 150ms and 300ms )    private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs();    // 心跳间隔    private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs();    /**     * 该投票箱的世代信息,如果一直进行选举,一直能达到 {@link #ELECTION_TIMEOUT_MS},而选不出 Leader ,也需要15年,generation才会不够用,如果     * generation 的初始值设置为 Long.Min (现在是0,则可以撑30年,所以完全呆胶布)     */    private long generation;    /**     * 当前节点的角色     */    private NodeRole nodeRole;    /**     * 所有正在跑的定时任务     */    private Map<TaskEnum, TimedTask> taskMap;    /**     * 投票箱     */    private Map<String/* serverName */, Boolean> box;    /**     * 投票给了谁的投票记录     */    private Votes voteRecord;    /**     * 缓存一份集群信息,因为集群信息是可能变化的,我们要保证在一次选举中,集群信息是不变的     */    private List<HanabiNode> clusters;    /**     * 心跳内容     */    private HeartBeat heartBeat;    /**     * 现在集群的leader是哪个节点     */    private String leaderServerName;    private volatile static ElectOperator INSTANCE;    public static ElectOperator getInstance() {        if (INSTANCE == null) {            synchronized (ElectOperator.class) {                if (INSTANCE == null) {                    INSTANCE = new ElectOperator();                    ElectControllerPool.execute(INSTANCE);                }            }        }        return INSTANCE;    }

另外,上面罗列的这些值大都是需要在更新世代时重置的,我们先拟定一下更新世代的逻辑,通用的来讲,就是清除投票记录,清除自己的投票箱,更新自己的世代,身份变更为 Follower 等等,我们将这个方法称为 init。

    /**     * 初始化     *     * 1、成为follower     * 2、先取消所有的定时任务     * 3、重置本地变量     * 4、新增成为Candidate的定时任务     */    private boolean init(long generation, String reason) {        return this.lockSupplier(() -> {            if (generation > this.generation) {// 如果有选票的世代已经大于当前世代,那么重置投票箱                logger.debug("初始化投票箱,原因:{}", reason);                // 1、成为follower                this.becomeFollower();                // 2、先取消所有的定时任务                this.cancelAllTask();                // 3、重置本地变量                logger.debug("更新世代:旧世代 {} => 新世代 {}", this.generation, generation);                this.generation = generation;                this.voteRecord = null;                this.box = new HashMap<>();                this.leaderServerName = null;                // 4、新增成为Candidate的定时任务                this.becomeCandidateAndBeginElectTask(this.generation);                return true;            } else {                return false;            }        });    }

(1) Follower的实现

基于上面的分析,我们可以归纳一下 Follower 需要一些什么样的方法:

1、转变为 Candidate 的定时任务

实际上就是 ELECTION_TIMEOUT_MS (randomized to be between 150ms and 300ms) 后,如果没收到 Leader 的心跳,或者自己变为 Candidate 后,在这个时间内没有成功上位,则继续转变为 Candidate。

为什么我们成为 Candidate 的退避时间需要随机 150ms – 300ms呢?这是为了避免所有节点的选举发起发生碰撞,如果说都是相同的退避时间,每个节点又会优先投自己一票,那么这个集群系统就会陷入无限发起投票,但又无法成为 Leader 的局面。

简而言之就是我们需要提供一个可刷新的定时任务,如果在一定时间内没刷新这个任务,则节点转变为 Candidate,并发起选举,代码如下。首先取消之前的 becomeCandidate 定时定时任务,然后设定在 electionTimeout 后调用 beginElect(generation) 方法。

   /**     * 成为候选者的任务,(重复调用则会取消之前的任务,收到来自leader的心跳包,就可以重置一下这个任务)     *     * 没加锁,因为这个任务需要频繁被调用,只要收到leader来的消息就可以调用一下     */    private void becomeCandidateAndBeginElectTask(long generation) {        this.lockSupplier(() -> {            this.cancelCandidateAndBeginElectTask("正在重置发起下一轮选举的退避时间");            // The election timeout is randomized to be between 150ms and 300ms.            long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat());            TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));            Timer.getInstance()                 .addTask(timedTask);            taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask);            return null;        });    }
2、接收心跳与心跳回复

接收心跳十分简单,如果当前心跳大于等于当前世代,且还未认定某个节点为 Leader,则取消所有定时任务,成为Follower,并记录心跳包中 Leader 节点的信息,最后重置一下成为候选者的任务。

如果已经成为某个 Leader 的 Follower,则直接成为候选者的任务即可。

另外一个要注意的是,needToSendHeartBeatInfection,是否需要发送心跳感染包,当收到低世代 Leader 的心跳时,如果当前集群已经选出 Leader ,则回复此心跳包,告诉旧 Leader,现在已经是新世代了!(代码中没有展现,其实就是再次封装一个心跳包,带上世代信息和 Leader 节点信息,回复给 Leader 即可)

    public void receiveHeatBeat(String leaderServerName, long generation, String msg) {       return this.lockSupplier(() -> {     boolean needToSendHeartBeatInfection = true;            // 世代大于当前世代            if (generation >= this.generation) {                needToSendHeartBeatInfection = false;                if (this.leaderServerName == null) {                                        logger.info("集群中,节点 {} 已经成功在世代 {} 上位成为 Leader,本节点将成为 Follower,直到与 Leader 的网络通讯出现问题", leaderServerName, generation);                    // 取消所有任务                    this.cancelAllTask();                    // 成为follower                    this.becomeFollower();                    // 将那个节点设为leader节点                    this.leaderServerName = leaderServerName;                }                // 重置成为候选者任务                this.becomeCandidateAndBeginElectTask(this.generation);            }            return null;        });    }
3、接收拉票请求与回复投票

我们知道,raft 在一个世代只能投票给一个节点,且发起投票者会首先投票给自己。所以逻辑就很简单了,只有当世代大于等于当前,且还未投票时,则拉票请求成功,返回true即可,否则都视为失败,返回false。

    /**     * 某个节点来请求本节点给他投票了,只有当世代大于当前世代,才有投票一说,其他情况都是失败的     *     * 返回结果     *     * 为true代表接受投票成功。     * 为false代表已经给其他节点投过票了,     */    public VotesResponse receiveVotes(Votes votes) {        return this.lockSupplier(() -> {            logger.debug("收到节点 {} 的投票请求,其世代为 {}", votes.getServerName(), votes.getGeneration());            String cause = "";            if (votes.getGeneration() < this.generation) {                cause = String.format("投票请求 %s 世代小于当前世代 %s", votes.getGeneration(), this.generation);            } else if (this.voteRecord != null) {                cause = String.format("在世代 %s,本节点已投票给 => %s 节点", this.generation, this.voteRecord.getServerName());            } else {                this.voteRecord = votes; // 代表投票成功了            }            boolean result = votes.equals(this.voteRecord);            if (result) {                logger.debug("投票记录更新成功:在世代 {},本节点投票给 => {} 节点", this.generation, this.voteRecord.getServerName());            } else {                logger.debug("投票记录更新失败:原因:{}", cause);            }            String serverName = InetSocketAddressConfigHelper.getServerName();            return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration());        });    }

(2) Candidate的实现

可以看出 Follower 十分简单, Candidate 在 Follower 的基础上增加了发起选举的拉票请求,与接收投票,并上位成为Leader两个功能,实际上也十分简单。

1、发起拉票请求

回顾一下前面的转变成 Candidate 的定时任务,定时任务实际上就是调用一个方法

TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));

这个 beginElect 就是转变为 Candidate 并发起选举的实现。让我们先想想需要做什么,首先肯定是

  1. 更新一下自己的世代,因为已经长时间没收到 Leader 的心跳包了,我们需要自立门户。
  2. 给自己投一票
  3. 要求其他节点给自己投票

分析到这里就很明了了。下面首先执行 updateGeneration 方法,实际上就是执行前面所说的 init 方法,传入 generation + 1 的世代,重置一下上个世代各种保存的状态;然后调用 becomeCandidate,实际上就是切换一下身份,将 Follower 或者 Candidate 切换为 Candidate;给自己的 voteRecord 投一票,最后带上自己的节点标识和世代信息,去拉票。

    /**     * 开始进行选举     *     * 1、首先更新一下世代信息,重置投票箱和投票记录     * 2、成为候选者     * 3、给自己投一票     * 4、请求其他节点,要求其他节点给自己投票     */    private void beginElect(long generation) {        this.lockSupplier(() -> {            if (this.generation != generation) {// 存在这么一种情况,虽然取消了选举任务,但是选举任务还是被执行了,所以这里要多做一重处理,避免上个周期的任务被执行                return null;            }            logger.info("Election Timeout 到期,可能期间内未收到来自 Leader 的心跳包或上一轮选举没有在期间内选出 Leader,故本节点即将发起选举");            updateGeneration("本节点发起了选举");// this.generation ++            // 成为候选者            logger.info("本节点正式开始世代 {} 的选举", this.generation);            if (this.becomeCandidate()) {                VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation);                // 给自己投票箱投票                this.receiveVotesResponse(votes);                // 记录一下,自己给自己投了票                this.voteRecord = votes;                // 让其他节点给自己投一票                this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0);            }            return null;        });    }
2、接收投票,并成为 Leader

如果说在 150ms and 300ms 之间,本节点收到了过半投票,则可上位成 Leader,否则定时任务会再次调用 beginElect,再次更新本节点世代,然后发起新一轮选举。

接收投票其实十分简单,回忆一下前面接收拉票请求与回复投票,实际上就是拉票成功,就返回true,否则返回flase。

我们每次都判断一下是否拿到过半的票数,如果拿到,则成为 Leader,另外有一个值得注意的是,为了加快集群恢复可用的进程,类似于心跳感染(如果心跳发到Leader那里去了,Leader会告诉本节点,它才是真正的Leader),投票也存在投票感染,下面的代码由 votesResponse.isFromLeaderNode() 来表示。

投票的记录也是十分简单,就是把每个投票记录扔到 Map<String/* serverName */, Boolean> box; 里,true 表示同意投给本节点,flase 则不同意,如果同意达到半数以上,则调用 becomeLeader 成为本世代 Leader。

    /**     * 给当前节点的投票箱投票     */    public void receiveVotesResponse(VotesResponse votesResponse) {        this.lockSupplier(() -> {            if (votesResponse.isFromLeaderNode()) {                logger.info("来自节点 {} 的投票应答表明其身份为 Leader,本轮拉票结束。", votesResponse.getServerName());                this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(),                    String.format("收到来自 Leader 节点的投票应答,自动将其视为来自 Leader %s 世代 %s 节点的心跳包", heartBeat.getServerName(), votesResponse.getGeneration()));            }            if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果选票的世代小于当前世代,投票无效                logger.info("来自节点 {} 的投票应答世代是以前世代 {} 的选票,选票无效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration());                return null;            }            if (votesResponse.isAgreed()) {                if (!voteSelf) {                    logger.info("来自节点 {} 的投票应答有效,投票箱 + 1", votesResponse.getServerName());                }                // 记录一下投票结果                box.put(votesResponse.getServerName(), votesResponse.isAgreed());                List<HanabiNode> hanabiNodeList = this.clusters;                int clusterSize = hanabiNodeList.size();                int votesNeed = clusterSize / 2 + 1;                long voteCount = box.values()                                    .stream()                                    .filter(aBoolean -> aBoolean)                                    .count();                logger.info("集群中共 {} 个节点,本节点当前投票箱进度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed);                // 如果获得的选票已经大于了集群数量的一半以上,则成为leader                if (voteCount == votesNeed) {                    logger.info("选票过半,准备上位成为 leader 节点", votesResponse.getServerName());                    this.becomeLeader();                }            } else {                logger.info("节点 {} 在世代 {} 的投票应答为:拒绝给本节点在世代 {} 的选举投票(当前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation);                // 记录一下投票结果                box.put(votesResponse.getServerName(), votesResponse.isAgreed());            }            return null;        });    }

(3) Leader 的实现

作为 Leader,在 raft 中的实现却是最简单的,我们只需要给子节点发心跳包即可。然后如果收到大于自己世代的心跳感染,则成为新世代的 Follower,接收心跳的逻辑和 Follower 没有区别。

    /**     * 当选票大于一半以上时调用这个方法,如何去成为一个leader     */    private void becomeLeader() {        this.lockSupplier(() -> {            long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime;            this.beginElectTime = 0L;            logger.info("本节点 {} 在世代 {} 角色由 {} 变更为 {} 选举耗时 {} ms,并开始向其他节点发送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(),                becomeLeaderCostTime);            this.nodeRole = NodeRole.Leader;            this.cancelAllTask();            this.heartBeatTask();            this.leaderServerName = InetSocketAddressConfigHelper.getServerName();            return null;        });    }

四、运行我们的 raft!

看到这里,不用怀疑.. 一个 raft 算法已经实现了。至于一些细枝末节的东西,我相信大家都能处理好的.. 比如如何给其他节点发送各种包,包怎么去定义之类的,都和 raft 本身没什么关系。

一般来说,在集群可用后,我们就可以让 Follower 连接 Leader 的业务端口,开始真正的业务了。 raft作为一个能快速选主的分布式算法,一次选主基本只需要一次 RTT(Round-Trip Time)时间即可,非常迅速。

运行一下我们的项目,简单测试,我们只用三台机子,想测试多台机子可以自己去玩玩…我们可以看到就像 zookeeper,我们需要配置两个端口,前一个作为选举端口,后一个则作为业务端口。

本文章只讲了怎么选举,后面的端口可以无视,但是必填…

依次启动 hanabi.1,hanabi.2,hanabi.3

很快,我们就能看到 hanabi.1 成为了世代28的 Leader,第一次选举耗时久是因为启动的时候有各种初始化 = =

此时,我们关闭 hanabi.1,因为集群还有2台机器,它们之间完全可以选出新的 Leader,我们关闭 hanabi.1 试试。观察 hanabi.3,我们发现,很快,hanabi.3 就发现 Leader 已经挂掉,并发起了世代 29 的选举。

在世代29中,仅存的 hanabi.2 拒绝为本节点投票,所以在 ELECTION_TIMEOUT_MS 到期后,hanabi.3 再次发起了选举,此次选举成功,因为 hanabi.2 还未到达 ELECTION_TIMEOUT_MS,所以还在世代 28,收到了世代 29 的拉票请求后,hanabi.2 节点将自己的票投给了 hanabi.3,hanabi.3 成功上位。

本项目github地址 : 基于raft算法实现的分布式kv存储框架 (项目实际上还有日志写入,日志提交,日志同步等功能,直接无视它…还没写完 = =)

mysql8+mybatis-plus3.1自动生成带lombok和swagger和增删改查接口

mumupudding阅读(15)

mybatis-dsc-generator

完美集成lombok,swagger的代码生成工具,让你不再为繁琐的注释和简单的接口实现而烦恼:entity集成,格式校验,swagger; dao自动加@ mapper,service自动注释和依赖; 控制器实现单表的增副改查,并实现swaggers的api文档。

源码地址

MAVEN地址

2.1.0版本是未集成Mybatis-plus版本——源码分支master

<dependency>
    <groupId>com.github.flying-cattle</groupId>
    <artifactId>mybatis-dsc-generator</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

 3.0.0版本是集成了Mybatis-plus版本——源码分支mybatisPlus

<dependency>
    <groupId>com.github.flying-cattle</groupId>
    <artifactId>mybatis-dsc-generator</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

数据表结构样式

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `login_name` varchar(40) DEFAULT NULL COMMENT '登录名',
  `password` varchar(100) NOT NULL COMMENT '秘密',
  `nickname` varchar(50) NOT NULL COMMENT '昵称',
  `type` int(10) unsigned DEFAULT NULL COMMENT '类型',
  `state` int(10) unsigned NOT NULL DEFAULT '1' COMMENT '状态:-1失败,0等待,1成功',
  `note` varchar(255) DEFAULT NULL COMMENT '备注',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `update_uid` bigint(20) DEFAULT '0' COMMENT '修改人用户ID',
  `login_ip` varchar(50) DEFAULT NULL COMMENT '登录IP地址',
  `login_addr` varchar(100) DEFAULT NULL COMMENT '登录地址',
  PRIMARY KEY (`id`),
  UNIQUE KEY `login_name` (`login_name`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

要求必须有表注释,要求必须有主键为id,所有字段必须有注释(便于生成java注释swagger等)。

生成的实体类

生成方法参考源码中的:https://github.com/flying-cattle/mybatis-dsc-generator/blob/mybatisPlus/src/main/java/com/github/mybatis/fl/test/TestMain.java

执行结果

实体类

/**
 * @filename:Order 2018年7月5日
 * @project deal-center  V1.0
 * Copyright(c) 2018 BianP Co. Ltd. 
 * All right reserved. 
 */
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.util.Date;
import org.springframework.format.annotation.DateTimeFormat;
import java.io.Serializable;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户实体类</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng    V1.0         initialize
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class User extends Model<User> {

    private static final long serialVersionUID = 1L;
    @TableId(value = "id", type = IdType.AUTO)
    @ApiModelProperty(name = "id" , value = "用户ID")
    private Long id;

    @ApiModelProperty(name = "loginName" , value = "登录账户")
    private String loginName;

    @ApiModelProperty(name = "password" , value = "登录密码")
    private String password;

    @ApiModelProperty(name = "nickname" , value = "用户昵称")
    private String nickname;

    @ApiModelProperty(name = "type" , value = "用户类型")
    private Integer type;

    @ApiModelProperty(name = "state" , value = "用户状态")
    private Integer state;

    @ApiModelProperty(name = "note" , value = "备注")
    private String note;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    @ApiModelProperty(name = "createTime" , value = "用户创建时间")
    private Date createTime;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    @ApiModelProperty(name = "updateTime" , value = "修改时间")
    private Date updateTime;
 
    @ApiModelProperty(name = "updateUid" , value = "修改人用户ID")
    private Long updateUid;

    @ApiModelProperty(name = "loginIp" , value = "登录IP")
    private String loginIp;

    @ApiModelProperty(name = "loginIp" , value = "登录地址")
    private String loginAddr;
 
    @Override
    protected Serializable pkVal() {
        return this.id;
    }
}

DAO

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import com.xin.usercenter.entity.User;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户数据访问层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng         V1.0            initialize
 */
@Mapper
public interface UserDao extends BaseMapper<User> {
 
}

生成的XML

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xin.usercenter.dao.UserDao">

 <resultMap id="BaseResultMap" type="com.xin.usercenter.entity.User">
  <id column="id" property="id" />
  <id column="login_name" property="loginName" />
  <id column="password" property="password" />
  <id column="nickname" property="nickname" />
  <id column="type" property="type" />
  <id column="state" property="state" />
  <id column="note" property="note" />
  <id column="create_time" property="createTime" />
  <id column="update_time" property="updateTime" />
  <id column="update_uid" property="updateUid" />
  <id column="login_ip" property="loginIp" />
  <id column="login_addr" property="loginAddr" />
 </resultMap>
 <sql id="Base_Column_List">
  id, login_name, password, nickname, type, state, note, create_time, update_time, update_uid, login_ip, login_addr
 </sql>
</mapper>

生成的SERVICE

import com.xin.usercenter.entity.User;
import com.baomidou.mybatisplus.extension.service.IService;
/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version        Description
 *------------------------------------------------------------*
 * 2019年4月9日      BianPeng        V1.0           initialize
 */
public interface UserService extends IService<User> {
 
}

生成的SERVICE_IMPL

import com.xin.usercenter.entity.User;
import com.xin.usercenter.dao.UserDao;
import com.xin.usercenter.service.UserService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务实现层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version        Description
 *------------------------------------------------------------*
 * 2019年4月9日      BianPeng        V1.0           initialize
 */
@Service
public class UserServiceImpl  extends ServiceImpl<UserDao, User> implements UserService  {
 
}

生成的CONTROLLER

import com.item.util.JsonResult;
import com.xin.usercenter.entity.User;
import com.xin.usercenter.service.UserService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户API接口层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng    V1.0           initialize
 */
@Api(description = "用户",value="用户" )
@RestController
@RequestMapping("/user")
public class UserController {

    Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Autowired
    public UserService userServiceImpl;
 
    /**
    * @explain 查询用户对象  <swagger GET请求>
    * @param   对象参数:id
    * @return  user
    * @author  BianPeng
    * @time    2019年4月9日
    */
    @GetMapping("/getUserById/{id}")
    @ApiOperation(value = "获取用户信息", notes = "获取用户信息[user],作者:BianPeng")
    @ApiImplicitParam(paramType="path", name = "id", value = "用户id", required = true, dataType = "Long")
    public JsonResult<User> getUserById(@PathVariable("id")Long id){
     JsonResult<User> result=new JsonResult<User>();
     try {
      User user=userServiceImpl.getById(id);
      if (user!=null) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(user);
      } else {
       logger.error("获取用户失败ID:"+id);
       result.setType("fail");
       result.setMessage("你获取的用户不存在");
      }
     } catch (Exception e) {
      logger.error("获取用户执行异常:"+e.getMessage());
      result=new JsonResult<User>(e);
     }
     return result;
    }
    /**
     * @explain 添加或者更新用户对象
     * @param   对象参数:user
     * @return  int
     * @author  BianPeng
     * @time    2019年4月9日
     */
    @PostMapping("/insertSelective")
    @ApiOperation(value = "添加用户", notes = "添加用户[user],作者:BianPeng")
    public JsonResult<User> insertSelective(User user){
     JsonResult<User> result=new JsonResult<User>();
     try {
      boolean rg=userServiceImpl.saveOrUpdate(user);
      if (rg) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(user);
      } else {
       logger.error("添加用户执行失败:"+user.toString());
       result.setType("fail");
       result.setMessage("执行失败,请稍后重试");
      }
     } catch (Exception e) {
      logger.error("添加用户执行异常:"+e.getMessage());
      result=new JsonResult<User>(e);
     }
       return result;
    }
 
    /**
     * @explain 删除用户对象
     * @param   对象参数:id
     * @return  int
     * @author  BianPeng
     * @time    2019年4月9日
     */
    @PostMapping("/deleteByPrimaryKey")
    @ApiOperation(value = "删除用户", notes = "删除用户,作者:BianPeng")
    @ApiImplicitParam(paramType="query", name = "id", value = "用户id", required = true, dataType = "Long")
    public JsonResult<Object> deleteByPrimaryKey(Long id){
     JsonResult<Object> result=new JsonResult<Object>();
     try {
      boolean reg=userServiceImpl.removeById(id);
      if (reg) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(id);
      } else {
       logger.error("删除用户失败ID:"+id);
       result.setType("fail");
       result.setMessage("执行错误,请稍后重试");
      }
     } catch (Exception e) {
      logger.error("删除用户执行异常:"+e.getMessage());
      result=new JsonResult<Object>(e);
     }
     return result;
 }
 
 /**
  * @explain 分页条件查询用户   
  * @param   对象参数:AppPage<User>
  * @return  PageInfo<User>
  * @author  BianPeng
  * @time    2019年4月9日
  */
 @GetMapping("/getUserPages")
 @ApiOperation(value = "分页查询", notes = "分页查询返回对象[IPage<User>],作者:边鹏")
 @ApiImplicitParams({
        @ApiImplicitParam(paramType="query", name = "pageNum", value = "当前页", required = true, dataType = "int"),
        @ApiImplicitParam(paramType="query", name = "pageSize", value = "页行数", required = true, dataType = "int")
    })
 public JsonResult<Object> getUserPages(Integer pageNum,Integer pageSize){
 
  JsonResult<Object> result=new JsonResult<Object>();
  Page<User> page=new Page<User>(pageNum,pageSize);
  QueryWrapper<User> queryWrapper =new QueryWrapper<User>();
  //分页数据
  try {
   //List<User> list=userServiceImpl.list(queryWrapper); 
   IPage<User> pageInfo=userServiceImpl.page(page, queryWrapper);
   result.setType("success");
   result.setMessage("成功");
   result.setData(pageInfo);
  } catch (Exception e) {
   logger.error("分页查询用户执行异常:"+e.getMessage());
   result=new JsonResult<Object>(e);
  }
  return result;
 }
}

生成完毕,控制器中的JsonResult

import java.io.Serializable;
import java.net.ConnectException;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date         Author         Version         Description
 *---------------------------------------------------------*
 * 2019/4/9  flying-cattle  V1.0            initialize
 */
public class JsonResult<T> implements Serializable{
 
 Logger logger = LoggerFactory.getLogger(this.getClass());
 private static final long serialVersionUID = 1071681926787951549L;

 /**
     * <p>返回状态</p>
     */
    private Boolean isTrue=true;
    /**
     *<p> 状态码</p>
     */
    private String code;
    /**
     * <p>业务码</p>
     */
    private String type;
    /**
     *<p> 状态说明</p>
     */
    private String message;
    /**
     * <p>返回数据</p>
     */
    private T data;
    public Boolean getTrue() {
        return isTrue;
    }
    public void setTrue(Boolean aTrue) {
        isTrue = aTrue;
    }
    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    public T getData() {
        return data;
    }
    public void setData(T data) {
        this.data = data;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    /**
     * <p>返回成功</p>
     * @param type 业务码
     * @param message 错误说明
     * @param data 数据
     */
    public JsonResult(String type, String message, T data) {
        this.isTrue=true;
        this.code ="0000";
        this.type=type;
        this.message = message;
        this.data=data;
    }
    public JsonResult() {
        this.isTrue=true;
        this.code ="0000";
    }
    public JsonResult(Throwable throwable) {
     logger.error(throwable+"tt");
        this.isTrue=false;
        if(throwable instanceof NullPointerException){
            this.code= "1001";
            this.message="空指针:"+throwable;
        }else if(throwable instanceof ClassCastException ){
            this.code= "1002";
            this.message="类型强制转换异常:"+throwable;
        }else if(throwable instanceof ConnectException){
            this.code= "1003";
            this.message="链接失败:"+throwable;
        }else if(throwable instanceof IllegalArgumentException ){
            this.code= "1004";
            this.message="传递非法参数异常:"+throwable;
        }else if(throwable instanceof NumberFormatException){
            this.code= "1005";
            this.message="数字格式异常:"+throwable;
        }else if(throwable instanceof IndexOutOfBoundsException){
            this.code= "1006";
            this.message="下标越界异常:"+throwable;
        }else if(throwable instanceof SecurityException){
            this.code= "1007";
            this.message="安全异常:"+throwable;
        }else if(throwable instanceof SQLException){
            this.code= "1008";
            this.message="数据库异常:"+throwable;
        }else if(throwable instanceof ArithmeticException){
            this.code= "1009";
            this.message="算术运算异常:"+throwable;
        }else if(throwable instanceof RuntimeException){
            this.code= "1010";
            this.message="运行时异常:"+throwable;
        }else if(throwable instanceof Exception){ 
         logger.error("未知异常:"+throwable);
            this.code= "9999";
            this.message="未知异常"+throwable;
        }
    }
}

如果你生成的分页的方法不能分页:根据官方提升,记得在启动类中加入

@Bean
public PaginationInterceptor paginationInterceptor() {
    return new PaginationInterceptor();
}

 

并行化-你的高并发大杀器

mumupudding阅读(9)

1.前言

想必热爱游戏的同学小时候,都幻想过要是自己要是能像鸣人那样会多重影分身之术,就能一边打游戏一边上课了,可惜漫画就是漫画,现实中并没有这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。虽然在现实中我们无法实现多重影分身这样的技术,但是我们可以在计算机世界中实现我们这样的愿望。

2.计算机中的分身术

计算机中的分身术不是天生就有了。在1971年,1971年,英特尔推出的全球第一颗通用型微处理器4004,由2300个晶体管构成。当时,公司的联合创始人之一戈登摩尔就提出大名鼎鼎的“摩尔定律”——每过18个月,芯片上可以集成的晶体管数目将增加一倍。最初的主频740kHz(每秒运行74万次),现在过了快50年了,大家去买电脑的时候会发现现在的主频都能达到4.0GHZ了(每秒40亿次)。但是主频越高带来的收益却是越来越小:

  • 据测算,主频每增加1G,功耗将上升25瓦,而在芯片功耗超过150瓦后,现有的风冷散热系统将无法满足散热的需要。有部分CPU都可以用来煎鸡蛋了。
  • 流水线过长,使得单位频率效能低下,越大的主频其实整体性能反而不如小的主频。
  • 戈登摩尔认为摩尔定律未来10-20年会失效。

在单核主频遇到瓶颈的情况下,多核CPU应运而生,不仅提升了性能,并且降低了功耗。所以多核CPU逐渐成为现在市场的主流,这样让我们的多线程编程也更加的容易。

说到了多核CPU就一定要说GPU,大家可能对这个比较陌生,但是一说到显卡就肯定不陌生,笔者搞过一段时间的CUDA编程,我才意识到这个才是真正的并行计算,大家都知道图片像素点吧,比如19201080的图片有210万个像素点,如果想要把一张图片的每个像素点都进行转换一下,那在我们java里面可能就要循环遍历210万次。就算我们用多线程8核CPU,那也得循环几十万次。但是如果使用Cuda,最多可以365535*512=100661760(一亿)个线程并行执行,就这种级别的图片那也是马上处理完成。但是Cuda一般适合于图片这种,有大量的像素点需要同时处理,但是其支持指令不多所以逻辑不能太复杂。GPU只是用来扩展介绍,感兴趣可以和笔者交流。

3.应用中的并行

一说起让你的服务高性能的手段,那么异步化,并行化这些肯定会第一时间在你脑海中显现出来,在之前的文章:《异步化,你的高并发大杀器》中已经介绍过了异步化的优化手段,有兴趣的朋友可以看看。并行化可以用来配合异步化,也可以用来单独做优化。

我们可以想想有这么一个需求,在你下外卖订单的时候,这笔订单可能还需要查,用户信息,折扣信息,商家信息,菜品信息等,用同步的方式调用,如下图所示:

设想一下这5个查询服务,平均每次消耗50ms,那么本次调用至少是250ms,我们细想一下,在这个这五个服务其实并没有任何的依赖,谁先获取谁后获取都可以,那么我们可以想想,是否可以用多重影分身之术,同时获取这五个服务的信息呢?优化如下:

将这五个查询服务并行查询,在理想情况下可以优化至50ms。当然说起来简单,我们真正如何落地呢?

3.1 CountDownLatch/Phaser

CountDownLatch和Phaser是JDK提供的同步工具类Phaser是1.7版本之后提供的工具类而CountDownLatch是1.5版本之后提供的工具类。这里简单介绍一下CountDownLatch,可以将其看成是一个计数器,await()方法可以阻塞至超时或者计数器减至0,其他线程当完成自己目标的时候可以减少1,利用这个机制我们可以将其用来做并发。可以用如下的代码实现我们上面的下订单的需求:

public class CountDownTask {    private static final int CORE_POOL_SIZE = 4;    private static final int MAX_POOL_SIZE = 12;    private static final long KEEP_ALIVE_TIME = 5L;    private final static int QUEUE_SIZE = 1600;    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));    public static void main(String[] args) throws InterruptedException {        // 新建一个为5的计数器        CountDownLatch countDownLatch = new CountDownLatch(5);        OrderInfo orderInfo = new OrderInfo();        THREAD_POOL.execute(() -> {            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());            orderInfo.setCustomerInfo(new CustomerInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());            orderInfo.setDiscountInfo(new DiscountInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());            orderInfo.setFoodListInfo(new FoodListInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName());            orderInfo.setTenantInfo(new TenantInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName());            orderInfo.setOtherInfo(new OtherInfo());            countDownLatch.countDown();        });        countDownLatch.await(1, TimeUnit.SECONDS);        System.out.println("主线程:"+ Thread.currentThread().getName());    }}

建立一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行我们的任务(生成用户信息,菜品信息等),最后利用await方法阻塞等待结果成功返回。

3.2CompletableFuture

相信各位同学已经发现,CountDownLatch虽然能实现我们需要满足的功能但是其任然有个问题是,在我们的业务代码需要耦合CountDownLatch的代码,比如在我们获取用户信息之后我们会执行countDownLatch.countDown(),很明显我们的业务代码显然不应该关心这一部分逻辑,并且在开发的过程中万一写漏了,那我们的await方法将只会被各种异常唤醒。

所以在JDK1.8中提供了一个类CompletableFuture,它是一个多功能的非阻塞的Future。(什么是Future:用来代表异步结果,并且提供了检查计算完成,等待完成,检索结果完成等方法。)在我之前的这篇文章中详细介绍了《异步技巧之CompletableFuture》,有兴趣的可以看这篇文章。我们将每个任务的计算完成的结果都用CompletableFuture来表示,利用CompletableFuture.allOf汇聚成一个大的CompletableFuture,那么利用get()方法就可以阻塞。

public class CompletableFutureParallel {    private static final int CORE_POOL_SIZE = 4;    private static final int MAX_POOL_SIZE = 12;    private static final long KEEP_ALIVE_TIME = 5L;    private final static int QUEUE_SIZE = 1600;    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        OrderInfo orderInfo = new OrderInfo();        //CompletableFuture 的List        List<CompletableFuture> futures = new ArrayList<>();        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());            orderInfo.setCustomerInfo(new CustomerInfo());        }, THREAD_POOL));        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());            orderInfo.setDiscountInfo(new DiscountInfo());        }, THREAD_POOL));        futures.add( CompletableFuture.runAsync(() -> {            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());            orderInfo.setFoodListInfo(new FoodListInfo());        }, THREAD_POOL));        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName());            orderInfo.setOtherInfo(new OtherInfo());        }, THREAD_POOL));        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));        allDoneFuture.get(10, TimeUnit.SECONDS);        System.out.println(orderInfo);    }}

可以看见我们使用CompletableFuture能很快的完成的需求,当然这还不够。

3.3 Fork/Join

我们上面用CompletableFuture完成了我们对多组任务并行执行,但是其依然是依赖我们的线程池,在我们的线程池中使用的是阻塞队列,也就是当我们某个线程执行完任务的时候需要通过这个阻塞队列进行,那么肯定会发生竞争,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

ForkJoinPool中每个线程都有自己的工作队列,并且采用Work-Steal算法防止线程饥饿。 Worker线程用LIFO的方法取出任务,但是会用FIFO的方法去偷取别人队列的任务,这样就减少了锁的冲突。

网上这个框架的例子很多,我们看看如何使用代码其完成我们上面的下订单需求:

public class OrderTask extends RecursiveTask<OrderInfo> {    @Override    protected OrderInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        // 定义其他五种并行TasK        CustomerTask customerTask = new CustomerTask();        TenantTask tenantTask = new TenantTask();        DiscountTask discountTask = new DiscountTask();        FoodTask foodTask = new FoodTask();        OtherTask otherTask = new OtherTask();        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());        return orderInfo;    }    public static void main(String[] args) {        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 );        System.out.println(forkJoinPool.invoke(new OrderTask()));    }}class CustomerTask extends RecursiveTask<CustomerInfo>{    @Override    protected CustomerInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new CustomerInfo();    }}class TenantTask extends RecursiveTask<TenantInfo>{    @Override    protected TenantInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new TenantInfo();    }}class DiscountTask extends RecursiveTask<DiscountInfo>{    @Override    protected DiscountInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new DiscountInfo();    }}class FoodTask extends RecursiveTask<FoodListInfo>{    @Override    protected FoodListInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new FoodListInfo();    }}class OtherTask extends RecursiveTask<OtherInfo>{    @Override    protected OtherInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new OtherInfo();    }}

我们定义一个OrderTask并且定义五个获取信息的任务,在compute中分别fork执行这五个任务,最后在将这五个任务的结果通过Join获得,最后完成我们的并行化的需求。

3.4 parallelStream

在jdk1.8中提供了并行流的API,当我们使用集合的时候能很好的进行并行处理,下面举了一个简单的例子从1加到100:

public class ParallelStream {    public static void main(String[] args) {        ArrayList<Integer> list = new ArrayList<Integer>();        for (int i = 1; i <= 100; i++) {            list.add(i);        }        LongAdder sum = new LongAdder();        list.parallelStream().forEach(integer -> {//            System.out.println("当前线程" + Thread.currentThread().getName());            sum.add(integer);        });        System.out.println(sum);    }}

parallelStream中底层使用的那一套也是Fork/Join的那一套,默认的并发程度是可用CPU数-1。

3.5 分片

可以想象有这么一个需求,每天定时对id在某个范围之间的用户发券,比如这个范围之间的用户有几百万,如果给一台机器发的话,可能全部发完需要很久的时间,所以分布式调度框架比如:elastic-job。都提供了分片的功能,比如你用50台机器,那么id%50=0的在第0台机器上,=1的在第1台机器上发券,那么我们的执行时间其实就分摊到了不同的机器上了。

4.并行化注意事项

  • 线程安全:在parallelStream中我们列举的代码中使用的是LongAdder,并没有直接使用我们的Integer和Long,这个是因为在多线程环境下Integer和Long线程不安全。所以线程安全我们需要特别注意。
  • 合理参数配置:可以看见我们需要配置的参数比较多,比如我们的线程池的大小,等待队列大小,并行度大小以及我们的等待超时时间等等,我们都需要根据自己的业务不断的调优防止出现队列不够用或者超时时间不合理等等。

5.最后

本文介绍了什么是并行化,并行化的各种历史,在Java中如何实现并行化,以及并行化的注意事项。希望大家对并行化有个比较全面的认识。最后给大家提个两个小问题:

  1. 在我们并行化当中有某个任务如果某个任务出现了异常应该怎么办?
  2. 在我们并行化当中有某个任务的信息并不是强依赖,也就是如果出现了问题这部分信息我们也可以不需要,当并行化的时候,这种任务出现了异常应该怎么办?

最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一起共建的Java学习路线,如果您想参与开源项目的维护,可以一起共建,github地址为:https://github.com/javagrowing/JGrowing麻烦给个小星星哟。

如果你觉得这篇文章对你有文章,可以关注我的技术公众号,你的关注和转发是对我最大的支持,O(∩_∩)O

支撑百万并发的数据库架构如何设计?

mumupudding阅读(7)


前言

        作为一个全球人数最多的国家,一个再怎么凄惨的行业,都能找出很多的人为之付出。而在这个互联网的时代,IT公司绝对比牛毛还多很多。但是大多数都是创业公司,长期存活的真的不多。大多数的IT项目在注册量从0-100万,日活跃1-5万,说实话就这种系统随便找一个有几年工作经验的高级工程师,然后带几个年轻工程师,随便干干都可以做出来。
        因为这样的系统,实际上主要就是在前期快速的进行业务功能的开发,搞一个单块系统部署在一台服务器上,然后连接一个数据库就可以了。接着大家就是不停的在一个工程里填充进去各种业务代码,尽快把公司的业务支撑起来。

        但是如果真的发展的还可以,可能就会遇到如下问题:
        在运行的过程中系统访问数据库的性能越来越差,单表数据量越来越大,一些复杂查询 SQL直接拖垮!
        这种时候就不得不考虑的解决方案:缓存,负载均衡,项目分块(微服务);数据库:读写分离,分库分表等技术

如果说此时你还是一台数据库服务器在支撑每秒上万的请求,负责任的告诉你,每次高峰期会出现下述问题:

  • 数据库服务器的磁盘 IO、网络带宽、CPU 负载、内存消耗,都会达到非常高的情况,数据库所在服务器的整体负载会非常重,甚至都快不堪重负了。
  • 高峰期时,本来你单表数据量就很大,SQL 性能就不太好,这时加上你的数据库服务器负载太高导致性能下降,就会发现你的 SQL 性能更差了。
  • 最明显的一个感觉,就是你的系统在高峰期各个功能都运行的很慢,用户体验很差,点一个按钮可能要几十秒才出来结果。
  • 如果你运气不太好,数据库服务器的配置不是特别的高的话,弄不好你还会经历数据库宕机的情况,因为负载太高对数据库压力太大了。

其实大多数公司的瓶颈都在数据库,其实如果把上面的解决方案,都实现了,基本上就没的什么问题了,举例
        如果订单一年有 1 亿条数据,可以把订单表一共拆分为 1024 张表,分散在5个库中,这样 1 亿数据量的话,分散到每个表里也就才 10 万量级的数据量,然后这上千张表分散在 5 台数据库里就可以了。
        在写入数据的时候,需要做两次路由,先对订单 id hash 后对数据库的数量取模,可以路由到一台数据库上,然后再对那台数据库上的表数量取模,就可以路由到数据库上的一个表里了。
        通过这个步骤,就可以让每个表里的数据量非常小,每年 1 亿数据增长,但是到每个表里才 10 万条数据增长,这个系统运行 10 年,每个表里可能才百万级的数据量。

全局唯一ID

在分库分表之后你必然要面对的一个问题,就是 id 咋生成?因为要是一个表分成多个表之后,每个表的 id 都是从 1 开始累加自增长,那肯定不对啊。

举个例子,你的订单表拆分为了 1024 张订单表,每个表的 id 都从 1 开始累加,这个肯定有问题了!

你的系统就没办法根据表主键来查询订单了,比如 id = 50 这个订单,在每个表里都有!

所以此时就需要分布式架构下的全局唯一 id 生成的方案了,在分库分表之后,对于插入数据库中的核心 id,不能直接简单使用表自增 id,要全局生成唯一 id,然后插入各个表中,保证每个表内的某个 id,全局唯一。

比如说订单表虽然拆分为了 1024 张表,但是 id = 50 这个订单,只会存在于一个表里。

那么如何实现全局唯一 id 呢?有以下几种方案:

方案一:独立数据库自增 id

这个方案就是说你的系统每次要生成一个 id,都是往一个独立库的一个独立表里插入一条没什么业务含义的数据,然后获取一个数据库自增的一个 id。拿到这个 id 之后再往对应的分库分表里去写入。

比如说你有一个 auto_id 库,里面就一个表,叫做 auto_id 表,有一个 id 是自增长的。

那么你每次要获取一个全局唯一 id,直接往这个表里插入一条记录,获取一个全局唯一 id 即可,然后这个全局唯一 id 就可以插入订单的分库分表中。

这个方案的好处就是方便简单,谁都会用。缺点就是单库生成自增 id,要是高并发的话,就会有瓶颈的,因为 auto_id 库要是承载个每秒几万并发,肯定是不现实的了。

方案二:UUID

这个每个人都应该知道吧,就是用 UUID 生成一个全局唯一的 id。

好处就是每个系统本地生成,不要基于数据库来了。不好之处就是,UUID 太长了,作为主键性能太差了,不适合用于主键。

如果你是要随机生成个什么文件名了,编号之类的,你可以用 UUID,但是作为主键是不能用 UUID 的。

方案三:获取系统当前时间

这个方案的意思就是获取当前时间作为全局唯一的 id。但是问题是,并发很高的时候,比如一秒并发几千,会有重复的情况,这个肯定是不合适的。

一般如果用这个方案,是将当前时间跟很多其他的业务字段拼接起来,作为一个 id,如果业务上你觉得可以接受,那么也是可以的。

你可以将别的业务字段值跟当前时间拼接起来,组成一个全局唯一的编号,比如说订单编号:时间戳 + 用户 id + 业务含义编码。

方案四:SnowFlake 算法的思想分析

SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。这 64 个 bit 中,其中 1 个 bit 是不用的,然后用其中的 41 bit 作为毫秒数,用 10 bit 作为工作机器 id,12 bit 作为序列号。

给大家举个例子吧,比如下面那个 64 bit 的 long 型数字:

  • 第一个部分,是 1 个 bit:0,这个是无意义的。

  • 第二个部分是 41 个 bit:表示的是时间戳。

  • 第三个部分是 5 个 bit:表示的是机房 id,10001。

  • 第四个部分是 5 个 bit:表示的是机器 id,1 1001。

  • 第五个部分是 12 个 bit:表示的序号,就是某个机房某台机器上这一毫秒内同时生成的 id 的序号,0000 00000000。

①1 bit:是不用的,为啥呢?

        因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。

②41 bit:表示的是时间戳,单位是毫秒。

        41 bit 可以表示的数字多达 2^41 – 1,也就是可以标识 2 ^ 41 – 1 个毫秒值,换算成年就是表示 69 年的时间。

③10 bit:记录工作机器 id,代表的是这个服务最多可以部署在 2^10 台机器上,也就是 1024 台机器。

        但是 10 bit 里 5 个 bit 代表机房 id,5 个 bit 代表机器 id。意思就是最多代表 2 ^ 5 个机房(32 个机房),每个机房里可以代表 2 ^ 5 个机器(32 台机器)。

④12 bit:这个是用来记录同一个毫秒内产生的不同 id。

        12 bit 可以代表的最大正整数是 2 ^ 12 – 1 = 4096,也就是说可以用这个 12 bit 代表的数字来区分同一个毫秒内的 4096 个不同的 id。简单来说,你的某个服务假设要生成一个全局唯一 id,那么就可以发送一个请求给部署了 SnowFlake 算法的系统,由这个 SnowFlake 算法系统来生成唯一 id。

        这个 SnowFlake 算法系统首先肯定是知道自己所在的机房和机器的,比如机房 id = 17,机器 id = 12。

        接着 SnowFlake 算法系统接收到这个请求之后,首先就会用二进制位运算的方式生成一个 64 bit 的 long 型 id,64 个 bit 中的第一个 bit 是无意义的。

        接着 41 个 bit,就可以用当前时间戳(单位到毫秒),然后接着 5 个 bit 设置上这个机房 id,还有 5 个 bit 设置上机器 id。

        最后再判断一下,当前这台机房的这台机器上这一毫秒内,这是第几个请求,给这次生成 id 的请求累加一个序号,作为最后的 12 个 bit。

        最终一个 64 个 bit 的 id 就出来了,类似于:

这个算法可以保证说,一个机房的一台机器上,在同一毫秒内,生成了一个唯一的 id。可能一个毫秒内会生成多个 id,但是有最后 12 个 bit 的序号来区分开来。

下面我们简单看看这个 SnowFlake 算法的一个代码实现,这就是个示例,大家如果理解了这个意思之后,以后可以自己尝试改造这个算法。

总之就是用一个 64 bit 的数字中各个 bit 位来设置不同的标志位,区分每一个 id。

SnowFlake 算法JAVA版(含测试方法):

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.ToString;

/**   
* Copyright: Copyright (c) 2019 
* 
* @ClassName: IdWorker.java
* @Description: <p>SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。
*      其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。
*      这 64 个 bit 中,其中 1 个 bit 是不用的,然后用其中的 41 bit 作为毫秒数,
*      用 10 bit 作为工作机器 id,12 bit 作为序列号
*    </p>
* @version: v1.0.0
* @author: BianPeng
* @date: 2019年4月11日 下午3:13:41 
*
* Modification History:
* Date           Author          Version          Description
*---------------------------------------------------------------*
* 2019年4月11日        BianPeng        v1.0.0           initialize
*/
@ToString
public class SnowflakeIdFactory {
 
 static Logger log = LoggerFactory.getLogger(SnowflakeIdFactory.class);
 
    private final long twepoch = 1288834974657L;
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);
 
    private long workerId;
    private long datacenterId;
    private long sequence = 0L;
    private long lastTimestamp = -1L;
 
 
 
    public SnowflakeIdFactory(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }
 
    public synchronized long nextId() {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            //服务器时钟被调整了,ID生成器停止服务.
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
 
        lastTimestamp = timestamp;
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }
 
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
 
    protected long timeGen() {
        return System.currentTimeMillis();
    }
 
    public static void testProductIdByMoreThread(int dataCenterId, int workerId, int n) throws InterruptedException {
        List<Thread> tlist = new ArrayList<>();
        Set<Long> setAll = new HashSet<>();
        CountDownLatch cdLatch = new CountDownLatch(10);
        long start = System.currentTimeMillis();
        int threadNo = dataCenterId;
        Map<String,SnowflakeIdFactory> idFactories = new HashMap<>();
        for(int i=0;i<10;i++){
            //用线程名称做map key.
            idFactories.put("snowflake"+i,new SnowflakeIdFactory(workerId, threadNo++));
        }
        for(int i=0;i<10;i++){
            Thread temp =new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<Long> setId = new HashSet<>();
                    SnowflakeIdFactory idWorker = idFactories.get(Thread.currentThread().getName());
                    for(int j=0;j<n;j++){
                        setId.add(idWorker.nextId());
                    }
                    synchronized (setAll){
                        setAll.addAll(setId);
                        log.info("{}生产了{}个id,并成功加入到setAll中.",Thread.currentThread().getName(),n);
                    }
                    cdLatch.countDown();
                }
            },"snowflake"+i);
            tlist.add(temp);
        }
        for(int j=0;j<10;j++){
            tlist.get(j).start();
        }
        cdLatch.await();
 
        long end1 = System.currentTimeMillis() - start;
 
        log.info("共耗时:{}毫秒,预期应该生产{}个id, 实际合并总计生成ID个数:{}",end1,10*n,setAll.size());
 
    }
 
    public static void testProductId(int dataCenterId, int workerId, int n){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(workerId, dataCenterId);
        SnowflakeIdFactory idWorker2 = new SnowflakeIdFactory(workerId+1, dataCenterId);
        Set<Long> setOne = new HashSet<>();
        Set<Long> setTow = new HashSet<>();
        long start = System.currentTimeMillis();
        for (int i = 0; i < n; i++) {
            setOne.add(idWorker.nextId());//加入set
        }
        long end1 = System.currentTimeMillis() - start;
        log.info("第一批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setOne.size(),end1);
 
        for (int i = 0; i < n; i++) {
            setTow.add(idWorker2.nextId());//加入set
        }
        long end2 = System.currentTimeMillis() - start;
        log.info("第二批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setTow.size(),end2);
 
        setOne.addAll(setTow);
        log.info("合并总计生成ID个数:{}",setOne.size());
 
    }
 
    public static void testPerSecondProductIdNums(){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(1, 2);
        long start = System.currentTimeMillis();
        int count = 0;
        for (int i = 0; System.currentTimeMillis()-start<1000; i++,count=i) {
            /**  测试方法一: 此用法纯粹的生产ID,每秒生产ID个数为400w+ */
         //idWorker.nextId();
            /**  测试方法二: 在log中打印,同时获取ID,此用法生产ID的能力受限于log.error()的吞吐能力.
             * 每秒徘徊在10万左右. */
         log.info(""+idWorker.nextId());
        }
        long end = System.currentTimeMillis()-start;
        System.out.println(end);
        System.out.println(count);
    }
 
    public static void main(String[] args) {
        /** case1: 测试每秒生产id个数?
         *   结论: 每秒生产id个数400w+ 
         */
        //testPerSecondProductIdNums();
 
        /** case2: 单线程-测试多个生产者同时生产N个id,验证id是否有重复?
         *   结论: 验证通过,没有重复. 
         */
        //testProductId(1,2,10000);//验证通过!
        //testProductId(1,2,20000);//验证通过!
 
        /** case3: 多线程-测试多个生产者同时生产N个id, 全部id在全局范围内是否会重复?
         *   结论: 验证通过,没有重复.
         */
        try {
            testProductIdByMoreThread(1,2,100000);//单机测试此场景,性能损失至少折半!
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
    }
}

这个算法也叫雪花算法我使用的类源码:https://gitee.com/flying-cattle/earn_knife/blob/master/item-common/src/main/java/com/item/util/SnowflakeIdWorker.java

项目是一个递进的过程,优先考虑缓存,其次读写分离,再分表分库。当然这只是个人想法,各位伙伴还是根据自己的项目和业务来综合考虑实行方案。

防火墙API,全新升级,支持iptables、ipset、pf

mumupudding阅读(7)

  在上一篇“使用redis来调用iptables,封禁恶意IP” 一文中已经讲解RedisPushIptables的用法,在经过几个版本迭代后,该模块功能更强大了,所以需要新开篇幅来讲解下。

   RedisPushIptables-6.1.tar.gz为最新版本,已经支持iptables、ipset、pf防火墙,意味着它已经跨平台支持,Linux、BSD、MacOS。最重要的是添加了动态删除防火墙规则的功能,当然这个功能ipset也有。虽然fail2ban也有此功能,但是极其消耗资源因为它是轮询来获取任务的。

 本篇主要介绍RedisPushIptables模块的实现原理、安装方法、API使用方法以及适用范围。并与Fail2ban作了对比以便读者了解二者的区别,RedisPushIptables不受编程语言的限制。意味着开发者都可以使用它来进行业务防护,接着讲解了怎样重新封装lib库从而支持API调用,最后给出了部分编程语言调用API的示例,供读者参阅。

简介

  RedisPushIptables是Redis的一个模块, 也可以把它理解为防火墙API调用库,该模块可以通过 redis 来操作 iptables 的 filter表INPUT链规则ACCEPT和DROP,而相对于BSD系统该模块则是对应pf防火墙。RedisPushIptables更新防火墙规则以在指定的时间内拒绝IP地址或永远拒绝。比如用来防御攻击。自此普通开发者也可以使用iptables或者PF,而不必再理会复杂的防火墙语法。

与Fail2Ban比较

  主要从两个方面,实现原理和实用性。Fail2Ban倾向于事后分析,需要监控日志,支持的应用也比较多,只需简单配置即可。而RedisPushIptables倾向于实效性,不需要监控日志,但是,需要程序编码时调用API,使用门槛较高,并不适用所有人。

 Fail2Ban

  Fail2Ban是一种入侵防御软件框架,可以保护计算机服务器免受暴力攻击。用Python编程语言编写,它能够在POSIX系统上运行,该系统具有本地安装的数据包控制系统或防火墙的接口,例如iptablesTCP Wrapper

  fail2ban通过监控操作日志文件(如/var/log/auth.log,/var/log/apache/access.log等)选中的条目并运行基于他们的脚本。最常用于阻止可能属于试图破坏系统安全性的主机的所选IP地址。它可以禁止在管理员定义的时间范围内进行过多登录尝试或执行任何其他不需要的操作的任何主机IP地址。包括对IPv4和IPv6的支持。可选择更长时间的禁令可以为不断回来的滥用者进行定制配置。Fail2Ban通常设置为在一定时间内取消阻止被阻止的主机,以便不“锁定”任何可能暂时错误配置的真正连接。但是,几分钟的unban时间通常足以阻止网络连接被恶意连接淹没,并降低字典攻击成功的可能性。

每当检测到滥用的IP地址时,Fail2Ban都可以执行多个操作:更新Netfilter / iptablesPF防火墙规则,TCP Wrapper的hosts.deny表,拒绝滥用者的IP地址; 邮件通知; 或者可以由Python脚本执行的任何用户定义的操作。

标准配置附带ApacheLighttpdsshdvsftpdqmailPostfixCourier Mail Server的过滤器。过滤器是被Python定义的正则表达式,其可以由熟悉正则表达式的管理员可以方便地定制。过滤器和操作的组合称为“jail”,是阻止恶意主机访问指定网络服务的原因。除了随软件一起分发的示例之外,还可以为任何创建访问日志文件的面向网络的进程创建“jail”。

Fail2Ban类似于DenyHosts […],但与专注于SSH的DenyHosts不同,Fail2Ban可以配置为监视将登录尝试写入日志文件的任何服务,而不是仅使用/etc/hosts.deny来阻止IP地址/ hosts,Fail2Ban可以使用Netfilter / iptables和TCP Wrappers /etc/hosts.deny。

 

缺点

  • Fail2Ban无法防范分布式暴力攻击。
  • 没有与特定于应用程序的API的交互。
  • 太过依赖正则表达式,不同的程序需要各自对应的正则。
  • 效率低下,性能受日志数量影响
  • IP列表很多时,内存消耗很高

 RedisPushIptables

  虽然与Fail2Ban比较起来,RedisPushIptables支持还不是很完善,但是,术业有专攻,它的优势在于高性能,用C语言实现,同样支持跨平台LinuxBSDMacOS。可以通过API调用,意味着redis官方支持的编程语言都可以使用,应用范围不受限。Fail2Ban是被动防御的需要根据关键字实时获取应用程序日志,匹配字符串再计算阈值达到就封禁IP地址。而RedisPushIptables业务主动调用,不需要分析日志。同样支持动态删除iptables或者PF规则,比fail2ban更省资源。

 

缺点

  • 需要开发者编码时调用API
  • 无法防范分布式暴力攻击。
  • 目前IPv6在我国还没普及所以不支持

安装

在安装RedisPushIptables之前,需要先安装redis。下面为版本redis-5.0.3.tar.gz

root@debian:~/bookscode# git clone https://github.com/limithit/RedisPushIptables.git
root@debian:~/bookscode# wget http://download.redis.io/releases/redis-5.0.3.tar.gz
root@debian:~/bookscode# tar zxvf redis-5.0.3.tar.gz
root@debian:~/bookscode#cd redis-5.0.3&& make   
root@debian:~/bookscode/redis-5.0.3# make test    
root@debian:~/bookscode/redis-5.0.3/deps/hiredis#make&& make install  
root@debian:~/bookscode/redis-5.0.3/src#cp redis-server redis-sentinel redis-cliredis-benchmark redis-check-rdb redis-check-aof /usr/local/bin/ 
root@debian:~/bookscode/redis-5.0.3/utils# ./install_server.sh    
root@debian:~/bookscode/redis-5.0.3# cd deps/hiredis
root@debian:~/bookscode/redis-5.0.3/deps/hiredis# make && make install
root@debian:~/bookscode/redis-5.0.3# echo /usr/local/lib >> /etc/ld.so.conf  
root@debian:~/bookscode/redis-5.0.3# ldconfig          
root@debian:~/bookscode# cd RedisPushIptables
root@debian:~/bookscode/ RedisPushIptables # make  && make install  

 

注意

编译时有三个选项 make、make CFLAGS=-DWITH_IPSET和make CFLGAS=-DBSD

默认是make选项,make CFLAGS=-DWITH_IPSET则是使用ipset更快地管理规则

make CFLGAS=-DBSD则是在BSD和MacOS系统上编译使用

 

可以使用以下redis.conf配置指令加载模块:

loadmodule /path/to/iptablespush.so

也可以使用以下命令在运行时加载模块:

MODULE LOAD /path/to/iptablespush.so

可以使用以下命令卸载模块:

MODULE unload iptables-input-filter

动态删除配置

默认情况下,禁用键空间事件通知,虽然不太明智,但该功能会使用一些CPU。使用redis.confnotify-keyspace-eventsCONFIG SET启用通知。将参数设置为空字符串会禁用通知。为了启用该功能,使用了一个非空字符串,由多个字符组成,其中每个字符都具有特殊含义,如下所示:

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.

字符串中至少应存在KE,否则无论字符串的其余部分如何都不会传递任何事件。例如,只为列表启用键空间事件,配置参数必须设置为Kl,依此类推。字符串KEA可用于启用每个可能的事件。

# redis-cli config set notify-keyspace-events Ex

也可以使用以下redis.conf配置指令加载模块:

notify-keyspace-events Ex
#notify-keyspace-events ""  #注释掉这行

使用root用户运行ttl_iptables守护程序

root@debian:~/RedisPushIptables# /etc/init.d/ttl_iptables start

日志在/var/log/ttl_iptables.log中查看

root@debian:~# redis-cli TTL_DROP_INSERT 192.168.18.5 60  
(integer) 12
root@debian:~# date
Fri Mar 15 09:38:49 CST 2019                                 
root@debian:~/RedisPushIptables# tail -f /var/log/ttl_iptables.log 
2019/03/15 09:39:48 pid=5670 iptables -D INPUT -s 192.168.18.5 -j DROP

指令

  RedisPushIptables目前有五个指令,管理filter表中的INPUT链。为了保证规则生效,采用插入规则而不是按序添加规则,这么做的原因是,因为iptables是按顺序执行的。此外加入了自动去重功能(ipset和pfctl自带去重)。使用者不必担心会出现重复的规则,只需要添加即可。

accept_insert

等同iptables -I INPUT -s x.x.x.x -j ACCEPT

  • accept_delete

等同iptables -D INPUT -s x.x.x.x -j ACCEPT

  • drop_insert

等同iptables -I INPUT -s x.x.x.x -j DROP

  • drop_delete

等同iptables -D INPUT -s x.x.x.x -j DROP

  • ttl_drop_insert

例ttl_drop_insert 192.168.18.5 60

等同iptables -I INPUT -s x.x.x.x -j DROP 60秒后ttl_iptables守护进程自动删除iptables -D INPUT -s x.x.x.x -j DROP

客户端API示例

  理论上除了C语言原生支持API调用,其他语言API调用前对应的库都要重新封装,因为第三方模块并不被其他语言支持。这里只示范C、Python、Bash、Lua其他编程语言同理。

 C编程

  C只需要编译安装hiredis即可。步骤如下:

root@debian:~/bookscode/redis-5.0.3/deps/hiredis#make install
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <hiredis.h>
int main(int argc, char **argv) {
    unsigned int j;                
    redisContext *c;
    redisReply *reply;
    const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";    
    int port = (argc > 2) ? atoi(argv[2]) : 6379;   
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds   
    c = redisConnectWithTimeout(hostname, port, timeout);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);    
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
}
    reply = redisCommand(c,"drop_insert 192.168.18.3");  
    printf("%d\n", reply->integer);
    freeReplyObject(reply);  

    reply = redisCommand(c,"accept_insert 192.168.18.4");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    reply = redisCommand(c,"drop_delete 192.168.18.3");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    reply = redisCommand(c,"accept_delete 192.168.18.5");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    redisFree(c);

    return 0;

}

gcc example.c -I/usr/local/include/hiredis -lhiredis

编译即可

Python编程

root@debian:~/bookscode# git clone https://github.com/andymccurdy/redis-py.git  #下载Python lib库

下载好之后不要急着编译安装,先编辑redis-py/redis/client.py文件,添加代码如下: 

   # COMMAND EXECUTION AND PROTOCOL PARSING
      def execute_command(self, *args, **options):
          "Execute a command and return a parsed response"
           .....
           .....

     def drop_insert(self, name):

         """
         Return the value at key ``name``, or None if the key doesn't exist
          """
         return self.execute_command('drop_insert', name)

     def accept_insert(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
        """
         return self.execute_command('accept_insert', name)

     def drop_delete(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('drop_delete', name)

     def accept_delete(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('accept_delete', name)

     def ttl_drop_insert(self, name, blocktime):

         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('ttl_drop_insert', name, blocktime)


为了不误导读者,上述代码不加注释了,只是在类里添加几个函数而已,不需要解释

root@debian:~/bookscode/redis-py# python setup.py build        
root@debian:~/bookscode/redis-py# python setup.py install       
root@debian:~/bookscode/8/redis-py# python
Python 2.7.3 (default, Nov 19 2017, 01:35:09)
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.drop_insert('192.168.18.7')
12L
>>> r.accept_insert('192.168.18.7')
12L
>>> r.accept_delete('192.168.18.7')
0L
>>> r.drop_delete('192.168.18.7')
0L
>>> r.ttl_drop_insert('192.168.18.7', 600)
12L
>>>

 Bash编程

examples.sh

#!/bin/bash
for ((i=1; i<=254; i++))
 do
redis-cli TTL_DROP_INSERT 192.168.17.$i 60
done 
redis-cli DROP_INSERT 192.168.18.5
redis-cli DROP_DELETE 192.168.18.5
redis-cli ACCEPT_INSERT 192.168.18.5
redis-cli ACCEPT_DELETE 192.168.18.5

 Lua编程

git clone https://github.com/nrk/redis-lua.git    #下载Lua lib库

下载后编辑redis-lua/src/redis.lua 添加以下代码:

redis.commands = {
    .....
    ttl              = command('TTL'),
    drop_insert     = command('drop_insert'),
    drop_delete     = command('drop_delete'),
    accept_insert    = command('accept_insert'),
    accept_delete    = command('accept_delete'),
    ttl_drop_insert  = command('ttl_drop_insert'),
    pttl             = command('PTTL'),         -- >= 2.6
     .....

示例代码examples.lua 

package.path = "../src/?.lua;src/?.lua;" .. package.path
pcall(require, "luarocks.require")             --不要忘记安装Luasocket库
local redis = require 'redis'
local params = {
    host = '127.0.0.1',
    port = 6379,
}
local client = redis.connect(params)                    
client:select(0) -- for testing purposes                   
client:drop_insert('192.168.1.1')          
client:drop_delete('192.168.1.1')          
client:ttl_drop_insert('192.168.1.2', '60')      --加入规则后60秒后自动删除添加的规则
local value = client:get('192.168.1.2')     
print(value)

最后,目前还缺少Java、php常用语言的驱动,由于我不太擅长太多语言,有兴趣的朋友可以提交PR来补充。

程序员随想-快速熟悉业务

mumupudding阅读(2)


前言

作为一名开发,经常面临着主动或被动切换业务做,有些时候切换至有一定相关联的另一个业务,本来做余额宝的被调到做证券。一些情况下是切换至完全相关的业务,如从商品切换到交易,甚至从电商业务切换至金融业务。在经常遇到这种的情况下,建立一个“快速熟悉陌生业务”的方法论就很重要了。下面经过个人思考,提出的一些不完善的想法。

一、划分模块

人的记忆和关系型数据库有些类型,较容易记住的大的关键点,而具体的细节可能想很久,甚至记不住。根据人的这种记忆方式,我们接触一个新的事务最好的方式应该就是分层,按照一定逻辑分几层,每层分多个模块,了解每层各个模块概念,然后再细分下一层子模块概念。这种思维方式有点像数据库的索引,符合人的思维习惯。

划分模块方式

模块是基于一定逻辑组织形式来划分的,只要按照逻辑来划分的话,划分模块的方式肯定有很多种,这里我提两种最容易的方式:业务功能,应用架构。

业务功能

按照业务功能性,对业务进行拆解,把复杂的业务进行拆分成功能单元,各功能单元再根据场景进行更细粒度的拆分。拆分有一个原则,要做到高内聚,低耦合。

image.png

应用架构

基本上应用都是分层的,下图就是最常见的应用的组织形式,当然很多应用组织形式会根据具体情况进行变化

image.png

二、梳理业务领域模型

对于大部分人结束新的业务的时候,相信最麻烦的就是周围人谈论的“专业词汇”,像之前听得什么,直销、代销,转托管,卡账,户帐这类词汇对于第一次听到人肯定蒙蔽。对于这一类词汇其实很多都是特定业务领域形成的简化术语,如果连这些业务领域内的概念的不清楚,那么根本不知道在讨论一个什么事。这类词汇存在于特定业务领域中,了解业务领域是相当重要的,统一大家的概念认知,统一大家在讨论的是一件什么事,在做的是一件什么事!!

所以我说的要了解业务领域,并不是指得DDD,充血、贫血模型。而是对业务进行一定抽象成一个或多个有关联的业务模型,对于这个模型有一定认知是需求讨论和方案设计的前提,很多时候某个业务域用到了其他业务域的模型,那么相对于相关联业务域的模型也要有一定认知。

如何梳理业务领域模型?

业务领域模型最底层的是数据库表极其字段,把表和表之间的关系,字段含义理清楚后,会有大概的业务轮廓。接着通过熟悉模块中核心的POJO的类极其字段能更对业务有更清晰一点的认知。当然这还不够,还需要花时间去看一些集团,网上的文章,关于其他人对业务的理解,其他人在技术如何设计和实现的。这个过程不是一蹴而就的,慢慢的会找到更深的理解,等到了能发现当前这个业务领域模型优点缺点,并构思出改进的方向,基本上就意味已经进入这块业务领域的专家了。

三、梳理业务调用链路

寻找业务入口,然后看代码。一般来说业务代码迭代很快,很多在写代码的时候没有考虑以后,经常被之后的需求给推翻调,亦或者经历过多个团队多人接手过,每个人命名、设计等等习惯不一样,所以这个过程比较痛苦。

如何梳理业务调用链路?

这里必须要提一下,traceId是个神器,自己实际操作一笔拿到traceId,在集团鹰眼或者蚂蚁云图上应用的调用链路基本上呼之欲出,但是具体逻辑细节还是要自己去看代码。这里看代码我理解两种模式,一种是自下而上,另一种是自上而下。我比较喜欢自上而下的看,这样顺着业务链路来理解比较简单,大部分情况下两者结合效果比较好。

自上而下

前后端不分离的PC页面,顺着页面实际操作入口去看,很简单前后端分离的PC页面、Ajax接口、无线端,和前端同学打好关系,多聊聊,喝喝咖啡,让前端同学帮忙去找入口,另外就是通过网络抓包找到调用入口,顺着代码往下看吧,少年。通过日志里面的traceId可以查到调用链路甚至到接口级别,这就更方便找到入口。一般情况下相同功能模块、对外的接口会放在一个大包或者pom模块下面。

自下而上

从数据库表入手,理解表模型,各字段含义,建立各表和字段之间的关系。

四、需求入手

一般情况下,公司不太会给太多空闲时间去看代码,且直接看代码多数情况下也没有那么直观感受,这时候结合一些比较完整的需求去搞,效果反而会更好。所谓完整的需求是指:尽量是完整模块的需求,或者能串联部分或整个链路的需求。切身体验零散的需求没有鸟用。

打开看板娘