Skip to content

Commit

Permalink
hopefully fixes #107; revamped update handlers, (client.On(...)), add…
Browse files Browse the repository at this point in the history
…ed nointent mode for client.JSON, and Marshal(), added example for how to use latest stars, v2.3.15
  • Loading branch information
AmarnathCJD committed Jul 10, 2024
1 parent 5864450 commit 62c5210
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 168 deletions.
14 changes: 7 additions & 7 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,12 @@ func (*errorSessionConfigsChanged) CRC() uint32 {
return 0x00000000
}

//type unexpectedEOFError struct{}
type errorReconnectRequired struct{}

//func (*unexpectedEOFError) Error() string {
// return "unexpected error: unexpected EOF"
//}
func (*errorReconnectRequired) Error() string {
return "session configuration was changed, need to repeat request"
}

//func (*unexpectedEOFError) CRC() uint32 {
// return 0x00000000
//}
func (*errorReconnectRequired) CRC() uint32 {
return 0x00000000
}
76 changes: 76 additions & 0 deletions examples/stars/stars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"fmt"

"github.com/amarnathcjd/gogram/telegram"
)

var paidUsers = make(map[int64]int32)

func main() {
bot, _ := telegram.NewClient(telegram.ClientConfig{
AppID: 123456,
AppHash: "YOUR_APP_HASH",
})

bot.Conn()
bot.LoginBot("YOUR_BOT_TOKEN")

bot.AddRawHandler(&telegram.UpdateBotPrecheckoutQuery{}, preCheckoutQueryUpd)
bot.On("edit", func(m *telegram.NewMessage) error {
paidUsers[m.SenderID()] = m.Invoice().ReceiptMsgID
return bot.E(m.Respond("Payment Successful!"))
}, telegram.FilterFunc(func(upd *telegram.NewMessage) bool {
return upd.Invoice() != nil && upd.Invoice().ReceiptMsgID != 0
}))

bot.On("message:pay", payCmd)
bot.On("message:status", statusCmd)
bot.On("message:refund", refundCmd) // new type of message handlers, eg<>
}

func payCmd(m *telegram.NewMessage) error {
invoice := telegram.InputMediaInvoice{
Title: "Test Product",
Description: "Test Description",
Payload: []byte(""),
Invoice: &telegram.Invoice{
Test: true,
Currency: "USD",
Prices: []*telegram.LabeledPrice{
{
Amount: 1,
Label: "1 USD",
},
},
},
ProviderData: &telegram.DataJson{},
}

m.ReplyMedia(&invoice)
return nil
}

func preCheckoutQueryUpd(upd telegram.Update, c *telegram.Client) error {
_upd := upd.(*telegram.UpdateBotPrecheckoutQuery)
c.MessagesSetBotPrecheckoutResults(true, _upd.QueryID, "Success")
return nil
}

func statusCmd(m *telegram.NewMessage) error {
if _, ok := paidUsers[m.SenderID()]; ok {
return m.E(m.Respond("You have paid!"))
}
return m.E(m.Respond("You have not paid!"))
}

func refundCmd(m *telegram.NewMessage) error {
if recpt_id, ok := paidUsers[m.SenderID()]; ok {
delete(paidUsers, m.SenderID())
u, _ := m.Client.GetSendableUser(m.SenderID())
m.Client.PaymentsRefundStarsCharge(u, fmt.Sprintf("%d", recpt_id))
return m.E(m.Respond("Refund Successful!"))
}
return m.E(m.Respond("You have not paid!"))
}
79 changes: 57 additions & 22 deletions mtproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pkg/errors"
)

const defaultTimeout = 30 * time.Second
const defaultTimeout = 60 * time.Second // after 60 sec without any read/write, lib will try to reconnect

type MTProto struct {
Addr string
Expand Down Expand Up @@ -67,7 +67,8 @@ type MTProto struct {
serviceChannel chan tl.Object
serviceModeActivated bool

authKey404 []int64
authKey404 []int64
shouldTransfer bool

Logger *utils.Logger

Expand Down Expand Up @@ -133,6 +134,8 @@ func NewMTProto(c Config) (*MTProto, error) {
}

//mtproto.offsetTime()
go mtproto.checkBreaking()

return mtproto, nil
}

Expand Down Expand Up @@ -189,7 +192,7 @@ func (m *MTProto) ImportAuth(stringSession string) (bool, error) {
return false, err
}
m.authKey, m.authKeyHash, m.Addr, m.appID = sessionString.AuthKey(), sessionString.AuthKeyHash(), sessionString.IpAddr(), sessionString.AppID()
m.Logger.Debug("importing Auth from stringSession...")
m.Logger.Debug("importing - auth from stringSession...")
if !m.memorySession {
if err := m.SaveSession(); err != nil {
return false, fmt.Errorf("saving session: %w", err)
Expand All @@ -198,6 +201,10 @@ func (m *MTProto) ImportAuth(stringSession string) (bool, error) {
return true, nil
}

func (m *MTProto) SetTransfer(transfer bool) {
m.shouldTransfer = transfer
}

func (m *MTProto) GetDC() int {
return utils.SearchAddr(m.Addr)
}
Expand Down Expand Up @@ -271,9 +278,9 @@ func (m *MTProto) CreateConnection(withLog bool) error {
ctx, cancelfunc := context.WithCancel(context.Background())
m.stopRoutines = cancelfunc
if withLog {
m.Logger.Info("Connecting to [" + m.Addr + "] - <Tcp> ...")
m.Logger.Info(fmt.Sprintf("connecting to [%s] - <Tcp> ...", m.Addr))
} else {
m.Logger.Debug("Connecting to [" + m.Addr + "] - <Tcp> ...")
m.Logger.Debug("connecting to [" + m.Addr + "] - <Tcp> ...")
}
err := m.connect(ctx)
if err != nil {
Expand All @@ -282,15 +289,15 @@ func (m *MTProto) CreateConnection(withLog bool) error {
m.tcpActive = true
if withLog {
if m.proxy != nil && m.proxy.Host != "" {
m.Logger.Info("Connection to (~" + m.proxy.Host + ")[" + m.Addr + "] - <Tcp> established")
m.Logger.Info(fmt.Sprintf("connection to (~%s)[%s] - <Tcp> established", m.proxy.Host, m.Addr))
} else {
m.Logger.Info("Connection to [" + m.Addr + "] - <Tcp> established")
m.Logger.Info(fmt.Sprintf("connection to [%s] - <Tcp> established", m.Addr))
}
} else {
if m.proxy != nil && m.proxy.Host != "" {
m.Logger.Debug("Connection to (~" + m.proxy.Host + ")[" + m.Addr + "] - <Tcp> established")
m.Logger.Debug("connection to (~" + m.proxy.Host + ")[" + m.Addr + "] - <Tcp> established")
} else {
m.Logger.Debug("Connection to [" + m.Addr + "] - <Tcp> established")
m.Logger.Debug("connection to [" + m.Addr + "] - <Tcp> established")
}
}

Expand Down Expand Up @@ -323,13 +330,15 @@ func (m *MTProto) connect(ctx context.Context) error {
return fmt.Errorf("creating transport: %w", err)
}

m.SetTransfer(true)

go closeOnCancel(ctx, m.transport)
return nil
}

func (m *MTProto) makeRequest(data tl.Object, expectedTypes ...reflect.Type) (any, error) {
if !m.TcpActive() {
return nil, errors.New("can't make request. connection is not established")
_ = m.CreateConnection(false)
}
resp, err := m.sendPacket(data, expectedTypes...)
if err != nil {
Expand All @@ -346,16 +355,20 @@ func (m *MTProto) makeRequest(data tl.Object, expectedTypes ...reflect.Type) (an
response := <-resp
switch r := response.(type) {
case *objects.RpcError:
//if err := RpcErrorToNative(r).(*ErrResponseCode); strings.Contains(err.Message, "FLOOD_WAIT_") {
//m.Logger.Info("flood wait detected on '" + strings.ReplaceAll(reflect.TypeOf(data).Elem().Name(), "Params", "") + fmt.Sprintf("' request. sleeping for %s", (time.Duration(realErr.AdditionalInfo.(int))*time.Second).String()))
//time.Sleep(time.Duration(realErr.AdditionalInfo.(int)) * time.Second)
//return m.makeRequest(data, expectedTypes...) TODO: implement flood wait correctly
//}
// if err := RpcErrorToNative(r).(*ErrResponseCode); strings.Contains(err.Message, "FLOOD_WAIT_") {
// m.Logger.Info("flood wait detected on '" + strings.ReplaceAll(reflect.TypeOf(data).Elem().Name(), "Params", "") + fmt.Sprintf("' request. sleeping for %s", (time.Duration(realErr.AdditionalInfo.(int))*time.Second).String()))
// time.Sleep(time.Duration(realErr.AdditionalInfo.(int)) * time.Second)
// return m.makeRequest(data, expectedTypes...) TODO: implement flood wait correctly
// }
return nil, RpcErrorToNative(r)

case *errorSessionConfigsChanged:
m.Logger.Debug("session configs changed, resending request")
return m.makeRequest(data, expectedTypes...)

case *errorReconnectRequired:
m.Logger.Info("req info: " + fmt.Sprintf("%T", data))
return nil, errors.New("required to reconnect!")
}

return tl.UnwrapNativeTypes(response), nil
Expand All @@ -376,6 +389,12 @@ func (m *MTProto) TcpActive() bool {
func (m *MTProto) Disconnect() error {
m.stopRoutines()
m.tcpActive = false

for _, v := range m.responseChannels.Keys() {
ch, _ := m.responseChannels.Get(v)
ch <- &errorReconnectRequired{}
}

// m.responseChannels.Close()
return nil
}
Expand All @@ -394,18 +413,18 @@ func (m *MTProto) Reconnect(WithLogs bool) error {
return errors.Wrap(err, "disconnecting")
}
if WithLogs {
m.Logger.Info("Reconnecting to [" + m.Addr + "] - <Tcp> ...")
m.Logger.Info(fmt.Sprintf("reconnecting to [%s] - <Tcp> ...", m.Addr))
}

err = m.CreateConnection(WithLogs)
if err == nil && WithLogs {
m.Logger.Info("Reconnected to [" + m.Addr + "] - <Tcp> ...")
m.Logger.Info(fmt.Sprintf("reconnected to [%s] - <Tcp>", m.Addr))
}
m.InvokeRequestWithoutUpdate(&utils.PingParams{
PingID: 123456789,
})

m.MakeRequest(&utils.UpdatesGetStateParams{}) // to ask the server to send the updates
//m.MakeRequest(&utils.UpdatesGetStateParams{}) // to ask the server to send the updates
return errors.Wrap(err, "recreating connection")
}

Expand Down Expand Up @@ -567,7 +586,6 @@ messageTypeSwitching:
respChannelsBackup = m.responseChannels

m.responseChannels = utils.NewSyncIntObjectChan()

m.Reconnect(false)

for _, k := range respChannelsBackup.Keys() {
Expand All @@ -593,7 +611,7 @@ messageTypeSwitching:
if badMsg.Code == 16 || badMsg.Code == 17 {
m.offsetTime()
}
m.Logger.Debug("badMsgNotification: " + badMsg.Error())
m.Logger.Debug("bad-msg-notification: " + badMsg.Error())
return badMsg
case *objects.RpcResult:
obj := message.Obj
Expand All @@ -606,7 +624,7 @@ messageTypeSwitching:
if strings.Contains(err.Error(), "no response channel found") {
m.Logger.Error(errors.Wrap(err, "writing rpc response"))
} else {
return errors.Wrap(err, "writing RPC response")
return errors.Wrap(err, "writing rpc response")
}
}

Expand All @@ -625,7 +643,7 @@ messageTypeSwitching:
}
}
if !processed {
m.Logger.Debug("~ unhandled update: " + fmt.Sprintf("%T", message))
m.Logger.Debug("unhandled update: " + fmt.Sprintf("%T", message))
}
}

Expand Down Expand Up @@ -682,3 +700,20 @@ func closeOnCancel(ctx context.Context, c io.Closer) {
c.Close()
}()
}

func (m *MTProto) checkBreaking() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for range ticker.C {
if m.shouldTransfer && !m.TcpActive() {
m.CreateConnection(false)
for i := 0; i < 2; i++ {
_, err := m.MakeRequest(&utils.UpdatesGetStateParams{})
if err == nil {
break
}
}
}
}
}
8 changes: 4 additions & 4 deletions telegram/callbackquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (b *CallbackQuery) ForwardTo(ChatID int64, options ...*ForwardOptions) (*Ne
return &m[0], nil
}

func (b *CallbackQuery) Marshal() string {
return b.Client.JSON(b.OriginalUpdate)
func (b *CallbackQuery) Marshal(nointent ...bool) string {
return b.Client.JSON(b.OriginalUpdate, nointent)
}

type InlineCallbackQuery struct {
Expand Down Expand Up @@ -255,6 +255,6 @@ func (b *InlineCallbackQuery) IsChannel() bool {
return b.ChatType() == EntityChannel
}

func (b *InlineCallbackQuery) Marshal() string {
return b.Client.JSON(b.OriginalUpdate)
func (b *InlineCallbackQuery) Marshal(nointent ...bool) string {
return b.Client.JSON(b.OriginalUpdate, nointent)
}
17 changes: 12 additions & 5 deletions telegram/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ func NewClient(config ClientConfig) (*Client, error) {
client.Log.Debug("client is running in no updates mode, no updates will be handled")
} else {
client.setupDispatcher()
if client.IsConnected() {
// TODO: Implement same for manual connect Call.
// client.UpdatesGetState()
}
}
if err := client.clientWarnings(config); err != nil {
return nil, err
Expand Down Expand Up @@ -204,7 +200,7 @@ func (c *Client) clientWarnings(config ClientConfig) error {
}

func (c *Client) setupDispatcher() {
c.dispatcher = &UpdateDispatcher{}
c.NewUpdateDispatcher()
handleUpdaterWrapper := func(u any) bool {
return HandleIncomingUpdates(u, c)
}
Expand Down Expand Up @@ -549,6 +545,7 @@ func (c *Client) Idle() {
func (c *Client) Stop() error {
close(c.stopCh)
go c.cleanExportedSenders()
c.MTProto.SetTransfer(false) // to stop connection break check.
return c.MTProto.Terminate()
}

Expand All @@ -572,3 +569,13 @@ func (c *Client) WrapError(err error) error {
}
return err
}

// return only the object, omitting the error
func (c *Client) W(obj any, err error) any {
return obj
}

// return only the error, omitting the object
func (c *Client) E(obj any, err error) error {
return err
}
2 changes: 1 addition & 1 deletion telegram/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "regexp"

const (
ApiVersion = 184
Version = "v2.3.14"
Version = "v2.3.15"

LogDebug = "debug"
LogInfo = "info"
Expand Down
Loading

0 comments on commit 62c5210

Please sign in to comment.