Visualising unique transactions in queue
Context
I was investigating an issue in Iroha: [BUG] observing peer can't change a role · Issue #5330 · hyperledger-iroha/iroha. You can find the load testing scripts and the notebook itself in the 0x009922/iroha-load-testing
repo.
In short: under high load, certain peers in the network started to fall behind in block synchronisation. It turned out to be somehow connected to the role of the peers in consensus[1] and to the size of the transactions queue[2]:
I had a hypothesis I wanted to proof: perhaps, ObservingPeer
overflows its queue because the load is too high to manage to gossip transactions to other peer, and ObservingPeer
never becomes a Leader
that "flushes" its local queue of transactions and creates a block of them (thus other peers, which rotate roles after each block, do not have this problem). In the meantime, ObservingPeer
grows its queue with unique transactions, i.e. those of which existence other peers never get to know.
How to proof the hypothesis
I wanted to build a visualisation that clearly shows the dynamic of unique transactions in each peers' queue over time to see whether my hypothesis is correct.
I had logs from the peers in JSON format. They were very detailed, and capturing just 2 minutes of load testing produced around a million of lines, for each peer (the size was around 100~200 mb of each). They contained events like Transaction enqueued
and Transaction removed from the queue
, with unique transaction hash attached. Using this, it is possible to keep track of what transactions each peer has in its queue at each point of time, and to calculate which are unique for each peer.
First steps
I used an Elixir notebook (Livebook). First, I read the logs line-by-line, parsing JSONs into maps, using Stream
s as much as possible to read/filter/map/reduce things lazily. I built a naive map_reduce
algorithm that was literally keeping MapSet
s (just sets) of transaction hashes for each peer and computing their differences to identify the amount of unique elements at each point of time.
It was very, very slow. I didn't got results within 5 or even 10 minutes.
Then I tried to optimise the bottleneck which I thought was using the MapSet
. I wrote a simple wrapper around ETS[3] that was implementing the necessary set semantics.
It was faster, but still very slow. I don't remember the numbers, but something like 110 seconds.
Then I thought: can I maybe use dataframes to work with the logs and extract data from them?
Data Frames
Elixir has a fantastic Explorer
library, a wrapper around Polars.
First, I read the logs themselves with it:
logs =
run["peers"]
|> Task.async_stream(
fn peer ->
label = peer["label"]
log_path = Path.join(dir, "#{label}_stdout.json")
{label,
DF.from_ndjson!(log_path, infer_schema_length: 2_000_000)
|> DF.mutate(
timestamp: Series.cast(
timestamp,
{:naive_datetime, :millisecond}
)
)
|> DF.mutate(level: cast(level, :category))
|> DF.mutate(msg: field(fields, :message))}
end,
timeout: 10_000
)
|> Stream.map(fn {:ok, data} -> data end)
|> Enum.into(%{})
It did it within a second. But this is a map of dataframes of logs of each peer. I needed to refine it:
df =
logs
|> Task.async_stream(fn {peer, df} ->
df
|> DF.mutate(
msg: Explorer.Series.field(fields, "message"),
tx: Explorer.Series.field(fields, "tx")
)
|> DF.mutate(
tx_queue:
cond do
msg == "Transaction enqueued" -> "push"
msg == "Removed transaction from the queue" -> "pop"
msg == "Remove transaction from queue" -> "pop"
true -> nil
end
)
|> DF.filter(not is_nil(tx_queue))
|> DF.select(["timestamp", "tx", "tx_queue"])
|> DF.mutate(peer: ^peer)
|> DF.mutate(peer: cast(peer, :category))
end)
|> Stream.map(fn {:ok, data} -> data end)
|> Enum.reduce(&DF.concat_rows/2)
|> DF.sort_by(timestamp)
This produced a more specific, single sorted data frame with timestamp
, tx
(hash), tx_queue
(push
or pop
), and peer
(some unique name like peer_0
). It represents events of transactions entering/leaving the queue on each individual peer.
Then I thought: how do I have a cumulative set of unique transactions for each peer in the paradigm of data frames?
That was not trivial:
- For each peer, assign a unique mask to it: 1, 2, 4, 8, 16 etc.
- Group data frame by transaction hash (
tx
) - Compute a new field for each row:
vis
(visibility). When transaction is pushed by a peer, increasevis
by the mask of that peer. When popped by a peer, decrease by its mask. - This way, if
vis
equals to a mask of some peer, (e.g. 1 or 2 or 4), it undoubtedly means that the transaction is visible to only that peer. - Then, we produce a
vis_prev
series by shiftingvis
forward by 1. Thus, on each row we can see whatvis
was just before (remember, we grouped bytx
) - Then, we produce a
diff
series: it isvis
ifvis
is some mask, and it is negatedvis_prev
ifvis_prev
is some mask, otherwisenull
. Why?vis
andvis_prev
cannot be equal by the nature of our events- if
vis
equals to a mask of some peer, it means that the transaction is now visible only to it.. - if
vis_prev
equals to a mask of some peer, it means that the transaction was uniquely visible by that peer just before and it is no longer the case.
- We also filter out all rows that have
diff = null
. - What we get: data frame of series
timestamp
,tx
,diff
, grouped bytx
.diff
equals a positive/negative mask of a peer (1
or-1
,4
or-4
etc), and indicates that at thistimestamp
, thistx
enters/leaves the unique set of transactions of that peer. Not bad, huh? - Then, produce series
diff_step: diff / abs(diff)
(to make them all1
/-1
) anddiff_peer
(that string that uniquely identifies a peer). - Then, ungroup by
tx
, group bydiff_peer
, and finally computeunique
series as a cumulative sum ofdiff_step
. (The meaning ofdiff_step
is exactly to increase/decrease a cumulative counter of unique transactions for a particular peer.) - ?????
- PROFIT!!! Now we have a data frame with series
timestamp
,peer
, andunique
. Exactly the data I needed.
All together:
# 1.
peers =
for x <- run["peers"],
do: x["label"]
# 1.
masks_df =
DF.new(
peer: peers,
mask:
Enum.with_index(peers)
|> Enum.map(fn {_peer, i} ->
:math.pow(2, i) |> trunc()
end)
)
|> DF.mutate(peer: cast(peer, :category))
df =
df
# 1.
|> DF.join(masks_df, on: :peer)
# 2.
|> DF.group_by(["tx"])
# 3. 4.
|> DF.mutate(
vis:
cumulative_sum(
if tx_queue == "push" do
mask
else
-mask
end
)
)
# 5.
|> DF.mutate(vis_prev: shift(vis, 1))
# 6.
|> DF.mutate(
diff:
cond do
vis in ^masks_df["mask"] -> vis
vis_prev in ^masks_df["mask"] -> -vis_prev
true -> nil
end
)
# 7. 8.
|> DF.filter(not is_nil(diff))
# 9.
|> DF.mutate(
diff_mask: abs(diff),
diff_step: diff / abs(diff)
)
|> DF.join(
masks_df
|> DF.rename(peer: "diff_peer"),
on: [diff_mask: :mask]
)
# 10.
|> DF.ungroup(["tx"])
|> DF.group_by(["diff_peer"])
|> DF.mutate(unique: cumulative_sum(diff_step))
# 12.
|> DF.select(["timestamp", "diff_peer", "unique"])
|> DF.rename(diff_peer: "peer")
Visualisation
The bold line indicates the queue size, while the area below shows how much of it is unique to the peer.
It proved my hypothesis: indeed, the queue overflow happens because it almost entirely consists of unique transactions.
Conclusion
I can't say whether it helped me to solve the original issue very much!
However, it was quite fun and interesting (and stressful, because I probably should focus on something else), and I am amazed by the power and speed of data frames. I had to stretch my mind quite a bit.