Algorand's data pipeline framework.

  • By Algorand
  • Last update: Aug 10, 2023
  • Comments: 17
Shows a black Algorand logo light mode and white in dark mode.

CircleCI Github Contribute

Algorand Conduit

Conduit is a framework for ingesting blocks from the Algorand blockchain into external applications. It is designed as modular plugin system that allows users to configure their own data pipelines for filtering, aggregation, and storage of blockchain data.

For example, use conduit to:

  • Build a notification system for on chain events.
  • Power a next generation block explorer.
  • Select app specific data and write it to a custom database.
  • Build a custom Indexer for a new ARC.
  • Send blockchain data to another streaming data platform for additional processing (e.g. RabbitMQ, Kafka, ZeroMQ).
  • Build an NFT catalog based on different standards.

Getting Started

Installation

Download

The latest conduit binary can be downloaded from the GitHub releases page.

Docker

The latest docker image is on docker hub.

Install from Source

  1. Checkout the repo, or download the source, git clone https://github.com/algorand/conduit.git && cd conduit
  2. Run make conduit.
  3. The binary is created at cmd/conduit/conduit.

Usage

Conduit is configured with a YAML file named conduit.yml. This file defines the pipeline behavior by enabling and configuring different plugins.

Create conduit.yml configuration file

Use the conduit init subcommand to create a configuration template. Place the configuration template in a new data directory. By convention the directory is named data and is referred to as the data directory.

mkdir data
./conduit init > data/conduit.yml

A Conduit pipeline is composed of 3 components, Importers, Processors, and Exporters. Every pipeline must define exactly 1 Importer, exactly 1 Exporter, and can optionally define a series of 0 or more Processors. See a full list of available plugins with conduit list or the plugin documentation page.

Here is an example conduit.yml that configures two plugins:

importer:
    name: algod
    config:
        mode: "follower"
        netaddr: "http://your-follower-node:1234"
        token: "your API token"

# no processors defined for this configuration
processors:

exporter:
    name: "file_writer"
    config:
        # the default config writes block data to the data directory.

The conduit init command can also be used to select which plugins to include in the template. The example below uses the standard algod importer and sends the data to PostgreSQL. This example does not use any processor plugins.

./conduit init --importer algod --exporter postgresql > data/conduit.yml

Before running Conduit you need to review and modify conduit.yml according to your environment.

Run Conduit

Once configured, start Conduit with your data directory as an argument:

./conduit -d data

Full Tutorials

External Plugins

Conduit supports external plugins which can be developed by anyone.

For a list of available plugins and instructions on how to use them, see the External Plugins page.

External Plugin Development

See the Plugin Development page for building a plugin.

Contributing

Contributions are welcome! Please refer to our CONTRIBUTING document for general contribution guidelines.

Migrating from Indexer 2.x

Conduit can be used to populate data from an existing Indexer 2.x deployment as part of upgrading to Indexer 3.x.

We will continue to maintain Indexer 2.x for the time being, but encourage users to move to Conduit. It provides cost benefits for most deployments in addition to greater flexibility.

To migrate, follow the Using Conduit to Populate an Indexer Database tutorial. When you get to the step about setting up postgres, substitute your existing database connection string. Conduit will read the database to initialize the next round.

Download

conduit.zip

Comments(17)

  • 1

    PostgreSQL exporter delete-task does not seem to be working

    The PostgreSQL exporter delete-task option does not seem to be working

    We were hoping to use the delete-task option to keep our dev databases a bit smaller. Our conduit.yml file looks like this:

    exporter:
      name: postgresql
      config:
        connection-string: <redacted>
          sslmode=disable
        delete-task:
          rounds: 1000
          interval: 20
    

    However, when running against either a fully-synced PostgreSQL instance or an instance that is starting from 0, it does not appear that the txn table is being culled. Using the above configs, I can see the following in the DB:

    betanet=> select round from txn;
     round 
    -------
       357
     12166
     12178
     12188
     13011
     13018
     13020
     14810
     14817
     14823
     14864
     15655
     15661
     15668
     18075
     36998
    (16 rows)
    

    I'd have expected the early rows to have been thrown out as conduit moves through later blocks.

    Your environment

    • Conduit 1.2.0 (37a13fe)
    • PostgreSQL 13.10
    • Algod 3.16.3.stable [rel/stable] (commit #cbccc6de)
    • Ledger: Betanet

    Steps to reproduce

    1. Bootstrap an empty PosgreSQL DB
    2. Start Conduit using the PostgreSQL exporter from block 0 using the configs above.

    Expected behaviour

    Only transactions within the last 1000 blocks would be present

    Actual behaviour

    All transactions appear to be present. Table continues to get longer as conduit works its way through the ledger.

  • 2

    Conduit will not reconnect/continue if algod has been restarted

    Conduit will not reconnect/continue if algod has been restarted

    We have noticed that if we restart our algod instances, conduit gets stuck and will not ingest new blocks once algod has finished restarting and is back on-line

    Your environment

    Software Versions:

    • algod 3.16.2

    • conduit 1.2.0

    • Pipeline configuration

    • algod config.json:

    {
      "Archival": false,
      "CatchpointInterval": 0,
      "CatchupParallelBlocks": 32,
      "DNSBootstrapID": "betanet.algodev.network",
      "EnableDeveloperAPI": true,
      "EnableFollowMode": true,
      "EndpointAddress": "0.0.0.0:48081",
      "ForceFetchTransactions": false,
      "IsIndexerActive": false,
      "MaxAcctLookback": 256
    }
    

    conduit.yaml:

    exporter:
      config:
        connection-string: <redacted>
      
        delete-task:
          interval: 1000
          rounds: 10000
      name: postgresql
    
    hide-banner: true
    importer:
      config:
        catchup-config:
          admin-token:  <redacted but confirmed correct>
        mode: follower
        netaddr: http://vira-betanet-blue-algod4conduit:48081
        token: <redacted but confirmed correct>
      name: algod
    log-level: "INFO"
    metrics:
      addr: ":9999"
      mode: "ON"
      prefix: "conduit"
    retry-count: 0
    
    • Operating System details. We are running these in an AWS kubernetes cluster. Both Conduit and algod are your provided containers available on Docker Hub.

    There is one alogd instance, fronted by a kubernetes service object. The network path looks like:

    Conduit ---> k8s service ---> algod

    When the algod pod is killed, it takes about 30-60s before it is re-registered in the k8s service and available. The IP addresses do not change.

    The Algod pod has a PVC (disk) attached and this same disk is re-attached to the recreated algod pod every time. So algod is starting back up with exactly the same state as when it was shut down.

    We have seen this using two ledgers now, Betanet and MainNet.

    Steps to reproduce

    1. Setup Conduit/Algod in k8s as above
    2. Kill the algod pod with kubectl delete pod <algod_pod_name>
    3. Wait for the k8s stateful set to automatically re-create the pod after it has been deleted.
    4. Watch the logs for Conduit. It does not advance, even after waiting 20 minutes.

    Expected behaviour

    Conduit should fetch the next block from its dedicated algod instance as soon as it has restarted.

    Actual behaviour

    Conduit gets stuck trying to retrieve the next block. Restarting conduit or restarting algod a second time does not help.

    We see the following logs in conduit:

    {"__type":"Conduit","_name":"main","level":"info","msg":"Pipeline round: 28060661","time":"2023-07-19T19:47:23Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting status for round 28060661 (attempt 0): error getting status for round: Get \"http://vira-betanet-blue-algod4conduit:48081/v2/status/wait-for-block-after/28060660\": dial tcp 172.20.14.60:48081: connect: connection refused","time":"2023-07-19T19:47:23Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting status for round 28060661 (attempt 1): error getting status for round: Get \"http://vira-betanet-blue-algod4conduit:48081/v2/status/wait-for-block-after/28060660\": dial tcp 172.20.14.60:48081: connect: connection refused","time":"2023-07-19T19:47:24Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting status for round 28060661 (attempt 2): error getting status for round: Get \"http://vira-betanet-blue-algod4conduit:48081/v2/status/wait-for-block-after/28060660\": dial tcp 172.20.14.60:48081: connect: connection refused","time":"2023-07-19T19:47:25Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting status for round 28060661 (attempt 3): error getting status for round: Get \"http://vira-betanet-blue-algod4conduit:48081/v2/status/wait-for-block-after/28060660\": dial tcp 172.20.14.60:48081: connect: connection refused","time":"2023-07-19T19:47:26Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting block for round 28060661 (attempt 4): HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:48:27Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"failed to get block for round 28060661 after 5 attempts, check node configuration: HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:48:27Z"}
    {"__type":"Conduit","_name":"main","level":"error","msg":"failed to get block for round 28060661 after 5 attempts, check node configuration: HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:48:27Z"}
    {"__type":"Conduit","_name":"main","level":"info","msg":"Retry number 27 resuming after a 1s retry delay.","time":"2023-07-19T19:48:27Z"}
    {"__type":"Conduit","_name":"main","level":"info","msg":"Pipeline round: 28060661","time":"2023-07-19T19:48:28Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting block for round 28060661 (attempt 0): HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:49:28Z"}
    
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting block for round 28060661 (attempt 1): HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:50:28Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting block for round 28060661 (attempt 2): HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:51:28Z"}
    {"__type":"importer","_name":"algod","level":"error","msg":"error getting block for round 28060661 (attempt 3): HTTP 404: {\"message\":\"failed to retrieve information from the ledger\"}\n","time":"2023-07-19T19:52:28Z"}
    

    In the same time frame, we see the following logs from algod:

    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).periodicSync","level":"info","line":616,"msg":"It's been too long since our ledger advanced; resyncing","name":"","time":"2023-07-19T19:51:22.328185Z"}
    {"Context":"sync","details":{"StartRound":28060660},"file":"telemetry.go","function":"github.com/algorand/go-algorand/logging.(*telemetryState).logTelemetry","instanceName":"aWLoG60wMexN2Akp","level":"info","line":255,"msg":"/ApplicationState/CatchupStart","name":"","session":"","time":"2023-07-19T19:51:22.328292Z","v":""}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060661): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.329278Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060663): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.330118Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060662): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.330821Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060664): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.331494Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060665): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332124Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060666): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332721Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060667): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332760Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060668): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332790Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060670): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332789Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060669): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332819Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060671): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332822Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060672): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332843Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060673): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332856Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060674): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332875Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060675): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332887Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060676): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332899Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060678): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332926Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060677): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332916Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060679): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332952Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060683): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332954Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060680): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332976Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060684): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332981Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060681): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.332998Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060682): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333018Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060685): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333029Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060687): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333058Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060686): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333058Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060688): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333083Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060690): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333084Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060689): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333108Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060691): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333109Z"}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).pipelineCallback.func1","level":"info","line":451,"msg":"pipelineCallback(28060692): did not fetch or write the block","name":"","time":"2023-07-19T19:51:22.333138Z"}
    {"Context":"sync","details":{"StartRound":28060660,"EndRound":28060660,"Time":4898458,"InitSync":false},"file":"telemetry.go","function":"github.com/algorand/go-algorand/logging.(*telemetryState).logTelemetry","instanceName":"aWLoG60wMexN2Akp","level":"info","line":255,"msg":"/ApplicationState/CatchupStop","name":"","session":"","time":"2023-07-19T19:51:22.333176Z","v":""}
    {"Context":"sync","file":"service.go","function":"github.com/algorand/go-algorand/catchup.(*Service).sync","level":"info","line":688,"msg":"Catchup Service: finished catching up, now at round 28060660 (previously 28060660). Total time catching up 4.898458ms.","name":"","time":"2023-07-19T19:51:22.333218Z"}
    {"file":"logger.go","function":"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares.(*LoggerMiddleware).handler.func1","level":"info","line":56,"msg":"10.20.4.144:48484 - - [2023-07-19 19:51:23.451617927 +0000 UTC m=+325.403445294] \"GET /metrics HTTP/1.1\" 200 0 \"Prometheus/2.34.0\" 270.898µs","time":"2023-07-19T19:51:23.451920Z"}
    {"file":"logger.go","function":"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares.(*LoggerMiddleware).handler.func1","level":"info","line":56,"msg":"10.20.4.134:45858 - - [2023-07-19 19:51:26.258681648 +0000 UTC m=+328.210509005] \"GET /health HTTP/1.1\" 200 0 \"kube-probe/1.24+\" 34.082µs","time":"2023-07-19T19:51:26.258754Z"}
    {"file":"logger.go","function":"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares.(*LoggerMiddleware).handler.func1","level":"info","line":56,"msg":"10.20.3.176:41880 - - [2023-07-19 19:50:28.340640642 +0000 UTC m=+270.292467999] \"GET /v2/status/wait-for-block-after/28060660 HTTP/1.1\" 200 653 \"Go-http-client/1.1\" 1m0.001869246s","time":"2023-07-19T19:51:28.342549Z"}
    {"file":"utils.go","function":"github.com/algorand/go-algorand/daemon/algod/api/server/v2.returnError","level":"info","line":41,"msg":"ledger does not have entry 28060661 (latest 28060660, committed 28060660)","time":"2023-07-19T19:51:28.343474Z"}
    {"file":"logger.go","function":"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares.(*LoggerMiddleware).handler.func1","level":"info","line":56,"msg":"10.20.3.176:41880 - - [2023-07-19 19:51:28.343426358 +0000 UTC m=+330.295253715] \"GET /v2/blocks/28060661?format=msgpack HTTP/1.1\" 404 61 \"Go-http-client/1.1\" 78.945µs","time":"2023-07-19T19:51:28.343564Z"}
    

    These same log stanzas just repeat over and over.

  • 3

    conduit indexer format

    Hi! We started to use algorand/conduit with algorand/indexer and found undesirable behavior: $ curl 0:8980/v2/blocks/28992333 {"message":"error while looking up block for round '28992333': json decode error [pos 729]: failed to decode address v+gWVeweiq9PYHJc2Wh+C8najmt9AMQQHqsCDEgPKnk= to base 32"}

    It seems that happens because conduit writes values in base64 format to the database, whereas indexer wrote and expects this field in base32 format. Are you aware of this problem? Will next release of conduit fix this?

  • 4

    lint: Fix the `lint` github workflow

    Summary

    Fix reviewdog linter in the github PR workflow.

    Before this, reviewdog silently caused a panic, but the workflow succeeded anyway. This change pins the reviewdog step to explicitly use Go 1.17.13 (uses latest 1.20.* version by default, which was causing a panic) and sets to check with nofilter (only checks added lines of code by default).

    Test Plan

    Locally run (requires act): act pull_request -j lint

    PR lint workflow fails when there is a golangci lint error: https://github.com/algorand/conduit/actions/runs/4984363520/jobs/8922595805?pr=76

    Closes https://github.com/algorand/conduit/issues/75

  • 5

    algod importer: Update sync on WaitForBlock error.

    Summary

    If algod is restarted after it receives a sync round update but before it fetches the new round(s), then the algod follower and conduit will stall. Conduit will keep waiting for algod to reach the new sync round but it never happens.

    This change adds some extra logic to the WaitForBlock call. If there is a timeout or a bad response, a new attempt to set the sync round is made.

    This PR also removes the retry loop from the algod importer. Retry is now managed by the pipeline.

    Test Plan

    Update existing unit tests.

  • 6

    Fix typo on conduit configuration file

    Summary

    Fixes typo on Docker documentation. The entry point is using cp /etc/algorand/conduit.yml /data/conduit.yml and the documentation refers to config.yml

    Test Plan

    I didn't test the changes, I think they can be validated by inspection.

  • 7

    Bug-Fix: don't attempt to get deltas at round 0

    Summary

    Don't attempt to query deltas from a follower node for round 0. Instead, just declare that we need to catch up and log:

    No state deltas are ever available for round 0

    Test Plan

    CI - introduced new unit test cases

  • 8

    refactoring: Cleanup dependencies, port indexer utils, delete dead code.

    Summary

    Cleanup code prior to breaking out plugin dependencies:

    • Port indexer utility functions to avoid dependency.
    • Delete dead code to remove indexer / ValidatedBlock dependency.
    • Move simple type definitions to data package to avoid circular dependencies.

    Test Plan

    N/A - non functional changes.

  • 9

    docker: Add docker to the release pipeline

    Summary

    Add a docker multi-arch build + deployment configuration.

    ~For now the docker hub deployment is disabled.~ Hyperflow has added credentials to the repo so that the container can be deployed.

    Goreleaser provides binaries from the build step to the Dockerfile to ensure containers have the same files as the archives.

    Test Plan

    Tested with goreleaser release --skip-publish --snapshot --clean

    The files are here:

    ~$ docker images|grep conduit
    algorand/conduit   1.0.1-next-amd64   0863fc121046   About a minute ago   96.9MB
    algorand/conduit   latest-amd64       0863fc121046   About a minute ago   96.9MB
    algorand/conduit   1.0.1-next-arm64   907990ee4310   About a minute ago   90.5MB
    algorand/conduit   latest-arm64       907990ee4310   About a minute ago   90.5MB
    

    The architecture appears as expected:

    ~$ docker image inspect algorand/conduit:latest-arm64 | grep Architecture
            "Architecture": "arm64",
    ~$ docker image inspect algorand/conduit:latest-amd64 | grep Architecture
            "Architecture": "amd64",
    
  • 10

    docs: automation to publish documentation after a release.

    Summary

    Two parts of this PR:

    1. Minor changes to align with the docs repo.
    2. Automation to push changes to the docs repo.

    Automation

    Implemented in a new Doc Repo PR Generator / docs-pr workflow. It is designed to run manually, and to run after the goreleaser action.

    The following are updated:

    • CLI documentation, this is already automated by reformat.py in the docs repo.
    • Copy the manually written docs from Conduit's docs directory into the docs repo docs/get-details/conduit directory.

    A PR is created on the algorand/docs repo with a number of reviewers tagged automatically. I grabbed the list from other GH actions, maybe a group could be created in the future.

    remote repository / security notes

    In order to create a PR on a remote repo, some special access is required. I followed the steps documented here and created a Personal Access Token (PAT). ~The token is a classic token, because I was not able to create a fine-grained token targeting repositories in the algorand org.~ The token is a fine-grained token limited to the algorand/docs repo.

    Test Plan

    In order to gain access to the ${{ secrets.PAT }} I temporarily changed the on condition to pull_request, and pushed a branch directly to algorand/conduit. After getting the action to run I see the PR created: https://github.com/algorand/docs/pull/1064

    Open Issue

    The PR seems to be created by me. I tried to override the committer but I must have missed something. Also the CLA Assistant is not working. It looks like the commit is correctly changed to "algo-devops-service" but other parts are not. Maybe it's due to the personal access token?

  • 11

    docs: Fix dynamically changing logo and convert to PNG

    Summary

    GitHub seems to have issues with svgs stored in repository. Changing the assets to pngs with hosted url fixes the issue and the logo changes dynamically to Black for Light theme and White for Dark theme.

    Test Plan

    How did you test these changes? Please provide the exact scenarios you tested in as much detail as possible including commands, output and rationale.

    Its a one liner in readme - tested manually by chancing themes and verifying the markdown renders

  • 12

    Prometheus Metrics for Consideration

    Catchall for prometheus metrics we may want to add

    • Plugin OnComplete Callbacks
      • a SummaryVec for durations so we can see if any callbacks are a bottleneck
  • 13

    Design: Startup an Indexer automatically

    Problem

    When we want to run an Indexer with Conduit, we need to first start the Conduit and then the Indexer in separate startup commands. For a simple single node indexer deployment, it would be better if the indexer API is available when running Conduit.

    Solution

    Propose a solution to run the Indexer API as part of Conduit.

  • 14

    Log Rotation: document or provide solution

    Problem

    After a catchup on mainnet with log level INFO, the conduit log is more than 12GB. This is large enough that some sort of log rotation strategy is called for.

    Solution

    We could either provide our own solution using a go library, or document how this could be done on various system. For example, on linux one could use logrotate.

    An argument for providing our own solution is platform independence, but a counter-argument is that this is non-standard.

    Dependencies

    Urgency

    Low - I'm not sure how big of an issue this in the community

  • 15

    plugin: Creator account filter.

    Problem

    It is common to have a set of creators who create many assets and/or applications, which are then delegated to other accounts. In this scenario the creator is only relevant at create time. After they have been delegated there are no longer references to the creator accounts.

    Suppose a creator has made 10,000 ASAs and you want to track the usage of each of them, the current filter plugin would not allow it because the list of ASAs would vary over time as the creator continues to add new ones.

    Solution

    A new plugin with the following properties:

    name: creator_filter
    config:
        creators:
          - ADDR1
          - ADDR2
          - ADDR3
        # either algod or indexer can be provided to lookup IDs for assets and applications
        # associated with the addresses.
        algod:
            addr:
            token:
        indexer:
            addr:
            token:
    

    During startup an in-memory list of IDs can be resolved from Indexer or algod. While processing blocks the list of IDs would grow or shrink as the creator creates or deletes things. Transactions associated with any of the IDs (or creators) are selected and all others are removed.

    Note: instead of providing algod or indexer, the list of IDs can be stored in the data directory.

  • 16

    Replace `sirupsen/logrus` with a more performant logger

    Problem

    In working on #128 , I ran some performance tests and noticed that log-level had a significant impact. logrus has been observed to be slow in go-algorand and there is an internal issue (2479) to replace it and a poc branch using zerolog.

    Problematic experiment

    Using the Justfile command

    ❯ just conduit-bootstrap-and-go 300
    

    to bootstrap testnet and run a postgresql exporter against it for 300 seconds. I ran it a number of times against both the original pipeline and the new one. Here are the experimental results:

    | Log Level | Reps | Original rounds/300 sec (logs/round) | Pipelining rounds/300 sec (logs/round) | Pipelining v Original (%) | |-----------|------|-----|--------------------------|---------------------------| | TRACE | 3 | 3718 (7.0) | 3509 (14.0) | -5.6% | | INFO | 2 | 4578.5 (3.0) | 4423.5 (3.0) | -3.4% |

    So comparing the results within each column we can see:

    • 19% performance decline in the Original algorithm when going from 3 → 7 logs/round
    • 21% performance decline in the Pipelining algorithm when going from 3 → 14 logs/round

    The sample was very noisy but it looks like each log per round is costing around 1-5 % hit in terms of performance.

    Action Items

    1. A POC PR which let's us validate the performance implications of swapping out logrus for zerolog
    2. Decide if we want to allow "plug and play" logger capability (we have heard one community member make this suggestion)
    3. Switch all logs in conduit to the new logger (possibly via an interface if we opt for the "plug and play" approach)

    More links

    Dependencies

    None

    Urgency

    Medium - as we're currently working on improving Conduit's performance, this seems like a useful avenue to persue.

  • 17

    Pipeline the Pipeline

    Description

    Allowing for moderate concurrency in the pipeline but without sacrificing its sequential integrity.

    Summary of Changes

    • conduit/pipeline/common.go: introducing generic retrying of pipeline methods via Retries() and RetriesNoOutput()
    • conduit/pipeline/pipeline.go:
      • error/cancellation handling:
        • modify the pipeline's cancellation function to have a cause, and expose it via the WhyStopped() method
        • joinError() instead of setError() to the pipeline's error property
      • introducing goroutines for the importer, processors, exporters, and round launcher by refactoring Start() and introducing methods ImportHandler(), ProcessorHandler(), and ExporterHandler()
      • E2E end of test signal: modified the info-level log at the end of a round to look like FINISHED Pipeline round: 110. UPDATED Pipeline round: 111 and added WARNING commentary to be careful about changing this format as it would break the E2E test.
    • conduit/pipeline/pipeline_bench_test.go - benchmarker for the pipeline that includes sleeping plugins with an importer, 2 processors, and an exporter.
    • pkg/cli/cli.go: remove a line break from the final error printout

    Issues

    #118

    TODO

    • [x] Cleanup E2E code
    • [x] Merge in #129
    • [x] Don't embed PipelineData inside of BlockData
    • [x] Remove OStart() and any resulting orphans in pipeline.go
    • [x] Merge latest from master which includes 30 second timeout #133
    • [x] ~Should we send a telemetry observation based on WhyStopped() ?~ NOT FOR NOW. Reconsider this for #97
    • [x] ~What should be logged and how should internal-tools/.../logstats.go be modified?~ This is under the umbrella of the "Logging Plugin Performance" thread
    • [x] ~Should we send metrics for the callbacks?~ Not in this PR
    • [ ] Test on EC2 with the final changes in place.

    Testing

    E2E

    pipeline_bench_test.go

    Running a new benchmark test twice on the original code and the new, we have the following results. Note the most pertinent results for the typical indexer DB population use case is exporter_10ms_while_others_1ms:

    | Benchmark Name | Original rounds/sec | Pipelining rounds/sec | Pipelining v Original (%) | |----------------------------------------------------------|--------------|----------------|---------------------------| | vanilla_2_procs_without_sleep-size-1-8 | 3077 | 3309.5 | +7% | | uniform_sleep_of_10ms-size-1-8 | 22.32 | 79.815 | +250% | | exporter_10ms_while_others_1ms-size-1-8 | 63.405 | 78.565 | +24% | | importer_10ms_while_others_1ms-size-1-8 | 65.535 | 91.255 | +39% | | first_processor_10ms_while_others_1ms-size-1-8 | 60.28 | 89.175 | +48% |

    Block Generator Results

    Running the block generator test using SCENARIO = scenarios/config.allmixed.small.yml for 30s, with the original code and the new, each time for 2 experiments we have:

    | Reset database? | Original rounds/30 sec | Pipelining rounds/30 sec | Pipelining v Original (%) | |--------------------|------------------------|--------------------------|---------------------------| | Reset | 301 | 400 | +33% | | No Reset | 295 | 418 | +41% |

    Local test network 5 minute sprint

    I used the Justfile command

    ❯ just conduit-bootstrap-and-go 300
    

    to bootstrap testnet and run a postgresql exporter against it for 300 seconds. I ran it a number of times against both the original pipeline and the new one. Here are the experimental results:

    | Log Level | Reps | Original rounds/300 sec (logs/round) | Pipelining rounds/300 sec (logs/round) | Pipelining v Original (%) | |-----------|------|-----|--------------------------|---------------------------| | TRACE | 3 | 3718 (7.0) | 3509 (14.0) | -5.6% 😢 | | INFO | 2 | 4578.5 (3.0) | 4423.5 (3.0) | -3.4% 😢 |

    On EC2 - CLASSIC vs. PIPELINING

    There are much more detailed results in a google sheets document, but we have:

    SUMMARY

    image

    #133 aims to reduce the post-catchup fatal errors to 0