diff --git a/actions/app.go b/actions/app.go index 6ee52c59..e901bfb1 100644 --- a/actions/app.go +++ b/actions/app.go @@ -1,9 +1,10 @@ package actions import ( - "github.com/oysterprotocol/brokernode/utils" "os" + oyster_utils "github.com/oysterprotocol/brokernode/utils" + raven "github.com/getsentry/raven-go" "github.com/gobuffalo/buffalo" "github.com/gobuffalo/buffalo/middleware" @@ -103,6 +104,14 @@ func App() *buffalo.App { // Status statusResource := StatusResource{} apiV2.GET("status", statusResource.CheckStatus) + + apiV3 := app.Group("/api/v3") + uploadSessionV3Resource := UploadSessionResourceV3{} + // apiV2.Resource("/upload-sessions", &UploadSessionResource{&buffalo.BaseResource{}}) + apiV3.POST("upload-sessions", uploadSessionResource.Create) + apiV3.PUT("upload-sessions/{id}", uploadSessionV3Resource.Update) + apiV3.POST("upload-sessions/beta", uploadSessionResource.CreateBeta) + apiV3.GET("upload-sessions/{id}", uploadSessionResource.GetPaymentStatus) } oyster_utils.StartProfile() diff --git a/actions/test_helpers.go b/actions/test_helpers.go new file mode 100644 index 00000000..bd222aea --- /dev/null +++ b/actions/test_helpers.go @@ -0,0 +1,45 @@ +package actions + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/oysterprotocol/brokernode/services" +) + +type mockWaitForTransfer struct { + hasCalled bool + input_brokerAddr common.Address + output_int *big.Int + output_error error +} + +type mockSendPrl struct { + hasCalled bool + input_msg services.OysterCallMsg + output_bool bool +} + +type mockCheckPRLBalance struct { + hasCalled bool + input_addr common.Address + output_int *big.Int +} + +func (v *mockWaitForTransfer) waitForTransfer(brokerAddr common.Address, transferType string) (*big.Int, error) { + v.hasCalled = true + v.input_brokerAddr = brokerAddr + return v.output_int, v.output_error +} + +func (v *mockSendPrl) sendPrl(msg services.OysterCallMsg) bool { + v.hasCalled = true + v.input_msg = msg + return v.output_bool +} + +func (v *mockCheckPRLBalance) checkPRLBalance(addr common.Address) *big.Int { + v.hasCalled = true + v.input_addr = addr + return v.output_int +} diff --git a/actions/upload_sessions.go b/actions/upload_sessions.go index af121296..995cb8a4 100644 --- a/actions/upload_sessions.go +++ b/actions/upload_sessions.go @@ -13,9 +13,9 @@ import ( "github.com/gobuffalo/pop/nulls" "github.com/oysterprotocol/brokernode/models" "github.com/oysterprotocol/brokernode/services" - "github.com/oysterprotocol/brokernode/utils" + oyster_utils "github.com/oysterprotocol/brokernode/utils" "github.com/pkg/errors" - "gopkg.in/segmentio/analytics-go.v3" + analytics "gopkg.in/segmentio/analytics-go.v3" ) type UploadSessionResource struct { diff --git a/actions/upload_sessions_test.go b/actions/upload_sessions_test.go index 98f58f50..4210dbf2 100644 --- a/actions/upload_sessions_test.go +++ b/actions/upload_sessions_test.go @@ -3,36 +3,17 @@ package actions import ( "encoding/json" "fmt" - "github.com/oysterprotocol/brokernode/utils" "io/ioutil" "math/big" "time" - "github.com/ethereum/go-ethereum/common" + oyster_utils "github.com/oysterprotocol/brokernode/utils" + "github.com/gobuffalo/pop/nulls" "github.com/oysterprotocol/brokernode/models" "github.com/oysterprotocol/brokernode/services" ) -type mockWaitForTransfer struct { - hasCalled bool - input_brokerAddr common.Address - output_int *big.Int - output_error error -} - -type mockSendPrl struct { - hasCalled bool - input_msg services.OysterCallMsg - output_bool bool -} - -type mockCheckPRLBalance struct { - hasCalled bool - input_addr common.Address - output_int *big.Int -} - func (suite *ActionSuite) Test_UploadSessionsCreate() { mockWaitForTransfer := mockWaitForTransfer{ output_error: nil, @@ -311,21 +292,3 @@ func verifyPaymentConfirmation(sessionId string, suite *ActionSuite) { suite.Nil(err) suite.Equal(models.PaymentStatusConfirmed, session.PaymentStatus) } - -func (v *mockWaitForTransfer) waitForTransfer(brokerAddr common.Address, transferType string) (*big.Int, error) { - v.hasCalled = true - v.input_brokerAddr = brokerAddr - return v.output_int, v.output_error -} - -func (v *mockSendPrl) sendPrl(msg services.OysterCallMsg) bool { - v.hasCalled = true - v.input_msg = msg - return v.output_bool -} - -func (v *mockCheckPRLBalance) checkPRLBalance(addr common.Address) *big.Int { - v.hasCalled = true - v.input_addr = addr - return v.output_int -} diff --git a/actions/upload_sessions_v3.go b/actions/upload_sessions_v3.go new file mode 100644 index 00000000..931cad20 --- /dev/null +++ b/actions/upload_sessions_v3.go @@ -0,0 +1,91 @@ +package actions + +import ( + "fmt" + "time" + + "github.com/gobuffalo/buffalo" + "github.com/oysterprotocol/brokernode/models" + oyster_utils "github.com/oysterprotocol/brokernode/utils" + "github.com/pkg/errors" + analytics "gopkg.in/segmentio/analytics-go.v3" +) + +type UploadSessionResourceV3 struct { + buffalo.Resource +} + +// Request Response structs + +type UploadSessionUpdateReqV3 struct { + Chunks []models.ChunkReq `json:"chunks"` +} + +func init() { + +} + +// Update uploads a chunk associated with an upload session. +func (usr *UploadSessionResourceV3) Update(c buffalo.Context) error { + start := PrometheusWrapper.TimeNow() + defer PrometheusWrapper.HistogramSeconds(PrometheusWrapper.HistogramUploadSessionResourceUpdate, start) + + req := UploadSessionUpdateReqV3{} + if err := oyster_utils.ParseReqBody(c.Request(), &req); err != nil { + err = fmt.Errorf("Invalid request, unable to parse request body %v", err) + c.Error(400, err) + return err + } + + // Get session + uploadSession := &models.UploadSession{} + err := models.DB.Find(uploadSession, c.Param("id")) + + defer oyster_utils.TimeTrack(time.Now(), "actions/upload_sessions: updating_session", analytics.NewProperties(). + Set("id", uploadSession.ID). + Set("genesis_hash", uploadSession.GenesisHash). + Set("file_size_byes", uploadSession.FileSizeBytes). + Set("num_chunks", uploadSession.NumChunks). + Set("storage_years", uploadSession.StorageLengthInYears)) + + if err != nil { + oyster_utils.LogIfError(err, nil) + c.Error(400, err) + return err + } + if uploadSession == nil { + err := errors.New("Error finding sessions") + oyster_utils.LogIfError(err, nil) + c.Error(400, err) + return err + } + + treasureIdxMap, err := uploadSession.GetTreasureIndexes() + + if oyster_utils.DataMapStorageMode == oyster_utils.DataMapsInBadger { + dbID := []string{oyster_utils.InProgressDir, uploadSession.GenesisHash, oyster_utils.MessageDir} + + db := oyster_utils.GetOrInitUniqueBadgerDB(dbID) + if db == nil { + err := errors.New("error creating unique badger DB for messages") + oyster_utils.LogIfError(err, nil) + c.Error(400, err) + return err + } + } + + // Update dMaps to have chunks async + go func() { + defer oyster_utils.TimeTrack(time.Now(), "actions/upload_sessions: async_datamap_updates", analytics.NewProperties(). + Set("id", uploadSession.ID). + Set("genesis_hash", uploadSession.GenesisHash). + Set("file_size_byes", uploadSession.FileSizeBytes). + Set("num_chunks", uploadSession.NumChunks). + Set("storage_years", uploadSession.StorageLengthInYears)) + + models.ProcessAndStoreChunkData(req.Chunks, uploadSession.GenesisHash, treasureIdxMap, + models.DataMapsTimeToLive) + }() + + return c.Render(202, r.JSON(map[string]bool{"success": true})) +} diff --git a/actions/upload_sessions_v3_test.go b/actions/upload_sessions_v3_test.go new file mode 100644 index 00000000..111e3fe1 --- /dev/null +++ b/actions/upload_sessions_v3_test.go @@ -0,0 +1,161 @@ +package actions + +import ( + "encoding/json" + "io/ioutil" + "math/big" + "time" + + oyster_utils "github.com/oysterprotocol/brokernode/utils" + + "github.com/oysterprotocol/brokernode/models" + "github.com/oysterprotocol/brokernode/services" +) + +func (suite *ActionSuite) Test_UploadSessionsV3Create() { + mockWaitForTransfer := mockWaitForTransfer{ + output_error: nil, + output_int: big.NewInt(100), + } + mockSendPrl := mockSendPrl{ + output_bool: true, + } + EthWrapper = services.Eth{ + WaitForTransfer: mockWaitForTransfer.waitForTransfer, + SendPRL: mockSendPrl.sendPrl, + GenerateEthAddr: services.EthWrapper.GenerateEthAddr, + GenerateKeys: services.EthWrapper.GenerateKeys, + } + + genHash := oyster_utils.RandSeq(8, []rune("abcdef0123456789")) + + res := suite.JSON("/api/v3/upload-sessions").Post(map[string]interface{}{ + "genesisHash": genHash, + "fileSizeBytes": 123, + "numChunks": 2, + "storageLengthInYears": 1, + }) + + // Parse response + resParsed := uploadSessionCreateRes{} + bodyBytes, err := ioutil.ReadAll(res.Body) + suite.Nil(err) + err = json.Unmarshal(bodyBytes, &resParsed) + suite.Nil(err) + + suite.Equal(200, res.Code) + suite.Equal(genHash, resParsed.UploadSession.GenesisHash) + suite.Equal(uint64(123), resParsed.UploadSession.FileSizeBytes) + suite.Equal(models.SessionTypeAlpha, resParsed.UploadSession.Type) + suite.NotEqual(0, resParsed.Invoice.Cost) + suite.NotEqual("", resParsed.Invoice.EthAddress) + + time.Sleep(50 * time.Millisecond) // Force it to wait for goroutine to excute. + + // TODO: fix waitForTransfer and uncomment it out in + // actions/upload_sessions.go then uncomment out these tests. + //suite.True(mockWaitForTransfer.hasCalled) + //suite.Equal(services.StringToAddress(resParsed.UploadSession.ETHAddrAlpha.String), mockWaitForTransfer.input_brokerAddr) + + // mockCheckPRLBalance will result a positive value, and Alpha knows that beta has such balance, it won't send + // it again. + suite.False(mockSendPrl.hasCalled) + + // TODO: fix waitForTransfer and uncomment it out in + // actions/upload_sessions.go then uncomment out these tests. + // verifyPaymentConfirmation(as, resParsed.ID) + + chunkData := models.GetSingleChunkData(oyster_utils.InProgressDir, genHash, int64(0)) + + suite.Equal(genHash, chunkData.Hash) + + brokerTx := []models.BrokerBrokerTransaction{} + + suite.DB.Where("genesis_hash = ?", genHash).All(&brokerTx) + + suite.Equal(1, len(brokerTx)) +} + +func (suite *ActionSuite) Test_UploadSessionsV3CreateBeta() { + mockWaitForTransfer := mockWaitForTransfer{ + output_error: nil, + output_int: big.NewInt(100), + } + mockSendPrl := mockSendPrl{} + + EthWrapper = services.Eth{ + WaitForTransfer: mockWaitForTransfer.waitForTransfer, + SendPRL: mockSendPrl.sendPrl, + GenerateEthAddr: services.EthWrapper.GenerateEthAddr, + GenerateKeys: services.EthWrapper.GenerateKeys, + } + + genHash := oyster_utils.RandSeq(8, []rune("abcdef0123456789")) + + res := suite.JSON("/api/v3/upload-sessions/beta").Post(map[string]interface{}{ + "genesisHash": genHash, + "fileSizeBytes": 123, + "numChunks": 2, + "storageLengthInYears": 1, + "alphaTreasureIndexes": []int{1}, + }) + + // Parse response + resParsed := uploadSessionCreateBetaRes{} + bodyBytes, err := ioutil.ReadAll(res.Body) + suite.Nil(err) + err = json.Unmarshal(bodyBytes, &resParsed) + suite.Nil(err) + + suite.Equal(200, res.Code) + suite.Equal(genHash, resParsed.UploadSession.GenesisHash) + suite.Equal(uint64(123), resParsed.UploadSession.FileSizeBytes) + suite.Equal(models.SessionTypeBeta, resParsed.UploadSession.Type) + suite.Equal(1, len(resParsed.BetaTreasureIndexes)) + suite.NotEqual(0, resParsed.Invoice.Cost) + suite.NotEqual("", resParsed.Invoice.EthAddress) + + time.Sleep(50 * time.Millisecond) // Force it to wait for goroutine to excute. + + // TODO: fix waitForTransfer and uncomment it out in + // actions/upload_sessions.go then uncomment out this test. + //suite.True(mockWaitForTransfer.hasCalled) + suite.Equal(services.StringToAddress(resParsed.UploadSession.ETHAddrAlpha.String), mockWaitForTransfer.input_brokerAddr) + suite.False(mockSendPrl.hasCalled) + + // TODO: fix waitForTransfer and uncomment it out in + // actions/upload_sessions.go then uncomment out these tests. + // verifyPaymentConfirmation(as, resParsed.ID) + + chunkData := models.GetSingleChunkData(oyster_utils.InProgressDir, genHash, int64(0)) + + suite.Equal(genHash, chunkData.Hash) + + brokerTx := []models.BrokerBrokerTransaction{} + + suite.DB.Where("genesis_hash = ?", genHash).All(&brokerTx) + + suite.Equal(1, len(brokerTx)) +} + +func (suite *ActionSuite) Test_UploadSessionsV3GetPaymentStatus_Paid() { + //setup + mockCheckPRLBalance := mockCheckPRLBalance{} + EthWrapper = services.Eth{ + CheckPRLBalance: mockCheckPRLBalance.checkPRLBalance, + } + + genHash := oyster_utils.RandSeq(8, []rune("abcdef0123456789")) + + uploadSession1 := models.UploadSession{ + GenesisHash: genHash, + FileSizeBytes: 123, + NumChunks: 2, + PaymentStatus: models.PaymentStatusConfirmed, + } + + resParsed := getPaymentStatus(uploadSession1, suite) + + suite.Equal("confirmed", resParsed.PaymentStatus) + suite.False(mockCheckPRLBalance.hasCalled) +} diff --git a/main.go b/main.go index 19a12e29..5cefba0b 100644 --- a/main.go +++ b/main.go @@ -2,15 +2,16 @@ package main import ( "fmt" - "github.com/fatih/color" - "github.com/oysterprotocol/brokernode/actions" - "github.com/oysterprotocol/brokernode/utils" "log" "math/rand" "os" "runtime" "time" + "github.com/fatih/color" + "github.com/oysterprotocol/brokernode/actions" + oyster_utils "github.com/oysterprotocol/brokernode/utils" + "github.com/gobuffalo/pop" "github.com/gobuffalo/pop/logging" ) diff --git a/utils/mode.go b/utils/mode.go index 63c99e46..d26edbf5 100644 --- a/utils/mode.go +++ b/utils/mode.go @@ -48,6 +48,8 @@ const ( DataMapsInSQL DataMapsStorageStatus = iota + 1 /*DataMapsInBadger is when the data maps are stored in badger*/ DataMapsInBadger + /*DataMapsInS3 is when the data maps are stored in S3*/ + DataMapsInS3 ) /*BrokerMode - the mode the broker is in (prod, dummy treasure, etc.)*/