Why are distributed transactions icebergs? It’s not because they’re cool and beautiful and you have to look under the surface to comprehend them.
Distributed transactions are icebergs because (1) it’s easy to not see them, even when they’re right in front of you, and (2) if you run into one, it’s got a great potential to sink your ship.Distributed transactions are icebergs: they can be hard to see, and they can sink your ship. Click To Tweet
What is a distributed transaction?
I’m not talking exclusively about explicit distributed transactions, vis-à-vis your old XA standard. My definition of a distributed transaction is simple:
distributed transaction (n.) any situation where a single event results in the mutation of two separate sources of data which cannot be committed atomically
In microservices, the temptation to do this pops up all the time. Like, right under your nose, and you may not even see it. That’s the first reason why they’re icebergs.
What’s wrong with distributed transactions in microservices?
The problem with distributed transactions is that they’re a time bomb for the optimist… but, without the user-friendly countdown. For those who think mostly about the happy paths in their system, distributed transactions will lie in wait, showing no harm day by day, until a little glitch happens in your network and suddenly all hell breaks loose.
Here’s the thing: a distributed transaction is about having one event trying to commit changes to two or more sources of data. So whenever you find yourself in this situation of wanting to commit data to two places, you have to ask:
“Would it be okay if the first commit succeeds and the second fails?”
And I propose that about 98% of the time, the answer will be “Hell, no!”
But failure will happen. If you’re familiar with the fallacies of distributed computing, you know it will happen. (If you’re doing microservices and you’re not familiar with the fallacies – stop everything and go learn them now! I’ve posted them on the walls at work to make sure people don’t forget.)
If you have distributed transactions, then when those fallacies come home to roost, you’ll end up committing data to only one of your two sources. Depending on how you’ve written your system, that could be a very, very bad situation.Distributed transactions might be great if the fallacies of distributed computing weren't false. Click To Tweet
Distributed Transaction Examples Hiding in Plain Sight
So I said they might be right under your nose. Let’s have a look at some examples.
Here’s a simple microservice system for buying a book online using the book store’s proprietary payment credits.
We have a Webapp, a Payment service and a Shipping service. The Webapp does the coordination: it asks the Payment service to debit the account, and when it’s heard back about that being successful, it tells the Shipping service to ship the book. What happens if the payment succeeds but the Shipping service isn’t up? Bzzzzzzz. You’re out of luck.
Can’t we do something about that? Maybe we can reverse the payment? Well, we could try that, but can we guarantee that we can reverse the payment? Maybe we can give the Webapp a database and store in there the fact that there’s a pending shipment that’s paid for? Well, we could try that, but can we guarantee that the Webapp’s storage is available after the payment’s completed? We can put layers and layers of error handling in here, and we could narrow the window for error to be smaller and smaller, but we’ll never get to a solution that’s robust.
Maybe the architecture’s wrong? What if we put the Shipping service in charge of things? Like this…
Looks like it might be better, but it’s still got the same problem: the Payment service may complete successfully but the Shipping service can still fail. It might fail to receive the response because the network flakes, fail to still be running when the Payment service tries to respond, or fail to contact its database to commit the shipment. It’s the same iceberg viewed from a different angle: it’s a distributed transaction.
Distributed Transactions: Not Just About Databases
Here’s another example at a somewhat lower level. Let’s say we’re using a message broker to communicate between services. An event occurs prompting a call to some service and we want to change some data in the local service and send a message to one or more remote services:
The important thing to realise here is that the message broker is also a data source. See that “Commit to queue” bit? That’s the message broker changing its data source. Hence, this is another example of a distributed transaction.
Again, let’s ask ourselves the important question: If we saved the data to the database but then couldn’t send the message, would we be happy about that? Conversely, if we sent the message but then failed to save to the database, would we be happy about that?
Distributed Transactions: The SolutionThe solution to distributed transactions in microservices is simply to avoid them like the plague. Click To Tweet
Just agree as a team to not go down this road.
How do you avoid them and still get your microservices coordinating? It’s actually quite simple: every business event needs to result in a single synchronous commit. It’s perfectly okay to have other data sources updated, but that needs to happen asynchronously, using patterns that guarantee delivery of the other effects to the other data sources.
How do you implement this? Quite simply, you have to pick one of the services to be the primary handler for the event. It will handle the original event with a single commit, and then take responsibility for asynchronously communicating the secondary effects to other services.
When you find yourself in a real-life version of this situation, which service to choose to be the primary handler will not always be obvious. There will be trade-offs. In our example, you could do it like this, where the Shipping service is responsible for executing the payment asynchronously:
Or you could do it like this, where the Payment service is responsible for notifying the Shipping service asynchronously (and before you spend too much time reading this one, yes, it is a straight swap of the words “Payment” and “Shipping” in the above diagram):
The key, if you didn’t pick it up, is that things happen asynchronously – on another thread, in another transaction. Notice that in the original transaction the Payment service is saving, in the same data store as its business data, the fact that it needs to notify the Shipping service. Another thread then picks the item off the queue and contacts the Shipping service. Once the Shipping service responds with success, we remove the item from the queue in the Payment service.
Hold your horses! Did you just say that after one database commits a transaction, we’ll commit something to the other database?
Yes I did.
But isn’t that a distributed transaction?
Why, yes it is. Which leads us to…
The Distributed Transaction That’s Okay
Let’s go back to the core problem with distributed transactions, which is that the answer to the question – “Would it be okay if the first commit succeeds and the second fails?” – is almost always “No!”. Well, in order to make this whole thing work, we have to create situations where the answer to that questions is, “Yes, everything will be fine.”
The way to achieve this is with the combination of a commander, retries and idempotence.
Safe Distributed Transactions: Commander, Retries and Idempotence
The commander is in charge of the distributed transaction. It knows the instructions that have to be executed and will coordinate executing them. In most scenarios, there will just be two instructions:
- Effect the remote update.
- Remove the item from the commander’s persistent queue.
The pattern can be extrapolated to work for any number of instructions, though, for example in the case of a multi-step orchestration component.
Secondly, the commander needs to take responsibility for guaranteeing the execution of all instructions, and that means: retries. In the case that the commander tries to effect the remote update and doesn’t get a response, it has to retry. If the commander gets a successful response but fails to update its own queue, then when its queue becomes available again it needs to retry. There’s other failure scenarios. The answer is retry.
Thirdly, because we have retries, we now need idempotence. This is a big word for a very simple thing. It’s just the property of being able to do something twice and have the end result be the same as if it had been done once. We need idempotence at the remote service or data source so that, in the cases where it receives the instruction more than once, it only processes it once. We don’t want to ship the book twice just because the Payment service timed out before receiving our first response! This is typically achieved with some simple duplicate detection based off a unique identifier in the instruction.
Once we’ve done all this, we have a robust distributed transaction through guaranteed delivery, however it’s important to realise that it’s not an atomic transaction. We’re completing one unit of work across two data sources, so it’s possible that the work has been completed but the record that it’s been completed (the pending queue item) may be out of sync when there are failures in the system.
At Tyro, where we currently do a lot of communication through RabbitMQ, we have a pattern we call “Async Rabbit”. The “Async” part is there not because because of the asynchrony introduced by using a message broker, but because we are submitting messages into the message broker asynchronously within the application, so as not to create a bad distributed transaction. The pattern looks like this:
The Big Trade-Off
Let’s go back to the core of this solution: a single synchronous commit, with asynchronous commits for the secondary effects. It’s super important to realise that, by employing this method, we’re introducing eventual consistency into the system.
The Payment service does not know whether the Shipping service is up when it processes the payment. It does the payment, commits to guaranteeing the secondary effect of contacting the Shipping service will happen, and returns “Success” to the user via the Webapp. Now, we hope that most of the time the Shipping service will be up and running and that the shipment will be processed milliseconds after the payment, but if the Shipping service is down for a minute, or an hour, or a day, or longer, that just has to be okay.
This is eventual consistency: eventually the effects of that primary commitment will permeate through the system, but it will take an amount of time that is non-zero. If you’re going to go down this road, you have to be cognisant that you’re introducing this characteristic to your system and be aware of the constraints it places on your software, in particular what you’re going to do when you have decisions that do require atomicity.
Two Consequences of Eventual Consistency
Let’s talk briefly about two consequences that this approach leaves you with.
Number one: We’re going to retry when we can’t get through. How many times, or for how long, do you retry? Answer: Depends on the context. One thing’s for sure, though – if you’re doing retries, it’s because something’s wrong in your system, so you’ll want to be monitoring that as it’s quite possibly a canary in the coal mine. The point of retries is to enable self-healing, but you’ll want to be aware that healing was required.
Number two: Because consistency is eventual, you can end up with what we might call business consistency conflicts. For example, let’s say a book was available when the “Purchase” page was shown to the user, but by the time the payment was processed and the Shipping request sent to the Shipment service, there are no copies left to ship. What to do? We’re asynchronous now, so there’s no reporting back to the user. What we need to do is recognise conflicts like this in the system and raise them up to be dealt with. This can be done either automatically, for example by refunding the user and sending an apology email, or manually, for example by queueing the conflicts as something to be dealt with by an employee.
Are there alternatives for resolving this situation? Absolutely.
First alternative is to avoid needing distributed transactions. When you find you need to update data in two places as a result of one event, you can consider refactoring your architecture to move some of the data so that you can update it all in one place, in one transaction. This may be an appropriate approach every now and then to reduce the complexity of specific system journeys, but I doubt it’s one that you can employ regularly in a microservices architecture. If you take this step over and over again, every time you hit a distributed transaction, you’ll eventually coalesce everything into a single monolithic database! But you do have the option of using it where it has the most benefit.
Second alternative is to actually just do the unsafe distributed transaction and be prepared to handle error scenarios. To do that and not sink your ship, you need two things: (1) some detection in place in the form of a data reconciliation, and (2) you need to ensure that you’ve got all the data. That means it’s only okay for the 2nd and following legs of a distributed transaction to fail if the 1st leg contains all the data needed to execute the following legs manually once a mismatch has been found. This is mostly true of the async pattern advocated above anyway. The only real difference is that with async+retries, you get the probability of a self-healing system. With distributed transactions+detection of half-commits, you’ll need to handle each failure manually.
Finally, the third alternative is just to be an optimist and not worry about this stuff. Maybe you work on a system where it’s no biggie for an event to be half committed and not actually finish getting processed. At Tyro, we’re building a nextgen bank, so we think about guaranteeing things a lot, because that’s the expectation of our customers. When considering the impact that our context has on the way we design for this kind of stuff, we often use the motto, “No one cares if you lose one of their Tweets, but they care if you lose one of their dollars.”
Thanks to Chris Howard and Carl Francia for giving feedback on the draft version of this article.
Want to learn more?
Image credits: ‘Nature Antarctica 18‘ by Georges Nijs
‘Suitcase Bomb prop‘ by Rafael Mizrahi
‘Road Closed‘ – public domain
‘Sign for the roundabout between U.S. Route 50 Alternate, U.S. Route 95 Alternate and Nevada State Route 828 at the west end of Nevada State Route 828 (Farm District Road) in Fernley, Nevada‘ by ‘Famartin’
Is this not simply a saga that you are describing? There are a number of frameworks for distributed transactions.
Of course implementing idempotency and compensating transactions being the hard part of distributed transactions.
Yes, I think the saga pattern is very similar to what I’ve written here. I think one difference is that the saga concept reifies the process as a stateful entity in its own right, whereas my approaches have generally just been about guaranteed message delivery and/or compensating operations – the state is simply in the message queue.
Here is my Kafka solution diagrammed, in case you missed my tweet.
I know it looks naive but it relies on at-least-once delivery which in turn relies on distributed consensus within Kafka. The assumptions are the same as yours i.e. no compensating transactions, db transactions are idempotent and always succeed, eventual consistency.
Whichever way you cut it, there must be eventual consensus between the two services in this problem. If it is provided by a bespoke distributed algorithm, as in your solution, I want to know what assumptions make it simpler than, say, RAFT.
Sometimes the source of consensus is remote from the problem itself. In the Kafka solution it is ultimately a PAXOS implimentation in Zookeeper that makes at-least-once delivery possible which in turn makes consensus between the two services possible.
Disclaimer: I reserve the right to be embarrassingly wrong and/or talk out of my arse. 🙂
Pingback: In the eyes of authority - and maybe rightly so - nothing looks more like a link than the ordinary man. - Giorgio Agamben - Magnus Udbjørg
Similar technique can be used at the level of process flow-charts – see the section “Automated activities” in http://improving-bpm-systems.blogspot.ch/2014/08/bpm-for-digital-age-shifting.html
Agree, I had a similar view at https://dzone.com/articles/transactions-in-microservices
I thought it was very well written with good examples! I had some questions on this and it cleared them for me.
Very interesting article. I think eventual consistency is the key of the concept, once you get your head around and build your systems with that in mind, it makes things way easier. (well, as easy as it can be to build distributed systems….).
Is there any framework which helped you guys implementing an eventual consistency pattern? (message persistence, automatic-retry….). This makes me think of Akka Persist (At-Least-Once delivery semantic, and you get the automatic retry and persistence for free) but then, you are JVM coupled (not a problem for us personally but for others might be).
Speculatively, I’d think that Kafka Streams at this time, not necessarily when you raised issue, would work well with exactly-once semantics among the Kafka services (Streams apps) and guaranteed delivery. Michael Noll also chimed in just below and he is the Product manager of that platform.
Thanks for sharing this, Graham — I enjoyed the read. It reminded me of the recent talk by Bobby Calderwood (of Capital One, thus also in a finance context) given at Abstractions Con 2016:
“Commander: Decoupled, Immutable REST APIs with Kafka Streams [and Kafka]”
It covers CQRS (http://martinfowler.com/bliki/CQRS.html) and applies it to a microservices architecture that provide transactional services. The diagrams look quite similar to yours. 😉
Great write up. I have few questions would love to get it clarified. The “queue” used for the asynchronous flow by different thread is in memory? What happens the “commander” is down and the queued event is lost?
If it’s important for the message to not be lost, then the message queue needs to be backed by persistent storage. In our world, there’s very few scenarios where we’re okay with dropping a message, so queues are always database tables or disk-backed RabbitMQ queues.
These kinds of scenarios are exactly why we implemented a “good” distributed transaction protocol called Try-Confirm/Cancel (TCC).
It is ideal for microservices and described here: https://www.atomikos.com/Blog/TCCForTransactionManagementAcrossMicroservices
EventSouring is a perfect solution for distributed transaction
You said “you have to pick one of the services to be the primary handler for the event.” Can I use MongoDB for this one? Or do I have to use an RDBMS that guarantees the commit of the event?
Many storage technologies could be used. The key requirement is for atomicity, as the pattern is to store the instructions for completing the secondary effects along with the primary effect.
This does not seem to be the responsibility of the arbitrarily picked service but should be handled by something specifically to coordinate the work (especially as it becomes more complex). Normally I would see the process manager pattern (POI, hohpe) in such a case.
I agree, achieving the required level of coordination is not trivial. I wrote a partial rebuttal last year where I argued that the task should be delegated to dedicated service: http://jbossts.blogspot.co.uk/2016/10/achieving-consistency-in-microservices.html
As you’ve alluded to, I think it depends on the complexity of what you’re doing. If you’re doing something relatively simple, then it would be more complex to move the responsibility off to somewhere else. Other trade-offs to consider are the cost of having the logic spread out across even more components, and the danger of heading towards an ESB-like setup if you congregate all process management in a single process or small number of processes.
This is good, but I wonder if it is only solving the easy half of the problem? Transactions are about logical consistency. These kinds of activities, just like our old friends XA and 2-PC only partially handle physical consistency.
For logical consistency you have to consider external versus internal viewpoints, and potentially extended errors which are outside your control. This is hard, and outside the scope of this article, but for what it is worth, the key concepts to add would be fine-grained timestamps, reversible delta operations and global transaction identifiers. If we also include nesting and relativistic issues where you have parallelism then it gets even harder. These require extended consideration of compensation as Taliesin hinted at in the first response, as well as business level decisions beyond technology. Now there’s a challenge!
Pingback: Distributed Transactions: The Icebergs of Microservices - Evolvable Me
Pingback: Geschäftsprozesse mit Microservices abbilden - SDX AG
Thank you for sharing your insights on distributed transactions. We are faced with a very similar problem where message publishing is not bound to the primary database transaction. With the “Async Rabbit” pattern you described:
1) How does the background thread become aware of new messages in the table? Does it poll the table? If so, does that not add considerable load on the database?
2) Is there only one background thread? Has the volume of messages ever required more than one thread to keep up with the throughput? Multiple threads may make reading the table even more contentious.
I have to be honest and say that I don’t know how many threads are used to read the database and send messages to Rabbit – it’s all abstracted from the areas I tend to work in via a library that we use across most services. I don’t believe this causes much contention on the database, as the query is relative simply, i.e. give me the first 100 rows in this table, if there’s any. I haven’t heard of any issues with the message throughput being too high for us to handle, though we did recently deal with an issue where a batch process that was trying to insert lots of messages took out a lock on the table that resulted in messages already in the queue not being able to be sent for a minute or two. That was easily solved by batching the inserts, however.
Instead of saying “Bzzzzzzz” you should say “Pzzzzuuuuuuu” like a laser sound or the sound of a machine having its power cut off suddenly. That usually gets the point across a little better. Otherwise, great article.
Thanks for the feedback!
Amazing article. Well articulated! Kudos!
Pingback: Distributed transactions and why you should care – CoinAffairs
Pingback: Distributed transactions and why you should care – Hodl Army
On the Async Ranbit solution, what happens if the machine crashes/loses power? What is the fail safe in that situation while the message is queued tryingbto go to the message broker?
Or, in the case of Kafka, where the producers are unable to send the message because there are not enough in-sync partitions?
The messages queued to go to the message broker are stored in the database, in the same transaction that committed the primary change to the database. So we get atomicity: either the domain update is saved and the secondary messages are persisted and guaranteed to be sent, or nothing gets saved and no messages get sent.
I think 90 to 95% time we are talking about a happy path. So we let the system working synchronously and only look for errors which are subjected to retry or push in the message queue.
There are so many payment service level errors which are required to propagate to the end customer such as an invalid card or insufficient funds etc. It’s not good user experience to send an email offline to tell a customer that please retry as your card was not valid.
I think we can still use the Distributed Transaction if the microservices’ collection is trying to create/update the Datasource (Same DB, Different DBs and NoSQL) then we can use the Distributed Transaction.
I am concerned about this as well. Let’s consider a system which serves as a storage facade in front of several microservices. When you submit the data to facade it should persist the data in those microservices. You can’t do it asynchronously just because user needs a confirmation and response with established ids for all objects to proceed with further operations on the data.
Pingback: Nerds2Nerds » Blog Archive » Епизод 103 – част 2 – Дистрибутираните Транзакции – Айсбергите на Microservices
Pingback: Distributed Transaction: The Icebergs of Microservices – DSTORM TECHNOLOGIES BLOG
Graham Lea, very good article!
I just see few things are missing here, introducing broker into the story (RabbitMQ for example) and writing about guaranteed message delivery but not mentioning delivery guarantee offered by RabbitMQ and how that impacts speed/performance of the broker it self is crucial information in my opinion.
If one is not using any kind of delivery guarantee offered by RabbitMQ, pretty much you have blind publisher that will never know what happened with the message which is default behaviour for RabbitMQ, and broker will silently drop that message.
There are many mechanism how this can be tackled, like publishing messages with mandatory flag at least, using publisher confirms, using message persistence and so on. In a nutshell the more guarantee you want, less performance you will get by the broker.
Pingback: 分布式事务 消息中间件 | harrykjg
Have you ever implemented a distributed system in a financial company in your banking core? On the other hand, did you ever implement XA in a banking core financial institution? Are you an XA specialist? My question is because IBM DB2 Iseries, Oracle DB , Mysql, Postgress, Include in its database Global transactions, if as you say it is not successful. Most likely you do not have experience implementing XA, The compensation scheme in a financial company is very disastrous, when you want to choreograph many transactional services is where you are going to realize by XA seeks the atomicity, XA already thought how to commit and rollback one more participants, if the problem is the network issue there are many controls to avoid it, from the coordinators, participants and data repository (RDBMS), in the compensatory case, you have to program the commit and the back of a series of combinations If it fails to do so at the end, the cost will be a programmer analyst who corrects the transaction by SQL.
On the other hand, I am trying to choose a standard of transactions distributed with api, and not use application servers, among them I found Atómicos and Bitronix. Someone knows another api that is free.
You got it. This article did not solve the atomic transaction conundrum. It just introduced a complex failure prone async commit that relies on queues. He never mentioned that queues do fail and lose data. His assumption are that queues managed by quality brokers are 100% reliable. It’s better to go for immediate all or nothing commit rather than leaving a dangling transaction that may never succeed. XA and newer protocols leave the system information consistent by doing all or nothing. In the case of nothing committed it’s better to the client to retry the business process rather than giving the client the impression that everything went well. The client doesn’t know that the business case has not completed and there’s an async piece of the process still not done. A time bomb. Visit Atomikos.com to see how the real pros do it, including TCC.
It does solve the atomic transaction problem. First data is saved to the database in an atomic transaction. Job done. The async message is then read from the database and sent to the message broker and, only on confirmation from the broker, it is deleted from the database. This (send to broker + delete from db) action doesn’t need to be atomic. Idempotence in message handling at the receiver/s ensures that, if the database delete fails, the message can be re-sent later without causing misbehaviour.
We didn’t assume that brokers are 100% reliable. As well as running a HA clustered broker architecture, we wrote logs of all outgoing messages on separate infrastructure and kept them for a sufficient amount of time to allow recovery from a broker failure.
I would be surprised if your comment that “XA and newer protocols leave the system information consistent by doing all or nothing” is true. I mean, if your network is magically 100% available, sure. But it sounds like the kind of assertion that ignores inevitable network failures and the CAP theorem. In the case of a network partition, these protocols would be temporarily unable to determine whether all or nothing has been completed by one of the parties. At that point, do they toss availability out the window and lock the whole system down until they find out the truth, or do they throw consistency out the window with a “tried my best” approach to maintain availability?
Really? Read that aloud to yourself and see what it sounds like.
Instead of polling the payment database for new events, can all the inflight events be written to a temporary database and have a transaction across payment and temporary database ensure atomicity ? If the payment, temp databases are replicated (for high-availability) then by having such a transaction across databases am I introducing distributed transaction back into the solution ?
I have no expertise in databases and hence my question.
I think this is a real great article post.Thanks Again. Really Cool.
Pingback: Microservices vs Monolithic Architecture: A Practical Approach - DEV Community