Skip to content

Commit 617c8c7

Browse files
author
twitter-team
committed
Open-sourcing Unified User Actions
Unified User Action (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA makes sure all internal teams consume the uniformed user actions data in an accurate and fast way.
1 parent f1b5c32 commit 617c8c7

File tree

250 files changed

+25277
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

250 files changed

+25277
-0
lines changed

unified_user_actions/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.DS_Store
2+
CONFIG.ini
3+
PROJECT
4+
docs

unified_user_actions/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD

unified_user_actions/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Unified User Actions (UUA)
2+
3+
**Unified User Actions** (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA reads client-side and server-side event streams that contain the user's actions and generates a unified real-time user actions Kafka stream. The Kafka stream is replicated to HDFS, GCP Pubsub, GCP GCS, GCP BigQuery. The user actions include public actions such as favorites, retweets, replies and implicit actions like bookmark, impression, video view.
4+
5+
## Components
6+
7+
- adapter: transform the raw inputs to UUA Thrift output
8+
- client: Kafka client related utils
9+
- kafka: more specific Kafka utils like customized serde
10+
- service: deployment, modules and services
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.twitter.unified_user_actions.adapter
2+
3+
import com.twitter.finagle.stats.NullStatsReceiver
4+
import com.twitter.finagle.stats.StatsReceiver
5+
6+
trait AbstractAdapter[INPUT, OUTK, OUTV] extends Serializable {
7+
8+
/**
9+
* The basic input -> seq[output] adapter which concrete adapters should extend from
10+
* @param input a single INPUT
11+
* @return A list of (OUTK, OUTV) tuple. The OUTK is the output key mainly for publishing to Kafka (or Pubsub).
12+
* If other processing, e.g. offline batch processing, doesn't require the output key then it can drop it
13+
* like source.adaptOneToKeyedMany.map(_._2)
14+
*/
15+
def adaptOneToKeyedMany(
16+
input: INPUT,
17+
statsReceiver: StatsReceiver = NullStatsReceiver
18+
): Seq[(OUTK, OUTV)]
19+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
scala_library(
2+
name = "base",
3+
sources = [
4+
"AbstractAdapter.scala",
5+
],
6+
compiler_option_sets = ["fatal_warnings"],
7+
tags = ["bazel-compatible"],
8+
dependencies = [
9+
"util/util-stats/src/main/scala/com/twitter/finagle/stats",
10+
],
11+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.twitter.unified_user_actions.adapter.ads_callback_engagements
2+
3+
import com.twitter.ads.spendserver.thriftscala.SpendServerEvent
4+
import com.twitter.unified_user_actions.thriftscala._
5+
6+
object AdsCallbackEngagement {
7+
object PromotedTweetFav extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetFav)
8+
9+
object PromotedTweetUnfav extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetUnfav)
10+
11+
object PromotedTweetReply extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetReply)
12+
13+
object PromotedTweetRetweet
14+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetRetweet)
15+
16+
object PromotedTweetBlockAuthor
17+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetBlockAuthor)
18+
19+
object PromotedTweetUnblockAuthor
20+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetUnblockAuthor)
21+
22+
object PromotedTweetComposeTweet
23+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetComposeTweet)
24+
25+
object PromotedTweetClick extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetClick)
26+
27+
object PromotedTweetReport extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetReport)
28+
29+
object PromotedProfileFollow
30+
extends ProfileAdsCallbackEngagement(ActionType.ServerPromotedProfileFollow)
31+
32+
object PromotedProfileUnfollow
33+
extends ProfileAdsCallbackEngagement(ActionType.ServerPromotedProfileUnfollow)
34+
35+
object PromotedTweetMuteAuthor
36+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetMuteAuthor)
37+
38+
object PromotedTweetClickProfile
39+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetClickProfile)
40+
41+
object PromotedTweetClickHashtag
42+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetClickHashtag)
43+
44+
object PromotedTweetOpenLink
45+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetOpenLink) {
46+
override def getItem(input: SpendServerEvent): Option[Item] = {
47+
input.engagementEvent.flatMap { e =>
48+
e.impressionData.flatMap { i =>
49+
getPromotedTweetInfo(
50+
i.promotedTweetId,
51+
i.advertiserId,
52+
tweetActionInfoOpt = Some(
53+
TweetActionInfo.ServerPromotedTweetOpenLink(
54+
ServerPromotedTweetOpenLink(url = e.url))))
55+
}
56+
}
57+
}
58+
}
59+
60+
object PromotedTweetCarouselSwipeNext
61+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetCarouselSwipeNext)
62+
63+
object PromotedTweetCarouselSwipePrevious
64+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetCarouselSwipePrevious)
65+
66+
object PromotedTweetLingerImpressionShort
67+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetLingerImpressionShort)
68+
69+
object PromotedTweetLingerImpressionMedium
70+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetLingerImpressionMedium)
71+
72+
object PromotedTweetLingerImpressionLong
73+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetLingerImpressionLong)
74+
75+
object PromotedTweetClickSpotlight
76+
extends BaseTrendAdsCallbackEngagement(ActionType.ServerPromotedTweetClickSpotlight)
77+
78+
object PromotedTweetViewSpotlight
79+
extends BaseTrendAdsCallbackEngagement(ActionType.ServerPromotedTweetViewSpotlight)
80+
81+
object PromotedTrendView
82+
extends BaseTrendAdsCallbackEngagement(ActionType.ServerPromotedTrendView)
83+
84+
object PromotedTrendClick
85+
extends BaseTrendAdsCallbackEngagement(ActionType.ServerPromotedTrendClick)
86+
87+
object PromotedTweetVideoPlayback25
88+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoPlayback25)
89+
90+
object PromotedTweetVideoPlayback50
91+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoPlayback50)
92+
93+
object PromotedTweetVideoPlayback75
94+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoPlayback75)
95+
96+
object PromotedTweetVideoAdPlayback25
97+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoAdPlayback25)
98+
99+
object PromotedTweetVideoAdPlayback50
100+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoAdPlayback50)
101+
102+
object PromotedTweetVideoAdPlayback75
103+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerPromotedTweetVideoAdPlayback75)
104+
105+
object TweetVideoAdPlayback25
106+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerTweetVideoAdPlayback25)
107+
108+
object TweetVideoAdPlayback50
109+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerTweetVideoAdPlayback50)
110+
111+
object TweetVideoAdPlayback75
112+
extends BaseVideoAdsCallbackEngagement(ActionType.ServerTweetVideoAdPlayback75)
113+
114+
object PromotedTweetDismissWithoutReason
115+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetDismissWithoutReason)
116+
117+
object PromotedTweetDismissUninteresting
118+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetDismissUninteresting)
119+
120+
object PromotedTweetDismissRepetitive
121+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetDismissRepetitive)
122+
123+
object PromotedTweetDismissSpam
124+
extends BaseAdsCallbackEngagement(ActionType.ServerPromotedTweetDismissSpam)
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.twitter.unified_user_actions.adapter.ads_callback_engagements
2+
3+
import com.twitter.finagle.stats.NullStatsReceiver
4+
import com.twitter.finagle.stats.StatsReceiver
5+
import com.twitter.finatra.kafka.serde.UnKeyed
6+
import com.twitter.unified_user_actions.adapter.AbstractAdapter
7+
import com.twitter.ads.spendserver.thriftscala.SpendServerEvent
8+
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
9+
10+
class AdsCallbackEngagementsAdapter
11+
extends AbstractAdapter[SpendServerEvent, UnKeyed, UnifiedUserAction] {
12+
13+
import AdsCallbackEngagementsAdapter._
14+
15+
override def adaptOneToKeyedMany(
16+
input: SpendServerEvent,
17+
statsReceiver: StatsReceiver = NullStatsReceiver
18+
): Seq[(UnKeyed, UnifiedUserAction)] =
19+
adaptEvent(input).map { e => (UnKeyed, e) }
20+
}
21+
22+
object AdsCallbackEngagementsAdapter {
23+
def adaptEvent(input: SpendServerEvent): Seq[UnifiedUserAction] = {
24+
val baseEngagements: Seq[BaseAdsCallbackEngagement] =
25+
EngagementTypeMappings.getEngagementMappings(Option(input).flatMap(_.engagementEvent))
26+
baseEngagements.flatMap(_.getUUA(input))
27+
}
28+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
scala_library(
2+
sources = [
3+
"*.scala",
4+
],
5+
compiler_option_sets = ["fatal_warnings"],
6+
tags = [
7+
"bazel-compatible",
8+
"bazel-only",
9+
],
10+
dependencies = [
11+
"kafka/finagle-kafka/finatra-kafka/src/main/scala",
12+
"src/thrift/com/twitter/ads/billing/spendserver:spendserver_thrift-scala",
13+
"src/thrift/com/twitter/ads/eventstream:eventstream-scala",
14+
"unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter:base",
15+
"unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/common",
16+
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
17+
],
18+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.twitter.unified_user_actions.adapter.ads_callback_engagements
2+
3+
import com.twitter.ads.spendserver.thriftscala.SpendServerEvent
4+
import com.twitter.unified_user_actions.adapter.common.AdapterUtils
5+
import com.twitter.unified_user_actions.thriftscala.ActionType
6+
import com.twitter.unified_user_actions.thriftscala.AuthorInfo
7+
import com.twitter.unified_user_actions.thriftscala.EventMetadata
8+
import com.twitter.unified_user_actions.thriftscala.Item
9+
import com.twitter.unified_user_actions.thriftscala.SourceLineage
10+
import com.twitter.unified_user_actions.thriftscala.TweetInfo
11+
import com.twitter.unified_user_actions.thriftscala.TweetActionInfo
12+
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
13+
import com.twitter.unified_user_actions.thriftscala.UserIdentifier
14+
15+
abstract class BaseAdsCallbackEngagement(actionType: ActionType) {
16+
17+
protected def getItem(input: SpendServerEvent): Option[Item] = {
18+
input.engagementEvent.flatMap { e =>
19+
e.impressionData.flatMap { i =>
20+
getPromotedTweetInfo(i.promotedTweetId, i.advertiserId)
21+
}
22+
}
23+
}
24+
25+
protected def getPromotedTweetInfo(
26+
promotedTweetIdOpt: Option[Long],
27+
advertiserId: Long,
28+
tweetActionInfoOpt: Option[TweetActionInfo] = None
29+
): Option[Item] = {
30+
promotedTweetIdOpt.map { promotedTweetId =>
31+
Item.TweetInfo(
32+
TweetInfo(
33+
actionTweetId = promotedTweetId,
34+
actionTweetAuthorInfo = Some(AuthorInfo(authorId = Some(advertiserId))),
35+
tweetActionInfo = tweetActionInfoOpt)
36+
)
37+
}
38+
}
39+
40+
def getUUA(input: SpendServerEvent): Option[UnifiedUserAction] = {
41+
val userIdentifier: UserIdentifier =
42+
UserIdentifier(
43+
userId = input.engagementEvent.flatMap(e => e.clientInfo.flatMap(_.userId64)),
44+
guestIdMarketing = input.engagementEvent.flatMap(e => e.clientInfo.flatMap(_.guestId)),
45+
)
46+
47+
getItem(input).map { item =>
48+
UnifiedUserAction(
49+
userIdentifier = userIdentifier,
50+
item = item,
51+
actionType = actionType,
52+
eventMetadata = getEventMetadata(input),
53+
)
54+
}
55+
}
56+
57+
protected def getEventMetadata(input: SpendServerEvent): EventMetadata =
58+
EventMetadata(
59+
sourceTimestampMs = input.engagementEvent
60+
.map { e => e.engagementEpochTimeMilliSec }.getOrElse(AdapterUtils.currentTimestampMs),
61+
receivedTimestampMs = AdapterUtils.currentTimestampMs,
62+
sourceLineage = SourceLineage.ServerAdsCallbackEngagements,
63+
language = input.engagementEvent.flatMap { e => e.clientInfo.flatMap(_.languageCode) },
64+
countryCode = input.engagementEvent.flatMap { e => e.clientInfo.flatMap(_.countryCode) },
65+
clientAppId =
66+
input.engagementEvent.flatMap { e => e.clientInfo.flatMap(_.clientId) }.map { _.toLong },
67+
)
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.twitter.unified_user_actions.adapter.ads_callback_engagements
2+
3+
import com.twitter.ads.spendserver.thriftscala.SpendServerEvent
4+
import com.twitter.unified_user_actions.thriftscala._
5+
6+
abstract class BaseTrendAdsCallbackEngagement(actionType: ActionType)
7+
extends BaseAdsCallbackEngagement(actionType = actionType) {
8+
9+
override protected def getItem(input: SpendServerEvent): Option[Item] = {
10+
input.engagementEvent.flatMap { e =>
11+
e.impressionData.flatMap { i =>
12+
i.promotedTrendId.map { promotedTrendId =>
13+
Item.TrendInfo(TrendInfo(actionTrendId = promotedTrendId))
14+
}
15+
}
16+
}
17+
}
18+
}

0 commit comments

Comments
 (0)