Skip to content

Commit

Permalink
Merge pull request #233 from crawlab-team/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
wo10378931 authored Sep 30, 2019
2 parents c16254a + 2199765 commit aeeab51
Show file tree
Hide file tree
Showing 42 changed files with 1,012 additions and 733 deletions.
9 changes: 9 additions & 0 deletions backend/constants/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package constants

const (
ChannelAllNode = "nodes:public"

ChannelWorkerNode = "nodes:"

ChannelMasterNode = "nodes:master"
)
1 change: 1 addition & 0 deletions backend/constants/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const (
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
MsgTypeRemoveLog = "remove-log"
MsgTypeRemoveSpider = "remove-spider"
)
15 changes: 11 additions & 4 deletions backend/constants/task.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package constants

const (
StatusPending string = "pending"
StatusRunning string = "running"
StatusFinished string = "finished"
StatusError string = "error"
// 调度中
StatusPending string = "pending"
// 运行中
StatusRunning string = "running"
// 已完成
StatusFinished string = "finished"
// 错误
StatusError string = "error"
// 取消
StatusCancelled string = "cancelled"
// 节点重启导致的异常终止
StatusAbnormal string = "abnormal"
)

const (
Expand Down
1 change: 0 additions & 1 deletion backend/database/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
fmt.Println(msg)
if err := consume(msg); err != nil {
fmt.Printf("redis pubsub consume message err: %v", err)
continue
Expand Down
17 changes: 17 additions & 0 deletions backend/entity/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package entity

import "strconv"

type Page struct {
Skip int
Limit int
PageNum int
PageSize int
}

func (p *Page)GetPage(pageNum string, pageSize string) {
p.PageNum, _ = strconv.Atoi(pageNum)
p.PageSize, _ = strconv.Atoi(pageSize)
p.Skip = p.PageSize * (p.PageNum - 1)
p.Limit = p.PageSize
}
25 changes: 25 additions & 0 deletions backend/entity/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package entity

type NodeMessage struct {
// 通信类别
Type string `json:"type"`

// 任务相关
TaskId string `json:"task_id"` // 任务ID

// 节点相关
NodeId string `json:"node_id"` // 节点ID

// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志

// 系统信息
SysInfo SystemInfo `json:"sys_info"`

// 爬虫相关
SpiderId string `json:"spider_id"` //爬虫ID

// 错误相关
Error string `json:"error"`
}
6 changes: 6 additions & 0 deletions backend/entity/spider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package entity

type SpiderType struct {
Type string `json:"type" bson:"_id"`
Count int `json:"count" bson:"count"`
}
15 changes: 15 additions & 0 deletions backend/entity/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package entity

type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
Hostname string `json:"host_name"`
NumCpu int `json:"num_cpu"`
Executables []Executable `json:"executables"`
}

type Executable struct {
Path string `json:"path"`
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}
6 changes: 4 additions & 2 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crawlab/database"
"crawlab/lib/validate_bridge"
"crawlab/middlewares"
"crawlab/model"
"crawlab/routes"
"crawlab/services"
"github.com/apex/log"
Expand Down Expand Up @@ -57,7 +58,7 @@ func main() {
}
log.Info("初始化Redis数据库成功")

if services.IsMaster() {
if model.IsMaster() {
// 初始化定时任务
if err := services.InitScheduler(); err != nil {
log.Error("init scheduler error:" + err.Error())
Expand Down Expand Up @@ -99,7 +100,7 @@ func main() {
log.Info("初始化用户服务成功")

// 以下为主节点服务
if services.IsMaster() {
if model.IsMaster() {
// 中间件
app.Use(middlewares.CORSMiddleware())
//app.Use(middlewares.AuthorizationMiddleware())
Expand Down Expand Up @@ -131,6 +132,7 @@ func main() {
authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入
authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录
authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据
authGroup.GET("/spider/types", routes.GetSpiderTypes) // 爬虫类型
// 任务
authGroup.GET("/tasks", routes.GetTaskList) // 任务列表
authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情
Expand Down
52 changes: 51 additions & 1 deletion backend/model/file.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,74 @@
package model

import (
"crawlab/database"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"os"
"runtime/debug"
"time"
)

type GridFs struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
ChunkSize int32 `json:"chunk_size" bson:"chunkSize"`
UploadDate time.Time `json:"upload_date" bson:"uploadDate"`
Length int32 `json:"length" bson:"length"`
Md5 string `json:"md_5" bson:"md5"`
Filename string `json:"filename" bson:"filename"`
}

type File struct {
Name string `json:"name"`
Path string `json:"path"`
IsDir bool `json:"is_dir"`
Size int64 `json:"size"`
}

func (f *GridFs) Remove() {
s, gf := database.GetGridFs("files")
defer s.Close()
if err := gf.RemoveId(f.Id); err != nil {
log.Errorf("remove file id error: %s, id: %s", err.Error(), f.Id.Hex())
debug.PrintStack()
}
}

func GetAllGridFs() []*GridFs {
s, gf := database.GetGridFs("files")
defer s.Close()

var files []*GridFs
if err := gf.Find(nil).All(&files); err != nil {
log.Errorf("get all files error: {}", err.Error())
debug.PrintStack()
return nil
}
return files
}

func GetGridFs(id bson.ObjectId) *GridFs {
s, gf := database.GetGridFs("files")
defer s.Close()

var gfFile GridFs
err := gf.Find(bson.M{"_id": id}).One(&gfFile)
if err != nil {
log.Errorf("get gf file error: %s, file_id: %s", err.Error(), id.Hex())
debug.PrintStack()
return nil
}
return &gfFile
}

func RemoveFile(path string) error {
if !utils.Exists(path) {
log.Info("file not found: " + path)
debug.PrintStack()
return nil
}
if err := os.Remove(path); err != nil {
if err := os.RemoveAll(path); err != nil {
return err
}
return nil
Expand Down
68 changes: 68 additions & 0 deletions backend/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"runtime/debug"
"time"
)
Expand All @@ -30,6 +31,73 @@ type Node struct {
UpdateTsUnix int64 `json:"update_ts_unix" bson:"update_ts_unix"`
}

const (
Yes = "Y"
No = "N"
)

// 当前节点是否为主节点
func IsMaster() bool {
return viper.GetString("server.master") == Yes
}

// 获取本机节点
func GetCurrentNode() (Node, error) {
// 获得注册的key值
key, err := register.GetRegister().GetKey()
if err != nil {
return Node{}, err
}

// 从数据库中获取当前节点
var node Node
errNum := 0
for {
// 如果错误次数超过10次
if errNum >= 10 {
panic("cannot get current node")
}

// 尝试获取节点
node, err = GetNodeByKey(key)
// 如果获取失败
if err != nil {
// 如果为主节点,表示为第一次注册,插入节点信息
if IsMaster() {
// 获取本机信息
ip, mac, key, err := GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
return node, err
}

// 生成节点
node = Node{
Key: key,
Id: bson.NewObjectId(),
Ip: ip,
Name: ip,
Mac: mac,
IsMaster: true,
}
if err := node.Add(); err != nil {
return node, err
}
return node, nil
}
// 增加错误次数
errNum++

// 5秒后重试
time.Sleep(5 * time.Second)
continue
}
// 跳出循环
break
}
return node, nil
}

func (n *Node) Save() error {
s, c := database.GetCol("nodes")
defer s.Close()
Expand Down
3 changes: 2 additions & 1 deletion backend/model/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
// 获取爬虫名称
spider, err := GetSpider(schedule.SpiderId)
if err != nil {
log.Errorf(err.Error())
log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error())
debug.PrintStack()
continue
}
schedules[i].SpiderName = spider.Name
Expand Down
Loading

0 comments on commit aeeab51

Please sign in to comment.