Customize

Plugin dispatcher

Specify the dispatcher to be used inside the plugin. The default is “akka.actor.default-dispatcher”.

Journal

j5ik2o.dynamo-db-journal {
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

Snapshot

j5ik2o.dynamo-db-snapshot {
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

State

j5ik2o.dynamo-db-state {
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

Table names

Specify the table name. The default is the followings.

Journal

j5ik2o.dynamo-db-journal {
  table-name = "Journal"
}

Snapshot

j5ik2o.dynamo-db-snapshot {
  table-name = "Snapshot"
}

State

j5ik2o.dynamo-db-state {
  table-name = "State"
}

Table column names

Journal

j5ik2o.dynamo-db-journal {
  columns-def {
    partition-key-column-name = "pkey"
    sort-key-column-name = "skey"
    
    persistence-id-column-name = "persistence-id"
    sequence-nr-column-name = "sequence-nr"
    
    deleted-column-name = "deleted"
    message-column-name = "message"
    ordering-column-name = "ordering"
    tags-column-name = "tags"
  }
}

Snapshot

j5ik2o.dynamo-db-snapshot {
  columns-def {
    partition-key-column-name = "pkey"
    sort-key-column-name = "skey"
    
    persistence-id-column-name = "persistence-id"
    sequence-nr-column-name = "sequence-nr"
    
    snapshot-column-name = "snapshot"
    created-column-name = "created"
  }
}

State

j5ik2o.dynamo-db-state {
  columns-def {
    partition-key-column-name = "pkey"
    sort-key-column-name = "skey"
    
    persistence-id-column-name = "persistence-id"
    sequence-nr-column-name = "sequence-nr"
    
    deleted-column-name = "deleted"
    payload-column-name = "payload"
    serializer-id-column-name = "serializer-id"
    serializer-manifest-column-name = "serializer-manifest"
    ordering-column-name = "ordering" 
    tags-column-name = "tags"
  }
}

Index names

Journal

j5ik2o.dynamo-db-journal {
  get-journal-rows-index-name = "GetJournalRowsIndex"
}

Snapshot

j5ik2o.dynamo-db-snapshot {
  get-snapshot-rows-index-name = "GetSnapshotRowsIndex"
}

State

The state plugin has not the secondary index.

Write-sharding

Journal

j5ik2o.dynamo-db-journal {
  shard-count = 64
  partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver$PersistenceIdBased"
  sort-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr" 
}

Shard Count

shard-count is the logical number of the shards. the default value is 64.

Partition Key Resolver Class Name

partition-key-resolver-class-name specifies the implementation class that generates pkey from PersistenceId and Sequence Number. The following two implementations are available for built-in use.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.SequenceNumberBased (Default)

    • pkey = ${persistenceId}-${sequenceNumber % shardCount}

    • The same PersistenceId will be assigned to a different shard if the Sequence Number is different. This is a write-specific sharding.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased

    • pkey = ${persistenceId.prefix}-${md5(persistenceId.reverse) % shardCount}

    • e.g. counter-875e6ce0425e4d2b8203f3b44b9b531a, persistenceId.prefix is counter.

    • If you choose this option, the same shard will be assigned if the PersistenceId is the same, so be sure to select this option if you are using DynamoDB Streams or KDS for DynamoDB.

Sort Key Resolver Class Name

sort-key-resolver-class-name specifies the implementation class that generates skey from PersistenceId and Sequence Number. The following two implementations are available for built-in use.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$SeqNr

    • skey = $sequenceNumber

    • An implementation in which pkey is the Sequence Number.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr

    • skey = ${persistenceId.body}-${sequenceNumber}

    • e.g. 875e6ce0425e4d2b8203f3b44b9b531a, persistenceId.body is 875e6ce0425e4d2b8203f3b44b9b531a.

    • Use persistenceId.body as the prefix since shard-count may cause multiple persistenceIds events to be stored in the same shard.

Configure your own implementation

If you need, set up your own implementation.

j5ik2o.dynamo-db-journal {
  partition-key-resolver-class-name = "your class name(fqcn)"
  sort-key-resolver-class-name = "your class name(fqcn)"
}

Snapshot

j5ik2o.dynamo-db-snapshot {
  shard-count = 64
  partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.snapshot.PartitionKeyResolver$PersistenceIdBased"
  sort-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.snapshot.SortKeyResolver$PersistenceIdWithSeqNr" 
}

Shard Count

shard-count is the logical number of shards. default value is 64.

Partition Key Resolver Class Name

partition-key-resolver-class-name specifies the implementation class that generates pkey from PersistenceId and Sequence Number. The following two implementations are available for built-in use.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.SequenceNumberBased (Default)

    • pkey =${persistenceId}-${sequenceNumber % shardCount}

    • The same PersistenceId will be assigned to a different shard if the Sequence Number is different. This is a write-specific sharding.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased

    • pkey = ${persistenceId.prefix}-${md5(persistenceId.reverse) % shardCount}

    • e.g. counter-875e6ce0425e4d2b8203f3b44b9b531a, persistenceId.prefix is counter.

    • If you choose this option, the same shard will be assigned if the PersistenceId is the same, so be sure to select this option if you are using DynamoDB Streams or KDS for DynamoDB.

Sort Key Resolver Class Name

sort-key-resolver-class-name specifies the implementation class that generates skey from PersistenceId and Sequence Number. The following two implementations are available for built-in use.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$SeqNr

    • skey = $sequenceNumber

    • An implementation in which pkey is the Sequence Number.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr

    • skey = ${persistenceId.body}-${sequenceNumber}

    • e.g. 875e6ce0425e4d2b8203f3b44b9b531a, persistenceId.body is 875e6ce0425e4d2b8203f3b44b9b531a.

    • Use persistenceId.body as the prefix since shard-count may cause multiple persistenceIds events to be stored in the same shard.

Configure your own implementation

If you need, set up your own implementation.

j5ik2o.dynamo-db-snapshot {
  partition-key-resolver-class-name = "your class name(fqcn)"
  sort-key-resolver-class-name = "your class name(fqcn)"
}

Persistent data images

persistenceId

sequence-nr

pkey(SequenceNumberBased)

skey(SeqNr)

counter-875e6ce0425e4d2b8203f3b44b9b531a

1

counter-875e6ce0425e4d2b8203f3b44b9b531a-1

0000000000000000001

counter-875e6ce0425e4d2b8203f3b44b9b531a

2

counter-875e6ce0425e4d2b8203f3b44b9b531a-2

0000000000000000002

counter-875e6ce0425e4d2b8203f3b44b9b531a

3

counter-875e6ce0425e4d2b8203f3b44b9b531a-3

0000000000000000003

counter-875e6ce0425e4d2b8203f3b44b9b531a

4

counter-875e6ce0425e4d2b8203f3b44b9b531a-4

0000000000000000004

counter-875e6ce0425e4d2b8203f3b44b9b531a

5

counter-875e6ce0425e4d2b8203f3b44b9b531a-5

0000000000000000005

persistenceId

sequence-nr

pkey(PersistenceIdBased)

skey(PersistenceIdWithSeqNr)

counter-875e6ce0425e4d2b8203f3b44b9b531a

1

counter-0000000000000000000000000000000000000803

875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000001

counter-875e6ce0425e4d2b8203f3b44b9b531a

2

counter-0000000000000000000000000000000000000803

875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000002

counter-875e6ce0425e4d2b8203f3b44b9b531a

3

counter-0000000000000000000000000000000000000803

875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000003

counter-875e6ce0425e4d2b8203f3b44b9b531a

4

counter-0000000000000000000000000000000000000803

875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000004

counter-875e6ce0425e4d2b8203f3b44b9b531a

5

counter-0000000000000000000000000000000000000803

875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000005

counter-a8d46579bc2f4caf8c3b8dc2db984227

1

counter-0000000000000000000000000000000000000803

a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000001

counter-a8d46579bc2f4caf8c3b8dc2db984227

2

counter-0000000000000000000000000000000000000803

a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000002

counter-a8d46579bc2f4caf8c3b8dc2db984227

3

counter-0000000000000000000000000000000000000803

a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000003

counter-a8d46579bc2f4caf8c3b8dc2db984227

4

counter-0000000000000000000000000000000000000803

a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000004

counter-a8d46579bc2f4caf8c3b8dc2db984227

5

counter-0000000000000000000000000000000000000803

a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000005

State

The state plugin has not the sort-key and SortKeyResolver.

j5ik2o.dynamo-db-state {
  shard-count = 64
  partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.state.PartitionKeyResolver$PersistenceIdBased"
}

Shard Count

shard-count is the logical number of shards. default value is 64.

Partition Key Resolver Class Name

There are one standard implementations as follows.

  • com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased

    • This is the same implementation as Journal or Snapshot.

Configure your own implementation

If you need, set up your own implementation.

j5ik2o.dynamo-db-state {
  partition-key-resolver-class-name = "your class name(fqcn)"
}

AWS Client

AWS Client Factory

The following factories are used to create AWS Clients inside the plugins.

key

version

async/sync

default value

j5ik2o.dynamo-db-?????.v2-async-client-factory-class-name

v2

async

com.github.j5ik2o.akka.persistence.dynamodb.utils.V2AsyncClientFactory$Default

j5ik2o.dynamo-db-?????.v2-sync-client-factory-class-name

v2

sync

com.github.j5ik2o.akka.persistence.dynamodb.utils.V2SyncClientFactory$Default

j5ik2o.dynamo-db-?????.v2-dax-async-client-factory-class-name

v2-dax

async

com.github.j5ik2o.akka.persistence.dynamodb.utils.V2DaxAsyncClientFactory$Default

j5ik2o.dynamo-db-?????.v2-dax-sync-client-factory-class-name

v2-dax

sync

com.github.j5ik2o.akka.persistence.dynamodb.utils.V2DaxSyncClientFactory$Default

j5ik2o.dynamo-db-?????.v1-async-client-factory-class-name

v1

async

com.github.j5ik2o.akka.persistence.dynamodb.utils.V1AsyncClientFactory$Default

j5ik2o.dynamo-db-?????.v1-sync-client-factory-class-name

v1

sync

com.github.j5ik2o.akka.persistence.dynamodb.utils.V1SyncClientFactory$Default

j5ik2o.dynamo-db-?????.v1-dax-async-client-factory-class-name

v1-dax

async

com.github.j5ik2o.akka.persistence.dynamodb.utils.V1DaxAsyncClientFactory$Default

j5ik2o.dynamo-db-?????.v1-dax-sync-client-factory-class-name

v1-dax

sync

com.github.j5ik2o.akka.persistence.dynamodb.utils.V1DaxSyncClientFactory$Default

The AWS Clients generation process is automatic if you specify the set values, but if you wish to apply your own generation process, please do the following.

This plugin is useful for specifying the configuration items to the client that are not supported by this plugin, or for dynamically specifying configuration items within the program.

package example

import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import com.github.j5ik2o.akka.persistence.dynamodb.utils.V2AsyncClientFactory
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

class MyV2AsyncClientFactory extends V2AsyncClientFactory {
  override def create(
      dynamicAccess: DynamicAccess,
      pluginConfig: PluginConfig
  ): DynamoDbAsyncClient = {
    // Default Implementation is the following.
    // V2ClientUtils.createV2AsyncClient(dynamicAccess, pluginConfig)
    
    // e.g. Custom AWS Client Initialization
    V2ClientBuilderUtils
      .setupAsync(
        dynamicAccess,
        pluginConfig
      )
      .defaultsMode(DefaultsMode.AUTO) // This is not a configuration item in the plugin.
      .build()
  }
}
j5ik2o.dynamo-db-journal.v2-async-client-factory-class-name = "example.MyV2AsyncClientFactory"

AWS Client Config

The configurations for AWS Clients is specified in the following sections(j5ik2o.dynamodb-db-?????.dynamo-db-client).

j5ik2o.dynamo-db-journal {
  dynamo-db-client {
    access-key-id = "x"
    secret-access-key = "x"
    endpoint = "http://localhost:8000/"
  }
}
  • access-key-id is Access Key ID

  • secret-access-key is Secret Access Key

  • endpoint is an endpoint to DynamoDB

AWS CredentialsProvider

You can specify your own AwsCredentialsProvider.

aws-credentials-provider-provider-class-name can specify the provider that generate AwsCredentialsProvider. aws-credentials-provider-class-name can specify the implementation class name of AwsCredentialsProvider.

j5ik2o.dynamo-db-????? {
  dynamo-db-client {
    v2 {
      aws-credentials-provider-provider-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.client.v2.AwsCredentialsProviderProvider$Default"
      aws-credentials-provider-class-name = "software.amazon.awssdk.auth.credentials.AwsCredentialsProvider"
    }
  }
}

Custom AWSCredentialsProviderProvider by using aws-credentials-provider-provider-class-name

package example

import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.client.v2.AwsCredentialsProviderProvider
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, WebIdentityTokenFileCredentialsProvider }

final class MyAwsCredentialsProviderProvider(@unused dynamicAccess: DynamicAccess, @unused pluginConfig: PluginConfig)
  extends AwsCredentialsProviderProvider {

  override def create: Option[AwsCredentialsProvider] = {
    if (sys.env.contains("AWS_ROLE_ARN"))
      Some(WebIdentityTokenFileCredentialsProvider.create())
    else
      None
  }

}
j5ik2o.dynamo-db-????? {
  v2 {
    aws-credentials-provider-provider-class-name = "exampe.MyAwsCredentialsProviderProvider"
  }
}

Custom AWSCredentialsProvider by using aws-credentials-provider-class-name

package example

import akka.actor.DynamicAccess
import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, AwsCredentials }
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig

class MyAwsCredentialsProvider(dynamicAccess: DynamicAccess, pluginConfig: PluginConfig) extends AwsCredentialsProvider {
  override def resolveCredentials(): AwsCredentials = {
    // Custom AWS Credentials
  }
}

Specify your custom implementation class to aws-credentials-provider-class-name.

j5ik2o.dynamo-db-????? {
  v2 {
    aws-credentials-provider-class-name = "exampe.MyAwsCredentialsProvider"
  }
}

MetricPublisher

Using the MetricPublisher in the V2 sdk, metrics at the SDK level can be sent to Datadog and Newrelic via Kamon.

package example

import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import kamon.Kamon
import kamon.metric.{ Counter, Histogram, MeasurementUnit }
import software.amazon.awssdk.core.metrics.CoreMetric
import software.amazon.awssdk.metrics.{ MetricCollection, MetricPublisher }

import scala.annotation.unused
import scala.jdk.StreamConverters.StreamHasToScala

class MyMetricPublisher(
                           @unused dynamicAccess: DynamicAccess,
                           pluginConfig: PluginConfig
                         ) extends MetricPublisher {

  override def publish(metricCollection: MetricCollection): Unit = {
    val metricsMap = metricCollection.stream().toScala(Vector).map { mr => (mr.metric().name(), mr) }.toMap
    metricsMap(CoreMetric.OPERATION_NAME.name()).value() match {
      case "PutItem" =>
        if (metricsMap(CoreMetric.API_CALL_SUCCESSFUL.name()).value().asInstanceOf[java.lang.Boolean]) {
          val apiCallDuration = metricsMap(CoreMetric.API_CALL_DURATION.name()).value().asInstanceOf[java.time.Duration]
          val credentialsFetchDuration =
            metricsMap(CoreMetric.CREDENTIALS_FETCH_DURATION.name()).value().asInstanceOf[java.time.Duration]
          val marshallingDuration =
            metricsMap(CoreMetric.MARSHALLING_DURATION.name()).value().asInstanceOf[java.time.Duration]
          val retryCount = metricsMap(CoreMetric.RETRY_COUNT.name()).value().asInstanceOf[java.lang.Integer]
          putItemApiCallDurationHistogram.record(apiCallDuration.toMillis)
          putItemCredentialsFetchDurationHistogram.record(credentialsFetchDuration.toMillis)
          putItemMarshallingDurationHistogram.record(marshallingDuration.toMillis)
          putItemRetryCounter.increment(retryCount.toLong)
        } else {
          putItemErrorCounter.increment()
        }
      case _ =>
    }

  }

  override def close(): Unit = {}

  private val prefix                            = "app"
  private val apiCallDurationHistogram          = histogram(s"$prefix.aws.api-call-duration")
  private val credentialsFetchDurationHistogram = histogram(s"$prefix.aws.credentials-fetch-duration")
  private val marshallingDurationHistogram      = histogram(s"$prefix.aws.marshalling-duration")
  private val retryCounter                      = counter(s"$prefix.aws.retry-count")
  private val errorCounter                      = counter(s"$prefix.error-count")

  private val putItemApiCallDurationHistogram = apiCallDurationHistogram.withTag("Operation", "PutItem")
  private val putItemCredentialsFetchDurationHistogram = credentialsFetchDurationHistogram
    .withTag("Operation", "PutItem")
  private val putItemMarshallingDurationHistogram = marshallingDurationHistogram
    .withTag("Operation", "PutItem")
  private val putItemRetryCounter = retryCounter
    .withTag("Operation", "PutItem")
  private val putItemErrorCounter = errorCounter
    .withTag("Operation", "PutItem")

  private def histogram(metricName: String): Histogram =
    Kamon
      .histogram(metricName, MeasurementUnit.time.milliseconds)
      .withTag("TableName", pluginConfig.tableName)
      .withTag("sdk", s"java-${pluginConfig.clientConfig.clientVersion}")

  private def counter(metricName: String): Counter =
    Kamon
      .counter(metricName)
      .withTag("TableName", pluginConfig.tableName)
      .withTag("sdk", s"java-${pluginConfig.clientConfig.clientVersion}")
}

Specify your custom implementation classes to metric-publisher-class-names.

j5ik2o.dynamo-db-????? {
  v2 {
    metric-publisher-class-names = ["exampe.MyMetricPublisher"]
  }
}