欢迎光临
我们一直在努力

聊聊cheddar的tx

本文主要研究一下cheddar的tx

MessageAction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java

public class MessageAction {

    private final TypedMessage typedMessage;
    private final int delaySeconds;

    public MessageAction(final TypedMessage typedMessage, final int delaySeconds) {
        this.typedMessage = typedMessage;
        this.delaySeconds = delaySeconds;
    }

    public TypedMessage message() {
        return typedMessage;
    }

    public int delay() {
        return delaySeconds;
    }

    public void apply(final MessageSender<TypedMessage> messageSender) {
        if (delay() > 0) {
            messageSender.sendDelayedMessage(typedMessage, delaySeconds);
        } else {
            messageSender.send(typedMessage);
        }
    }

    public void apply(final MessagePublisher<TypedMessage> messagePublisher) {
        messagePublisher.publish(typedMessage);
    }

}

MessageAction定义了typedMessage、delaySeconds属性,它提供了两个apply方法,接收messageSender参数的apply方法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接收messagePublisher参数的apply方法执行messagePublisher.publish

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> {

    /**
     * Send a message
     * @param message Message to send
     * @throws MessageSendException
     */
    void send(T message) throws MessageSendException;

    /**
     * Send a message, where the message is not visible to receivers for the specified delay duration
     * @param message Message to send
     * @param delaySeconds Duration for which sent message is invisible to receivers
     * @throws MessageSendException
     */
    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}

MessageSender接口定义了send、sendDelayedMessage方法

TransactionalResource

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java

public interface TransactionalResource {

    void begin() throws TransactionException;

    void commit() throws TransactionException;

    void abort() throws TransactionException;
}

TransactionalResource接口定义了begin、commit、abort方法

TransactionalMessageSender

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java

public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessageSender<TypedMessage> messageSender;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) {
        this.messageSender = messageSender;
    }

    private MessagingTransaction getCurrentTransaction() {
        if (currentTransaction.get() == null) {
            throw new NonExistentTransactionException();
        }
        return currentTransaction.get();
    }

    @Override
    public void begin() throws TransactionException {
        if (currentTransaction.get() != null) {
            throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {
        final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction: " + transaction.transactionId());
        transaction.applyActions(messageSender);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed: " + transaction.transactionId());
    }

    @Override
    public void send(final TypedMessage typedMessage) throws MessageSendException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addDelayedMessage(typedMessage, delay);
    }

    @Override
    public void abort() throws TransactionException {
        currentTransaction.remove();
    }
}

TransactionalMessageSender实现了MessageSender、TransactionalResource接口;begin方法给currentTransaction设置新的MessagingTransaction;commit方法获取MessagingTransaction,执行applyActions方法,最后执行currentTransaction.remove();abort方法执行currentTransaction.remove()方法;send方法执行transaction.addMessage;sendDelayedMessage方法执行addDelayedMessage

MessagePublisher

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java

public interface MessagePublisher<T extends Message> {

    /**
     * Forward a message for publication
     * @param message
     * @throws MessagePublishException
     */
    void publish(T message) throws MessagePublishException;

}

MessagePublisher接口定义了publish方法

TransactionalMessagePublisher

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java

public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessagePublisher<TypedMessage> messagePublisher;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {
        this.messagePublisher = messagePublisher;
    }

    private MessagingTransaction getCurrentTransaction() {
        if (currentTransaction.get() == null) {
            throw new NonExistentTransactionException();
        }
        return currentTransaction.get();
    }

    @Override
    public void begin() throws TransactionException {
        if (currentTransaction.get() != null) {
            throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {
        final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction: " + transaction.transactionId());
        transaction.applyActions(messagePublisher);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed: " + transaction.transactionId());
    }

    @Override
    public void publish(final TypedMessage typedMessage) throws MessagePublishException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void abort() throws TransactionException {
        currentTransaction.remove();
    }
}

TransactionalMessagePublisher实现了MessagePublisher、TransactionalResource接口;begin方法给currentTransaction设置新的MessagingTransaction;commit方法获取MessagingTransaction,执行applyActions方法,最后执行currentTransaction.remove();abort方法执行currentTransaction.remove()方法;publish方法执行transaction.addMessage

Transaction

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java

public interface Transaction {

    String transactionId();

}

Transaction接口定义了transactionId方法

MessagingTransaction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java

public class MessagingTransaction implements Transaction {

    private final Queue<MessageAction> messageActions;

    private final String transactionId;

    public MessagingTransaction() {
        messageActions = new LinkedList<>();
        transactionId = UUID.randomUUID().toString();
    }

    @Override
    public String transactionId() {
        return transactionId;
    }

    public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) {
        while (!messageActions.isEmpty()) {
            final MessageAction messageAction = messageActions.remove();
            messagePublisher.publish(messageAction.message());
        }
    }

    public void applyActions(final MessageSender<TypedMessage> messageSender) {
        while (!messageActions.isEmpty()) {
            final MessageAction messageAction = messageActions.remove();
            messageAction.apply(messageSender);
        }
    }

    public void addMessage(final TypedMessage typedMessage) {
        messageActions.add(new MessageAction(typedMessage, 0));
    }

    public void addDelayedMessage(final TypedMessage typedMessage, final int delay) {
        messageActions.add(new MessageAction(typedMessage, delay));
    }

}

MessagingTransaction方法实现了Transaction接口;其transactionId方法返回的是构造器生成的UUID;applyActions方法遍历messageAction,分别执行messagePublisher.publish及messageAction.apply(messageSender)

小结

cheddar的tx提供了TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了TransactionalResource接口;其commit方法都执行了transaction.applyActions;MessageAction提供了两个apply方法,接收messageSender参数的apply方法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接收messagePublisher参数的apply方法执行messagePublisher.publish。

doc

https://segmentfault.com/a/1190000039722603

赞(0)
未经允许不得转载:ITyet » 聊聊cheddar的tx
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址