Skip to content

Commit d07c6fb

Browse files
committed
Add osiris_log_manifest behaviour for hooks into log writers
`osiris_log_manifest` adds a few callbacks which are executed during initialization of writers and at events like rolling segments and writing chunks.
1 parent ed51a6b commit d07c6fb

2 files changed

Lines changed: 150 additions & 28 deletions

File tree

src/osiris_log.erl

Lines changed: 108 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@
6060
generate_log/4,
6161
parse_header/2]).
6262

63+
-behaviour(osiris_log_manifest).
64+
%% osiris_log_manifest implementations
65+
-export([init_manifest/2,
66+
handle_event/2,
67+
close_manifest/1,
68+
delete/1]).
69+
6370
-export([dump_init/1,
6471
dump_init_idx/1,
6572
dump_chunk/1,
@@ -364,6 +371,7 @@
364371
-type offset() :: osiris:offset().
365372
-type epoch() :: osiris:epoch().
366373
-type range() :: empty | {From :: offset(), To :: offset()}.
374+
-type epoch_offsets() :: [{epoch(), offset()}].
367375
-type counter_spec() :: {Tag :: term(), Fields :: seshat:fields_spec()}.
368376
-type chunk_type() ::
369377
?CHNK_USER |
@@ -424,7 +432,8 @@
424432
%% that will be included in snapshots written to new segments
425433
readers_counter_fun = fun(_) -> ok end :: function(),
426434
shared :: atomics:atomics_ref(),
427-
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size()
435+
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(),
436+
manifest_module = ?MODULE :: module()
428437
}).
429438
-record(ra,
430439
{size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
@@ -446,7 +455,8 @@
446455
{type = writer :: writer | acceptor,
447456
segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()},
448457
current_epoch :: non_neg_integer(),
449-
tail_info = {0, empty} :: osiris:tail_info()
458+
tail_info = {0, empty} :: osiris:tail_info(),
459+
manifest :: osiris_log_manifest:state()
450460
}).
451461
-record(?MODULE,
452462
{cfg :: #cfg{},
@@ -458,7 +468,7 @@
458468
%% record chunk_info does not map exactly to an index record (field 'num' differs)
459469
-record(chunk_info,
460470
{id :: offset(),
461-
timestamp :: non_neg_integer(),
471+
timestamp :: osiris:timestamp(),
462472
epoch :: epoch(),
463473
num :: non_neg_integer(),
464474
type :: chunk_type(),
@@ -477,8 +487,10 @@
477487
-opaque state() :: #?MODULE{}.
478488

479489
-export_type([state/0,
490+
chunk_type/0,
480491
chunk_iterator/0,
481492
range/0,
493+
epoch_offsets/0,
482494
config/0,
483495
counter_spec/0,
484496
transport/0,
@@ -501,20 +513,11 @@ init(Config) ->
501513
-spec init(config(), writer | acceptor) -> state().
502514
init(#{dir := Dir,
503515
name := Name,
504-
epoch := Epoch} = Config,
516+
epoch := Epoch} = Config0,
505517
WriterType) ->
506518
%% scan directory for segments if in write mode
507-
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
508-
?DEFAULT_MAX_SEGMENT_SIZE_B),
509-
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
510-
?DEFAULT_MAX_SEGMENT_SIZE_C),
511-
Retention = maps:get(retention, Config, []),
512-
FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE),
513519
?INFO("Stream: ~ts will use ~ts for osiris log data directory",
514520
[Name, Dir]),
515-
?DEBUG_(Name, "max_segment_size_bytes: ~b,
516-
max_segment_size_chunks ~b, retention ~w, filter size ~b",
517-
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
518521
ok = filelib:ensure_dir(Dir),
519522
case file:make_dir(Dir) of
520523
ok ->
@@ -524,7 +527,25 @@ init(#{dir := Dir,
524527
Err ->
525528
throw(Err)
526529
end,
530+
ok = maybe_fix_corrupted_files(Config0),
531+
532+
ManifestMod = manifest_module(),
533+
{Config, Manifest0} = case Config0 of
534+
#{manifest := M} ->
535+
{Config0, M};
536+
_ ->
537+
ManifestMod:init_manifest(Config0, writer)
538+
end,
527539

540+
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
541+
?DEFAULT_MAX_SEGMENT_SIZE_B),
542+
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
543+
?DEFAULT_MAX_SEGMENT_SIZE_C),
544+
Retention = maps:get(retention, Config, []),
545+
FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE),
546+
?DEBUG_(Name, "max_segment_size_bytes: ~b,
547+
max_segment_size_chunks ~b, retention ~w, filter size ~b",
548+
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
528549
Cnt = make_counter(Config),
529550
%% initialise offset counter to -1 as 0 is the first offset in the log and
530551
%% it hasn't necessarily been written yet, for an empty log the first offset
@@ -546,8 +567,8 @@ init(#{dir := Dir,
546567
counter = Cnt,
547568
counter_id = counter_id(Config),
548569
shared = Shared,
549-
filter_size = FilterSize},
550-
ok = maybe_fix_corrupted_files(Config),
570+
filter_size = FilterSize,
571+
manifest_module = ManifestMod},
551572
DefaultNextOffset = case Config of
552573
#{initial_offset := IO}
553574
when WriterType == acceptor ->
@@ -564,6 +585,7 @@ init(#{dir := Dir,
564585
#write{type = WriterType,
565586
tail_info = {DefaultNextOffset,
566587
empty},
588+
manifest = Manifest0,
567589
current_epoch = Epoch}});
568590
{NumSegments,
569591
#seg_info{first = #chunk_info{id = FstChId,
@@ -605,11 +627,14 @@ init(#{dir := Dir,
605627
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
606628
{ok, IdxEof} = file:position(IdxFd, eof),
607629
NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B,
630+
Event = {segment_opened, undefined, filename:basename(Filename)},
631+
Manifest = ManifestMod:handle_event(Event, Manifest0),
608632
#?MODULE{cfg = Cfg,
609633
mode =
610634
#write{type = WriterType,
611635
tail_info = TailInfo,
612636
segment_size = {Size, NumChunks},
637+
manifest = Manifest,
613638
current_epoch = Epoch},
614639
current_file = filename:basename(Filename),
615640
fd = SegFd,
@@ -629,10 +654,13 @@ init(#{dir := Dir,
629654
{ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE),
630655
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
631656
osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1),
657+
Event = {segment_opened, undefined, filename:basename(Filename)},
658+
Manifest = ManifestMod:handle_event(Event, Manifest0),
632659
#?MODULE{cfg = Cfg,
633660
mode =
634661
#write{type = WriterType,
635662
tail_info = {DefaultNextOffset, empty},
663+
manifest = Manifest,
636664
current_epoch = Epoch},
637665
current_file = filename:basename(Filename),
638666
fd = SegFd,
@@ -869,7 +897,7 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0
869897
-spec init_acceptor(range(), list(), config()) ->
870898
state().
871899
init_acceptor(Range, EpochOffsets0,
872-
#{name := Name, dir := Dir} = Conf) ->
900+
#{name := Name, dir := Dir} = Conf0) ->
873901
%% truncate to first common last epoch offset
874902
%% * if the last local chunk offset has the same epoch but is lower
875903
%% than the last chunk offset then just attach at next offset.
@@ -881,6 +909,8 @@ init_acceptor(Range, EpochOffsets0,
881909
lists:reverse(
882910
lists:sort(EpochOffsets0)),
883911

912+
{Conf, Manifest} = (manifest_module()):init_manifest(Conf0, acceptor),
913+
884914
%% then truncate to
885915
IdxFiles = sorted_index_files(Dir),
886916
?DEBUG_(Name, "from epoch offsets: ~w range ~w", [EpochOffsets, Range]),
@@ -891,7 +921,8 @@ init_acceptor(Range, EpochOffsets0,
891921
{O, _} -> O
892922
end,
893923
init(Conf#{initial_offset => InitOffset,
894-
index_files => RemIdxFiles}, acceptor).
924+
index_files => RemIdxFiles,
925+
manifest => Manifest}, acceptor).
895926

896927
chunk_id_index_scan(IdxFile, ChunkId)
897928
when ?IS_STRING(IdxFile) ->
@@ -1830,11 +1861,19 @@ needs_handling(_, _, _) ->
18301861

18311862
-spec close(state()) -> ok.
18321863
close(#?MODULE{cfg = #cfg{counter_id = CntId,
1864+
manifest_module = ManifestMod,
18331865
readers_counter_fun = Fun},
18341866
fd = SegFd,
1835-
index_fd = IdxFd}) ->
1867+
index_fd = IdxFd,
1868+
mode = Mode}) ->
18361869
close_fd(IdxFd),
18371870
close_fd(SegFd),
1871+
case Mode of
1872+
#write{manifest = Manifest} ->
1873+
ok = ManifestMod:close_manifest(Manifest);
1874+
_ ->
1875+
ok
1876+
end,
18381877
Fun(-1),
18391878
case CntId of
18401879
undefined ->
@@ -1843,14 +1882,17 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId,
18431882
osiris_counters:delete(CntId)
18441883
end.
18451884

1846-
delete_directory(#{name := Name,
1847-
dir := _} = Config) ->
1885+
delete_directory(Config) ->
1886+
(manifest_module()):delete(Config).
1887+
1888+
delete(#{name := Name,
1889+
dir := _} = Config) ->
18481890
Dir = directory(Config),
18491891
?DEBUG_(Name, " deleting directory ~ts", [Dir]),
18501892
delete_dir(Dir);
1851-
delete_directory(#{name := Name}) ->
1852-
delete_directory(Name);
1853-
delete_directory(Name) when ?IS_STRING(Name) ->
1893+
delete(#{name := Name}) ->
1894+
delete(Name);
1895+
delete(Name) when ?IS_STRING(Name) ->
18541896
Dir = directory(Name),
18551897
?DEBUG_(Name, " deleting directory ~ts", [Dir]),
18561898
delete_dir(Dir).
@@ -2131,7 +2173,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) ->
21312173
end.
21322174

21332175
-spec overview(file:filename_all()) ->
2134-
{range(), [{epoch(), offset()}]}.
2176+
{range(), epoch_offsets()}.
21352177
overview(Dir) ->
21362178
Files = list_dir(Dir),
21372179
%% index files with matching segment
@@ -2195,11 +2237,19 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir,
21952237
-spec update_retention([retention_spec()], state()) -> state().
21962238
update_retention(Retention,
21972239
#?MODULE{cfg = #cfg{name = Name,
2240+
manifest_module = ?MODULE,
21982241
retention = Retention0} = Cfg} = State0)
21992242
when is_list(Retention) ->
22002243
?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]),
22012244
State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}},
2202-
trigger_retention_eval(State).
2245+
trigger_retention_eval(State);
2246+
update_retention(Retention,
2247+
#?MODULE{cfg = #cfg{manifest_module = ManifestMod},
2248+
mode = #write{manifest = Manifest0} = Write0} =
2249+
State0) ->
2250+
Event = {retention_updated, Retention},
2251+
Manifest = ManifestMod:handle_event(Event, Manifest0),
2252+
State0#?MODULE{mode = Write0#write{manifest = Manifest}}.
22032253

22042254
-spec evaluate_retention(file:filename_all(), [retention_spec()]) ->
22052255
{range(), FirstTimestamp :: osiris:timestamp(),
@@ -2495,11 +2545,13 @@ write_chunk(Chunk,
24952545
Epoch,
24962546
NumRecords,
24972547
#?MODULE{cfg = #cfg{counter = CntRef,
2548+
manifest_module = ManifestMod,
24982549
shared = Shared} = Cfg,
24992550
fd = Fd,
25002551
index_fd = IdxFd,
25012552
mode =
25022553
#write{segment_size = {SegSizeBytes, SegSizeChunks},
2554+
manifest = Manifest0,
25032555
tail_info = {Next, _}} =
25042556
Write} =
25052557
State) ->
@@ -2529,9 +2581,19 @@ write_chunk(Chunk,
25292581
counters:put(CntRef, ?C_OFFSET, NextOffset - 1),
25302582
counters:add(CntRef, ?C_CHUNKS, 1),
25312583
maybe_set_first_offset(Next, Cfg),
2584+
ChunkInfo = #{id => Next,
2585+
timestamp => Timestamp,
2586+
epoch => Epoch,
2587+
num => NumRecords,
2588+
type => ChType,
2589+
size => Size,
2590+
pos => Cur},
2591+
Event = {chunk_written, ChunkInfo, Chunk},
2592+
Manifest = ManifestMod:handle_event(Event, Manifest0),
25322593
State#?MODULE{mode =
25332594
Write#write{tail_info = {NextOffset,
25342595
{Epoch, Next, Timestamp}},
2596+
manifest = Manifest,
25352597
segment_size = {SegSizeBytes + Size,
25362598
SegSizeChunks + 1}}}
25372599
end.
@@ -2740,10 +2802,13 @@ make_file_name(N, Suff) ->
27402802

27412803
open_new_segment(#?MODULE{cfg = #cfg{name = Name,
27422804
directory = Dir,
2743-
counter = Cnt},
2805+
counter = Cnt,
2806+
manifest_module = ManifestMod},
27442807
fd = OldFd,
27452808
index_fd = OldIdxFd,
2809+
current_file = OldFilename,
27462810
mode = #write{type = _WriterType,
2811+
manifest = Manifest0,
27472812
tail_info = {NextOffset, _}} = Write} =
27482813
State0) ->
27492814
_ = close_fd(OldFd),
@@ -2764,11 +2829,15 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
27642829
{ok, _} = file:position(IdxFd, eof),
27652830
counters:add(Cnt, ?C_SEGMENTS, 1),
27662831

2832+
Event = {segment_opened, OldFilename, Filename},
2833+
Manifest = ManifestMod:handle_event(Event, Manifest0),
2834+
27672835
State0#?MODULE{current_file = Filename,
27682836
fd = Fd,
2769-
%% reset segment_size counter
27702837
index_fd = IdxFd,
2771-
mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}.
2838+
mode = Write#write{manifest = Manifest,
2839+
%% reset segment_size counter
2840+
segment_size = {?LOG_HEADER_SIZE, 0}}}.
27722841

27732842
open_index_read(File) ->
27742843
{ok, Fd} = open(File, [read, raw, binary, read_ahead]),
@@ -3455,6 +3524,9 @@ ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) ->
34553524
ra(_) ->
34563525
#ra{}.
34573526

3527+
manifest_module() ->
3528+
application:get_env(osiris, log_manifest, ?MODULE).
3529+
34583530
generate_log(Msg, MsgsPerChunk, NumMessages, Directory) ->
34593531
Name = filename:basename(Directory),
34603532

@@ -3487,6 +3559,14 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 ->
34873559
write_in_chunks(_, _, _, W) ->
34883560
W.
34893561

3562+
%% Default implementation of osiris_log_manifest:
3563+
init_manifest(Config, _WriterType) ->
3564+
{Config, undefined}.
3565+
handle_event(_Event, undefined) ->
3566+
undefined.
3567+
close_manifest(undefined) ->
3568+
ok.
3569+
34903570
-ifdef(TEST).
34913571
-include_lib("eunit/include/eunit.hrl").
34923572

src/osiris_log_manifest.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
6+
%%
7+
8+
-module(osiris_log_manifest).
9+
10+
-type state() :: term().
11+
12+
-type chunk_info() ::
13+
#{id := osiris:offset(),
14+
timestamp := osiris:timestamp(),
15+
epoch := osiris:epoch(),
16+
num := non_neg_integer(),
17+
type := osiris_log:chunk_type(),
18+
%% size of data + filter + trailer
19+
size := non_neg_integer(),
20+
pos := integer()}.
21+
22+
-type event() :: {segment_opened,
23+
OldSegment :: file:filename_all() | undefined,
24+
NewSegment :: file:filename_all()} |
25+
{chunk_written, chunk_info(), iodata()} |
26+
{retention_updated, [osiris:retention_spec()]}.
27+
28+
-export_type([state/0,
29+
chunk_info/0,
30+
event/0]).
31+
32+
-callback overview(Dir :: file:filename_all()) ->
33+
{osiris_log:range(), osiris_log:epoch_offsets()}.
34+
35+
-callback init_manifest(osiris_log:config(), writer | acceptor) ->
36+
{osiris_log:config(), state()}.
37+
38+
-callback handle_event(event(), state()) -> state().
39+
40+
-callback close_manifest(state()) -> ok.
41+
42+
-callback delete(osiris_log:config()) -> ok.

0 commit comments

Comments
 (0)