diff --git a/CHANGELOG.md b/CHANGELOG.md index 95ef9cd76..6dd14e2c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,23 @@ +# 0.4.1 (2019-12-13) +### Features / Enhancement +- **Spiderfile Optimization**. Stages changed from dictionary to array. [#358](https://github.com/crawlab-team/crawlab/issues/358) +- **Baidu Tongji Update**. + +### Bug Fixes +- **Unable to display schedule tasks**. [#353](https://github.com/crawlab-team/crawlab/issues/353) +- **Duplicate node registration**. [#334](https://github.com/crawlab-team/crawlab/issues/334) + +# 0.4.0 (2019-12-06) +### Features / Enhancement +- **Configurable Spider**. Allow users to add spiders using *Spiderfile* to configure crawling rules. +- **Execution Mode**. Allow users to select 3 modes for task execution: *All Nodes*, *Selected Nodes* and *Random*. + +### Bug Fixes +- **Task accidentally killed**. [#306](https://github.com/crawlab-team/crawlab/issues/306) +- **Documentation fix**. [#301](https://github.com/crawlab-team/crawlab/issues/258) [#301](https://github.com/crawlab-team/crawlab/issues/258) +- **Direct deploy incompatible with Windows**. [#288](https://github.com/crawlab-team/crawlab/issues/288) +- **Log files lost**. [#269](https://github.com/crawlab-team/crawlab/issues/269) + # 0.3.5 (2019-10-28) ### Features / Enhancement - **Graceful Showdown**. [detail](https://github.com/crawlab-team/crawlab/commit/63fab3917b5a29fd9770f9f51f1572b9f0420385) diff --git a/backend/constants/schedule.go b/backend/constants/schedule.go new file mode 100644 index 000000000..c31046011 --- /dev/null +++ b/backend/constants/schedule.go @@ -0,0 +1,10 @@ +package constants + +const ( + ScheduleStatusStop = "stop" + ScheduleStatusRunning = "running" + ScheduleStatusError = "error" + + ScheduleStatusErrorNotFoundNode = "Not Found Node" + ScheduleStatusErrorNotFoundSpider = "Not Found Spider" +) diff --git a/backend/database/redis.go b/backend/database/redis.go index bffc40be0..b165aaa36 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -4,10 +4,12 @@ import ( "context" "crawlab/entity" "crawlab/utils" + "errors" "github.com/apex/log" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" + "strings" "time" ) @@ -17,9 +19,18 @@ type Redis struct { pool *redis.Pool } +type Mutex struct { + Name string + expiry time.Duration + tries int + delay time.Duration + value string +} + func NewRedisClient() *Redis { return &Redis{pool: NewRedisPool()} } + func (r *Redis) RPush(collection string, value interface{}) error { c := r.pool.Get() defer utils.Close(c) @@ -143,3 +154,59 @@ func Sub(channel string, consume ConsumeFunc) error { } return nil } + +// 构建同步锁key +func (r *Redis) getLockKey(lockKey string) string { + lockKey = strings.ReplaceAll(lockKey, ":", "-") + return "nodes:lock:" + lockKey +} + +// 获得锁 +func (r *Redis) Lock(lockKey string) (int64, error) { + c := r.pool.Get() + defer utils.Close(c) + lockKey = r.getLockKey(lockKey) + + ts := time.Now().Unix() + ok, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000) + if err != nil { + log.Errorf("get lock fail with error: %s", err.Error()) + debug.PrintStack() + return 0, err + } + if err == nil && ok == nil { + log.Errorf("the lockKey is locked: key=%s", lockKey) + return 0, errors.New("the lockKey is locked") + } + return ts, nil +} + +func (r *Redis) UnLock(lockKey string, value int64) { + c := r.pool.Get() + defer utils.Close(c) + lockKey = r.getLockKey(lockKey) + + getValue, err := redis.Int64(c.Do("GET", lockKey)) + if err != nil { + log.Errorf("get lockKey error: %s", err.Error()) + debug.PrintStack() + return + } + + if getValue != value { + log.Errorf("the lockKey value diff: %d, %d", value, getValue) + return + } + + v, err := redis.Int64(c.Do("DEL", lockKey)) + if err != nil { + log.Errorf("unlock failed, error: %s", err.Error()) + debug.PrintStack() + return + } + + if v == 0 { + log.Errorf("unlock failed: key=%s", lockKey) + return + } +} diff --git a/backend/entity/config_spider.go b/backend/entity/config_spider.go index 3fe28bc95..d9e085d2e 100644 --- a/backend/entity/config_spider.go +++ b/backend/entity/config_spider.go @@ -5,7 +5,7 @@ type ConfigSpiderData struct { Engine string `yaml:"engine" json:"engine"` StartUrl string `yaml:"start_url" json:"start_url"` StartStage string `yaml:"start_stage" json:"start_stage"` - Stages map[string]Stage `yaml:"stages" json:"stages"` + Stages []Stage `yaml:"stages" json:"stages"` Settings map[string]string `yaml:"settings" json:"settings"` } diff --git a/backend/main.go b/backend/main.go index 3f87125de..0d7b7cc1c 100644 --- a/backend/main.go +++ b/backend/main.go @@ -154,17 +154,20 @@ func main() { authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情 authGroup.PUT("/tasks", routes.PutTask) // 派发任务 authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 + authGroup.DELETE("/tasks_multiple", routes.DeleteMultipleTask) // 删除多个任务 authGroup.DELETE("/tasks_by_status", routes.DeleteTaskByStatus) //删除指定状态的任务 authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 // 定时任务 - authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表 - authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 - authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务 - authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 - authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 + authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表 + authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 + authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务 + authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 + authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 + authGroup.POST("/schedules/:id/stop", routes.StopSchedule) // 停止定时任务 + authGroup.POST("/schedules/:id/run", routes.RunSchedule) // 运行定时任务 // 统计数据 authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据 // 用户 diff --git a/backend/model/config_spider/common.go b/backend/model/config_spider/common.go index c803755ac..4d244fe19 100644 --- a/backend/model/config_spider/common.go +++ b/backend/model/config_spider/common.go @@ -15,16 +15,12 @@ func GetAllFields(data entity.ConfigSpiderData) []entity.Field { func GetStartStageName(data entity.ConfigSpiderData) string { // 如果 start_stage 设置了且在 stages 里,则返回 if data.StartStage != "" { - for stageName := range data.Stages { - if stageName == data.StartStage { - return data.StartStage - } - } + return data.StartStage } // 否则返回第一个 stage - for stageName := range data.Stages { - return stageName + for _, stage := range data.Stages { + return stage.Name } return "" } diff --git a/backend/model/config_spider/scrapy.go b/backend/model/config_spider/scrapy.go index 6fcb77f02..ee24a3e78 100644 --- a/backend/model/config_spider/scrapy.go +++ b/backend/model/config_spider/scrapy.go @@ -83,7 +83,8 @@ func (g ScrapyGenerator) ProcessSpider() error { // 替换 parsers strParser := "" - for stageName, stage := range g.ConfigData.Stages { + for _, stage := range g.ConfigData.Stages { + stageName := stage.Name stageStr := g.GetParserString(stageName, stage) strParser += stageStr } diff --git a/backend/model/node.go b/backend/model/node.go index 1c63fc3e9..a24b36e33 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -169,7 +169,7 @@ func GetNode(id bson.ObjectId) (Node, error) { defer s.Close() if err := c.FindId(id).One(&node); err != nil { - log.Errorf(err.Error()) + log.Errorf("get node error: %s, id: %s", err.Error(), id.Hex()) debug.PrintStack() return node, err } diff --git a/backend/model/schedule.go b/backend/model/schedule.go index ef758fb64..c1923885f 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -12,19 +12,25 @@ import ( ) type Schedule struct { - Id bson.ObjectId `json:"_id" bson:"_id"` - Name string `json:"name" bson:"name"` - Description string `json:"description" bson:"description"` - SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"` - NodeId bson.ObjectId `json:"node_id" bson:"node_id"` - NodeKey string `json:"node_key" bson:"node_key"` - Cron string `json:"cron" bson:"cron"` - EntryId cron.EntryID `json:"entry_id" bson:"entry_id"` - Param string `json:"param" bson:"param"` + Id bson.ObjectId `json:"_id" bson:"_id"` + Name string `json:"name" bson:"name"` + Description string `json:"description" bson:"description"` + SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"` + //NodeId bson.ObjectId `json:"node_id" bson:"node_id"` + //NodeKey string `json:"node_key" bson:"node_key"` + Cron string `json:"cron" bson:"cron"` + EntryId cron.EntryID `json:"entry_id" bson:"entry_id"` + Param string `json:"param" bson:"param"` + RunType string `json:"run_type" bson:"run_type"` + NodeIds []bson.ObjectId `json:"node_ids" bson:"node_ids"` + + // 状态 + Status string `json:"status" bson:"status"` // 前端展示 SpiderName string `json:"spider_name" bson:"spider_name"` NodeName string `json:"node_name" bson:"node_name"` + Message string `json:"message" bson:"message"` CreateTs time.Time `json:"create_ts" bson:"create_ts"` UpdateTs time.Time `json:"update_ts" bson:"update_ts"` @@ -46,26 +52,26 @@ func (sch *Schedule) Delete() error { return c.RemoveId(sch.Id) } -func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) { - sch.syncNodeId(node) - sch.syncSpiderId(spider) -} - -func (sch *Schedule) syncNodeId(node Node) { - if node.Id.Hex() == sch.NodeId.Hex() { - return - } - sch.NodeId = node.Id - _ = sch.Save() -} - -func (sch *Schedule) syncSpiderId(spider Spider) { - if spider.Id.Hex() == sch.SpiderId.Hex() { - return - } - sch.SpiderId = spider.Id - _ = sch.Save() -} +//func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) { +// sch.syncNodeId(node) +// sch.syncSpiderId(spider) +//} + +//func (sch *Schedule) syncNodeId(node Node) { +// if node.Id.Hex() == sch.NodeId.Hex() { +// return +// } +// sch.NodeId = node.Id +// _ = sch.Save() +//} + +//func (sch *Schedule) syncSpiderId(spider Spider) { +// if spider.Id.Hex() == sch.SpiderId.Hex() { +// return +// } +// sch.SpiderId = spider.Id +// _ = sch.Save() +//} func GetScheduleList(filter interface{}) ([]Schedule, error) { s, c := database.GetCol("schedules") @@ -78,29 +84,31 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { var schs []Schedule for _, schedule := range schedules { - // 获取节点名称 - if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) { - // 选择所有节点 - schedule.NodeName = "All Nodes" - } else { - // 选择单一节点 - node, err := GetNode(schedule.NodeId) - if err != nil { - log.Errorf(err.Error()) - continue - } - schedule.NodeName = node.Name - } + // TODO: 获取节点名称 + //if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) { + // // 选择所有节点 + // schedule.NodeName = "All Nodes" + //} else { + // // 选择单一节点 + // node, err := GetNode(schedule.NodeId) + // if err != nil { + // schedule.Status = constants.ScheduleStatusError + // schedule.Message = constants.ScheduleStatusErrorNotFoundNode + // } else { + // schedule.NodeName = node.Name + // } + //} // 获取爬虫名称 spider, err := GetSpider(schedule.SpiderId) if err != nil && err == mgo.ErrNotFound { log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error()) - debug.PrintStack() - _ = schedule.Delete() - continue + schedule.Status = constants.ScheduleStatusError + schedule.Message = constants.ScheduleStatusErrorNotFoundSpider + } else { + schedule.SpiderName = spider.Name } - schedule.SpiderName = spider.Name + schs = append(schs, schedule) } return schs, nil @@ -125,12 +133,13 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error { if err := c.FindId(id).One(&result); err != nil { return err } - node, err := GetNode(item.NodeId) - if err != nil { - return err - } + //node, err := GetNode(item.NodeId) + //if err != nil { + // return err + //} - item.NodeKey = node.Key + item.UpdateTs = time.Now() + //item.NodeKey = node.Key if err := item.Save(); err != nil { return err } @@ -141,15 +150,15 @@ func AddSchedule(item Schedule) error { s, c := database.GetCol("schedules") defer s.Close() - node, err := GetNode(item.NodeId) - if err != nil { - return err - } + //node, err := GetNode(item.NodeId) + //if err != nil { + // return err + //} item.Id = bson.NewObjectId() item.CreateTs = time.Now() item.UpdateTs = time.Now() - item.NodeKey = node.Key + //item.NodeKey = node.Key if err := c.Insert(&item); err != nil { debug.PrintStack() diff --git a/backend/model/spider.go b/backend/model/spider.go index a0d72c1cd..78adc4d0c 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -319,11 +319,5 @@ func GetConfigSpiderData(spider Spider) (entity.ConfigSpiderData, error) { return configData, err } - // 赋值 stage_name - for stageName, stage := range configData.Stages { - stage.Name = stageName - configData.Stages[stageName] = stage - } - return configData, nil } diff --git a/backend/model/task.go b/backend/model/task.go index 588db6b38..299661edc 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -61,6 +61,7 @@ func (t *Task) Save() error { defer s.Close() t.UpdateTs = time.Now() if err := c.UpdateId(t.Id, t); err != nil { + log.Errorf("update task error: %s", err.Error()) debug.PrintStack() return err } @@ -152,14 +153,13 @@ func GetTask(id string) (Task, error) { var task Task if err := c.FindId(id).One(&task); err != nil { + log.Infof("get task error: %s, id: %s", err.Error(), id) debug.PrintStack() return task, err } return task, nil } - - func AddTask(item Task) error { s, c := database.GetCol("tasks") defer s.Close() diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index 73b753236..e54c49a3c 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -14,11 +14,7 @@ func GetScheduleList(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: results, - }) + HandleSuccessData(c, results) } func GetSchedule(c *gin.Context) { @@ -29,11 +25,8 @@ func GetSchedule(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: result, - }) + + HandleSuccessData(c, result) } func PostSchedule(c *gin.Context) { @@ -48,7 +41,7 @@ func PostSchedule(c *gin.Context) { // 验证cron表达式 if err := services.ParserCron(newItem.Cron); err != nil { - HandleError(http.StatusOK, c, err) + HandleError(http.StatusInternalServerError, c, err) return } @@ -65,10 +58,7 @@ func PostSchedule(c *gin.Context) { return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) } func PutSchedule(c *gin.Context) { @@ -82,7 +72,7 @@ func PutSchedule(c *gin.Context) { // 验证cron表达式 if err := services.ParserCron(item.Cron); err != nil { - HandleError(http.StatusOK, c, err) + HandleError(http.StatusInternalServerError, c, err) return } @@ -98,10 +88,7 @@ func PutSchedule(c *gin.Context) { return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) } func DeleteSchedule(c *gin.Context) { @@ -119,8 +106,25 @@ func DeleteSchedule(c *gin.Context) { return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) +} + +// 停止定时任务 +func StopSchedule(c *gin.Context) { + id := c.Param("id") + if err := services.Sched.Stop(bson.ObjectIdHex(id)); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + HandleSuccess(c) +} + +// 运行定时任务 +func RunSchedule(c *gin.Context) { + id := c.Param("id") + if err := services.Sched.Run(bson.ObjectIdHex(id)); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + HandleSuccess(c) } diff --git a/backend/routes/task.go b/backend/routes/task.go index 3d6edece1..d5e3caccf 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -29,7 +29,7 @@ func GetTaskList(c *gin.Context) { // 绑定数据 data := TaskListRequestData{} if err := c.ShouldBindQuery(&data); err != nil { - HandleError(http.StatusBadRequest, c, err) + HandleError(http.StatusInternalServerError, c, err) return } if data.PageNum == 0 { @@ -82,11 +82,7 @@ func GetTask(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: result, - }) + HandleSuccessData(c, result) } func PutTask(c *gin.Context) { @@ -100,7 +96,7 @@ func PutTask(c *gin.Context) { // 绑定数据 var reqBody TaskRequestBody if err := c.ShouldBindJSON(&reqBody); err != nil { - HandleError(http.StatusBadRequest, c, err) + HandleError(http.StatusInternalServerError, c, err) return } @@ -123,7 +119,6 @@ func PutTask(c *gin.Context) { return } } - } else if reqBody.RunType == constants.RunTypeRandom { // 随机 t := model.Task{ @@ -134,7 +129,6 @@ func PutTask(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - } else if reqBody.RunType == constants.RunTypeSelectedNodes { // 指定节点 for _, nodeId := range reqBody.NodeIds { @@ -149,16 +143,11 @@ func PutTask(c *gin.Context) { return } } - } else { - HandleErrorF(http.StatusBadRequest, c, "invalid run_type") + HandleErrorF(http.StatusInternalServerError, c, "invalid run_type") return } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) } func DeleteTaskByStatus(c *gin.Context) { @@ -176,12 +165,31 @@ func DeleteTaskByStatus(c *gin.Context) { return } - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) +} + +// 删除多个任务 +func DeleteMultipleTask(c *gin.Context) { + ids := make(map[string][]string) + if err := c.ShouldBindJSON(&ids); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + list := ids["ids"] + for _, id := range list { + if err := services.RemoveLogByTaskId(id); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + if err := model.RemoveTask(id); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + HandleSuccess(c) } +// 删除单个任务 func DeleteTask(c *gin.Context) { id := c.Param("id") @@ -190,33 +198,22 @@ func DeleteTask(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - // 删除task if err := model.RemoveTask(id); err != nil { HandleError(http.StatusInternalServerError, c, err) return } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) } func GetTaskLog(c *gin.Context) { id := c.Param("id") - logStr, err := services.GetTaskLog(id) if err != nil { HandleError(http.StatusInternalServerError, c, err) return } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: logStr, - }) + HandleSuccessData(c, logStr) } func GetTaskResults(c *gin.Context) { @@ -225,7 +222,7 @@ func GetTaskResults(c *gin.Context) { // 绑定数据 data := TaskResultsRequestData{} if err := c.ShouldBindQuery(&data); err != nil { - HandleError(http.StatusBadRequest, c, err) + HandleError(http.StatusInternalServerError, c, err) return } @@ -327,9 +324,5 @@ func CancelTask(c *gin.Context) { HandleError(http.StatusInternalServerError, c, err) return } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) + HandleSuccess(c) } diff --git a/backend/routes/utils.go b/backend/routes/utils.go index 38ca35bb1..dfa5420eb 100644 --- a/backend/routes/utils.go +++ b/backend/routes/utils.go @@ -1,17 +1,15 @@ package routes import ( - "github.com/apex/log" "github.com/gin-gonic/gin" + "net/http" "runtime/debug" ) func HandleError(statusCode int, c *gin.Context, err error) { - log.Errorf("handle error:" + err.Error()) - debug.PrintStack() c.AbortWithStatusJSON(statusCode, Response{ - Status: "ok", - Message: "error", + Status: "error", + Message: "failure", Error: err.Error(), }) } @@ -24,3 +22,18 @@ func HandleErrorF(statusCode int, c *gin.Context, err string) { Error: err, }) } + +func HandleSuccess(c *gin.Context) { + c.AbortWithStatusJSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} + +func HandleSuccessData(c *gin.Context, data interface{}) { + c.AbortWithStatusJSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: data, + }) +} diff --git a/backend/services/config_spider.go b/backend/services/config_spider.go index 7c736cc71..fe0a3da14 100644 --- a/backend/services/config_spider.go +++ b/backend/services/config_spider.go @@ -61,7 +61,9 @@ func ValidateSpiderfile(configData entity.ConfigSpiderData) error { // 校验stages dict := map[string]int{} - for stageName, stage := range configData.Stages { + for _, stage := range configData.Stages { + stageName := stage.Name + // stage 名称不能为空 if stageName == "" { return errors.New("spiderfile invalid: stage name is empty") @@ -152,12 +154,6 @@ func IsUniqueConfigSpiderFields(fields []entity.Field) bool { func ProcessSpiderFilesFromConfigData(spider model.Spider, configData entity.ConfigSpiderData) error { spiderDir := spider.Src - // 赋值 stage_name - for stageName, stage := range configData.Stages { - stage.Name = stageName - configData.Stages[stageName] = stage - } - // 删除已有的爬虫文件 for _, fInfo := range utils.ListDir(spiderDir) { // 不删除Spiderfile diff --git a/backend/services/node.go b/backend/services/node.go index be916f104..369347468 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -95,19 +95,17 @@ func UpdateNodeStatus() { } func handleNodeInfo(key string, data Data) { + // 添加同步锁 + v, err := database.RedisClient.Lock(key) + if err != nil { + return + } + defer database.RedisClient.UnLock(key, v) + // 更新节点信息到数据库 s, c := database.GetCol("nodes") defer s.Close() - // 同个key可能因为并发,被注册多次 - var nodes []model.Node - _ = c.Find(bson.M{"key": key}).All(&nodes) - if len(nodes) > 1 { - for _, node := range nodes { - _ = c.RemoveId(node.Id) - } - } - var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { // 数据库不存在该节点 diff --git a/backend/services/schedule.go b/backend/services/schedule.go index 52a6492e8..53938aea3 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -4,8 +4,10 @@ import ( "crawlab/constants" "crawlab/lib/cron" "crawlab/model" + "errors" "github.com/apex/log" - "github.com/satori/go.uuid" + "github.com/globalsign/mgo/bson" + uuid "github.com/satori/go.uuid" "runtime/debug" ) @@ -17,48 +19,87 @@ type Scheduler struct { func AddScheduleTask(s model.Schedule) func() { return func() { - node, err := model.GetNodeByKey(s.NodeKey) - if err != nil || node.Id.Hex() == "" { - log.Errorf("get node by key error: %s", err.Error()) - debug.PrintStack() - return - } - - spider := model.GetSpiderByName(s.SpiderName) - if spider == nil || spider.Id.Hex() == "" { - log.Errorf("get spider by name error: %s", err.Error()) - debug.PrintStack() - return - } - - // 同步ID到定时任务 - s.SyncNodeIdAndSpiderId(node, *spider) - // 生成任务ID id := uuid.NewV4() - // 生成任务模型 - t := model.Task{ - Id: id.String(), - SpiderId: spider.Id, - NodeId: node.Id, - Status: constants.StatusPending, - Param: s.Param, - } - - // 将任务存入数据库 - if err := model.AddTask(t); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + if s.RunType == constants.RunTypeAllNodes { + // 所有节点 + nodes, err := model.GetNodeList(nil) + if err != nil { + return + } + for _, node := range nodes { + t := model.Task{ + Id: id.String(), + SpiderId: s.SpiderId, + NodeId: node.Id, + Param: s.Param, + } + + if err := AddTask(t); err != nil { + return + } + if err := AssignTask(t); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + } + } else if s.RunType == constants.RunTypeRandom { + // 随机 + t := model.Task{ + Id: id.String(), + SpiderId: s.SpiderId, + Param: s.Param, + } + if err := AddTask(t); err != nil { + return + } + if err := AssignTask(t); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + } else if s.RunType == constants.RunTypeSelectedNodes { + // 指定节点 + for _, nodeId := range s.NodeIds { + t := model.Task{ + Id: id.String(), + SpiderId: s.SpiderId, + NodeId: nodeId, + Param: s.Param, + } + + if err := AddTask(t); err != nil { + return + } + + if err := AssignTask(t); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + } + } else { return } - // 加入任务队列 - if err := AssignTask(t); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } + //node, err := model.GetNodeByKey(s.NodeKey) + //if err != nil || node.Id.Hex() == "" { + // log.Errorf("get node by key error: %s", err.Error()) + // debug.PrintStack() + // return + //} + // + //spider := model.GetSpiderByName(s.SpiderName) + //if spider == nil || spider.Id.Hex() == "" { + // log.Errorf("get spider by name error: %s", err.Error()) + // debug.PrintStack() + // return + //} + // + //// 同步ID到定时任务 + //s.SyncNodeIdAndSpiderId(node, *spider) } } @@ -106,6 +147,7 @@ func (s *Scheduler) AddJob(job model.Schedule) error { // 更新EntryID job.EntryId = eid + job.Status = constants.ScheduleStatusRunning if err := job.Save(); err != nil { log.Errorf("job save error: %s", err.Error()) debug.PrintStack() @@ -134,6 +176,36 @@ func ParserCron(spec string) error { return nil } +// 停止定时任务 +func (s *Scheduler) Stop(id bson.ObjectId) error { + schedule, err := model.GetSchedule(id) + if err != nil { + return err + } + if schedule.EntryId == 0 { + return errors.New("entry id not found") + } + s.cron.Remove(schedule.EntryId) + // 更新状态 + schedule.Status = constants.ScheduleStatusStop + if err = schedule.Save(); err != nil { + return err + } + return nil +} + +// 运行任务 +func (s *Scheduler) Run(id bson.ObjectId) error { + schedule, err := model.GetSchedule(id) + if err != nil { + return err + } + if err := s.AddJob(schedule); err != nil { + return err + } + return nil +} + func (s *Scheduler) Update() error { // 删除所有定时任务 s.RemoveAll() @@ -151,6 +223,10 @@ func (s *Scheduler) Update() error { // 单个任务 job := sList[i] + if job.Status == constants.ScheduleStatusStop { + continue + } + // 添加到定时任务 if err := s.AddJob(job); err != nil { log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), job.Name, job.Cron) diff --git a/backend/services/task.go b/backend/services/task.go index 859a24f05..9e6fdbc8d 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -418,15 +418,23 @@ func ExecuteTask(id int) { t.Status = constants.StatusRunning // 任务状态 t.WaitDuration = t.StartTs.Sub(t.CreateTs).Seconds() // 等待时长 + // 判断爬虫文件是否存在 + gfFile := model.GetGridFs(spider.FileId) + if gfFile == nil { + t.Error = "找不到爬虫文件,请重新上传" + t.Status = constants.StatusError + t.FinishTs = time.Now() // 结束时间 + t.RuntimeDuration = t.FinishTs.Sub(t.StartTs).Seconds() // 运行时长 + t.TotalDuration = t.FinishTs.Sub(t.CreateTs).Seconds() // 总时长 + _ = t.Save() + return + } + // 开始执行任务 log.Infof(GetWorkerPrefix(id) + "开始执行任务(ID:" + t.Id + ")") // 储存任务 - if err := t.Save(); err != nil { - log.Errorf(err.Error()) - HandleTaskError(t, err) - return - } + _ = t.Save() // 起一个cron执行器来统计任务结果数 if spider.Col != "" { diff --git a/backend/template/spiderfile/Spiderfile.163_news b/backend/template/spiderfile/Spiderfile.163_news index 29d58279a..c2a73be7c 100644 --- a/backend/template/spiderfile/Spiderfile.163_news +++ b/backend/template/spiderfile/Spiderfile.163_news @@ -4,17 +4,17 @@ start_url: "http://news.163.com/special/0001386F/rank_news.html" start_stage: "list" engine: "scrapy" stages: - list: - is_list: true - list_css: "table tr:not(:first-child)" - fields: - - name: "title" - css: "td:nth-child(1) > a" - - name: "url" - css: "td:nth-child(1) > a" - attr: "href" - - name: "clicks" - css: "td.cBlue" +- name: list + is_list: true + list_css: "table tr:not(:first-child)" + fields: + - name: "title" + css: "td:nth-child(1) > a" + - name: "url" + css: "td:nth-child(1) > a" + attr: "href" + - name: "clicks" + css: "td.cBlue" settings: ROBOTSTXT_OBEY: false USER_AGENT: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36 diff --git a/backend/template/spiderfile/Spiderfile.baidu b/backend/template/spiderfile/Spiderfile.baidu index fbf720e4b..5643c9801 100644 --- a/backend/template/spiderfile/Spiderfile.baidu +++ b/backend/template/spiderfile/Spiderfile.baidu @@ -4,19 +4,19 @@ start_url: http://www.baidu.com/s?wd=crawlab start_stage: list engine: scrapy stages: - list: - is_list: true - list_xpath: //*[contains(@class, "c-container")] - page_xpath: //*[@id="page"]//a[@class="n"][last()] - page_attr: href - fields: - - name: title - xpath: .//h3/a - - name: url - xpath: .//h3/a - attr: href - - name: abstract - xpath: .//*[@class="c-abstract"] +- name: list + is_list: true + list_xpath: //*[contains(@class, "c-container")] + page_xpath: //*[@id="page"]//a[@class="n"][last()] + page_attr: href + fields: + - name: title + xpath: .//h3/a + - name: url + xpath: .//h3/a + attr: href + - name: abstract + xpath: .//*[@class="c-abstract"] settings: ROBOTSTXT_OBEY: false USER_AGENT: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36 diff --git a/backend/template/spiderfile/Spiderfile.toscrapy_books b/backend/template/spiderfile/Spiderfile.toscrapy_books index 4bf18f617..247b4f40a 100644 --- a/backend/template/spiderfile/Spiderfile.toscrapy_books +++ b/backend/template/spiderfile/Spiderfile.toscrapy_books @@ -4,25 +4,25 @@ start_url: "http://books.toscrape.com" start_stage: "list" engine: "scrapy" stages: - list: - is_list: true - list_css: "section article.product_pod" - page_css: "ul.pager li.next a" - page_attr: "href" - fields: - - name: "title" - css: "h3 > a" - - name: "url" - css: "h3 > a" - attr: "href" - next_stage: "detail" - - name: "price" - css: ".product_price > .price_color" - detail: - is_list: false - fields: - - name: "description" - css: "#product_description + p" +- name: list + is_list: true + list_css: "section article.product_pod" + page_css: "ul.pager li.next a" + page_attr: "href" + fields: + - name: "title" + css: "h3 > a" + - name: "url" + css: "h3 > a" + attr: "href" + next_stage: "detail" + - name: "price" + css: ".product_price > .price_color" +- name: detail + is_list: false + fields: + - name: "description" + css: "#product_description + p" settings: ROBOTSTXT_OBEY: true AUTOTHROTTLE_ENABLED: true diff --git a/frontend/package.json b/frontend/package.json index 724b5e365..c01ad67e9 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "crawlab", - "version": "0.3.5", + "version": "0.4.1", "private": true, "scripts": { "serve": "vue-cli-service serve --ip=0.0.0.0 --mode=development", diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 06a41cce6..2a91e61a2 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -30,10 +30,10 @@ export default { document.querySelector('.el-message__closeBtn').click() if (value === 1) { this.$st.sendPv('/allow_stats') - this.$st.sendEv('全局', '允许/禁止统计', 'value', 'allow') + this.$st.sendEv('全局', '允许/禁止统计', '允许') } else { this.$st.sendPv('/disallow_stats') - this.$st.sendEv('全局', '允许/禁止统计', 'value', 'disallow') + this.$st.sendEv('全局', '允许/禁止统计', '禁止') } } diff --git a/frontend/src/api/request.js b/frontend/src/api/request.js index 22707159c..5f91f2acc 100644 --- a/frontend/src/api/request.js +++ b/frontend/src/api/request.js @@ -1,53 +1,39 @@ import axios from 'axios' import router from '../router' +import { Message } from 'element-ui' let baseUrl = process.env.VUE_APP_BASE_URL ? process.env.VUE_APP_BASE_URL : 'http://localhost:8000' -const request = async (method, path, params, data, others = {}) => { - try { - const url = baseUrl + path - const headers = { - 'Authorization': window.localStorage.getItem('token') +const request = (method, path, params, data, others = {}) => { + const url = baseUrl + path + const headers = { + 'Authorization': window.localStorage.getItem('token') + } + return axios({ + method, + url, + params, + data, + headers, + ...others + }).then((response) => { + if (response.status === 200) { + return Promise.resolve(response) + } + return Promise.reject(response) + }).catch((e) => { + let response = e.response + if (response.status === 400) { + Message.error(response.data.error) } - const response = await axios({ - method, - url, - params, - data, - headers, - ...others - }) - // console.log(response) - return response - } catch (e) { - if (e.response.status === 401 && router.currentRoute.path !== '/login') { + if (response.status === 401 && router.currentRoute.path !== '/login') { + console.log('login') router.push('/login') } - await Promise.reject(e) - } - - // return new Promise((resolve, reject) => { - // const url = baseUrl + path - // const headers = { - // 'Authorization': window.localStorage.getItem('token') - // } - // axios({ - // method, - // url, - // params, - // data, - // headers, - // ...others - // }) - // .then(resolve) - // .catch(error => { - // console.log(error) - // if (error.response.status === 401) { - // router.push('/login') - // } - // reject(error) - // }) - // }) + if (response.status === 500) { + Message.error(response.data.error) + } + }) } const get = (path, params) => { @@ -63,7 +49,7 @@ const put = (path, data) => { } const del = (path, data) => { - return request('DELETE', path) + return request('DELETE', path, {}, data) } export default { diff --git a/frontend/src/components/Common/CrawlConfirmDialog.vue b/frontend/src/components/Common/CrawlConfirmDialog.vue index f2ad70c22..42fc2b6f9 100644 --- a/frontend/src/components/Common/CrawlConfirmDialog.vue +++ b/frontend/src/components/Common/CrawlConfirmDialog.vue @@ -80,7 +80,7 @@ export default { this.$message.success(this.$t('A task has been scheduled successfully')) }) this.$emit('close') - this.$st.sendEv('爬虫', '运行') + this.$st.sendEv('爬虫确认', '确认运行', this.form.runType) }) } }, diff --git a/frontend/src/components/Config/ConfigList.vue b/frontend/src/components/Config/ConfigList.vue index 5c7a9dc21..9b274b3b6 100644 --- a/frontend/src/components/Config/ConfigList.vue +++ b/frontend/src/components/Config/ConfigList.vue @@ -68,9 +68,10 @@ v-model="spiderForm.config.start_stage" :placeholder="$t('Start Stage')" :class="startStageClass" + @change="$st.sendEv('爬虫详情', '配置', '改变起始阶段')" >