Visualising unique transactions in queue

BlockchainElixirVegaLitePolars

Context

I was investigating an issue in Iroha: [BUG] observing peer can't change a role · Issue #5330 · hyperledger-iroha/iroha. You can find the load testing scripts and the notebook itself in the 0x009922/iroha-load-testing repo.

In short: under high load, certain peers in the network started to fall behind in block synchronisation. It turned out to be somehow connected to the role of the peers in consensus[1] and to the size of the transactions queue[2]:

Blocks and queue growth in the network (queue size capacity is set to default 65,536)
Blocks and queue growth in the network (queue size capacity is set to default 65,536)

I had a hypothesis I wanted to proof: perhaps, ObservingPeer overflows its queue because the load is too high to manage to gossip transactions to other peer, and ObservingPeer never becomes a Leader that "flushes" its local queue of transactions and creates a block of them (thus other peers, which rotate roles after each block, do not have this problem). In the meantime, ObservingPeer grows its queue with unique transactions, i.e. those of which existence other peers never get to know.

How to proof the hypothesis

I wanted to build a visualisation that clearly shows the dynamic of unique transactions in each peers' queue over time to see whether my hypothesis is correct.

I had logs from the peers in JSON format. They were very detailed, and capturing just 2 minutes of load testing produced around a million of lines, for each peer (the size was around 100~200 mb of each). They contained events like Transaction enqueued and Transaction removed from the queue, with unique transaction hash attached. Using this, it is possible to keep track of what transactions each peer has in its queue at each point of time, and to calculate which are unique for each peer.

First steps

I used an Elixir notebook (Livebook). First, I read the logs line-by-line, parsing JSONs into maps, using Streams as much as possible to read/filter/map/reduce things lazily. I built a naive map_reduce algorithm that was literally keeping MapSets (just sets) of transaction hashes for each peer and computing their differences to identify the amount of unique elements at each point of time.

It was very, very slow. I didn't got results within 5 or even 10 minutes.

Then I tried to optimise the bottleneck which I thought was using the MapSet. I wrote a simple wrapper around ETS[3] that was implementing the necessary set semantics.

It was faster, but still very slow. I don't remember the numbers, but something like 110 seconds.

Then I thought: can I maybe use dataframes to work with the logs and extract data from them?

Data Frames

Elixir has a fantastic Explorer library, a wrapper around Polars.

First, I read the logs themselves with it:

logs =
  run["peers"]
  |> Task.async_stream(
    fn peer ->
      label = peer["label"]
      log_path = Path.join(dir, "#{label}_stdout.json")

      {label,
       DF.from_ndjson!(log_path, infer_schema_length: 2_000_000)
       |> DF.mutate(
          timestamp: Series.cast(
            timestamp,
            {:naive_datetime, :millisecond}
          )
       )
       |> DF.mutate(level: cast(level, :category))
       |> DF.mutate(msg: field(fields, :message))}
    end,
    timeout: 10_000
  )
  |> Stream.map(fn {:ok, data} -> data end)
  |> Enum.into(%{})

It did it within a second. But this is a map of dataframes of logs of each peer. I needed to refine it:

df =
  logs
  |> Task.async_stream(fn {peer, df} ->
    df
    |> DF.mutate(
      msg: Explorer.Series.field(fields, "message"),
      tx: Explorer.Series.field(fields, "tx")
    )
    |> DF.mutate(
      tx_queue:
        cond do
          msg == "Transaction enqueued" -> "push"
          msg == "Removed transaction from the queue" -> "pop"
          msg == "Remove transaction from queue" -> "pop"
          true -> nil
        end
    )
    |> DF.filter(not is_nil(tx_queue))
    |> DF.select(["timestamp", "tx", "tx_queue"])
    |> DF.mutate(peer: ^peer)
    |> DF.mutate(peer: cast(peer, :category))
  end)
  |> Stream.map(fn {:ok, data} -> data end)
  |> Enum.reduce(&DF.concat_rows/2)
  |> DF.sort_by(timestamp)

This produced a more specific, single sorted data frame with timestamp, tx (hash), tx_queue (push or pop), and peer (some unique name like peer_0). It represents events of transactions entering/leaving the queue on each individual peer.

Then I thought: how do I have a cumulative set of unique transactions for each peer in the paradigm of data frames?

That was not trivial:

  1. For each peer, assign a unique mask to it: 1, 2, 4, 8, 16 etc.
  2. Group data frame by transaction hash (tx)
  3. Compute a new field for each row: vis (visibility). When transaction is pushed by a peer, increase vis by the mask of that peer. When popped by a peer, decrease by its mask.
  4. This way, if vis equals to a mask of some peer, (e.g. 1 or 2 or 4), it undoubtedly means that the transaction is visible to only that peer.
  5. Then, we produce a vis_prev series by shifting vis forward by 1. Thus, on each row we can see what vis was just before (remember, we grouped by tx)
  6. Then, we produce a diff series: it is vis if vis is some mask, and it is negated vis_prev if vis_prev is some mask, otherwise null. Why?
    • vis and vis_prev cannot be equal by the nature of our events
    • if vis equals to a mask of some peer, it means that the transaction is now visible only to it..
    • if vis_prev equals to a mask of some peer, it means that the transaction was uniquely visible by that peer just before and it is no longer the case.
  7. We also filter out all rows that have diff = null.
  8. What we get: data frame of series timestamp, tx, diff, grouped by tx. diff equals a positive/negative mask of a peer (1 or -1, 4 or -4 etc), and indicates that at this timestamp, this tx enters/leaves the unique set of transactions of that peer. Not bad, huh?
  9. Then, produce series diff_step: diff / abs(diff) (to make them all 1/-1) and diff_peer (that string that uniquely identifies a peer).
  10. Then, ungroup by tx, group by diff_peer, and finally compute unique series as a cumulative sum of diff_step. (The meaning of diff_step is exactly to increase/decrease a cumulative counter of unique transactions for a particular peer.)
  11. ?????
  12. PROFIT!!! Now we have a data frame with series timestamp, peer, and unique. Exactly the data I needed.

All together:

# 1.
peers =
  for x <- run["peers"],
      do: x["label"]

# 1.
masks_df =
  DF.new(
    peer: peers,
    mask:
      Enum.with_index(peers)
      |> Enum.map(fn {_peer, i} ->
        :math.pow(2, i) |> trunc()
      end)
  )
  |> DF.mutate(peer: cast(peer, :category))

df =
  df
  # 1.
  |> DF.join(masks_df, on: :peer)
  # 2.
  |> DF.group_by(["tx"])
  # 3. 4.
  |> DF.mutate(
    vis:
      cumulative_sum(
        if tx_queue == "push" do
          mask
        else
          -mask
        end
      )
  )
  # 5.
  |> DF.mutate(vis_prev: shift(vis, 1))
  # 6.
  |> DF.mutate(
    diff:
      cond do
        vis in ^masks_df["mask"] -> vis
        vis_prev in ^masks_df["mask"] -> -vis_prev
        true -> nil
      end
  )
  # 7. 8.
  |> DF.filter(not is_nil(diff))
  # 9.
  |> DF.mutate(
    diff_mask: abs(diff),
    diff_step: diff / abs(diff)
  )
  |> DF.join(
    masks_df
    |> DF.rename(peer: "diff_peer"),
    on: [diff_mask: :mask]
  )
  # 10.
  |> DF.ungroup(["tx"])
  |> DF.group_by(["diff_peer"])
  |> DF.mutate(unique: cumulative_sum(diff_step))
  # 12.
  |> DF.select(["timestamp", "diff_peer", "unique"])
  |> DF.rename(diff_peer: "peer")

Visualisation

Note: this visualisation is made on a different dataset (set of logs) from the previous chart
Note: this visualisation is made on a different dataset (set of logs) from the previous chart

The bold line indicates the queue size, while the area below shows how much of it is unique to the peer.

It proved my hypothesis: indeed, the queue overflow happens because it almost entirely consists of unique transactions.

Conclusion

I can't say whether it helped me to solve the original issue very much!

However, it was quite fun and interesting (and stressful, because I probably should focus on something else), and I am amazed by the power and speed of data frames. I had to stretch my mind quite a bit.


  1. There are Leader, ValidatingPeer, ProxyTail, and ObservingPeer. ↩︎

  2. Transactions queue is a place where each peer stores accepted transactions until they eventually get committed as part of some block. ↩︎

  3. Erlang Term Storage, a very fast built-in in-memory database, simply speaking. ↩︎