diff --git a/cmd/tesseract/aws/main.go b/cmd/tesseract/aws/main.go index 33b3ce5a..38d5baf9 100644 --- a/cmd/tesseract/aws/main.go +++ b/cmd/tesseract/aws/main.go @@ -74,7 +74,7 @@ var ( extKeyUsages = flag.String("ext_key_usages", "", "If set, will restrict the set of such usages that the server will accept. By default only 'ServerAuth' certs are accepted. Set to 'Any' to accept all chain. Accepted values are defined in internal/ct.") rejectExtensions = flag.String("reject_extension", "", "A list of X.509 extension OIDs, in dotted string form (e.g. '2.3.4.5') which, if present, should cause submissions to be rejected.") acceptSHA1 = flag.Bool("accept_sha1_signing_algorithms", true, "If true, accept chains that use SHA-1 based signing algorithms. This flag will eventually be removed, and such algorithms will be rejected.") - enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true then the certificate is integrated into log before returning the response.") + enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true, waits for the submitted certificate to be covered by a published checkpoint before responding to an add-* request.") witnessPolicyFile = flag.String("witness_policy_file", "", "(Optional) Path to the file containing the witness policy in the format described at https://git.glasklar.is/sigsum/core/sigsum-go/-/blob/main/doc/policy.md") witnessTimeout = flag.Duration("witness_timeout", tessera.DefaultWitnessTimeout, "Maximum time to wait for witness responses.") notBeforeRL = flag.String("rate_limit_old_not_before", "28h:500", "Optionally rate limits submissions with old notBefore dates. Expects a value of with the format: \":\", e.g. \"30d:50\" would impose a limit of 50 certs/s on submissions whose notBefore date is >= 30days old.") @@ -90,6 +90,7 @@ var ( pushbackMaxOutstanding = flag.Uint("pushback_max_outstanding", tessera.DefaultPushbackMaxOutstanding, "Maximum number of in-flight add requests - i.e. the number of entries with sequence numbers assigned, but which are not yet integrated into the log.") pushbackMaxAntispamLag = flag.Uint("pushback_max_antispam_lag", aws_as.DefaultPushbackThreshold, "Maximum permitted lag for antispam follower, before log starts returning pushback.") garbageCollectionInterval = flag.Duration("garbage_collection_interval", 10*time.Second, "Interval between scans to remove obsolete partial tiles and entry bundles. Set to 0 to disable.") + awaiterPollInterval = flag.Duration("awaiter_poll_interval", storage.DefaultAwaiterPollInterval, "Interval between two checkpoint polls by the awaiter. Used for antispam, and if enable_publication_awaiter is set, to block add-* requests responses. Must be strictly positive or defaults to DefaultAwaiterPollInterval.") // Infrastructure setup flags bucket = flag.String("bucket", "", "Name of the S3 bucket to store the log in.") @@ -296,10 +297,11 @@ func newAWSStorageFunc(awsCfg taws.Config) func(ctx context.Context, signer note } sopts := storage.CTStorageOptions{ - Appender: appender, - Reader: reader, - IssuerStorage: issuerStorage, - EnableAwaiter: *enablePublicationAwaiter, + Appender: appender, + Reader: reader, + IssuerStorage: issuerStorage, + AwaiterPollInterval: *awaiterPollInterval, + EnablePubAwaiter: *enablePublicationAwaiter, } return storage.NewCTStorage(ctx, &sopts) diff --git a/cmd/tesseract/gcp/main.go b/cmd/tesseract/gcp/main.go index 5702b66d..9033095c 100644 --- a/cmd/tesseract/gcp/main.go +++ b/cmd/tesseract/gcp/main.go @@ -82,7 +82,7 @@ var ( extKeyUsages = flag.String("ext_key_usages", "", "If set, will restrict the set of such usages that the server will accept. By default all are accepted. The values specified must be ones known to the x509 package.") rejectExtensions = flag.String("reject_extension", "", "A list of X.509 extension OIDs, in dotted string form (e.g. '2.3.4.5') which, if present, should cause submissions to be rejected.") acceptSHA1 = flag.Bool("accept_sha1_signing_algorithms", true, "If true, accept chains that use SHA-1 based signing algorithms. This flag will eventually be removed, and such algorithms will be rejected.") - enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true then the certificate is integrated into log before returning the response.") + enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true, waits for the submitted certificate to be covered by a published checkpoint before responding to an add-* request.") witnessPolicyFile = flag.String("witness_policy_file", "", "(Optional) Path to the file containing the witness policy in the format described at https://git.glasklar.is/sigsum/core/sigsum-go/-/blob/main/doc/policy.md") witnessTimeout = flag.Duration("witness_timeout", tessera.DefaultWitnessTimeout, "Maximum time to wait for witness responses.") notBeforeRL = flag.String("rate_limit_old_not_before", "28h:500", "Optionally rate limits submissions with old notBefore dates. Expects a value of with the format: \":\", e.g. \"30d:50\" would impose a limit of 50 certs/s on submissions whose notBefore date is >= 30days old.") @@ -101,6 +101,7 @@ var ( clientHTTPMaxIdle = flag.Int("client_http_max_idle", 200, "Maximum number of idle HTTP connections for outgoing requests.") clientHTTPMaxIdlePerHost = flag.Int("client_http_max_idle_per_host", 200, "Maximum number of idle HTTP connections per host for outgoing requests.") garbageCollectionInterval = flag.Duration("garbage_collection_interval", 10*time.Second, "Interval between scans to remove obsolete partial tiles and entry bundles. Set to 0 to disable.") + awaiterPollInterval = flag.Duration("awaiter_poll_interval", storage.DefaultAwaiterPollInterval, "Interval between two checkpoint polls by the awaiter. Used for antispam, and if enable_publication_awaiter is set, to block add-* requests responses. Must be strictly positive or defaults to DefaultAwaiterPollInterval.") // Infrastructure setup flags bucket = flag.String("bucket", "", "Name of the GCS bucket to store the log in.") @@ -329,10 +330,11 @@ func newGCPStorage(gc *gcs.Client, hc *http.Client) func(ctx context.Context, si } sopts := storage.CTStorageOptions{ - Appender: appender, - Reader: reader, - IssuerStorage: issuerStorage, - EnableAwaiter: *enablePublicationAwaiter, + Appender: appender, + Reader: reader, + IssuerStorage: issuerStorage, + AwaiterPollInterval: *awaiterPollInterval, + EnablePubAwaiter: *enablePublicationAwaiter, } return storage.NewCTStorage(ctx, &sopts) diff --git a/cmd/tesseract/posix/main.go b/cmd/tesseract/posix/main.go index 14d3f74c..0951589a 100644 --- a/cmd/tesseract/posix/main.go +++ b/cmd/tesseract/posix/main.go @@ -81,7 +81,7 @@ var ( extKeyUsages = flag.String("ext_key_usages", "", "If set, will restrict the set of such usages that the server will accept. By default only 'ServerAuth' certs are accepted. Set to 'Any' to accept all chain. Accepted values are defined in internal/ct.") rejectExtensions = flag.String("reject_extension", "", "A list of X.509 extension OIDs, in dotted string form (e.g. '2.3.4.5') which, if present, should cause submissions to be rejected.") acceptSHA1 = flag.Bool("accept_sha1_signing_algorithms", true, "If true, accept chains that use SHA-1 based signing algorithms. This flag will eventually be removed, and such algorithms will be rejected.") - enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true then the certificate is integrated into log before returning the response.") + enablePublicationAwaiter = flag.Bool("enable_publication_awaiter", true, "If true, waits for the submitted certificate to be covered by a published checkpoint before responding to an add-* request.") witnessPolicyFile = flag.String("witness_policy_file", "", "(Optional) Path to the file containing the witness policy in the format described at https://git.glasklar.is/sigsum/core/sigsum-go/-/blob/main/doc/policy.md") witnessTimeout = flag.Duration("witness_timeout", tessera.DefaultWitnessTimeout, "Maximum time to wait for witness responses.") notBeforeRL = flag.String("rate_limit_old_not_before", "28h:500", "Optionally rate limits submissions with old notBefore dates. Expects a value of with the format: \":\", e.g. \"30d:50\" would impose a limit of 50 certs/s on submissions whose notBefore date is >= 30days old.") @@ -103,6 +103,7 @@ var ( antispamBlockCacheSize = flag.String("antispam_block_cache_size", "768MB", "Amount of RAM to allocate for antispam block cache, set to zero to disable.") antispamIndexCacheSize = flag.String("antispam_index_cache_size", "768MB", "Amount of RAM to allocate for antispam index cache, set to zero for unlimited.") antispamCompactionInterval = flag.Duration("antispam_compaction_interval", tposix_as.DefaultCompactionInterval, "Interval between GC/compaction runs on the antispam index.") + awaiterPollInterval = flag.Duration("awaiter_poll_interval", storage.DefaultAwaiterPollInterval, "Interval between two checkpoint polls by the awaiter. Used for antispam, and if enable_publication_awaiter is set, to block add-* requests responses. Must be strictly positive or defaults to DefaultAwaiterPollInterval.") // Infrastructure setup flags storageDir = flag.String("storage_dir", "", "Path to root of log storage.") @@ -311,10 +312,11 @@ func newStorage(ctx context.Context, signer note.Signer) (st *storage.CTStorage, } sopts := storage.CTStorageOptions{ - Appender: appender, - Reader: reader, - IssuerStorage: issuerStorage, - EnableAwaiter: *enablePublicationAwaiter, + Appender: appender, + Reader: reader, + IssuerStorage: issuerStorage, + AwaiterPollInterval: *awaiterPollInterval, + EnablePubAwaiter: *enablePublicationAwaiter, } return storage.NewCTStorage(ctx, &sopts) } diff --git a/internal/ct/handlers_test.go b/internal/ct/handlers_test.go index 2f0570fb..0d939cb7 100644 --- a/internal/ct/handlers_test.go +++ b/internal/ct/handlers_test.go @@ -192,10 +192,11 @@ func newPOSIXStorageFunc(t *testing.T, root string) storage.CreateStorage { } sopts := storage.CTStorageOptions{ - Appender: appender, - Reader: reader, - IssuerStorage: issuerStorage, - EnableAwaiter: false, + Appender: appender, + Reader: reader, + IssuerStorage: issuerStorage, + AwaiterPollInterval: 20 * time.Millisecond, + EnablePubAwaiter: false, } s, err := storage.NewCTStorage(t.Context(), &sopts) if err != nil { diff --git a/storage/storage.go b/storage/storage.go index 18e07db4..6861476e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -42,8 +42,9 @@ const ( // Each key is 64 bytes long, so this will take up to 64MB. // A CT log references ~15k unique issuer certifiates in 2024, so this gives plenty of space // if we ever run into this limit, we should re-think how it works. - maxCachedIssuerKeys = 1 << 20 - RootsPrefix = "roots/" + maxCachedIssuerKeys = 1 << 20 + RootsPrefix = "roots/" + DefaultAwaiterPollInterval = 200 * time.Millisecond ) type KV struct { @@ -63,30 +64,35 @@ type RootsStorage interface { } type CTStorageOptions struct { - Appender *tessera.Appender - Reader tessera.LogReader - IssuerStorage IssuerStorage - EnableAwaiter bool + Appender *tessera.Appender + Reader tessera.LogReader + IssuerStorage IssuerStorage + AwaiterPollInterval time.Duration + EnablePubAwaiter bool } // CTStorage implements ct.Storage and tessera.LogReader. type CTStorage struct { - storeData func(context.Context, *ctonly.Entry) tessera.IndexFuture - storeIssuers func(context.Context, []KV) error - reader tessera.LogReader - awaiter *tessera.PublicationAwaiter - enableAwaiter bool + storeData func(context.Context, *ctonly.Entry) tessera.IndexFuture + storeIssuers func(context.Context, []KV) error + reader tessera.LogReader + awaiter *tessera.PublicationAwaiter + enablePubAwaiter bool } // NewCTStorage instantiates a CTStorage object. func NewCTStorage(ctx context.Context, opts *CTStorageOptions) (*CTStorage, error) { - awaiter := tessera.NewPublicationAwaiter(ctx, opts.Reader.ReadCheckpoint, 200*time.Millisecond) + pollInterval := opts.AwaiterPollInterval + if pollInterval <= 0 { + pollInterval = DefaultAwaiterPollInterval + } + awaiter := tessera.NewPublicationAwaiter(ctx, opts.Reader.ReadCheckpoint, pollInterval) ctStorage := &CTStorage{ - storeData: tessera.NewCertificateTransparencyAppender(opts.Appender), - storeIssuers: cachedStoreIssuers(opts.IssuerStorage), - reader: opts.Reader, - awaiter: awaiter, - enableAwaiter: opts.EnableAwaiter, + storeData: tessera.NewCertificateTransparencyAppender(opts.Appender), + storeIssuers: cachedStoreIssuers(opts.IssuerStorage), + reader: opts.Reader, + awaiter: awaiter, + enablePubAwaiter: opts.EnablePubAwaiter, } return ctStorage, nil @@ -137,7 +143,7 @@ func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) (tessera.Ind return trace1(ctx, "tesseract.storage.Add", func(ctx context.Context) (tessera.IndexFuture, error) { future := cts.storeData(ctx, entry) - if cts.enableAwaiter { + if cts.enablePubAwaiter { _, _, err := cts.awaiter.Await(ctx, future) if err != nil { return future, fmt.Errorf("error waiting for Tessera index future and its integration: %w", err)