代码捉急,先看例子:

<http://www.jianshu.com/p/d85a4329d0c2>
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import random
import time


class Producter(threading.Thread):
    """生产者线程"""
    def __init__(self, t_name, queue):
        self.queue = queue
        threading.Thread.__init__(self, name=t_name)

    def run(self):
        for i in range(10):
            randomnum = random.randint(1, 99)
            self.queue.put(randomnum)
            print 'put num in Queue %s' %  randomnum
            time.sleep(1)

        print 'put queue done'


class ConsumeEven(threading.Thread):
    """奇数消费线程"""
    def __init__(self, t_name, queue):
        self.queue = queue
        threading.Thread.__init__(self, name=t_name)

    def run(self):
        while True:
            try:
                queue_val = self.queue.get(1, 3)
            except Exception, e:
                print e
                break;

            if queue_val % 2 == 0:
                print 'Get Even Num %s ' % queue_val
            else:
                self.queue.put(queue_val)


q = Queue.Queue()
pt = Producter('producter', q)
ce = ConsumeEven('consumeeven', q)
ce.start()
pt.start()
pt.join()
ce.join()

照着写:

#!/usr/bin/env python
# coding: utf-8

from elasticsearch import Elasticsearch
import requests
from Queue import Queue
import time
import threading
import datetime

es = Elasticsearch('xxxxxx:9200', http_auth=('user', 'password'))


class Producter(threading.Thread):
	def __init__(self, queue):
		threading.Thread.__init__(self)
		self.queue = queue
		self.initime = datetime.datetime.now().strftime('%Y.%m.%d')


	def initsearch(self):
		dsl_query = {
		  "query": {
		    "match": {
		      "method": {
		        "query": "GET",
		        "type": "phrase"
		      }
		    }
		  },
			"size": 100,
		"sort": [{"@timestamp": {"order": "desc"}}]
		}
		res = es.search(index="packetbeat-" + self.initime, body=dsl_query)
		latest_time = res['hits']['hits'][0]['_source']['@timestamp']
		return latest_time


	def run(self):
		latest_time = self.initsearch()
		while 1:
			lastindex = latest_time.split('T')[0].replace('-', '.')  # 获取最新的index
			dsl_query2 = {
				"query": {
					"bool": {
						"must": {
							"match": {
								"method": "GET"
							}
						},
						"filter": {
							"range": {
								"@timestamp": {
									"gte": latest_time
								}
							}
						}
					}
				},
				"sort": [{"@timestamp": {"order": "desc"}}],
				"size": 1000
			}
			time.sleep(10)
			res2 = es.search(index="packetbeat-" + lastindex, body=dsl_query2)
			for hit in res2['hits']['hits']:
				# print hit['_source']['@timestamp'], hit['_id'], hit['_source']['path']
				self.queue.put([hit['_source']['path'], hit['_source']['http']['request']['params'], hit['_source']['method']])
				print "Put %s" % hit['_id']
				print hit['_source']['@timestamp']
			latest_time = res2['hits']['hits'][0]['_source']['@timestamp']


class Consumer(threading.Thread):
	def __init__(self, queue):
		threading.Thread.__init__(self)
		self.queue = queue

	#
	# def http_curl(self):
	# 	# http_client = AsyncHTTPClient()
	# 	path = self.request[0]
	# 	param = self.request[1]
	# 	method = self.request[2]
	# 	if method == "GET":
	# 		##判断get的param是否是空
	# 		if not param:
	# 			pass
	# 		else:
	# 			rep = requests.get("http://xxxxxx:9999" + path + '?' + param + "union select")
	# 			print "Curl %s" % self.request
	# 			print rep.status_code
	# 	else:
	# 		#留着写POST请求判断
	# 		pass


	def run(self):
		while 1:
			try:
				request = self.queue.get()
				path = request[0]
				param = request[1]
				method = request[2]
				if method == 'GET':
					if not param:
						pass
					else:
						rep = requests.get("http://xxxxx:9999" + path + '?' + param + "union select")
				#else  写POST
				print "Get %s" % request
				print rep.status_code
			except Exception as e:
				raise e

q = Queue()
pt = Producter(q)
ce = Consumer(q)
ce.start()
pt.start()
pt.join()
ce.join()

代码应该还有点问题,先记录下大概的流程:

使用: packetbeat在A服务器抓包,格式化之后把数据发送到B服务器,存储在elk里面,然后B服务器画图对这些请求进行分析,比如某个接口报警之类的。

这个时候在B服务器设置一个naxsi防火墙代理,然后把es里面的输出取出来,再发送一遍给B。经过测试,虽然这样子请求大部分都是404,但是如果请求中存在恶意payload,防火墙会记录日志。(所以这里的规则要设置的特别严格,严格到每个请求都不放过)

上面的脚本就是在B防火墙的转发脚本demo,测试为GET请求,因为POST请求的body没有存储到es里面,脚本的大概思路是这样的:

因为es里面存储的数据包是这样的格式: packet-[year]-[days]
所以先得到今天的日期,随便选100条,记录最新的时间戳,此时初始化完成。

下面的请求都是基于这个时间戳来的,每隔10s,在这个时间戳的基础上,轮询请求一次es,然后组装起来发送到B服务器。
记录下每次请求的最新日期,然后请求这个index,因为packet的格式: packet-[year]-[days],所以记录下每次请求的最新时间,格式化抓取最新的时间:

latest_time = res2['hits']['hits'][0]['_source']['@timestamp']

当es的存储数据按照大于某个时间点去筛选的时候,只会出现匹配的时间条数,所以可以把请求的size设置得大一点

暂时的问题:

  • requests是同步请求库,追求效率可以使用异步请求
  • 防火墙的正则匹配如果要得到准确的请求,需要进一步修改
2017-12-11
Contents

⬆︎TOP