-
Notifications
You must be signed in to change notification settings - Fork 154
Adding a New Java Client Library: Publisher
Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.
Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:
class CPSPublisherTask extends Task {The class must extend from Task.
private CPSPublisherTask(StartRequest request) {
super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);The constructor should take the start request, pass it to Task, and use any other information to initialize.
this.pubSub = PubSubOptions.builder()
.projectId(request.getProject())
.build().service();
this.topic = Preconditions.checkNotNull(request.getTopic());
this.payload = LoadTestRunner.createMessage(request.getMessageSize());
this.batchSize = request.getPublishBatchSize();We now initialize the client library and store some state for later use.
this.id = (new Random()).nextInt();
}We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.
public static void main(String[] args) throws Exception {
LoadTestRunner.Options options = new LoadTestRunner.Options();
new JCommander(options, args);
LoadTestRunner.run(options, CPSPublisherTask::new);
}Every new client library task must have a main function like this.
@Override
public ListenableFuture<RunResult> doRun() {This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so.
try {
List<Message> messages = new ArrayList<>(batchSize);We construct the list of messages.
String sendTime = String.valueOf(System.currentTimeMillis());All published messages should have the same sendTime
for (int i = 0; i < batchSize; i++) {
messages.add(
Message.builder(payload)Contsruct batchSize messages.
.addAttribute("sendTime", sendTime)We set the sendTime for proper end to end latency reporting
.addAttribute("clientId", id.toString())We set the clientId
.addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))The sequence number must atomically increase, and the first message must start at 0.
.build());
}
pubSub.publish(topic, messages);Publish the messages.
return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));Return a listenable future. If this client library returned futures, we could use Futures.transform to do this, but for a simple library like this we can return an immediate future.
} catch (PubSubException e) {
return Futures.immediateFailedFuture(e);Do not throw an exception if it can possibly be recovered from, instead report it to the Task.
}
}
}