数据专栏

智能大数据搬运工,你想要的我们都有

数据资讯

数据学院

数据百科

matplotlib是常用的绘图库,支持python及在Jupyter Notebook下使用,也支持最新的JupyterLab环境。这里介绍matplotlib的字体设置方法以及绘图线型、符号和颜色的设置。 1、中文字体 关于中文乱码问题, https://www.linuxidc.com/Linux/2019-03/157632.htm Linux下查找中文字体,使用命令: fc-list :lang=zh 绘图函数参考下面: def draw(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc',size=24) fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['日期'], rotation=15, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['日期'],dfx['新增确诊'],color='red',linewidth=3,label='新增确诊') p1.bar(dfx['日期'],dfx['新增死亡'],color='black',label='新增死亡') plt.title(u'全国新增病例数量(NCP)-2020年01-02月',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show() 需要字体设置的主要有标题、图例和轴上的标签。先创建一个myfont对象,然后在title、legend、set_xticklabel放进去即可,注意legend的参数是prop。 2、图形绘制 这里给出个简单的使用matplotlib绘图例子,使用了颜色、线宽、线型符号等风格样式。 import matplotlib.pyplot as plt from matplotlib.font_manager import * x= range(100) y= [i**2 for i in x] plt.subplots(1, 1) plt.plot(x, y, linewidth = '1', label = 'Example', color='coral', linestyle=':', marker='|') plt.legend(loc='upper left') plt.show() 3、线型表 linestyle可选参数: '-' solid line style '--' dashed line style '-.' dash-dot line style ':' dotted line style 4、符号表 marker可选参数: '.' point marker ',' pixel marker 'o' circle marker 'v' triangle_down marker '^' triangle_up marker '<' triangle_left marker '>' triangle_right marker '1' tri_down marker '2' tri_up marker '3' tri_left marker '4' tri_right marker 's' square marker 'p' pentagon marker '*' star marker 'h' hexagon1 marker 'H' hexagon2 marker '+' plus marker 'x' x marker 'D' diamond marker 'd' thin_diamond marker '|' vline marker '_' hline marker 5、颜色表 color可用的颜色: cnames = { 'aliceblue': '#F0F8FF', 'antiquewhite': '#FAEBD7', 'aqua': '#00FFFF', 'aquamarine': '#7FFFD4', 'azure': '#F0FFFF', 'beige': '#F5F5DC', 'bisque': '#FFE4C4', 'black': '#000000', 'blanchedalmond': '#FFEBCD', 'blue': '#0000FF', 'blueviolet': '#8A2BE2', 'brown': '#A52A2A', 'burlywood': '#DEB887', 'cadetblue': '#5F9EA0', 'chartreuse': '#7FFF00', 'chocolate': '#D2691E', 'coral': '#FF7F50', 'cornflowerblue': '#6495ED', 'cornsilk': '#FFF8DC', 'crimson': '#DC143C', 'cyan': '#00FFFF', 'darkblue': '#00008B', 'darkcyan': '#008B8B', 'darkgoldenrod': '#B8860B', 'darkgray': '#A9A9A9', 'darkgreen': '#006400', 'darkkhaki': '#BDB76B', 'darkmagenta': '#8B008B', 'darkolivegreen': '#556B2F', 'darkorange': '#FF8C00', 'darkorchid': '#9932CC', 'darkred': '#8B0000', 'darksalmon': '#E9967A', 'darkseagreen': '#8FBC8F', 'darkslateblue': '#483D8B', 'darkslategray': '#2F4F4F', 'darkturquoise': '#00CED1', 'darkviolet': '#9400D3', 'deeppink': '#FF1493', 'deepskyblue': '#00BFFF', 'dimgray': '#696969', 'dodgerblue': '#1E90FF', 'firebrick': '#B22222', 'floralwhite': '#FFFAF0', 'forestgreen': '#228B22', 'fuchsia': '#FF00FF', 'gainsboro': '#DCDCDC', 'ghostwhite': '#F8F8FF', 'gold': '#FFD700', 'goldenrod': '#DAA520', 'gray': '#808080', 'green': '#008000', 'greenyellow': '#ADFF2F', 'honeydew': '#F0FFF0', 'hotpink': '#FF69B4', 'indianred': '#CD5C5C', 'indigo': '#4B0082', 'ivory': '#FFFFF0', 'khaki': '#F0E68C', 'lavender': '#E6E6FA', 'lavenderblush': '#FFF0F5', 'lawngreen': '#7CFC00', 'lemonchiffon': '#FFFACD', 'lightblue': '#ADD8E6', 'lightcoral': '#F08080', 'lightcyan': '#E0FFFF', 'lightgoldenrodyellow': '#FAFAD2', 'lightgreen': '#90EE90', 'lightgray': '#D3D3D3', 'lightpink': '#FFB6C1', 'lightsalmon': '#FFA07A', 'lightseagreen': '#20B2AA', 'lightskyblue': '#87CEFA', 'lightslategray': '#778899', 'lightsteelblue': '#B0C4DE', 'lightyellow': '#FFFFE0', 'lime': '#00FF00', 'limegreen': '#32CD32', 'linen': '#FAF0E6', 'magenta': '#FF00FF', 'maroon': '#800000', 'mediumaquamarine': '#66CDAA', 'mediumblue': '#0000CD', 'mediumorchid': '#BA55D3', 'mediumpurple': '#9370DB', 'mediumseagreen': '#3CB371', 'mediumslateblue': '#7B68EE', 'mediumspringgreen': '#00FA9A', 'mediumturquoise': '#48D1CC', 'mediumvioletred': '#C71585', 'midnightblue': '#191970', 'mintcream': '#F5FFFA', 'mistyrose': '#FFE4E1', 'moccasin': '#FFE4B5', 'navajowhite': '#FFDEAD', 'navy': '#000080', 'oldlace': '#FDF5E6', 'olive': '#808000', 'olivedrab': '#6B8E23', 'orange': '#FFA500', 'orangered': '#FF4500', 'orchid': '#DA70D6', 'palegoldenrod': '#EEE8AA', 'palegreen': '#98FB98', 'paleturquoise': '#AFEEEE', 'palevioletred': '#DB7093', 'papayawhip': '#FFEFD5', 'peachpuff': '#FFDAB9', 'peru': '#CD853F', 'pink': '#FFC0CB', 'plum': '#DDA0DD', 'powderblue': '#B0E0E6', 'purple': '#800080', 'red': '#FF0000', 'rosybrown': '#BC8F8F', 'royalblue': '#4169E1', 'saddlebrown': '#8B4513', 'salmon': '#FA8072', 'sandybrown': '#FAA460', 'seagreen': '#2E8B57', 'seashell': '#FFF5EE', 'sienna': '#A0522D', 'silver': '#C0C0C0', 'skyblue': '#87CEEB', 'slateblue': '#6A5ACD', 'slategray': '#708090', 'snow': '#FFFAFA', 'springgreen': '#00FF7F', 'steelblue': '#4682B4', 'tan': '#D2B48C', 'teal': '#008080', 'thistle': '#D8BFD8', 'tomato': '#FF6347', 'turquoise': '#40E0D0', 'violet': '#EE82EE', 'wheat': '#F5DEB3', 'white': '#FFFFFF', 'whitesmoke': '#F5F5F5', 'yellow': '#FFFF00', 'yellowgreen': '#9ACD32'} 颜色样本如下: 另外: 如果安装了seaborn扩展的话,在字典seaborn.xkcd_rgb中包含所有的xkcd crowdsourced color names。使用方法如下: plt.plot([1,2], lw=4, c=seaborn.xkcd_rgb['baby poop green']) 所有颜色表如下:
来源:OSCHINA
发布时间:2020-02-10 15:57:00
前言 首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute) : 函数计算 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。函数计算更多信息 参考 。 Fun : Fun 是一个用于支持 Serverless 应用部署的工具,能帮助您便捷地管理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件(template.yml),协助您进行开发、构建、部署操作。Fun 的更多文档 参考 。 备注: 本文介绍的技巧需要 Fun 版本大于等于 3.5.0。 依赖工具 本项目是在 MacOS 下开发的,涉及到的工具是平台无关的,对于 Linux 和 Windows 桌面系统应该也同样适用。在开始本例之前请确保如下工具已经正确的安装,更新到最新版本,并进行正确的配置。 Docker Fun Fun 工具依赖于 docker 来模拟本地环境。 对于 MacOS 用户可以使用 homebrew 进行安装: brew cask install docker brew tap vangie/formula brew install fun Windows 和 Linux 用户安装请参考: https://github.com/aliyun/fun/blob/master/docs/usage/installation.md https://github.com/aliyun/fcli/releases 安装好后,记得先执行 fun config 初始化一下配置。 初始化 使用 fun init 命令可以快捷的将本模板项目初始化到本地。 fun init vangie/puppeteer-example 安装依赖 fun install fun install 会执行 Funfile 文件里的指令,依次执行如下任务: 安装 chrome headless 二进制文件; 安装 puppeteer 依赖的 apt 包; 安装 npm 依赖。 部署 同步大文件到 nas 盘: fun nas sync 部署代码: $ fun deploy using template: template.yml using region: cn-hangzhou using accountId: *********** 3743 using accessKeyId: ***********Ptgk using timeout: 600 Waiting for service puppeteer to be deployed... make sure role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is exist role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is already exist attaching police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer attached police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer using 'VpcConfig: Auto' , Fun will try to generate related vpc resources automatically vpc already generated, vpcId is : vpc-bp1wv9al02opqahkizmvr vswitch already generated, vswitchId is : vsw-bp1kablus0jrcdeth8v35 security group already generated, security group is : sg-bp1h2swzeb5vgjfu6gpo generated auto VpcConfig done: { "vpcId" : "vpc-bp1wv9al02opqahkizmvr" , "vswitchIds" :[ "vsw-bp1kablus0jrcdeth8v35" ], "securityGroupId" : "sg-bp1h2swzeb5vgjfu6gpo" } using 'NasConfig: Auto' , Fun will try to generate related nas file system automatically nas file system already generated, fileSystemId is : 0825 a4a395 nas file system mount target is already created, mountTargetDomain is : 0825 a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com generated auto NasConfig done: { "UserId" : 10003 , "GroupId" : 10003 , "MountPoints" :[{ "ServerAddr" : "0825a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com:/puppeteer" , "MountDir" : "/mnt/auto" }]} Checking if nas directories /puppeteer exists, if not, it will be created automatically Checking nas directories done [ "/puppeteer" ] Waiting for function html2png to be deployed... Waiting for packaging function html2png code... The function html2png has been packaged. A total of 7 files files were compressed and the final size was 2.56 KB Waiting for HTTP trigger httpTrigger to be deployed... triggerName: httpTrigger methods: [ 'GET' ] url: https: //xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ Http Trigger will forcefully add a 'Content-Disposition: attachment' field to the response header, which cannot be overwritten and will cause the response to be downloaded as an attachment in the browser. This issue can be avoided by using CustomDomain. trigger httpTrigger deploy success function html2png deploy success service puppeteer deploy success ===================================== Tips for nas resources ================================================== Fun has detected the .nas.yml file in your working directory, which contains the local directory: /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/.fun/root /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/node_modules The above directories will be automatically ignored when 'fun deploy' . Any content of the above directories changes,you need to use 'fun nas sync' to sync local resources to remote. =============================================================================================================== 验证 curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ > screenshot.png 如果不传递查询参数,默认会截取阿里云的首页。 如果想换一个网址,可以使用如下命令格式: curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/?url=http://www.alibaba.com > screenshot.png 调试 如果需要在本地调试代码,可以使用如下命令: $ fun local start using template : template.yml HttpTrigger httpTrigger of puppeteer/html2png was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/puppeteer/html2png methods: [ 'GET' ] authType: ANONYMOUS function compute app listening on port 8000 ! 浏览器打开 http://localhost:8000/2016-08-15/proxy/puppeteer/html2png 即可。 查看更多:https://yq.aliyun.com/articles/743644?utm_content=g_1000103099 上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
来源:OSCHINA
发布时间:2020-02-10 15:55:00
这是一个包含了 函数计算 每种 Runtime 结合 HTTP Trigger 实现文件上传和文件下载的示例集。每个示例包括: 一个公共 HTML 页面,该页面有一个文件选择框和上传按钮,会列出已经上传的文件,点击某个已上传的文件可以把文件下载下来; 支持文件上传、下载和列举的函数。 我们知道不同语言在处理 HTTP 协议上传下载时都有很多中方法和社区库,特别是结合函数计算的场景,开发人员往往需要耗费不少精力去学习和尝试。本示例集编撰的目的就是节省开发者甄别的精力和时间,为每种语言提供一种有效且符合社区最佳实践的方法,可以拿来即用。 当前已支持的 Runtime 包括: nodejs python php java 计划支持的 Runtime 包括: dotnetcore 不打算支持的 Runtime 包括: custom 使用限制 由于函数计算对于 HTTP 的 Request 和 Response 的 Body 大小限制均为 6M,所以该示例集只适用于借助函数计算上传和下载文件小于 6M 的场景。对于大于 6M 的情况,可以考虑如下方法: 分片上传 ,把文件切分成小块,上传以后再拼接起来; 借助于 OSS ,将文件先上传 OSS,函数从 OSS 上下载文件,处理完以后回传 OSS; 借助于 NAS ,将大文件放在 NAS 网盘上,函数可以像读写普通文件系统一样访问 NAS 网盘的文件。 快速开始 安装依赖 在开始之前请确保开发环境已经安装了如下工具: docker funcraft git make 构建并启动函数 克隆代码: git clone https://github.com/vangie/ fc -file-transfer 本地启动函数: $ make start ... HttpTrigger httpTrigger of file -transfer/nodejs was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/nodejs methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/python was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/python methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/ java was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/ java methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/php was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/php methods: [ 'GET' , 'POST' ] authType: ANONYMOUS function compute app listening on port 8000 ! make start 命令会调用 Makefile 文件中的指令,通过 fun local 在本地的 8000 端口开放 HTTP 服务,控制台会打印出每个 HTTP Trigger 的 URL 、支持的 HTTP 方法,以及认证方式。 效果演示 上面四个 URL 地址随便选一个在浏览器中打开示例页面。 接口说明 所有示例都实现了下述四个 HTTP 接口: GET / 返回文件上传 Form 的 HTML 页面 GET /list 以 JSON 数组形式返回文件列表 POST /upload 以 multipart/form-data 格式上传文件 fileContent 作为文件字段 fileName 作为文件名字段 GET /download?filename=xxx 以 application/octet-stream 格式返回文件内容。 此外为了能正确的计算相对路径,在访问根路径时如果不是以 / 结尾,都会触发一个 301 跳转,在 URL 末尾加上一个 / 。 不同语言的示例代码 nodejs python php java 已知问题 文件大小 限制 fun local 实现存在已知问题,上传过大的文件会自动退出,未来的版本会修复。 部署到线上需要绑定 自定义域名 才能使用,否则 HTML 文件在浏览器中会被 强制下载 而不是直接渲染。 查看更多:https://yq.aliyun.com/articles/743642?utm_content=g_1000103098 上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
来源:OSCHINA
发布时间:2020-02-10 15:42:00
导读: 容器存储是 Kubernetes 系统中提供数据持久化的基础组件,是实现有状态服务的重要保证。Kubernetes 默认提供了主流的存储卷接入方案(In-Tree),同时也提供了插件机制(Out-Of-Tree),允许其他类型的存储服务接入 Kubernetes 系统服务。本文将从 Kubernetes 存储架构、存储插件原理、实现等方面进行讲解,希望大家有所收获。 一、Kubernetes 存储体系架构 引例: 在 Kubernetes 中挂载一个 Volume 首先以一个 Volume 的挂载例子来作为引入。 如下图所示,左边的 YAML 模板定义了一个 StatefulSet 的一个应用,其中定义了一个名为 disk-pvc 的 volume,挂载到 Pod 内部的目录是 /data。disk-pvc 是一个 PVC 类型的数据卷,其中定义了一个 storageClassName。 因此这个模板是一个典型的动态存储的模板。右图是数据卷挂载的过程,主要分为 6 步: 第一步 :用户创建一个包含 PVC的 Pod; 第二步 :PV Controller 会不断观察 ApiServer,如果它发现一个 PVC 已经创建完毕但仍然是未绑定的状态,它就会试图把一个 PV 和 PVC 绑定; PV Controller 首先会在集群内部找到一个适合的 PV 进行绑定,如果未找到相应的 PV,就调用 Volume Plugin 去做 Provision。Provision 就是从远端上一个具体的存储介质创建一个 Volume,并且在集群中创建一个 PV 对象,然后将此 PV 和 PVC 进行绑定; 第三步 :通过 Scheduler 完成一个调度功能; 我们知道,当一个 Pod 运行的时候,需要选择一个 Node,这个节点的选择就是由 Scheduler 来完成的。Scheduler 进行调度的时候会有多个参考量,比如 Pod 内部所定义的 nodeSelector、nodeAffinity 这些定义以及 Volume 中所定义的一些标签等。 我们可以在数据卷中添加一些标签,这样使用这个 pv 的 Pod 就会由于标签的限制,被调度器调度到期望的节点上。 第四步 :如果有一个 Pod 调度到某个节点之后,它所定义的 PV 还没有被挂载(Attach),此时 AD Controller 就会调用 VolumePlugin,把远端的 Volume 挂载到目标节点中的设备上(如:/dev/vdb); 第五步: 当 Volum Manager 发现一个 Pod 调度到自己的节点上并且 Volume 已经完成了挂载,它就会执行 mount 操作,将本地设备(也就是刚才得到的 /dev/vdb)挂载到 Pod 在节点上的一个子目录中。同时它也可能会做一些像格式化、是否挂载到 GlobalPath 等这样的附加操作。 第六步 :绑定操作,就是将已经挂载到本地的 Volume 映射到容器中。 Kubernetes 的存储架构 接下来,我们一起看一下 Kubernetes 的存储架构。 PV Controller : 负责 PV/PVC 的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作; AD Controller :负责存储设备的 Attach/Detach 操作,将设备挂载到目标节点; Volume Manager :管理卷的 Mount/Unmount 操作、卷设备的格式化以及挂载到一些公用目录上的操作; Volume Plugins :它主要是对上面所有挂载功能的实现; PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。 Scheduler :实现对 Pod 的调度能力,会根据一些存储相关的的定义去做一些存储相关的调度; 接下来,我们分别介绍上面这几部分的功能。 PV Controller 首先我们先来回顾一下几个基本概念: Persistent Volume (PV) : 持久化存储卷,详细定义了预挂载存储空间的各项参数; 例如,我们去挂载一个远端的 NAS 的时候,这个 NAS 的具体参数就要定义在 PV 中。一个 PV 是没有 NameSpace 限制的,它一般由 Admin 来创建与维护; Persistent Volume Claim (PVC) :持久化存储声明; 它是用户所使用的存储接口,对存储细节无感知,主要是定义一些基本存储的 Size、AccessMode 参数在里面,并且它是属于某个 NameSpace 内部的。 StorageClass :存储类; 一个动态存储卷会按照 StorageClass 所定义的模板来创建一个 PV,其中定义了创建模板所需要的一些参数和创建 PV 的一个 Provisioner(就是由谁去创建的)。 PV Controller 的主要任务就是完成 PV、PVC 的生命周期管理,比如创建、删除 PV 对象,负责 PV、PVC 的状态迁移;另一个任务就是绑定 PVC 与 PV 对象,一个 PVC 必须和一个 PV 绑定后才能被应用使用,它们是一一绑定的,一个 PV 只能被一个 PVC 绑定,反之亦然。 接下来,我们看一下一个 PV 的状态迁移图。 创建好一个 PV 以后,我们就处于一个 Available 的状态,当一个 PVC 和一个 PV 绑定的时候,这个 PV 就进入了 Bound 的状态,此时如果我们把 PVC 删掉,Bound 状态的 PV 就会进入 Released 的状态。 一个 Released 状态的 PV 会根据自己定义的 ReclaimPolicy 字段来决定自己是进入一个 Available 的状态还是进入一个 Deleted 的状态。如果 ReclaimPolicy 定义的是 "recycle" 类型,它会进入一个 Available 状态,如果转变失败,就会进入 Failed 的状态。 相对而言,PVC 的状态迁移图就比较简单。 一个创建好的 PVC 会处于 Pending 状态,当一个 PVC 与 PV 绑定之后,PVC 就会进入 Bound 的状态,当一个 Bound 状态的 PVC 的 PV 被删掉之后,该 PVC 就会进入一个 Lost 的状态。对于一个 Lost 状态的 PVC,它的 PV 如果又被重新创建,并且重新与该 PVC 绑定之后,该 PVC 就会重新回到 Bound 状态。 下图是一个 PVC 去绑定 PV 时对 PV 筛选的一个流程图。就是说一个 PVC 去绑定一个 PV 的时候,应该选择一个什么样的 PV 进行绑定。 首先 它会检查 VolumeMode 这个标签,PV 与 PVC 的 VolumeMode 标签必须相匹配。VolumeMode 主要定义的是我们这个数据卷是文件系统 (FileSystem) 类型还是一个块 (Block) 类型; 第二个部分 是 LabelSelector。当 PVC 中定义了 LabelSelector 之后,我们就会选择那些有 Label 并且与 PVC 的 LabelSelector 相匹配的 PV 进行绑定; 第三个部分 是 StorageClassName 的检查。如果 PVC 中定义了一个 StorageClassName,则必须有此相同类名的 PV 才可以被筛选中。 这里再具体解释一下 StorageClassName 这个标签,该标签的目的就是说,当一个 PVC 找不到相应的 PV 时,我们就会用该标签所指定的 StorageClass 去做一个动态创建 PV 的操作,同时它也是一个绑定条件,当存在一个满足该条件的 PV 时,就会直接使用现有的 PV,而不再去动态创建。 第四个部分 是 AccessMode 检查。 AccessMode 就是平时我们在 PVC 中定义的如 "ReadWriteOnce"、"RearWriteMany" 这样的标签。该绑定条件就是要求 PVC 和 PV 必须有匹配的 AccessMode,即 PVC 所需求的 AccessMode 类型,PV 必须具有。 最后 一个部分是 Size 的检查。 一个 PVC 的 Size 必须小于等于 PV 的 Size,这是因为 PVC 是一个声明的 Volume,实际的 Volume 必须要大于等于声明的 Volume,才能进行绑定。 接下来,我们看一个 PV Controller 的一个实现。 PV Controller 中主要有两个实现逻辑:一个是 ClaimWorker;一个是 VolumeWorker。 ClaimWorker 实现的是 PVC 的状态迁移。 通过系统标签 "pv.kubernetes.io/bind-completed" 来标识一个 PVC 的状态。 如果该标签为 True,说明我们的 PVC 已经绑定完成,此时我们只需要去同步一些内部的状态; 如果该标签为 False,就说明我们的 PVC 处于未绑定状态。 这个时候就需要检查整个集群中的 PV 去进行筛选。通过 findBestMatch 就可以去筛选所有的 PV,也就是按照之前提到的五个绑定条件来进行筛选。如果筛选到 PV,就执行一个 Bound 操作,否则就去做一个 Provision 的操作,自己去创建一个 PV。 再看 VolumeWorker 的操作。它实现的则是 PV 的状态迁移。 通过 PV 中的 ClaimRef 标签来进行判断,如果该标签为空,就说明该 PV 是一个 Available 的状态,此时只需要做一个同步就可以了;如果该标签非空,这个值是 PVC 的一个值,我们就会去集群中查找对应的 PVC。如果存在该 PVC,就说明该 PV 处于一个 Bound 的状态,此时会做一些相应的状态同步;如果找不到该 PVC,就说明该 PV 处于一个绑定过的状态,相应的 PVC 已经被删掉了,这时 PV 就处于一个 Released 的状态。此时再根据 ReclaimPolicy 是否是 Delete 来决定是删掉还是只做一些状态的同步。 以上就是 PV Controller 的简要实现逻辑。 AD Controller AD Controller 是 Attach/Detach Controller 的一个简称。 它有两个核心对象,即 DesiredStateofWorld 和 ActualStateOfWorld。 DesiredStateofWorld 是集群中预期要达到的数据卷的挂载状态; ActualStateOfWorld 则是集群内部实际存在的数据卷挂载状态。 它有两个核心逻辑,desiredStateOfWorldPopulator 和 Reconcile。 desiredStateOfWorldPopulator 主要是用来同步集群的一些数据以及 DSW、ASW 数据的更新,它会把集群里面,比如说我们创建一个新的 PVC、创建一个新的 Pod 的时候,我们会把这些数据的状态同步到 DSW 中; Reconcile 则会根据 DSW 和 ASW 对象的状态做状态同步。它会把 ASW 状态变成 DSW 状态,在这个状态的转变过程中,它会去执行 Attach、Detach 等操作。 下面这个表分别给出了 desiredStateOfWorld 以及 actualStateOfWorld 对象的一个具体例子。 desiredStateOfWorld 会对每一个 Worker 进行定义,包括 Worker 所包含的 Volume 以及一些试图挂载的信息; actualStateOfWorl 会把所有的 Volume 进行一次定义,包括每一个 Volume 期望挂载到哪个节点上、挂载的状态是什么样子的等等。 下图是 AD Controller 实现的逻辑框图。 从中我们可以看到,AD Controller 中有很多 Informer,Informer 会把集群中的 Pod 状态、PV 状态、Node 状态、PVC 状态同步到本地。 在初始化的时候会调用 populateDesireStateofWorld 以及 populateActualStateofWorld 将 desireStateofWorld、actualStateofWorld 两个对象进行初始化。 在执行的时候,通过 desiredStateOfWorldPopulator 进行数据同步,即把集群中的数据状态同步到 desireStateofWorld 中。reconciler 则通过轮询的方式把 actualStateofWorld 和 desireStateofWorld 这两个对象进行数据同步,在同步的时候,会通过调用 Volume Plugin 进行 attach 和 detach 操作,同时它也会调用 nodeStatusUpdater 对 Node 的状态进行更新。 以上就是 AD Controller 的简要实现逻辑。 Volume Manager Volume Manager 实际上是 Kubelet 中一部分,是 Kubelet 中众多 Manager 的一个。它主要是用来做本节点 Volume 的 Attach/Detach/Mount/Unmount 操作。 它和 AD Controller 一样包含有 desireStateofWorld 以及 actualStateofWorld,同时还有一个 volumePluginManager 对象,主要进行节点上插件的管理。在核心逻辑上和 AD Controller 也类似,通过 desiredStateOfWorldPopulator 进行数据的同步以及通过 Reconciler 进行接口的调用。 这里我们需要讲一下 Attach/Detach 这两个操作: 之前我们提到 AD Controller 也会做 Attach/Detach 操作,所以到底是由谁来做呢?我们可以通过 "--enable-controller-attach-detach" 标签进行定义,如果它为 True,则由 AD Controller 来控制;若为 False,就由 Volume Manager 来做。 它是 Kubelet 的一个标签,只能定义某个节点的行为,所以如果假设一个有 10 个节点的集群,它有 5 个节点定义该标签为 False,说明这 5 个节点是由节点上的 Kubelet 来做挂载,而其它 5 个节点是由 AD Controller 来做挂载。 下图是 Volume Manager 实现逻辑图。 我们可以看到,最外层是一个循环,内部则是根据不同的对象,包括 desireStateofWorld, actualStateofWorld 的不同对象做一个轮询。 例如,对 actualStateofWorld 中的 MountedVolumes 对象做轮询,对其中的某一个 Volume,如果它同时存在于 desireStateofWorld,这就说明实际的和期望的 Volume 均是处于挂载状态,因此我们不会做任何处理。如果它不存在于 desireStateofWorld,说明期望状态中该 Volume 应该处于 Umounted 状态,就执行 UnmountVolume,将其状态转变为 desireStateofWorld 中相同的状态。 所以我们可以看到:实际上,该过程就是根据 desireStateofWorld 和 actualStateofWorld 的对比,再调用底层的接口来执行相应的操作,下面的 desireStateofWorld.UnmountVolumes 和 actualStateofWorld.AttachedVolumes 的操作也是同样的道理。 Volume Plugins 我们之前提到的 PV Controller、AD Controller 以及 Volume Manager 其实都是通过调用 Volume Plugin 提供的接口,比如 Provision、Delete、Attach、Detach 等去做一些 PV、PVC 的管理。而这些接口的具体实现逻辑是放在 VolumePlugin 中的 根据源码的位置可将 Volume Plugins 分为 In-Tree 和 Out-of-Tree 两类: In-Tree 表示源码是放在 Kubernetes 内部的,和 Kubernetes 一起发布、管理与迭代,缺点及时迭代速度慢、灵活性差; Out-of-Tree 类的 Volume Plugins 的代码独立于 Kubernetes,它是由存储商提供实现的,目前主要有 Flexvolume 和 CSI 两种实现机制,可以根据存储类型实现不同的存储插件。所以我们比较推崇 Out-of-Tree 这种实现逻辑。 从位置上我们可以看到,Volume Plugins 实际上就是 PV Controller、AD Controller 以及 Volume Manager 所调用的一个库,分为 In-Tree 和 Out-of-Tree 两类 Plugins。它通过这些实现来调用远端的存储,比如说挂载一个 NAS 的操作 "mount -t nfs * ",该命令其实就是在 Volume Plugins 中实现的,它会去调用远程的一个存储挂载到本地。 从类型上来看,Volume Plugins 可以分为很多种。In-Tree 中就包含了 几十种常见的存储实现,但一些公司的自己定义私有类型,有自己的 API 和参数,公共存储插件是无法支持的,这时就需要 Out-of-Tree 类的存储实现,比如 CSI、FlexVolume。 Volume Plugins 的具体实现会放到后面去讲。这里主要看一下 Volume Plugins 的插件管理。 Kubernetes会在 PV Controller、AD Controller 以及 Volume Manager 中来做插件管理。通过 VolumePlguinMg 对象进行管理。主要包含 Plugins 和 Prober 两个数据结构。 Plugins 主要是用来保存 Plugins 列表的一个对象,而 Prober 是一个探针,用于发现新的 Plugin,比如 FlexVolume、CSI 是扩展的一种插件,它们是动态创建和生成的,所以一开始我们是无法预知的,因此需要一个探针来发现新的 Plugin。 下图是插件管理的整个过程。 PV Controller、AD Controller 以及 Volume Manager 在启动的时候会执行一个 InitPlugins 方法来对 VolumePluginsMgr 做一些初始化。 它首先会将所有 In-Tree 的 Plugins 加入到我们的插件列表中。同时会调用 Prober 的 init 方法,该方法会首先调用一个 InitWatcher,它会时刻观察着某一个目录 (比如图中的 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/),当这个目录每生成一个新文件的时候,也就是创建了一个新的 Plugins,此时就会生成一个新的 FsNotify.Create 事件,并将其加入到 EventsMap 中;同理,如果删除了一个文件,就生成一个 FsNotify.Remove 事件加入到 EventsMap 中。 当上层调用 refreshProbedPlugins 时,Prober 就会把这些事件进行一个更新,如果是 Create,就将其添加到插件列表;如果是 Remove,就从插件列表中删除一个插件。 以上就是 Volume Plugins 的插件管理机制。 Kubernetes 存储卷调度 我们之前说到 Pod 必须被调度到某个 Worker 上才能去运行。在调度 Pod 时,我们会使用不同的调度器来进行筛选,其中有一些与 Volume 相关的调度器。例如 VolumeZonePredicate、VolumeBindingPredicate、CSIMaxVolumLimitPredicate 等。 VolumeZonePredicate 会检查 PV 中的 Label,比如 failure-domain.beta.kubernetes.io/zone 标签,如果该标签定义了 zone 的信息,VolumeZonePredicate 就会做相应的判断,即必须符合相应的 zone 的节点才能被调度。 比如下图左侧的例子,定义了一个 label 的 zone 为 cn-shenzhen-a。右侧的 PV 则定义了一个 nodeAffinity,其中定义了 PV 所期望的节点的 Label,该 Label 是通过 VolumeBindingPredicate 进行筛选的。 存储卷具体调度信息的实现可以参考《 从零开始入门 K8s | 应用存储和持久化数据卷:存储快照与拓扑调度 》,这里会有一个更加详细的介绍。 二、Flexvolume 介绍及使用 Flexvolume 是 Volume Plugins 的一个扩展,主要实现 Attach/Detach/Mount/Unmount 这些接口。我们知道这些功能本是由 Volume Plugins 实现的,但是对于某些存储类型,我们需要将其扩展到 Volume Plugins 以外,所以我们需要把接口的具体实现放到外面。 在下图中我们可以看到,Volume Plugins 其实包含了一部分 Flexvolume 的实现代码,但这部分代码其实只有一个 “Proxy”的功能。 比如当 AD Controller 调用插件的一个 Attach 时,它首先会调用 Volume Plugins 中 Flexvolume 的 Attach 接口,但这个接口只是把调用转到相应的 Flexvolume 的Out-Of-Tree实现上。 Flexvolume是可被 Kubelet 驱动的可执行文件,每一次调用相当于执行一次 shell 的 ls 这样的脚本,都是可执行文件的命令行调用,因此它不是一个常驻内存的守护进程。 Flexvolume 的 Stdout 作为 Kubelet 调用的返回结果,这个结果需要是 JSON 格式。 Flexvolume默认的存放地址为 "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/alicloud~disk/disk"。 下面是一个命令格式和调用的实例。 Flexvolume 的接口介绍 Flexvolum 包含以下接口: init : 主要做一些初始化的操作,比如部署插件、更新插件的时候做 init 操作,返回的时候会返回刚才我们所说的 DriveCapabilities 类型的数据结构,用来说明我们的 Flexvolume 插件有哪些功能; GetVolumeName : 返回插件名; Attach : 挂载功能的实现。根据 --enable-controller-attach-detach 标签来决定是由 AD Controller 还是 Kubelet 来发起挂载操作; WaitforAttach : Attach 经常是异步操作,因此需要等待挂载完成,才能需要进行下面的操作; MountDevice:它是 mount 的一部分。这里我们将 mount 分为 MountDevice 和 SetUp 两部分,MountDevice 主要做一些简单的预处理工作,比如将设备格式化、挂载到 GlobalMount 目录中等; GetPath :获取每个 Pod 对应的本地挂载目录; Setup :使用 Bind 方式将 GlobalPath 中的设备挂载到 Pod 的本地目录; TearDown 、 UnmountDevice 、 Detach 实现的是上面一些借口的逆过程; ExpandVolumeDevice :扩容存储卷,由 Expand Controller 发起调用; NodeExpand : 扩容文件系统,由 Kubelet 发起调用。 上面这些接口不一定需要全部实现,如果某个接口没有实现的话,可以将返回结果定义成: { "status" : "Not supported" , "message" : "error message" } 告诉调用者没有实现这个接口。此外,Volume Plugins 中的 Flexvolume 接口除了作为一个 Proxy 外,它也提供了一些默认实现,比如 Mount 操作。所以如果你的 Flexvolume 中没有定义该接口,该默认实现就会被调用。 在定义 PV 时可以通过 secretRef 字段来定义一些 secret 的功能。比如挂载时所需的用户名和密码,就可以通过 secretRef 传入。 Flexvolume 的挂载分析 从挂载流程和卸载流程两个方向来分析 Flexvolume 的挂载过程。 我们首先看 Attach 操作,它调用了一个远端的 API 把我们的 Storage 挂载到目标节点中的某个设备上去。然后通过 MountDevice 将本地设备挂载到 GlobalPath 中,同时也会做一些格式化这样的操作。Mount 操作(SetUp),它会把 GlobalPath 挂载 PodPath 中,PodPath 就是 Pod 启动时所映射的一个目录。 下图给出了一个例子,比如我们一个云盘,其 Volume ID 为 d-8vb4fflsonz21h31cmss,在执行完 Attach 和 WaitForAttach 操作之后,就会将其挂载到目标节点上的 /dec/vdc 设备中。执行 MountDevice 之后,就会把上述设备格式化,挂载到一个本地的 GlobalPath 中。而执行完 Mount 之后,就会将 GlobalPath 映射到 Pod 相关的一个子目录中。最后执行 Bind 操作,将我们的本地目录映射到容器中。这样完成一次挂载过程。 卸载流程就是一个逆过程。上述过程描述的是一个块设备的挂载过程,对于文件存储类型,就无需 Attach、MountDevice操作,只需要 Mount 操作,因此文件系统的 Flexvolume 实现较为简单,只需要 Mount 和 Unmount 过程即可。 Flexvolume 的代码示例 其中主要实现的是 init()、doMount()、doUnmount() 方法。在执行该脚本的时候对传入的参数进行判断来决定执行哪一个命令。 在 Github 上还有很多 Flexvolume 的示例,大家可以自行参考查阅。阿里云提供了一个 Flexvolume 的实现 ,有兴趣的可以参考一下。 Flexvolume 的使用 下图给出了一个 Flexvolume 类型的 PV 模板。它和其它模板实际上没有什么区别,只不过类型被定义为 flexVolume 类型。flexVolume 中定义了 driver、fsType、options。 driver 定义的是我们实现的某种驱动,比如图中的是 aliclound/disk,也可以是 aliclound/nas 等; fsType 定义的是文件系统类型,比如 "ext4"; options 包含了一些具体的参数,比如定义云盘的 id 等。 我们也可以像其它类型一样,通过 selector 中的 matchLabels 定义一些筛选条件。同样也可以定义一些相应的调度信息,比如定义 zone 为 cn-shenzhen-a。 下面是一个具体的运行结果。在 Pod 内部我们挂载了一个云盘,其所在本地设备为 /dev/vdb。通过 mount | grep disk 我们可以看到相应的挂载目录,首先它会将 /dev/vdb 挂载到 GlobalPath 中;其次会将 GlobalPath 通过 mount 命令挂载到一个 Pod 所定义的本地子目录中去;最后会把该本地子目录映射到 /data 上。 三、CSI 介绍及使用 和 Flexvolume 类似,CSI 也是为第三方存储提供数据卷实现的抽象接口。 有了 Flexvolume,为何还要 CSI 呢? Flexvolume 只是给 kubernetes 这一个编排系统来使用的,而 CSI 可以满足不同编排系统的需求,比如 Mesos,Swarm。 其次 CSI 是容器化部署,可以减少环境依赖,增强安全性,丰富插件的功能。我们知道,Flexvolume 是在 host 空间一个二进制文件,执行 Flexvolum 时相当于执行了本地的一个 shell 命令,这使得我们在安装 Flexvolume 的时候需要同时安装某些依赖,而这些依赖可能会对客户的应用产生一些影响。因此在安全性上、环境依赖上,就会有一个不好的影响。 同时对于丰富插件功能这一点,我们在 Kubernetes 生态中实现 operator 的时候,经常会通过 RBAC 这种方式去调用 Kubernetes 的一些接口来实现某些功能,而这些功能必须要在容器内部实现,因此像 Flexvolume 这种环境,由于它是 host 空间中的二进制程序,就没法实现这些功能。而 CSI 这种容器化部署的方式,可以通过 RBAC 的方式来实现这些功能。 CSI 主要包含两个部分:CSI Controller Server 与 CSI Node Server。 Controller Server 是控制端的功能,主要实现创建、删除、挂载、卸载等功能; Node Server 主要实现的是节点上的 mount、Unmount 功能。 下图给出了 CSI 接口通信的描述。CSI Controller Server 和 External CSI SideCar 是通过 Unix Socket 来进行通信的,CSI Node Server 和 Kubelet 也是通过 Unix Socket 来通信,之后我们会讲一下 External CSI SiderCar 的具体概念。 下图给出了 CSI 的接口。主要分为三类:通用管控接口、节点管控接口、中心管控接口。 通用管控接口主要返回 CSI 的一些通用信息,像插件的名字、Driver 的身份信息、插件所提供的能力等; 节点管控接口的 NodeStageVolume 和 NodeUnstageVolume 就相当于 Flexvolume 中的 MountDevice 和 UnmountDevice。NodePublishVolume 和 NodeUnpublishVolume 就相当于 SetUp 和 TearDown 接口; 中心管控接口的 CreateVolume 和 DeleteVolume 就是我们的 Provision 和 Delete 存储卷的一个接口,ControllerPublishVolume 和 ControllerUnPublishVolume 则分别是 Attach 和 Detach 的接口。 CSI 的系统结构 CSI 是通过 CRD 的形式实现的,所以 CSI 引入了这么几个对象类型:VolumeAttachment、CSINode、CSIDriver 以及 CSI Controller Server 与 CSI Node Server 的一个实现。 在 CSI Controller Server 中,有传统的类似 Kubernetes 中的 AD Controller 和 Volume Plugins,VolumeAttachment 对象就是由它们所创建的。 此外,还包含多个 External Plugin组件,每个组件和 CSI Plugin 组合的时候会完成某种功能。比如: External Provisioner 和 Controller Server 组合的时候就会完成数据卷的创建与删除功能; External Attacher 和 Controller Server 组合起来可以执行数据卷的挂载和操作; External Resizer 和 Controller Server 组合起来可以执行数据卷的扩容操作; External Snapshotter 和 Controller Server 组合则可以完成快照的创建和删除。 CSI Node Server 中主要包含 Kubelet 组件,包括 VolumeManager 和 VolumePlugin,它们会去调用 CSI Plugin 去做 mount 和 unmount 操作;另外一个组件 Driver Registrar 主要实现的是 CSI Plugin 注册的功能。 以上就是 CSI 的整个拓扑结构,接下来我们将分别介绍不同的对象和组件。 CSI 对象 我们将介绍 3 种对象:VolumeAttachment,CSIDriver,CSINode。 VolumeAttachment 描述一个 Volume 卷在一个 Pod 使用中挂载、卸载的相关信息。例如,对一个卷在某个节点上的挂载,我们通过 VolumeAttachment 对该挂载进行跟踪。AD Controller 创建一个 VolumeAttachment,而 External-attacher 则通过观察该 VolumeAttachment,根据其状态来进行挂载和卸载操作。 下图就是一个 VolumeAttachment 的例子,其类别 (kind) 为 VolumeAttachment,spec 中指定了 attacher 为 ossplugin.csi.alibabacloud.com,即指定挂载是由谁操作的;指定了 nodeName 为 cn-zhangjiakou.192.168.1.53,即该挂载是发生在哪个节点上的;指定了 source 为 persistentVolumeName 为 oss-csi-pv,即指定了哪一个数据卷进行挂载和卸载。 status 中 attached 指示了挂载的状态,如果是 False, External-attacher 就会执行一个挂载操作。 第二个对象是 CSIDriver,它描述了集群中所部署的 CSI Plugin 列表,需要管理员根据插件类型进行创建。 例如下图中创建了一些 CSI Driver,通过 kuberctl get csidriver 我们可以看到集群里面创建的 3 种类型的 CSI Driver:一个是云盘;一个是 NAS;一个是 OSS。 在 CSI Driver 中,我们定义了它的名字,在 spec 中还定义了 attachRequired 和 podInfoOnMount 两个标签。 attachRequired 定义一个 Plugin 是否支持 Attach 功能,主要是为了对块存储和文件存储做区分。比如文件存储不需要 Attach 操作,因此我们将该标签定义为 False; podInfoOnMount 则是定义 Kubernetes 在调用 Mount 接口时是否带上 Pod 信息。 第三个对象是 CSINode,它是集群中的节点信息,由 node-driver-registrar 在启动时创建。它的作用是每一个新的 CSI Plugin 注册后,都会在 CSINode 列表里添加一个 CSINode 信息。 例如下图,定义了 CSINode 列表,每一个 CSINode 都有一个具体的信息(左侧的 YAML)。以 一 cn-zhangjiakou.192.168.1.49 为例,它包含一个云盘的 CSI Driver,还包含一个 NAS 的 CSI Driver。每个 Driver 都有自己的 nodeID 和它的拓扑信息 topologyKeys。如果没有拓扑信息,可以将 topologyKeys 设置为 "null"。也就是说,假如有一个有 10 个节点的集群,我们可以只定义一部分节点拥有 CSINode。 CSI 组件之 Node-Driver-Registrar Node-Driver-Registrar 主要实现了 CSI Plugin 注册的一个机制。我们来看一下下图中的流程图。 第 1 步 ,在启动的时候有一个约定,比如说在 /var/lib/kuberlet/plugins_registry 这个目录每新加一个文件,就相当于每新加了一个 Plugin; 启动 Node-Driver-Registrar,它首先会向 CSI-Plugin 发起一个接口调用 GetPluginInfo,这个接口会返回 CSI 所监听的地址以及 CSI-Plugin 的一个 Driver name; 第 2 步 ,Node-Driver-Registrar 会监听 GetInfo 和 NotifyRegistrationStatus 两个接口; 第 3 步 ,会在 /var/lib/kuberlet/plugins_registry 这个目录下启动一个 Socket,生成一个 Socket 文件 ,例如:"diskplugin.csi.alibabacloud.com-reg.sock",此时 Kubelet 通过 Watcher 发现这个 Socket 后,它会通过该 Socket 向 Node-Driver-Registrar 的 GetInfo 接口进行调用。GetInfo 会把刚才我们所获得的的 CSI-Plugin 的信息返回给 Kubelet,该信息包含了 CSI-Plugin 的监听地址以及它的 Driver name; 第 4 步 ,Kubelet 通过得到的监听地址对 CSI-Plugin 的 NodeGetInfo 接口进行调用; 第 5 步 ,调用成功之后,Kubelet 会去更新一些状态信息,比如节点的 Annotations、Labels、status.allocatable 等信息,同时会创建一个 CSINode 对象; 第 6 步 ,通过对 Node-Driver-Registrar 的 NotifyRegistrationStatus 接口的调用告诉它我们已经把 CSI-Plugin 注册成功了。 通过以上 6 步就实现了 CSI Plugin 注册机制。 CSI 组件之 External-Attacher External-Attacher 主要是通过 CSI Plugin 的接口来实现数据卷的挂载与卸载功能。它通过观察 VolumeAttachment 对象来实现状态的判断。VolumeAttachment 对象则是通过 AD Controller 来调用 Volume Plugin 中的 CSI Attacher 来创建的。CSI Attacher 是一个 In-Tree 类,也就是说这部分是 Kubernetes 完成的。 当 VolumeAttachment 的状态是 False 时,External-Attacher 就去调用底层的一个 Attach 功能;若期望值为 False,就通过底层的 ControllerPublishVolume 接口实现 Detach 功能。同时,External-Attacher 也会同步一些 PV 的信息在里面。 CSI 部署 我们现在来看一下块存储的部署情况。 之前提到 CSI 的 Controller 分为两部分,一个是 Controller Server Pod,一个是 Node Server Pod。 我们只需要部署一个 Controller Server,如果是多备份的,可以部署两个。Controller Server 主要是通过多个外部插件来实现的,比如说一个 Pod 中可以定义多个 External 的 Container 和一个包含 CSI Controller Server 的 Container,这时候不同的 External 组件会和 Controller Server 组成不同的功能。 而 Node Server Pod 是个 DaemonSet,它会在每个节点上进行注册。Kubelet 会直接通过 Socket 的方式直接和 CSI Node Server 进行通信、调用 Attach/Detach/Mount/Unmount 等。 Driver Registrar 只是做一个注册的功能,会在每个节点上进行部署。 文件存储和块存储的部署情况是类似的。只不过它会把 Attacher 去掉,也没有 VolumeAttachment 对象。 CSI 使用示例 和 Flexvolume 一样,我们看一下它的定义模板。 可以看到,它和其它的定义并没什么区别。主要的区别在于类型为 CSI,里面会定义 driver,volumeHandle,volumeAttribute,nodeAffinity 等。 driver 就是定义是由哪一个插件来去实现挂载; volumeHandle 主要是指示 PV 的唯一标签; volumeAttribute 用于附加参数,比如 PV 如果定义的是 OSS,那么就可以在 volumeAttribute 定义 bucket、访问的地址等信息在里面; nodeAffinity 则可以定义一些调度信息。与 Flexvolume 类似,还可以通过 selector 和 Label 定义一些绑定条件。 中间的图给出了一个动态调度的例子,它和其它类型的动态调度是一样的。只不过在定义 provisioner 的时候指定了一个 CSI 的 provisioner。 下面给出了一个具体的挂载例子。 Pod 启动之后,我们可以看到 Pod 已经把一个 /dev/vdb 挂载到 /data 上了。同理,它有一个 GlobalPath 和一个 PodPath 的集群在里面。我们可以把一个 /dev/vdb 挂载到一个 GlobalPath 里面,它就是一个 CSI 的一个 PV 在本节点上唯一确定的目录。一个 PodPath 就是一个 Pod 所确定的一个本地节点的目录,它会把 Pod 所对应的目录映射到我们的容器中去。 CSI 的其它功能 除了挂载、卸载之外,CSI 化提供了一些附加的功能。例如,在定义模板的时候往往需要一些用户名和密码信息,此时我们就可通过 Secret 来进行定义。之前我们所讲的 Flexvolume 也支持这个功能,只不过 CSI 可以根据不同的阶段定义不同的 Secret 类型,比如挂载阶段的 Secret、Mount 阶段的 Secret、Provision 阶段的 Secret。 Topology 是一个拓扑感知的功能。当我们定义一个数据卷的时候,集群中并不是所有节点都能满足该数据卷的需求,比如我们需要挂载不同的 zone 的信息在里面,这就是一个拓扑感知的功能。这部分在第 10 讲已有详细的介绍,大家可以进行参考。 Block Volume 就是 volumeMode 的一个定义,它可以定义成 Block 类型,也可以定义成文件系统类型,CSI 支持 Block 类型的 Volume,就是说挂载到 Pod 内部时,它是一个块设备,而不是一个目录。 Skip Attach 和 PodInfo On Mount 是刚才我们所讲过的 CSI Driver 中的两个功能。 CSI 的近期 Features CSI 还是一个比较新的实现方式。近期也有了很多更新,比如 ExpandCSIVolumes 可以实现文件系统扩容的功能;VolumeSnapshotDataSource 可以实现数据卷的快照功能;VolumePVCDataSource 实现的是可以定义 PVC 的数据源;我们以前在使用 CSI 的时候只能通过 PVC、PV 的方式定义,而不能直接在 Pod 里面定义 Volume,CSIInlineVolume 则可以让我们可以直接在 Volume 中定义一些 CSI 的驱动。 阿里云在 GitHub 上开源了 CSI 的实现 ,大家有兴趣的可以看一下,做一些参考。 四、本文总结 本文主要介绍了 Kubernetes 集群中存储卷相关的知识,主要有以下三点内容: 第一部分讲述了 Kubernetes 存储架构,主要包括存储卷概念、挂载流程、系统组件等相关知识; 第二部分讲述了 Flexvolume 插件的实现原理、部署架构、使用示例等; 第三部分讲述了 CSI 插件的实现原理、资源对象、功能组件、使用示例等; 希望上述知识点能让各位同学有所收获,特别是在处理存储卷相关的设计、开发、故障处理等方面有所帮助。 查看更多:https://yq.aliyun.com/articles/743613?utm_content=g_1000103097 上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
来源:OSCHINA
发布时间:2020-02-10 15:36:00
# default_exp digdata # 上面一行用于nbdev中声明本模块的名称。必须是notebook的第一个Cell的第一行。 digdata 描述:抗击新冠病毒(3)-探索在线数据资源 功能:本页面用于交互式地探索数据。通过访问网络获取数据,分析和理解网页数据结构,转换为列表格式,用于后续的分析和绘图输出。 模块:使用JupyterLab、Python、nbdev等完成。用到的Python模块包括: requests,访问web服务网站。 re,正则表达式解析。 json,JSON格式解析。 BeautifulSoup,HTML格式解析。 pprint,格式化输出。 pandas,数据表格分析。 源码-https://github.com/openthings/anti2020ncov 参考: JupyterLab-数据实验室 文学式编程-nbdev入门教程 抗击新冠病毒(1)-开源软件与数据项目 抗击新冠病毒(2)-基于Jupyter+nbdev的数据分析 #hide from nbdev.showdoc import * #export from bs4 import BeautifulSoup from parser import * #regex_parser import re import json import time import logging import datetime import requests import pprint 获取网页数据 #export #url = "https://3g.dxy.cn/newh5/view/pneumonia" url = "https://ncov.dxy.cn/ncovh5/view/pneumonia?from=singlemessage&isappinstalled=0" headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36' } #export session = requests.session() session.headers.update(headers) r = session.get(url) #export #pprint.pprint(r.text) #export #soup = BeautifulSoup(r.content, 'lxml') #soup 提取特定的数据域 # export # 分为总体情况、分省情况、省内各市情况、新闻四大类。 overall_information = re.search(r'\{("id".*?)\}', str(soup.find('script', attrs={'id': 'getStatisticsService'}))) province_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getListByCountryTypeService1'}))) area_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getAreaStat'}))) news_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getTimelineService'}))) 1、总体情况 #pprint.pprint(overall_information.string) #overall_information.group(0) #jsall = json.loads(overall_information.group(0)) def overall_parser(overall_information): overall_information = json.loads(overall_information.group(0)) overall_information.pop('id') overall_information.pop('createTime') overall_information.pop('modifyTime') overall_information.pop('imgUrl') overall_information.pop('deleted') overall_information['countRemark'] = overall_information['countRemark'].replace(' 疑似', ',疑似').replace(' 治愈', ',治愈').replace(' 死亡', ',死亡').replace(' ', '') #overall_information = json.loads(overall_information.group(0)) 2、分省情况 #provinces = json.loads(province_information.group(0)) #provinces def province_parser(province_information): provinces = json.loads(province_information.group(0)) crawl_timestamp = "" for province in provinces: province.pop('id') province['comment'] = province['comment'].replace(' ', '') province['crawlTime'] = crawl_timestamp #province['country'] = country_type.get(province['countryType']) province['tags'] = province['tags'].replace(' ', '') province = regex_parser(content=province, key='tags') #for province in provinces: # print(province['id'],'\t',province['provinceShortName'],'\t',province['tags']) 3、省内各市县情况 #area_information.string area = json.loads(area_information.group(0)) print("省份\t确诊\t疑似\t治愈\t死亡") for a in area: print(a['provinceName'],'\t',a['confirmedCount'],'\t',a['suspectedCount'],'\t',a['curedCount'],'\t',a['deadCount']) 按省提取城市情况 cities = area[0]['cities'] #cities print("城市\t确诊\t疑似\t治愈\t死亡") for p in area: cities = p['cities'] print("===================================") print(p['provinceName'],'\t',p['confirmedCount'],'\t',p['suspectedCount'],'\t',p['curedCount'],'\t',p['deadCount']) print("-----------------------------------") for c in cities: print(c['cityName'],'\t',c['confirmedCount'],'\t',c['suspectedCount'],'\t',c['curedCount'],'\t',c['deadCount']) 4、新闻列表 news = json.loads(news_information.group(0)) #news for n in news: print(n['id'],'\t',n['infoSource'].strip(),'\t',n['title'].strip())#,n['summary'].strip()) nbdev 适用工具 # 将notebook转化为python的*.py代码,保存到项目名称的子目录中。 from nbdev.export import * notebook2script() Converted 00_digdata.ipynb. Converted 01_getdata.ipynb. Converted 10_charts.ipynb. Converted 10_china.ipynb. Converted index.ipynb. help(notebook2script) Help on function notebook2script in module nbdev.export: notebook2script(fname=None, silent=False, to_dict=False) Convert notebooks matching `fname` to modules
来源:OSCHINA
发布时间:2020-02-10 12:39:00
直播主题: 智能测温及社区防疫监控解决方案 直播时间: 2月10日 10:00-10:30 讲师: 岑参,阿里云智能IoT解决方案架构师七年医疗行业数字化咨询经验,曾任职于国内头部IT公司,现负责IOT医疗行业 适合观众: 政务大厅管理人员、车站交通枢纽管理人员、医院管理人员、一般居民用户 内容简介: 热成像人体测温方案适用于人群聚集区域检测疫情防控,利用红外非接触式体温检测,可实现快速体温筛查,远距离、大面积检测,自动预警;智能体温远程监控方案可部署于医院新开设的隔离病区或居民家庭,能很大程度减少医患间因测量基础生命体征而发生的接触,提高效率降低被感染风险。 直播主题: 金融行业钉钉组织健康守护方案 直播时间: 2月10日 14:00-14:50 讲师: 七玉,钉钉金融行业运营专家十年企业数字化咨询、运营经验,曾任职于国际、国内头部IT公司,现负责钉钉金融行业 适合观众: 金融行业(银行、保险、证券等)IT负责人、人事经理、办公室主任等 内容简介: 应对疫情期间,各组织启用远程办公的需求,阿里巴巴钉钉紧急推出组织健康方案,免费推出员工健康打卡服务以及异地办公工具,帮助金融机构守护组织成员健康的同时实现数智化远程协同。本次课程分为两期,第一期:精准通知、智能守护。包括钉钉健康打卡、紧急通知、视频会议三个核心场景。 在线看大会,就来云栖号! 每天都有行业专家分享!请访问: https://yqh.aliyun.com/zhibo
来源:OSCHINA
发布时间:2020-02-10 10:33:00
一、安装准备 本次安装的版本是截止2020.1.30最新的版本0.17.0 软件要求 需要**Java 8(8u92 +)**以上的版本,否则会有问题 Linux,Mac OS X或其他类似Unix的操作系统(不支持Windows) 硬件要求 Druid包括一组参考配置和用于单机部署的启动脚本: nano-quickstart micro-quickstart small medium large xlarge 单服务器参考配置 Nano-Quickstart:1个CPU,4GB RAM 启动命令: bin/start-nano-quickstart 配置目录: conf/druid/single-server/nano-quickstart 微型快速入门:4个CPU,16GB RAM 启动命令: bin/start-micro-quickstart 配置目录: conf/druid/single-server/micro-quickstart 小型:8 CPU,64GB RAM(〜i3.2xlarge) 启动命令: bin/start-small 配置目录: conf/druid/single-server/small 中:16 CPU,128GB RAM(〜i3.4xlarge) 启动命令: bin/start-medium 配置目录: conf/druid/single-server/medium 大型:32 CPU,256GB RAM(〜i3.8xlarge) 启动命令: bin/start-large 配置目录: conf/druid/single-server/large 大型X:64 CPU,512GB RAM(〜i3.16xlarge) 启动命令: bin/start-xlarge 配置目录: conf/druid/single-server/xlarge 我们这里做测试使用选择最低配置即可 nano-quickstart 二、下载安装包 访问官网: http://druid.io/现在也会跳转https://druid.apache.org/ 或者直接访问 https://druid.apache.org/ 点击download进入下载页面: 选择最新版本: apache-druid-0.17.0-bin.tar.gz 进行下载 200多M 也可以选择下载源码包 用maven进行编译 三、安装 上传安装包 在终端中运行以下命令来安装Druid: tar -xzf apache-druid-0.17.0-bin.tar.gz cd apache-druid-0.17.0 安装包里有这几个目录: LICENSE 和 NOTICE 文件 bin/* -脚本 conf/* -单服务器和集群设置的示例配置 extensions/* -扩展 hadoop-dependencies/* -Druid Hadoop依赖 lib/* -Druid库 quickstart/* -快速入门教程的配置文件,样本数据和其他文件 配置文件 #进入我们要启动的配置文件位置: cd conf/druid/single-server/nano-quickstart/ _common 公共配置 是druid一些基本的配置,比如元数据库地址 各种路径等等 其他的是各个节点的配置 比较类似,比如broker cd broker/ jvm配置 main配置 runtime运行时相关的配置 回到主目录 启动的conf在 cd conf/supervise/single-server 里面是不同配置启动不同的脚本 四、启动 回到主目录 ./bin/start-nano-quickstart 启动成功: 访问 localhost:8888 看到管理页面 如果要修改端口,需要修改配置的端口和主目录下的 vi bin/verify-default-ports 五、加载数据 Druid提供了一个示例数据文件,其中包含2015年9月12日发生的Wiki的示例数据。 此样本数据位于 quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz 示例数据大概是这样: { "timestamp":"2015-09-12T20:03:45.018Z", "channel":"#en.wikipedia", "namespace":"Main", "page":"Spider-Man's powers and equipment", "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":false, "isNew":false, "isMinor":false, "isRobot":false, "isUnpatrolled":false, "added":99, "delta":99, "deleted":0, } Druid加载数据分为以下几种: 加载文件 从kafka中加载数据 从hadoop中加载数据 自定义加载方式 我们这样演示一下加载示例文件数据 1、进入localhost:8888 点击load data 2、选择local disk 3、选择Connect data 4、预览数据 Base directory输入quickstart/tutorial/ File filter输入 wikiticker-2015-09-12-sampled.json.gz 然后点击apply预览 就可以看见数据了 点击Next:parse data解析数据 5、解析数据 可以看到json数据已经被解析了 继续解析时间 6、解析时间 解析时间成功 之后两步是transform和filter 这里不做演示了 直接next 7、确认Schema 这一步会让我们确认Schema 可以做一些修改 由于数据量较小 我们直接关掉Rollup 直接下一步 8、设置分段 这里可以设置数据分段 我们选择hour next 9、确认发布 10、发布成功 开始解析数据 等待任务成功 11、查看数据 选择datasources 可以看到我们加载的数据 可以看到数据源名称 Fully是完全可用 还有大小等各种信息 12、查询数据 点击query按钮 我们可以写sql查询数据了 还可以将数据下载 Druid相关博文 什么是Druid 静下心来,努力的提升自己,永远都没有错。更多实时计算相关博文,欢迎关注实时流式计算
来源:OSCHINA
发布时间:2020-02-10 09:06:00
Apache Flink社区宣布Flink 1.10.0正式发布! 本次Release版本修复1.2K个问题,对Flink作业的整体性能和稳定性做了重大改进,同时增加了对K8S,Python的支持。 这个版本标志着与Blink集成的完成,并且强化了流式SQL与Hive的集成,本文将详细介绍新功能和主要的改进。 一、内存管理优化 原有TaskExecutor 有一些缺点: 流处理和批处理用了不同的配置模型; 流处理的堆外配置RocksDB复杂,需要用户配置; 为了使内存管理更明确直观,Flink 1.10对TaskExecutor内存模型和配置做了重大改进,这个更改使FLink更适合于各种部署环境:K8S,Yarn,Mesos。 这种更改统一了入口点,使得下游框架比如zeppelin的编程更加容易。 二、集成Kubernetes 这对于想要在容器中使用Flink的用户是一个非常好的消息。 在Flink1.10中推出了 Active Kubernetes集成 Flink的ResourceManager( K8sResMngr )与Kubernetes进行本地通信以按需分配新的Pod,类似于Flink的Yarn和Mesos集成。用户还可以利用命名空间为聚合资源消耗有限的多租户环境启动Flink集群。事先配置具有足够权限的RBAC角色和服务帐户。 用户可以简单地参考Kubernetes配置选项,然后使用以下命令在CLI中将作业提交到Kubernetes上的现有Flink会话: ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar 三、集成Hive Flink 1.10通过开发将Hive集成到Flink,可用于生产环境。 并且支持大部分Hive版本,Flink支持Hive版本列表: 1.0 1.0.0 1.0.1 1.1 1.1.0 1.1.1 1.2 1.2.0 1.2.1 1.2.2 2.0 2.0.0 2.0.1 2.1 2.1.0 2.1.1 2.2 2.2.0 2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6 3.1 3.1.0 3.1.1 3.1.2 需要引入依赖 org.apache.flink flink-connector-hive_2.11 1.10.0 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.0 provided org.apache.hive hive-exec ${hive.version} provided 连接Hive代码 val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val name = "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive") 四、PyFlink:支持UDF 从Flink 1.10开始,PyFlink开始支持UDF函数。 用户还可以 pip 使用以下方法轻松安装PyFlink : pip install apache-flink 五、其他重要变化 Flink现在可以编译并在Java 11上运行。 一个新的Elasticsearch sink,完全支持Elasticsearch 7.x版本。 Kafka 0.8 和 0.9 版本已经被废,不再支持。 删除了非认证网络流量配置选项taskmanager.network.credit.model。 删除了旧版Web UI。 六、贡献者名单 最后我们看一下贡献者的名单,有很多国内大神的身影 Achyuth Samudrala, Aitozi, Alberto Romero, Alec.Ch, Aleksey Pak, Alexander Fedulov, Alice Yan, Aljoscha Krettek, Aloys, Andrey Zagrebin, Arvid Heise, Benchao Li, Benoit Hanotte, Benoît Paris, Bhagavan Das, Biao Liu, Chesnay Schepler, Congxian Qiu, Cyrille Chépélov, César Soto Valero, David Anderson, David Hrbacek, David Moravek, Dawid Wysakowicz, Dezhi Cai, Dian Fu, Dyana Rose, Eamon Taaffe, Fabian Hueske, Fawad Halim, Fokko Driesprong, Frey Gao, Gabor Gevay, Gao Yun, Gary Yao, GatsbyNewton, GitHub, Grebennikov Roman, GuoWei Ma, Gyula Fora, Haibo Sun, Hao Dang, Henvealf, Hongtao Zhang, HuangXingBo, Hwanju Kim, Igal Shilman, Jacob Sevart, Jark Wu, Jeff Martin, Jeff Yang, Jeff Zhang, Jiangjie (Becket) Qin, Jiayi, Jiayi Liao, Jincheng Sun, Jing Zhang, Jingsong Lee, JingsongLi, Joao Boto, John Lonergan, Kaibo Zhou, Konstantin Knauf, Kostas Kloudas, Kurt Young, Leonard Xu, Ling Wang, Lining Jing, Liupengcheng, LouisXu, Mads Chr. Olesen, Marco Zühlke, Marcos Klein, Matyas Orhidi, Maximilian Bode, Maximilian Michels, Nick Pavlakis, Nico Kruber, Nicolas Deslandes, Pablo Valtuille, Paul Lam, Paul Lin, PengFei Li, Piotr Nowojski, Piotr Przybylski, Piyush Narang, Ricco Chen, Richard Deurwaarder, Robert Metzger, Roman, Roman Grebennikov, Roman Khachatryan, Rong Rong, Rui Li, Ryan Tao, Scott Kidder, Seth Wiesman, Shannon Carey, Shaobin.Ou, Shuo Cheng, Stefan Richter, Stephan Ewen, Steve OU, Steven Wu, Terry Wang, Thesharing, Thomas Weise, Till Rohrmann, Timo Walther, Tony Wei, TsReaper, Tzu-Li (Gordon) Tai, Victor Wong, WangHengwei, Wei Zhong, WeiZhong94, Wind (Jiayi Liao), Xintong Song, XuQianJin-Stars, Xuefu Zhang, Xupingyong, Yadong Xie, Yang Wang, Yangze Guo, Yikun Jiang, Ying, YngwieWang, Yu Li, Yuan Mei, Yun Gao, Yun Tang, Zhanchun Zhang, Zhenghua Gao, Zhijiang, Zhu Zhu, a-suiniaev, azagrebin, beyond1920, biao.liub, blueszheng, bowen.li, caoyingjie, catkint, chendonglin, chenqi, chunpinghe, cyq89051127, danrtsey.wy, dengziming, dianfu, eskabetxe, fanrui, forideal, gentlewang, godfrey he, godfreyhe, haodang, hehuiyuan, hequn8128, hpeter, huangxingbo, huzheng, ifndef-SleePy, jiemotongxue, joe, jrthe42, kevin.cyj, klion26, lamber-ken, libenchao, liketic, lincoln-lil, lining, liuyongvs, liyafan82, lz, mans2singh, mojo, openinx, ouyangwulin, shining-huang, shuai-xu, shuo.cs, stayhsfLee, sunhaibotb, sunjincheng121, tianboxiu, tianchen, tianchen92, tison, tszkitlo40, unknown, vinoyang, vthinkxie, wangpeibin, wangxiaowei, wangxiyuan, wangxlong, wangyang0918, whlwanghailong, xuchao0903, xuyang1706, yanghua, yangjf2019, yongqiang chai, yuzhao.cyz, zentol, zhangzhanchum, zhengcanbin, zhijiang, zhongyong jin, zhuzhu.zz, zjuwangg, zoudaokoulife, 砚田, 谢磊, 张志豪, 曹建华 Flink系列文章: Flink入门(一)——Apache Flink介绍 Flink入门(二)——Flink架构介绍 Flink入门(三)——环境与部署 Flink入门(四)——编程模型 更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算
来源:OSCHINA
发布时间:2020-02-13 09:39:00
2020年到了,祝大家新年快乐! 2020年是一个闰年(Leap Year),闰年是会出故障的。八年前,2012年2月29日,我在Azure的时候我们就出了一个大故障: https://azure.microsoft.com/en-us/blog/summary-of-windows-azure-service-disruption-on-feb-29th-2012/ 常见的错误认知 1、 一年总是365天 2、2月总是28天 3、闰年是每四年一次 其实,闰年并不是每四年一次。2000是闰年,但1900年和2100都不是闰年。 哪里容易出闰年相关的Bug 1、在一个日期值上加或减时间的代码。尤其是加减1年或1个月的代码 2、各种根据数据库查询结果生成的报表和图标,月度和年度统计可能会少算1天 3、证书/密码/密钥/缓存 等的过期时间,可能会比预期的早了一天,或者可能设定了一个非法的过期时间 4、固定长度的数组。例如,一个长度为365的数组遇到闰年可能就不够了,可能会数组越界。 5、UI组件,例如日历、日期选择组件,以及客户端输入校验相关的代码。 闰年的哪些日子要特别注意 2019年12月31日:这是闰年前一年的最后一天。2019年的最后一天加365天,并不是2020年的最后一天,而会是2020年的倒数第二天(即2020年12月30日)。 2020年1月1日:闰年的第一天。闰年的第一天加365天,并不是下一年的1月1日,而是今年的12月31日。 2020年1月31日:这一天加28天,并不是下个月(2月)的最后一天。 2020年2月1日:这一天加28天,并不是下个月(3月)的第一天。 2020年2月28日:这是2月29日的前一天。有问题的代码可能会错误的把这天当成2月的最后一天,试图加1天得到3月1日。但实际上这一天加1天是2月29日。 2020年2月29日:这是闰年多出来的一天。如果代码以为2月总是只有28天,那代码可能出现各种问题,例如: 入参校验会认为一个合法输入(2020/2/29)是非法的,用 { year+1 , month , day } 的方式来加减1年的话会产生一个非法日期。 2020年3月1日:2月29日后面的那天。代码如果在3月1日上减28天,会得到2月2日(而不是预期中的2月1日);减365天的话会得到2019年3月2日(而不是预期中的3月1日)。 2020年12月31日:一年的第366天。 代码如果不能正确处理一年的第366天,可能也会导致问题。例如,2008年12月31日,第三方软件中的问题导致了所有Microsoft Zune设备无法使用,详情参考: http://www.theguardian.com/technology/blog/2009/jan/01/zune-firmware-mistake 代码如果假设1年永远是365天,声明了一个固定大小为365的数组,那在一年的第366天可能会发生数组越界。 数组越界如果发生在 C/C++ 语言编写的代码里,可能导致内存溢出攻击漏洞。 查看更多:https://yq.aliyun.com/articles/742802?utm_content=g_1000103478 上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
来源:OSCHINA
发布时间:2020-02-12 15:13:00
2020.01.02 新年伊始,Nacos Star 突破 10000,从此迈上了一个新的里程碑。感谢大家的一路支持、信任和帮助!!! Nacos 开源 17 个月以来,发布了 22 个版本,成功切入 Dubbo/Spring-Cloud/ 云原生三个核心生态。吸引了 88 位优秀贡献者,积累了 110 家企业案例,官网累计获取 20w+ 用户浏览, 2000 UV ,借此机会,我们代表 Nacos 社区一起回顾 Nacos 来时的路,和未来的发展方向。 项目起源 Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十一的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。随着云计算兴起,2018 年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型! 开源后的工作 开源很重要的是生态,而且开发者往往是先选服务框架,再选注册中心和配置中心,因此在 1.0 之前 Nacos 首先支持了国内人气最高的 Dubbo/Spring-Cloud 两个主流服务框架,又在 1.X 版本之后支持了云原生的服务框架。至此 Nacos 目前已经能够支持所有主流服务框架,并且为用户未来平滑迁移云原生服务框架做好了准备! 虽然我们期望通过云原生的方式支持多语言,但是也为 Java/Golang/NodeJs/Cpp/Python 等提供了语言级支持,以便给大家更好的编程体验! 后续规划 2020 年,我们将聚焦 Nacos 内核构建,打造一个更稳定、更安全、更高效的微服务引擎! 目前最核心的工作如下: 建立访问控制体系,提升安全水准 升级连接通道,提升推送效率 解耦Mysql,降低部署运维成本 查看更多:https://yq.aliyun.com/articles/742637?utm_content=g_1000103477 上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
来源:OSCHINA
发布时间:2020-02-12 15:10:00
消息队列常见面试问题小集合 一、为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: 第一 ,你知不知道你们系统里为什么要用消息队列这个东西? 不少候选人,说自己项目里用了 Redis、MQ,但是其实他并不知道自己为什么要用这个东西。其实说白了,就是为了用而用,或者是别人设计的架构,他从头到尾都没思考过。 没有对自己的架构问过为什么的人,一定是平时没有思考的人,面试官对这类候选人印象通常很不好。因为面试官担心你进了团队之后只会木头木脑的干呆活儿,不会自己思考。 第二 ,你既然用了消息队列这个东西,你知不知道用了有什么好处&坏处? 你要是没考虑过这个,那你盲目弄个 MQ 进系统里,后面出了问题你是不是就自己溜了给公司留坑?你要是没考虑过引入一个技术可能存在的弊端和风险,面试官把这类候选人招进来了,基本可能就是挖坑型选手。就怕你干 1 年挖一堆坑,自己跳槽了,给公司留下无穷后患。 第三 ,既然你用了 MQ,可能是某一种 MQ,那么你当时做没做过调研? 你别傻乎乎的自己拍脑袋看个人喜好就瞎用了一个 MQ,比如 Kafka,甚至都从没调研过业界流行的 MQ 到底有哪几种。每一个 MQ 的优点和缺点是什么。每一个 MQ 没有绝对的好坏 ,但是就是看用在哪个场景可以 扬长避短,利用其优势,规避其劣势 。 如果是一个不考虑技术选型的候选人招进了团队,leader 交给他一个任务,去设计个什么系统,他在里面用一些技术,可能都没考虑过选型,最后选的技术可能并不一定合适,一样是留坑。 面试题剖析 为什么使用消息队列 其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么? 面试官问你这个问题, 期望的一个回答 是说,你们公司有个什么 业务场景 ,这个业务场景有个什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。 先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个: 解耦 、 异步 、 削峰 。 解耦 看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃...... 在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊! 如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。 总结 :通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。 面试技巧 :你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。 异步 再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。 一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。 如果 使用 MQ ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快! 削峰 每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。 一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。 但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。 如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。 这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。 消息队列有什么优缺点 优点上面已经说了,就是 在特殊场景下有其对应的好处 , 解耦 、 异步 、 削峰 。 缺点有以下几个: 1.系统可用性降低 系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了? 2.系统复杂度提高 硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。 3.一致性问题 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。 所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点? 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;综上,各种对比之后,有如下建议: 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高; 不过现在确实越来越多的公司,会去用 RocketMQ,确实很不错(阿里出品),但社区可能有突然黄掉的风险,对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。 所以 中小型公司 ,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择; 大型公司 ,基础架构研发实力较强,用 RocketMQ 是很好的选择。 如果是 大数据领域 的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。 二、如何保证消息队列的高可用? 面试官心理分析 如果有人问到你 MQ 的知识,高可用是必问的。 上一讲提到,MQ 会导致系统可用性降低。 所以只要你用了 MQ,接下来问的一些要点肯定就是围绕着 MQ 的那些缺点怎么来解决了。 要是你傻乎乎的就干用了一个 MQ,各种问题从来没考虑过,那你就杯具了,面试官对你的感觉就是,只会简单使用一些技术,没任何思考,马上对你的印象就不太好了。 这样的同学招进来要是做个 20k 薪资以内的普通小弟还凑合,要是做薪资 20k+ 的高工,那就惨了,让你设计个系统,里面肯定一堆坑,出了事故公司受损失,团队一起背锅。 面试题剖析 这个问题这么问是很好的,因为不能问你 Kafka 的高可用性怎么保证? ActiveMQ 的高可用性怎么保证? 一个面试官要是这么问就显得很没水平,人家可能用的就是 RabbitMQ,没用过 Kafka,你上来问人家 Kafka 干什么? 这不是摆明了刁难人么。 所以有水平的面试官,问的是 MQ 的高可用性怎么保证? 这样就是你用过哪个 MQ,你就说说你对那个 MQ 的高可用性的理解。 RabbitMQ 的高可用性 RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。 RabbitMQ 有三种模式: 单机模式、普通集群模式、镜像集群模式。 单机模式 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。 普通集群模式(无高可用性) 普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。 你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。 你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。 这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。 因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。 而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。 所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。 镜像集群模式(高可用性) 这种模式,才是所谓的 RabbitMQ 的高可用模式。 跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。 然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。 那么如何开启这个镜像集群模式呢? 其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。 这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。 坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重! 第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。 你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢? Kafka 的高可用性 Kafka 一个最基本的架构认识: 由多个 broker 组成,每个 broker 是一个节点; 你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。 这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。 实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。 Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。 比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。 但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。 Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。 每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。 所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。 写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。 只能读写 leader? 很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。 Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。 这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的。 如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。 这就有所谓的高可用性了。 写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。 一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。 (当然,这只是其中一种模式,还可以适当调整这个行为) 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。 看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧? 不至于一无所知,现场还能给面试官画画图。 要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。 三、如何保证消息不被重复消费? 或者说,如何保证消息消费的幂等性? 面试官心理分析 其实这是很常见的一个问题,这俩问题基本可以连起来问。 既然是消费消息,那肯定要考虑会不会重复消费? 能不能避免重复消费? 或者重复消费了也别造成系统异常可以吗? 这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。 面试题剖析 回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。 首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。 因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。 挑一个 Kafka 来举个例子,说说怎么重复消费吧。 Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。 但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。 这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。 重启之后,少数消息会再次消费一次。 举个栗子。 有这么个场景。 数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。 消费者从 kafka 去消费的时候,也是按照这个顺序去消费。 假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。 那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。 那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。 由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。 如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。 其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。 举个例子吧。 假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了? 但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。 一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。 幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。 所以第二个问题来了,怎么保证消息队列消费的幂等性? 其实还是得结合业务来思考,我这里给几个思路: 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗? 如果没有消费过,你就处理,然后这个 id 写 Redis。 如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。 比如基于数据库的唯一键来保证重复数据不会重复插入多条。 因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。 当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。 四、如何保证消息的可靠性传输? 或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费和幂等性问题。 不能少,就是说这数据别搞丢了。 那这个问题你必须得考虑一下。 如果说你这个是用 MQ 来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢。 面试题剖析 数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。 RabbitMQ 生产者弄丢了数据 生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息; 如果收到了消息,那么可以提交事务channel.txCommit。 // 开启事务 channel.txSelect try { // 这里发送消息 } catch ( Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommitCopy to clipboardErrorCopied 但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。 所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。 如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。 而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。 所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。 RabbitMQ 弄丢了数据 就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。 除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。 设置持久化有两个步骤: 创建 queue 的时候将其设置为持久化 这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。 第二个是发送消息的时候将消息的 deliveryMode 设置为 2 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。 必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。 注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。 所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。 消费端弄丢了数据 RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。 这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。 这样的话,如果你还没处理完,不就没有 ack 了? 那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。 Kafka 消费端弄丢了数据 唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。 这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。 但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。 生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。 然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。 Kafka 弄丢了数据 这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。 大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据? 这就丢了一些数据啊。 生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。 所以此时一般是要求起码设置如下 4 个参数: 给 topic 设置 replication.factor 参数: 这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 在 Kafka 服务端设置 min.insync.replicas 参数: 这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。 在 producer 端设置 acks=all: 这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思): 这个是要求一旦写入失败,就无限重试,卡在这里了。 我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。 生产者会不会弄丢数据? 如果按照上述的思路设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。 如果没满足这个条件,生产者会自动不断的重试,重试无限次。 五、如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿? 第二看看你有没有办法保证消息是有顺序的? 这是生产系统中常见的问题。 面试题剖析 我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。 常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。 你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧? 不然本来是: 增加、修改、删除; 你愣是换了顺序给执行成删除、修改、增加,不全错了么。 本来这个数据同步过来,应该最后这个数据被删除了; 结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。 先看看顺序会错乱的俩场景: RabbitMQ: 一个 queue,多个 consumer。 比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。 有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。 这不明显乱了。 Kafka: 比如说我们建了一个 topic,有三个 partition。 生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。 消费者从 partition 中取出来数据的时候,也一定是有顺序的。 到这里,顺序还是 ok 的,没有错乱。 接着,我们在消费者里可能会搞多个线程来并发处理消息。 因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。 而多个线程并发跑的话,顺序可能就乱掉了。 解决方案 RabbitMQ 拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点; 或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。 Kafka 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue; 然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。 六、如何解决消息队列的延时以及过期失效问题? 消息队列满了以后该怎么处理? 有几百万消息持续积压几小时,说说怎么解决? 面试官心理分析 你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了; 或者消费的速度极其慢。 接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办? 或者是这整个就积压了几个小时,你这个时候怎么办? 或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办? 所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。 一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了; 或者是消费端出了个什么岔子,导致消费速度极其慢。 面试题剖析 关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。 大量消息在 mq 里积压了几个小时了还没解决 几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。 这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。 这个肯定不能在面试的时候说吧。 一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。 所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。 一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下: 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。 这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。 mq 中的消息过期失效了 假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。 如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。 那这就是第二个坑了。 这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。 这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。 我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。 就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。 这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。 也只能是这样了。 假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。 mq 都快写满了 如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办? 这个还有别的办法吗? 没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。 然后走第二个方案,到了晚上再补数据吧。 七、如果让你写一个消息队列,该如何进行架构设计? 说一下你的思路。 面试官心理分析 其实聊到这个问题,一般面试官要考察两块: 你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个消息队列的架构原理。 看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来。 说实话,问类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西。 类似的问题,比如,如果让你来设计一个 Spring 框架你会怎么做? 如果让你来设计一个 Dubbo 框架你会怎么做? 如果让你来设计一个 MyBatis 框架你会怎么做? 面试题剖析 其实回答这类问题,说白了,不求你看过那技术的源码,起码你要大概知道那个技术的基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好。 比如说这个消息队列系统,我们从以下几个角度来考虑一下: 首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞? 设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。 如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了? 其次你得考虑一下这个 mq 的数据要不要落地磁盘吧? 那肯定要了,落磁盘才能保证别进程挂了数据就丢了。 那落磁盘的时候怎么落啊? 顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。 其次你考虑一下你的 mq 的可用性啊? 这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。 多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。 能不能支持数据 0 丢失啊? 可以的,参考我们之前说的那个 kafka 数据零丢失方案。 mq 肯定是很复杂的,面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。 确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。 参考文章: https://mp.weixin.qq.com/s/3GMs3ae7ffDFgia9VSDMEg https://mp.weixin.qq.com/s/hAw2KEnZJNIq_qVw8H1UMg
来源:OSCHINA
发布时间:2020-02-12 10:40:00
本文首发于公众号「Python知识圈」,如需转载,请在公众号联系作者授权。 前言 上一篇文章整理了的公众号所有文章的导航链接,其实如果手动整理起来的话,是一件很费力的事情,因为公众号里添加文章的时候只能一篇篇的选择,是个单选框。 面对几百篇的文章,这样一个个选择的话,是一件苦差事。 pk哥作为一个 Pythoner,当然不能这么低效,我们用爬虫把文章的标题和链接等信息提取出来。 抓包 我们需要通过抓包提取公众号文章的请求的 URL,参考之前写过的一篇抓包的文章 Python爬虫APP前的准备 ,pk哥这次直接抓取 PC 端微信的公众号文章列表信息,更简单。 我以抓包工具 Charles 为例,勾选容许抓取电脑的请求,一般是默认就勾选的。 为了过滤掉其他无关请求,我们在左下方设置下我们要抓取的域名。 打开 PC 端微信,打开 「Python知识圈」公众号文章列表后,Charles 就会抓取到大量的请求,找到我们需要的请求,返回的 JSON 信息里包含了文章的标题、摘要、链接等信息,都在 comm_msg_info 下面。 这些都是请求链接后的返回,请求链接 url 我们可以在 Overview 中查看。 通过抓包获取了这么多信息后,我们可以写爬虫爬取所有文章的信息并保存了。 初始化函数 公众号历史文章列表向上滑动,加载更多文章后发现链接中变化的只有 offset 这个参数,我们创建一个初始化函数,加入代理 IP,请求头和信息,请求头包含了 User-Agent、Cookie、Referer。 这些信息都在抓包工具可以看到。 请求数据 通过抓包分析出来了请求链接,我们就可以用 requests 库来请求了,用返回码是否为 200 做一个判断,200 的话说明返回信息正常,我们再构建一个函数 parse_data() 来解析提取我们需要的返回信息。 def request_data(self): try: response = requests.get(self.base_url.format(self.offset), headers=self.headers, proxies=self.proxy) print(self.base_url.format(self.offset)) if 200 == response.status_code: self.parse_data(response.text) except Exception as e: print(e) time.sleep(2) pass 提取数据 通过分析返回的 Json 数据,我们可以看到,我们需要的数据都在 app_msg_ext_info 下面。 我们用 json.loads 解析返回的 Json 信息,把我们需要的列保存在 csv 文件中,有标题、摘要、文章链接三列信息,其他信息也可以自己加。 def parse_data(self, responseData): all_datas = json.loads(responseData) if 0 == all_datas['ret'] and all_datas['msg_count']>0: summy_datas = all_datas['general_msg_list'] datas = json.loads(summy_datas)['list'] a = [] for data in datas: try: title = data['app_msg_ext_info']['title'] title_child = data['app_msg_ext_info']['digest'] article_url = data['app_msg_ext_info']['content_url'] info = {} info['标题'] = title info['小标题'] = title_child info['文章链接'] = article_url a.append(info) except Exception as e: print(e) continue print('正在写入文件') with open('Python公众号文章合集1.csv', 'a', newline='', encoding='utf-8') as f: fieldnames = ['标题', '小标题', '文章链接'] # 控制列的顺序 writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(a) print("写入成功") print('----------------------------------------') time.sleep(int(format(random.randint(2, 5)))) self.offset = self.offset+10 self.request_data() else: print('抓取数据完毕!') 这样,爬取的结果就会以 csv 格式保存起来。 运行代码时,可能会遇到 SSLError 的报错,最快的解决办法就是 base_url 前面的 https 去掉 s 再运行。 保存markdown格式的链接 经常写文章的人应该都知道,一般写文字都会用 Markdown 的格式来写文章,这样的话,不管放在哪个平台,文章的格式都不会变化。 在 Markdown 格式里,用 [文章标题](文章url链接) 表示,所以我们保存信息时再加一列信息就行,标题和文章链接都获取了,Markdown 格式的 url 也就简单了。 md_url = '[{}]'.format(title) + '({})'.format(article_url) 爬取完成后,效果如下。 我们把 md链接这一列全部粘贴到 Markdown 格式的笔记里就行了,大部分的笔记软件都知道新建 Markdown 格式的文件的。 这样,这些导航文章链接整理起来就是分类的事情了。 你用 Python 解决过生活中的小问题吗?欢迎留言讨论。 欢迎关注公众号「Python知识圈」,公众号后台回复关键字,获取更多干货。 回复「英语」:送你英语 7000 单词速记法,亲测非常有效。 回复「编程」:免费获赠2019最新编程资料,认真学完BAT offer 拿到手软。 回复「赚钱」:领取简单可实操的 36 个赚钱的小项目,每天多赚100块零花钱。 回复「电子书」:免费送你10本Python电子书。
来源:OSCHINA
发布时间:2020-02-11 22:45:02
随着数据中台的概念愈发火热,越来越多的技术公司开始慢慢驶入中台的赛道,无论是数据中台、技术中台还是业务中台等等,只要与中台沾上边儿的,大家理解的概念与期待产品应该有的样子都各有不同又自成体系。也正因为此,被“中台的风”吹着跑的各个企业决策者们,也对这个概念愈发好奇与着迷,似乎只要快速搭上“中台”的这趟车,就能让企业摇身一变,迅速转型,进而成为行业佼佼者。而现实中,中台真的是企业的灵丹妙药吗?而数澜的数栖平台又与数据中台有什么关系呢? 先让我们来回顾一下,中台的概念是如何在国内突然爆红的。 一、中台,风从哪儿来? 「中台」,原本是一种美军作战概念,即通过高效、统一的后方系统,来支持前端的机动部队,提高作战效率,减少冗余投入。 而这一概念在国内的开端,依据现在普遍流行的说法,则源于马云 2015 年带领团队对开发出《部落冲突》、《皇室战争》等手游的芬兰公司 Supercell 的一次拜访。这家仅有 200 人不到的小公司,在 2015 财年已创造出 23.3 亿美元的营收。而彼时的阿里,员工人数 3.4W,2015 财年的营收为 122.93 亿美元。 能够支撑 Supercell 公司这种高效散兵作战模式的基础,是他们经过 6 年时间沉淀下来的游戏中台。中台将游戏开发过程中公共、通用的游戏素材和算法整理起来,可以同时支持几个小团队在几周时间内研发出一款新游戏,并能鼓励员工充分试错。但在愈发庞大的阿里生态体系内部,则因为业务的快速扩张和增长,出现了不同业务线之间“烟囱林立”、资源利用率低的问题。同时,部门之间常常因为所谈合作不能立即产生收益,基于 KPI 的问题,最终都会被废止掉。长此以往,公司的创新力实际上也会逐渐下降。 因此,马云回国后,便开始全面推广「中台」战略。并于同年底基于「大中台,小前台」的战略,对组织架构进行了全面彻底的调整。 在阿里已经成功实施内部「中台」战略的两年后,也就是从 2018 年底到 2019 年初这短短半年内,各个大型互联网企业开始进行大规模组织架构调整。 2018 年,商业大环境开始发生变化。To C 业务开始逐渐进入瓶颈期,以往靠着流量红利疯狂扩张业务的大型互联网企业,已经无法找到当年百试百灵的路数了。面对整个互联网行业「水温」的变化,企业意识到降本增效势在必行,转型同样势在必行。于是,各大型互联网企业开始向 To B 业务模式转型。同时,企业内部管理也开始走向精益化,以往的疯狂扩张导致的各种「业务烟囱」、「部门墙」都在推倒名单中。 当云计算、大数据、人工智能、支付能力,成为企业转型中业务较量的关键点时,数据能力、算法能力、调度能力的沉淀,则成为了考验企业内部是否能够快速支撑前台业务,实现企业转型的重要环节。而这些能力的背后,中台的重要性不言而喻。 因此,「旁观者们」从行动上,正式入局。 腾讯的「930 变革」、京东的中台战略、美团的数据全量打通、字节跳动的「直播大中台」、百度的 All in AI 战略,都让中台概念持续升温,也让整个行业热度上涨不断。 二、数据中台,到底是什么? 数澜科技铁教授曾在《 数据中台系列(一):你的企业真的需要「数据中台」吗? 》一文中提出: 数据中台,它不仅仅是我们平时会提到的任何一种工具,也不仅仅是一种企业协同工作的方法,更不能把它当做是一个简单的组织架构。 「 数据中台,包括平台、工具、数据、组织、流程、规范等一切与企业数据资产如何用起来所相关的。 企业所属行业不同,经营策略不同,从而数据场景也千差万别。再加上企业人员运用数据的能力参差不齐,这就导致了 每一家企业的数据中台都是独一无二的,不是购买一个所谓的数据中台工具就能解决的。 当然合适的工具是可以降低企业应用数据难度的,这是强调的是「合适的」,而不是「高级的」。」 数澜数据中台理念 而在数澜看来,数据中台是一种战略选择和组织形式,通过有型的产品支撑和实施方法论,解决大企业面临的数据孤岛、数据维护混乱、数据价值利用低的问题,依据企业特有的业务和架构,构建一套源源不断地把数据变成资产并服务于业务的,可持续让企业数据用起来的机制,让数据可见、可懂、可用、可运营。 它的出现,基于以下两个大前提: 1)丰富的数据维度; 以阿里巴巴为例,TCIF & IDMAPPING,淘宝消费者信息工厂和用户识别,打通了阿里集团所有相关业务域,建立了几千个标签来刻画用户画像。比如:你的真实性别、购物性别、音乐风格偏爱是「R&B」、你的线上购物行为特征是「爱薅羊毛还是财大气粗」等等。如果没有这些用户数据维度,标签的建立无法做到大而全,也就无法提升用户画像的精准度。 2)多个大数据场景。 同样,数据服务支撑了阿里妈妈、淘宝、天猫、支付宝等多个业务板块的场景,每天都有上亿的调用次数。通过业务效果反馈,进而不断优化调整数据和模型。现在许多企业想要建设数据中台,却发现没有实际的数据应用场景,无法进行切入。 三、数栖平台,就是数澜的数据中台吗? 正如上文提到的,数据中台是各个企业独有的一种战略选择和组织形式,市面上不可能存在数据中台这样的一个产品。 数澜 CEO 风剑在面对记者采访时曾说: 「在我看来,但凡说销售数据中台产品的人,都是在忽悠。我对数据中台的定义是,数据中台绝对是不可复制的;数据中台是企业管理与运营的一套机制,一套让数据可用起来的可持续的机制。 企业需要一套自己的数据资产管理体系与框架,但不可能存在一个所有企业和组织都适用的、通用的数据中台框架。 」 也因此,数澜的数栖平台,仅仅是一个一站式数据应用基础设施,它是在帮助用户搭建自有数据中台过程中,必不可少的一套工具。通过数栖平台,企业可以让自己业务沉淀多年的数据融汇打通,同时,通过开发平台可以帮助企业内部的开发同学快速的进行对数据的处理,将数据成体系有逻辑的被管理起来,并且开发出更多可被业务使用的标签,向上层提供更多的弹药。 数栖平台架构 而仅仅通过数栖平台,企业很难快速的构建起完整的数据中台。在这个过程中,就需要客户对于自身业务及数据的全流程完整梳理,同时也需要数澜将沉淀多年的中台建设方法论融入到帮助客户共同建设数据中台的每一个细节中。只有科学的工具+先进的方法论,二者结合在一起,才能够实现最终的目标。 扫 它 ↓ 深 入 了 解 数 栖 呀!
来源:OSCHINA
发布时间:2019-11-29 10:15:00
描述:今天早上到公司,发现测试集群中的一台机器的磁盘使用率100%,而其他节点的磁盘使用率只有30%左右,检查磁盘的使用情况后,使用率饱满的机器上,90%的数据都是/dfs目录下的,因为只是昨天项目测试刚跑进来的数据,删是不可能的,所以只能想办法对集群中的数据进行平衡。 引起这种情况的方式很多: 1. 添加新的Datanode节点 2. 人为干预将数据的副本数降低或者增加 我们都知道当HDFS出现数据不平衡的时候,就会造成MapReduce或Spark等应用程序无法很好的利用本地计算的优势,而且Datanode节点之间也没有更好的网络带宽利用率,某些Datanode节点的磁盘无法使用等等问题。 在Hadoop中,提供了hdfs balancer程序用来保证HDFS的数据平衡,我们先看一下这个程序的参数: hdfs balancer --help Usage: hdfs balancer [-policy ] the balancing policy: datanode or blockpool [-threshold ] Percentage of disk capacity [-exclude [-f | ]] Excludes the specified datanodes. [-include [-f | ]] Includes only the specified datanodes. [-idleiterations ] Number of consecutive idle iterations (-1 for Infinite) before exit. [-runDuringUpgrade] Whether to run the balancer during an ongoing HDFS upgrade.This is usually not desired since it will not affect used space on over-utilized machines. Generic options supported are -conf specify an application configuration file -D use value for given property -fs specify a namenode -jt specify a ResourceManager -files specify comma separated files to be copied to the map reduce cluster -libjars specify comma separated jar files to include in the classpath. -archives specify comma separated archives to be unarchived on the compute machines. The general command line syntax is bin/hadoop command [genericOptions] [commandOptions] 选项的含义根据描述应该很好理解,其中-threshold参数是用来判断数据平衡的依据,值范围为0-100。默认值为10,表示HDFS达到平衡状态的磁盘使用率偏差值为10%,如果机器与机器之间磁盘使用率偏差小于10%,那么我们就认为HDFS集群已经达到了平衡的状态。 我们可以从CDH平台的CM上看到该参数是默认值和含义: 该参数具体含义为:判断集群是否平衡的目标参数,每一个 Datanode 存储使用率和集群总存储使用率的差值都应该小于这个阀值,理论上,该参数设置的越小,整个集群就越平衡,但是在线上环境中,Hadoop集群在进行balance时,还在并发的进行数据的写入和删除,所以有可能无法到达设定的平衡参数值。 参数-policy表示的平衡策略,默认为DataNode。 该参数的具体含义为:应用于重新平衡 HDFS 存储的策略。默认DataNode策略平衡了 DataNode 级别的存储。这类似于之前发行版的平衡策略。BlockPool 策略平衡了块池级别和 DataNode 级别的存储。BlockPool 策略仅适用于 Federated HDFS 服务。 参数-exclude和-include是用来选择balancer时,可以指定哪几个DataNode之间重分布,也可以从HDFS集群中排除哪几个节点不需要重分布,比如: hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO 除了上面的参数会影响HDFS数据重分布,还有如下的参数也会影响重分布, dfs.datanode.balance.bandwidthPerSec, dfs.balance.bandwidthPerSec 该默认设置:1048576(1M/s),个人建议如果机器的网卡和 交换机 的带宽有限,可以适当降低该速度,一般默认就可以了。 该参数含义如下: HDFS平衡器检测集群中使用过度或者使用不足的DataNode,并在这些DataNode之间移动数据块来保证负载均衡。如果不对平衡操作进行带宽限制,那么它会很快就会抢占所有的网络资源,不会为Mapreduce作业或者数据输入预留资源。参数dfs.balance.bandwidthPerSec定义了每个DataNode平衡操作所允许的最大使用带宽,这个值的单位是byte,这是很不直观的,因为网络带宽一般都是用bit来描述的。因此,在设置的时候,要先计算好。DataNode使用这个参数来控制网络带宽的使用,但不幸的是,这个参数在守护进程启动的时候就读入,导致管理员没办法在平衡运行时来修改这个值,如果需要调整就要重启集群。 下面简单介绍一下balancer的原理: Rebalance程序作为一个独立的进程与NameNode进行分开执行。 步骤1: Rebalance Server从NameNode中获取所有的DataNode情况:每一个DataNode磁盘使用情况。 步骤2: Rebalance Server计算哪些机器需要将数据移动,哪些机器可以接受移动的数据。并且从NameNode中获取需要移动的数据分布情况。 步骤3: Rebalance Server计算出来可以将哪一台机器的block移动到另一台机器中去。 步骤4,5,6: 需要移动block的机器将数据移动的目的机器上去,同时删除自己机器上的block数据。 步骤7: Rebalance Server获取到本次数据移动的执行结果,并继续执行这个过程,一直没有数据可以移动或者HDFS集群以及达到了平衡的标准为止。 实战: 找一个比较空闲的的Datanode执行,建议不要在NameNode执行: hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO 执行过程如下(部分),大家可以对照上面的流程看日志,可能会更清楚一点: 16/07/11 09:35:12 INFO balancer.Balancer: namenodes = [hdfs://CDHB:8022] 16/07/11 09:35:12 INFO balancer.Balancer: parameters = Balancer.Parameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, number of nodes to be excluded = 0, number of nodes to be included = 5, run during upgrade = false] Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.130:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.131:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.135:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.138:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.139:50010 16/07/11 09:35:14 INFO balancer.Balancer: 2 over-utilized: [192.168.1.130:50010:DISK, 192.168.1.135:50010:DISK] 16/07/11 09:35:14 INFO balancer.Balancer: 1 underutilized: [192.168.1.131:50010:DISK] 16/07/11 09:35:14 INFO balancer.Balancer: Need to move 203.48 GB to make the cluster balanced. 16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK 16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK 16/07/11 09:35:14 INFO balancer.Balancer: Will move 20 GB in this iteration 16/07/11 09:36:00 INFO balancer.Dispatcher: Successfully moved blk_1074048042_307309 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:07 INFO balancer.Dispatcher: Successfully moved blk_1074049886_309153 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 16/07/11 09:36:09 INFO balancer.Dispatcher: Successfully moved blk_1074048046_307313 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:10 INFO balancer.Dispatcher: Successfully moved blk_1074049900_309167 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 16/07/11 09:36:16 INFO balancer.Dispatcher: Successfully moved blk_1074048061_307328 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:17 INFO balancer.Dispatcher: Successfully moved blk_1074049877_309144 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 如果你使用的是CDH集成平台,也可以通过CM来执行数据重分布: 步骤1:先选择HDFS 组件 的页面,如下: 步骤2:找到页面右侧的操作选择,从下拉框中选择数据“重新平衡”选项 步骤3:确定“重新平衡”就开始安装默认的设置规则重新分布DataNode的Block数据了,可以用CM的日志中查看具体的执行过程。 参考博客: https://www.2cto.com/net/201607/525222.html
来源:OSCHINA
发布时间:2019-11-29 09:28:00
经过一段时间的演化,spark-binlog,delta-plus慢慢进入正轨。spark-binlog可以将MySQL binlog作为标准的Spark数据源来使用,目前支持insert/update/delete 三种事件的捕捉。 delta-plus则是对Delta Lake的一个增强库,譬如在Delta Plus里实现了将binlog replay进Detla表,从而保证Delta表和数据库表接近实时同步。除此之外,detla-plus还集成了譬如布隆过滤器等来尽快数据更新更新速度。更多特性可参考我写的专栏。 数据湖Delta Lake 深入解析 ​ zhuanlan.zhihu.com 图标 有了这两个库,加上Spark,我们就能通过两行代码完成库表的同步。 以前如果要做数据增量同步,大概需要这么个流程: 问题很明显,Pipeline长,涉及到技术多,中间转存其实也挺麻烦的,难做到实时。我们希望可以更简单些,比如最好是这样: 然后我可能只要写如下代码就可以搞定: val spark: SparkSession = ??? val df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host","127.0.0.1"). option("port","3306"). option("userName","xxxxx"). option("password","xxxxx"). option("databaseNamePattern","mlsql_console"). option("tableNamePattern","script_file"). optioin("binlogIndex","4"). optioin("binlogFileOffset","4"). load() df.writeStream. format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource"). option(" path ","/tmp/sync/tables"). option("mode","Append"). option("idCols","id"). option("duration","5"). option("syncType","binlog"). checkpointLocation("/tmp/cpl-binlog2") .mode(OutputMode.Append).save("{db}/{table}") 读和写,非常简单。读你需要提供MySQL binlog信息,写的时候指定主键,以及表的存储路径。 如果使用MLSQL则更简单,下面是一个完整的流式同步脚本: set streamName="binlog"; load binlog.`` where host="127.0.0.1" and port="3306" and userName="xxxx" and password="xxxxxx" and bingLogNamePrefix="mysql-bin" and binlogIndex="4" and binlogFileOffset="4" and databaseNamePattern="mlsql_console" and tableNamePattern="script_file" as table1; save append table1 as rate. mysql_{db}.{table} options mode="Append" and idCols="id" and duration="5" and syncType="binlog" and checkpointLocation="/tmp/cpl-binlog2"; 因为是增量同步,所以第一次需要先全量同步一次,用MLSQL也很简单: connect jdbc where url="jdbc: mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false " and driver="com.mysql.jdbc.Driver" and user="xxxxx" and password="xxxx" as db_cool; load jdbc. db_cool.script_file as script_file; save overwrite script_file as delta. mysql_mlsql_console.script_file ; load delta. mysql_mlsql_console.script_file as output; 如果你使用了Console则可在编辑器里直接运行: 如果你安装了binlog2delta插件, 则可享受向导便利:
来源:OSCHINA
发布时间:2019-11-28 21:45:00
软件发展到今,企业业务系统日趋复杂,开发一个业务系统需要掌握和关注的知识点越来越多。除实现业务逻辑本身,还需考虑很多非业务的基础技术系统:如分布式cache和队列、基础服务能力集成、容量规划、弹性伸缩等。这种情况下,研发门槛逐渐上升,效率逐渐下降。企业很难做到低成本创新、试错和快速扩展业务。 阿里云Serverless应用引擎(简称SAE)产品的出现,很好地解决了这类问题。帮助 PaaS 层用户免运维IaaS,按需使用,按量计费,提供了一系列通用能力,实现低门槛微服务/Web/多语言应用上云,有效解决成本及效率问题。 免运维、省成本是所有Serverless产品的核心优势之一,SAE除了免运维底层IaaS外,还能让用户免部署和运维微服务注册中心等组件,提供生产级别稳定可靠的微服务托管能力;免部署和运维K8s集群,零容器基础的用户也能拥抱K8s带来的技术红利。 很多企业在云上都会部署多套环境,存在很大的闲置浪费。使用SAE的“一键启停开发测试环境”,按需释放闲置资源,节省成本,需要使用时一键秒级拉起。后续SAE考虑基于K8s强大的编排能力,编排应用所需的DB、应用和应用的依赖,一键初始化拉起一套全新环境,以及多环境的克隆复制等。 云时代下弹性已成为新常态,很多业务场景无法提前预知,如天猫双11、突发事件导致社交网站瞬时过载。和传统弹性方案相比,SAE在成本和效率上都能做到极致。基于监控触发按需弹,不会出现资源浪费/不足,在效率上免去ECS扩容和ECS启动的时间,能做到秒级弹性。 SAE三个主要指标数据:端到端启动时长20s,满足突发场景快速扩容的需要。支持0.5core的最小规格,进一步降低用户使用成本。部署一套日常环境成本节省47%~57%。 据Serverless应用引擎(SAE)产品经理黛忻介绍,SAE继续探索弹性效率和用户成本的优化方案,继续将一些基础技术归纳抽象下沉到平台,让创新业务成为企业的唯一关注点。 据悉,阿里云是国内率先提供了面向应用的Serverless产品的云计算公司。截止目前,已有上百家企业通过 SAE 构建应用,实现业务的快速交付和IT成本优化。 原文链接 本文为云栖社区原创内容,未经允许不得转载。
来源:OSCHINA
发布时间:2019-11-28 17:30:00
流媒体必定是5G市场上必不可少的一把利器,在云服务终端之下,流媒体的展现形式是多样化的,我们再4G的的时代已经感受到了他无穷的魅力,我们如何看云服务的市场呢,云服务市场下的流媒体的未来就是本文主要阐述的内容   如果把云端比作一座城镇,先后搬迁进来的居民们是形形色色的:IT企业率先发现了这片沃土开始挖掘建设,随后金融、营销咨询、零售、医疗等等领域的企业也开始纷纷搬迁,不久后看这个城镇景色繁华又生活便利,一些“家大业大”的企业也开始举家迁移,例如工业、制造业、交通等等。   在这一过程中,有一位看似不起眼、却又十分重要居民,就是流媒体。   流媒体,包括但不限于游戏、直播、VR、视频等等大量内容行业。相比体量庞大的工业或制造业,这些领域的产业往往量级更轻,而且这些行业本身大多也是建设在完备的数字化基础之上。这就导致,这些行业能否进行产业升级,很可能需要数据传输技术的支持。而当云计算真的着手对于流媒体领域进行改造时,迸发出的能量,却又往往会超出人们的想象。   很多人认为,近年来云游戏的异军突起崛起源于5G的抬头,更快的数据传输速度,意味着终端与云端间更低的延迟,也意味着可以传输更庞大的数据体量,因此来支持游戏的高质量画面,和对于玩家指令的响应。实际这种说法略显片面,因为游戏发展的过程本身就是云化的过程。从PC游戏再到下载客户端就能进行网游,再到“贪玩蓝月”式的页游,就连云游戏这一概念的首次提出都在2009年。降低硬件要求,随时随地进行游戏,本身就是游戏发展的一条路径。但由于受网络技术的限制,游戏在云化的过程中不得不割舍掉诸如画质、即时反应等的优势。   但随着近年以来虚拟机技术和容器技术的提升,云计算厂商对于GPU虚拟化的能力不断提高,可以给予云游戏更优质的计算资源,同时云计算厂商的网络覆盖能力加上不断增强的边缘计算能力,可以很好地解决网络延迟问题。再结合音视频编解码技术、客户端与服务器端的同步算法、网络传输优化等等方面的提升,今天的云游戏平台已经可以运行像《巫师》系列、《只狼》等原本对终端硬件有着很高要求的大作。   虽然今天的云游戏运行起来还常常会出现画面质量不稳定、延迟卡顿等问题,但是不得不说,随着云计算厂商能力的提升,游戏产业正在发生变化。同样,直播、视频、VR产业也以同样的方式受到云计算发展的影响。视频内容的分发、直播中的实时安全审核等等,实际也都是随着云计算技术能力而得到的提升。
来源:OSCHINA
发布时间:2019-11-28 13:39:00
> Github原文链接 1 OOP-Klass(Ordinary Object Pointer)模型 OOP-Klass模型用来描述class的属性和行为 设计为OOP和Klass两部分是因为不希望每个对象都有一个C ++ vtbl指针, 因此,普通的oops没有任何虚拟功能。 相反,他们将所有“虚拟”函数转发到它们的klass,它具有vtbl并根据对象的实际类型执行C ++调度。 1.1 OOP oopDesc是对象类的最高父类。 {name}Desc类描述了Java对象的格式,可从C++访问这些字段 路径: /hotspot/share/oops/oop.hpp 完整的类层次结构,请阅读 src/hotspot/share/oops/oopsHierarchy.hpp OOP体系 1.2 Klass Klass体系 Klass对象提供 语言级别的类对象(方法字典等) 为对象提供虚拟机调度行为 class Klass : public Metadata { friend class VMStructs; friend class JVMCIVMStructs; protected: // 如果添加指向任何元数据对象的新字段,则必须将此字段添加到Klass :: metaspace_pointers_do() // 注意:在klass结构的起始处将常用字段放在一起,以获得更好的缓存行为(虽然可能不会有太大的区别,但可以肯定不会造成伤害) enum { _primary_super_limit = 8 }; // The "layout helper" is a combined descriptor of object layout. // For klasses which are neither instance nor array, the value is zero. // // For instances, layout helper is a positive number, the instance size. // This size is already passed through align_object_size and scaled to bytes. // The low order bit is set if instances of this class cannot be // allocated using the fastpath. // // For arrays, layout helper is a negative number, containing four // distinct bytes, as follows: // MSB:[tag, hsz, ebt, log2(esz)]:LSB // where: // tag is 0x80 if the elements are oops, 0xC0 if non-oops // hsz is array header size in bytes (i.e., offset of first element) // ebt is the BasicType of the elements // esz is the element size in bytes // This packed word is arranged so as to be quickly unpacked by the // various fast paths that use the various subfields. // // The esz bits can be used directly by a SLL instruction, without masking. // // Note that the array-kind tag looks like 0x00 for instance klasses, // since their length in bytes is always less than 24Mb. // // Final note: This comes first, immediately after C++ vtable, // because it is frequently queried. jint _layout_helper; // Klass identifier used to implement devirtualized oop closure dispatching. const KlassID _id; // The fields _super_check_offset, _secondary_super_cache, _secondary_supers // and _primary_supers all help make fast subtype checks. See big discussion // in doc/server_compiler/checktype.txt // // Where to look to observe a supertype (it is &_secondary_super_cache for // secondary supers, else is &_primary_supers[depth()]. juint _super_check_offset; // 类名. Instance classes: java/lang/String, etc. Array classes: [I, // [Ljava/lang/String;, etc. Set to zero for all other kinds of classes. Symbol* _name; // Cache of last observed secondary supertype Klass* _secondary_super_cache; // Array of all secondary supertypes Array* _secondary_supers; // Ordered list of all primary supertypes Klass* _primary_supers[_primary_super_limit]; // java/lang/Class instance mirroring this class OopHandle _java_mirror; // Superclass Klass* _super; // First subclass (NULL if none); _subklass->next_sibling() is next one Klass* volatile _subklass; // Sibling link (or NULL); links all subklasses of a klass Klass* volatile _next_sibling; // All klasses loaded by a class loader are chained through these links Klass* _next_link; // 用于加载此类的VM对类加载器的表示。 //提供访问相应的java.lang.ClassLoader实例 ClassLoaderData* _class_loader_data; jint _modifier_flags; // 处理的访问标志,由Class.getModifiers使用 AccessFlags _access_flags; // 访问标志。 类/接口的区别就存储在这里 JFR_ONLY(DEFINE_TRACE_ID_FIELD;) // 偏向锁定实现和统计 // 64位块优先,以避免碎片 jlong _last_biased_lock_bulk_revocation_time; markOop _prototype_header; // Used when biased locking is both enabled and disabled for this type jint _biased_lock_revocation_count; // 虚表长度 int _vtable_len; ... > 本文由博客一文多发平台 OpenWrite 发布!
来源:OSCHINA
发布时间:2019-11-28 01:25:00
跨域支持 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import org.springframework.web.filter.CorsFilter; @Configuration public class CorsConfig { /** * 跨域支持 * * @return / @Bean public CorsFilter corsFilter() { final UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(); final CorsConfiguration config = new CorsConfiguration(); config.setAllowCredentials(true); // 允许cookies跨域 config.addAllowedOrigin(" ");// #允许向该服务器提交请求的URI, 表示全部允许 config.addAllowedHeader(" ");// #允许访问的头信息, 表示全部 config.setMaxAge(18000L);// 预检请求的缓存时间(秒),即在这个时间段里,对于相同的跨域请求不会再预检了 config.addAllowedMethod(" ");// 允许提交请求的方法,*表示全部允许 source.registerCorsConfiguration("/**", config); return new CorsFilter(source); } } RestTemplate高并发下异常与配置 import org.apache.http.client.HttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.RestTemplate; /** * RestTemplate高并发下异常与配置说明 1、java.util.ConcurrentModificationException 2、java.net.SocketTimeoutException Connection timed out */ @Configuration public class RestTemplateConfig { @Bean @LoadBalanced public RestTemplate restTemplate() { // 长连接 PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(); // 总连接数 pollingConnectionManager.setMaxTotal(1000); // 同路由的并发数 pollingConnectionManager.setDefaultMaxPerRoute(1000); HttpClientBuilder httpClientBuilder = HttpClients.custom(); httpClientBuilder.setConnectionManager(pollingConnectionManager); // 重试次数,默认是3次,没有开启 // httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)); HttpClient httpClient = httpClientBuilder.build(); HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory( httpClient); // 连接超时 ms clientHttpRequestFactory.setConnectTimeout(12000); // 数据读取超时时间,即SocketTimeout ms clientHttpRequestFactory.setReadTimeout(12000); // 连接不够用的等待时间,不宜过长,必须设置,比如连接不够用时,时间过长将是灾难性的 clientHttpRequestFactory.setConnectionRequestTimeout(200); // 缓冲请求数据,默认值是true。通过POST或者PUT大量发送数据时,建议将此属性更改为false,以免耗尽内存。 // clientHttpRequestFactory.setBufferRequestBody(false); RestTemplate restTemplate = new RestTemplate(); restTemplate.setRequestFactory(clientHttpRequestFactory); restTemplate.setErrorHandler(new DefaultResponseErrorHandler()); return restTemplate; } } json数据Long为String /** * @description: 返回json是转换long为string @create: 2019-08-02 17:49 **/ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import org.springframework.context.annotation.Configuration; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import java.util.List; @EnableWebMvc @Configuration public class WebDataConvertConfig implements WebMvcConfigurer { @Override public void configureMessageConverters(List> converters) { MappingJackson2HttpMessageConverter jackson2HttpMessageConverter = new MappingJackson2HttpMessageConverter(); ObjectMapper objectMapper = new ObjectMapper(); /** * 序列换成json时,将所有的long变成string * 因为js中得数字类型不能包含所有的java long值 */ SimpleModule simpleModule = new SimpleModule(); simpleModule.addSerializer(Long.class, ToStringSerializer.instance); simpleModule.addSerializer(Long.TYPE, ToStringSerializer.instance); objectMapper.registerModule(simpleModule); jackson2HttpMessageConverter.setObjectMapper(objectMapper); converters.add(jackson2HttpMessageConverter); } } > 本文由作者pm1024:JAVA实验手册 发布,交流:583284584!
来源:OSCHINA
发布时间:2019-11-27 16:48:00
作者 | 易立 阿里云资深技术专家 containerd 是一个开源的行业标准容器运行时,关注于简单、稳定和可移植,同时支持 Linux 和 Windows。 2016 年 12 月 14 日,Docker 公司宣布将 Docker Engine 的核心组件 containerd 捐赠到一个新的开源社区独立发展和运营。阿里云、AWS、 Google、IBM 和 Microsoft 作为初始成员,共同建设 containerd 社区; 2017 年 3 月,Docker 将 containerd 捐献给 CNCF(云原生计算基金会)。containerd 得到了快速的发展和广泛的支持; Docker 引擎已经将 containerd 作为容器生命周期管理的基础,Kubernetes 也在 2018 年 5 月,正式支持 containerd 作为容器运行时管理器; 2019 年 2 月,CNCF 宣布 containerd 毕业,成为生产可用的项目。 containerd 从 1.1 版本开始就已经内置了 Container Runtime Interface (CRI) 支持,进一步简化了对 Kubernetes 的支持。其架构图如下: 在 Kubernetes 场景下,containerd 与完整 Docker Engine 相比,具有更少的资源占用和更快的启动速度。 图片来源: containerd 红帽主导的 cri-o 是与 containerd 竞争的容器运行时管理项目。containerd 与 cri-o 项目相比,在性能上具备优势,在社区支持上也更加广泛。 图片来源: ebay 的分享 更重要的是 containerd 提供了灵活的扩展机制,支持各种符合 OCI(Open Container Initiative)的容器运行时实现,比如 runc 容器(也是熟知的 Docker 容器)、KataContainer、gVisor 和 Firecraker 等安全沙箱容器。 在 Kubernetes 环境中,可以用不同的 API 和命令行工具来管理容器 / Pod、镜像等概念。为了便于大家理解,我们可以用下图说明如何利用不同层次的 API 和 CLI 管理容器生命周期管理。 Kubectl:是集群层面的命令行工具,支持 Kubernetes 的基本概念 crictl :是针对节点上 CRI 的命令行工具 ctr :是针对 containerd 的命令行工具 体验 Minikube 是体验 containerd 作为 Kubernetes 容器运行时的最简单方式,我们下面将其作为 Kubernetes 容器运行时,并支持 runc 和 gvisor 两种不同的实现。 早期由于网络访问原因,很多朋友无法直接使用官方 Minikube 进行实验。在最新的 Minikube 1.5 版本中,已经提供了完善的配置化方式,可以帮助大家利用阿里云的镜像地址来获取所需 Docker 镜像和配置,同时支持 Docker/Containerd 等不同容器运行时。我们 创建 一个 Minikube 虚拟机环境,注意需要指明 --container-runtime=containerd 参数设置 containerd 作为容器运行时。同时 registry-mirror 也要替换成自己的阿里云镜像加速地址。 $ minikube start --image-mirror-country cn \ --iso-url=https://kubernetes.oss-cn-hangzhou.aliyuncs.com/minikube/iso/minikube-v1.5.0.iso \ --registry-mirror=https://XXX.mirror.aliyuncs.com \ --container-runtime=containerd Darwin 10.14.6 上的 minikube v1.5.0 Automatically selected the 'hyperkit' driver (alternates: [virtualbox]) ️ 您所在位置的已知存储库都无法访问。正在将 registry.cn-hangzhou.aliyuncs.com/google_containers 用作后备存储库。 正在创建 hyperkit 虚拟机(CPUs=2,Memory=2000MB, Disk=20000MB)... ️ VM is unable to connect to the selected image repository: command failed: curl -sS https://k8s.gcr.io/ stdout: stderr: curl: (7) Failed to connect to k8s.gcr.io port 443: Connection timed out : Process exited with status 7 正在 containerd 1.2.8 中准备 Kubernetes v1.16.2… 拉取镜像 ... 正在启动 Kubernetes ... ⌛ Waiting for: apiserver etcd scheduler controller 完成!kubectl 已经配置至 "minikube" $ minikube dashboard Verifying dashboard health ... Launching proxy ... Verifying proxy health ... Opening http://127.0.0.1:54438/api/v1/namespaces/kubernetes-dashboard/services/http:kubernetes-dashboard:/proxy/ in your default browser... 部署测试应用 我们通过 Pod 部署一个 nginx 应用: $ cat nginx.yaml apiVersion: v1 kind: Pod metadata: name: nginx spec: containers: - name: nginx image: nginx $ kubectl apply -f nginx.yaml pod/nginx created $ kubectl exec nginx -- uname -a Linux nginx 4.19.76 #1 SMP Fri Oct 25 16:07:41 PDT 2019 x86_64 GNU/Linux 然后,我们开启 minikube 对 gvisor 支持: $ minikube addons enable gvisor gvisor was successfully enabled $ kubectl get pod,runtimeclass gvisor -n kube-system NAME READY STATUS RESTARTS AGE pod/gvisor 1/1 Running 0 60m NAME CREATED AT runtimeclass.node.k8s.io/gvisor 2019-10-27T01:40:45Z $ kubectl get runtimeClass NAME CREATED AT gvisor 2019-10-27T01:40:45Z 当 gvisor pod 进入 Running 状态的时候,可以部署 gvisor 测试应用。 我们可以看到 K8s 集群中已经注册了一个 gvisor 的“runtimeClassName”。之后,开发者可以通过在 Pod 声明中的 “runtimeClassName” 来选择不同类型的容器运行时实现。比如,如下我们创建一个运行在 gvisor 沙箱容器中的 nginx 应用。 $ cat nginx-untrusted.yaml apiVersion: v1 kind: Pod metadata: name: nginx-untrusted spec: runtimeClassName: gvisor containers: - name: nginx image: nginx $ kubectl apply -f nginx-untrusted.yaml pod/nginx-untrusted created $ kubectl exec nginx-untrusted -- uname -a Linux nginx-untrusted 4.4 #1 SMP Sun Jan 10 15:06:54 PST 2016 x86_64 GNU/Linux 我们可以清楚地发现:由于基于 runc 的容器与宿主机共享操作系统内核,runc 容器中查看到的 OS 内核版本与 Minikube 宿主机 OS 内核版本相同;而 gvisor 的 runsc 容器采用了独立内核,它和 Minikube 宿主机 OS 内核版本不同。 正是因为每个沙箱容器拥有独立的内核,减小了安全攻击面,具备更好的安全隔离特性。适合隔离不可信的应用,或者多租户场景。注意:gvisor 在 minikube 中,通过 ptrace 对内核调用进行拦截,其性能损耗较大,此外 gvisor 的兼容性还有待增强。 使用 ctl 和 crictl 工具 我们现在可以进入进入 Minikube 虚拟机: $ minikube ssh containerd 支持通过名空间对容器资源进行隔离,查看现有 containerd 名空间: $ sudo ctr namespaces ls NAME LABELS k8s.io # 列出所有容器镜像 $ sudo ctr --namespace=k8s.io images ls ... # 列出所有容器列表 $ sudo ctr --namespace=k8s.io containers ls 在 Kubernetes 环境更加简单的方式是利用 crictl 对 pods 进行操作。 # 查看pod列表 $ sudo crictl pods POD ID CREATED STATE NAME NAMESPACE ATTEMPT 78bd560a70327 3 hours ago Ready nginx-untrusted default 0 94817393744fd 3 hours ago Ready nginx default 0 ... # 查看名称包含nginx的pod的详细信息 $ sudo crictl pods --name nginx -v ID: 78bd560a70327f14077c441aa40da7e7ad52835100795a0fa9e5668f41760288 Name: nginx-untrusted UID: dda218b1-d72e-4028-909d-55674fd99ea0 Namespace: default Status: Ready Created: 2019-10-27 02:40:02.660884453 +0000 UTC Labels: io.kubernetes.pod.name -> nginx-untrusted io.kubernetes.pod.namespace -> default io.kubernetes.pod.uid -> dda218b1-d72e-4028-909d-55674fd99ea0 Annotations: kubectl.kubernetes.io/last-applied-configuration -> {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"nginx-untrusted","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"nginx"}],"runtimeClassName":"gvisor"}} kubernetes.io/config.seen -> 2019-10-27T02:40:00.675588392Z kubernetes.io/config.source -> api ID: 94817393744fd18b72212a00132a61c6cc08e031afe7b5295edafd3518032f9f Name: nginx UID: bfcf51de-c921-4a9a-a60a-09faab1906c4 Namespace: default Status: Ready Created: 2019-10-27 02:38:19.724289298 +0000 UTC Labels: io.kubernetes.pod.name -> nginx io.kubernetes.pod.namespace -> default io.kubernetes.pod.uid -> bfcf51de-c921-4a9a-a60a-09faab1906c4 Annotations: kubectl.kubernetes.io/last-applied-configuration -> {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"nginx","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"nginx"}]}} kubernetes.io/config.seen -> 2019-10-27T02:38:18.206096389Z kubernetes.io/config.source -> api containerd 与 Docker 的关系 很多同学都关心 containerd 与 Docker 的关系,以及是否 containerd 可以取代 Docker? containerd 已经成为容器运行时的主流实现,也得到了 Docker 社区和 Kubernetes 社区的大力支持。Docker Engine 底层的容器生命周期管理也是基于 containerd 实现。 但是 Docker Engine 包含了更多的开发者工具链,比如镜像构建。也包含了 Docker 自己的日志、存储、网络、Swarm 编排等能力。此外,绝大多数容器生态厂商,如安全、监控、开发等对 Docker Engine 的支持比较完善,对 containerd 的支持也在逐渐补齐。 所以在 Kubernetes 运行时环境,对安全和效率和定制化更加关注的用户可以选择 containerd 作为容器运行时环境;对于大多数开发者,继续使用 Docker Engine 作为容器运行时也是一个不错的选择。 阿里云容器服务对 containerd 的支持 在阿里云 Kubernetes 服务 ACK,我们已经采用 containerd 作为容器运行时管理,来支撑安全沙箱容器和 runc 容器的混合部署。在现有产品中,我们和阿里云操作系统团队、蚂蚁金服一起支持了基于轻量虚拟化的 runV 沙箱容器,4Q 也将和操作系统团队、安全团队合作发布基于 Intel SGX 的可信加密沙箱容器。 具体产品信息可以参考 该文档 。 Serverless Kubernetes(ASK)中,我们也利用 containerd 灵活的插件机制定制和剪裁了面向 nodeless 环境的容器运行时实现。 原文链接 containerd 与安全沙箱的 Kubernetes 初体验 原文链接 本文为云栖社区原创内容,未经允许不得转载。
来源:OSCHINA
发布时间:2019-11-27 15:58:00
前言 Apache Dubbo 是由阿里开源的一个RPC框架,除了基本的 RPC 功能以外,还提供了一整套的服务治理相关功能。目前它已经是 Apache 基金会下的顶级项目。 而 dubbo-go 则是 Dubbo 的 Go 语言实现。 最近在 dubbo-go 的 todo list 上发现,它还没有实现 TPS Limit 的模块,于是就抽空实现了这个部分。 TPS limit 实际上就是限流,比如说限制一分钟内某个接口只能访问 200 次,超过这个次数,则会被拒绝服务。在 Dubbo 的 Java 版本上,只有一个实现,就是 DefaultTPSLimiter 。 DefaultTPSLimiter 是在服务级别上进行限流。虽然 Dubbo 的官方文档里面声称可以在 method 级别上进行限流,但是我看了一下它的源码,实际上这个是做不到的。当然,如果自己通过实现 Filter 接口来实现 method 级别的限流,那么自然是可以的——这样暴露了 Dubbo Java 版本实现的另外一个问题,就是 Dubbo 的 TpsLimitFilter 实现,是不允许接入自己 TpsLimiter 的实现的。这从它的源码也可以看出来: 它直接写死了 TpsLimiter 的实现。 这个实现的目前只是合并到了 develop 上,等下次发布正式版本的时候才会发布出来。 GitHub: https://github.com/apache/dubbo-go/pull/237 设计思路 于是我大概参考了一下 Dubbo 已有的实现,做了一点改进。 Dubbo 里面的核心抽象是 TpsLimiter 接口。 TpsLimitFilter 只是简单调用了一下这个接口的方法而已: 这个抽象是很棒的。但是还欠缺了一些抽象。 实际上,一个 TPS Limit 就要解决三个问题: 对什么东西进行 limit 。比如说,对服务进行限流,或者对某个方法进行限流,或者对IP进行限流,或者对用户进行限流; 如何判断已经 over limitation 。这是从算法层面上考虑,即用什么算法来判断某个调用进来的时候,已经超过配置的上限了; 被拒绝之后该如何处理。如果一个请求被断定为已经 over limititation 了,那么该怎么处理; 所以在 TpsLimiter 接口的基础上,我再加了两个抽象: TpsLimiter TpsLimitStrategy RejectedExecutionHandler TpsLimiter 对应到 Java 的 TpsLimiter ,两者是差不多。在我的设想里面,它既是顶级入口,还需要承担解决第一个问题的职责。 而 TpsLimitStrategy 则是第二个问题的抽象的接口定义。它代表的是纯粹的算法。该接口完全没有参数,实际上,所有的实现需要维护自身的状态——对于大部分实现而言,它大概只需要获取一下系统时间戳,所以不需要参数。 最后一个接口 RejectedExecutionHandler 代表的是拒绝策略。在 TpsLimitFilter 里面,如果它调用 TpsLimiter 的实现,发现该请求被拒绝,那么就会使用该接口的实现来获取一个返回值,返回给客户端。 实现 其实实现没太多好谈的。不过有一些微妙的地方,我虽然在代码里面注释了,但是我觉得在这里再多说一点也是可以的。 首先提及的就是拒绝策略 RejectedExecutionHandler ,我就是提供了一种实现,就是随便 log 了一下,什么都没做。因为这个东西是强业务相关的,我也不能提供更加多的通用的实现。 方法与服务双重支持的 TpsLimiter TpsLimiter 我只有一个实现,那就是 MethodServiceTpsLimiterImpl 。它就是根据配置,如果方法级别配置了参数,那么会在方法级别上进行限流。否则,如果在服务级别( ServiceKey )上有配置,那么会在服务级别进行限流。 举个最复杂的例子:服务 A 限制 100 ,有四个方法,方法 M1 配置限制 40 ,方法 M2 和方法 M3 无配置,方法M4配置限制 -1 :那么方法 M1 会单独限流 40 ; M2 和 M3 合并统计,被限制在 100 ;方法 M4 则会被忽略。 用户可以配置具体的算法。比如说使用我接下来说的,我已经实现的三种实现。 FixedWindow 和 ThreadSafeFixedWindow FixedWindow 直接对应到 Java 的 DefaultTpsLimiter 。它采用的是 fixed-window 算法:比如说配置了一分钟内只能调用 100 次。假如从 00:00 开始计时,那么 00:00-01:00 内,只能调用 100 次。只有到达 01:00 ,才会开启新的窗口 01:00-02:00 。如图: Fixed-Window图示 Fixed-Window实现 这里有一个很有意思的地方。就是这个实现,是一个几乎线程安全但是其实并不是线程安全的实现。 在所有的实现里面,它是最为简单,而且性能最高的。我在衡量了一番之后,还是没把它做成线程安全的。事实上, Java 版本的也不是线程安全的。 它只会在多个线程通过第 67 行的检测之后,才会出现并发问题,这个时候就不是线程安全了。但是在最后的 return 语句中,那一整个是线程安全的。它因为不断计数往上加,所以多个线程同时跑到这里,其实不会有什么问题。 现在我要揭露一个最为奇诡的特性了:并发越高,那么这个 race condition 就越严重,也就是说越不安全。 但是从实际使用角度而言,有极端 TPS 的还是比较少的。对于那些 TPS 只有几百每秒的,是没什么问题的。 为了保持和 Dubbo 一致的特性,我把它作为默认的实现。 此外,我还为它搞了一个线程安全版本,也就是 ThreadSafeFixedWindowTpsLimitStrategyImpl ,只是简单的用 sync 封装了一下,可以看做是一个 Decorator 模式的应用。 如果强求线程安全,可以考虑使用这个。 SlidingWindow 这是我比较喜欢的实现。它跟网络协议里面的滑动窗口算法在理念上是比较接近的。 具体来说,假如我设置的同样是一分钟 1000 次,它统计的永远是从当前时间点往前回溯一分钟内,已经被调用了多少次。如果这一分钟内,调用次数没超过 1000 ,请求会被处理,如果已经超过,那么就会拒绝。 我再来描述一下, SldingWindow 和 FixedWindow 两种算法的区别。这两者很多人会搞混。假如当前的时间戳是 00:00 ,两个算法同时收到了第一个请求,开启第一个时间窗口。 那么 FixedWindow 就是 00:00-01:00 是第一个窗口,接下来依次是 01:00-02:00 , 02:00-03:00 , ...。当然假如说 01:00 之后的三十秒内都没有请求,在 01:31 又来了一个请求,那么时间窗口就是 01:31-02:31 。 而 SildingWindow 则没有这种概念。假如在 01:30 收到一个请求,那么 SlidingWindow 统计的则是 00:30-01:30 内有没有达到 1000 次。它永远计算的都是接收到请求的那一刻往前回溯一分钟的请求数量。 如果还是觉得有困难,那么简单来说就是 FixedWindow 往后看一分钟, SlidingWindow 回溯一分钟。 这个说法并不严谨,只是为了方便理解。 在真正写这个实现的时候,我稍微改了一点点: 我用了一个队列来保存每次访问的时间戳。一般的写法,都是请求进来,先把已经不在窗口时间内的时间戳删掉,然后统计剩下的数量,也就是后面的 slow path 的那一堆逻辑。 但是我改了的一点是,我进来直接统计队列里面的数量——也就是请求数量,如果都小于上限,那么我可以直接返回 true ,即 quick path 。 这种改进的核心就是:我只有在检测到当前队列里面有超过上限数量的请求数量时候,才会尝试删除已经不在窗口内的时间戳。 这其实就是,是每个请求过来,我都清理一下队列呢?还是只有队列元素超出数量了,我才清理呢?我选择的是后者。 我认为这是一种改进……当然从本质上来说,整体开销是没有减少的——因为 golang 语言里面 List 的实现,一次多删除几个,和每次删除一个,多删几次,并没有多大的区别。 算法总结 无论是 FixedWindow 算法还是 SlidingWindow 算法都有一个固有的缺陷,就是这个时间窗口难控制。 我们设想一下,假如说我们把时间窗口设置为一分钟,允许 1000 次调用。然而,在前十秒的时候就调用了 1000 次。在后面的五十秒,服务器虽然将所有的请求都处理完了,然是因为窗口还没到新窗口,所以这个时间段过来的请求,全部会被拒绝。 解决的方案就是调小时间窗口,比如调整到一秒。但是时间窗口的缩小,会导致 FixedWindow 算法的 race condition 情况加剧。 那些没有实现的 基于特定业务对象的限流 举例来说,某些特殊业务用的针对用户 ID 进行限流和针对 IP 进行限流,我就没有在 dubbo-go 里面实现。有需要的可以通过实现 TpsLimiter 接口来完成。 全局 TPS limit 这篇文章之前讨论的都是单机限流。如果全局限流,比如说针对某个客户,它购买的服务是每分钟调用 100 次,那么就需要全局限流——虽然这种 case 都不会用 Filter 方案,而是另外做一个 API 接入控制。 比如说,很常用的使用 Redis 进行限流的。针对某个客户,一分钟只能访问 100 次,那我就用客户 ID 做 key , value 设置成 List ,每次调用过来,随便塞一个值进去,设置过期时间一分钟。那么每次统计只需要统计当前 key 的存活的值的数量就可以了。 这种我也没实现,因为好像没什么需求。国内讨论 TPS limit 都是讨论单机 TPS limit 比较多。 这个同样可以通过实现 TpsLimiter 接口来实现。 Leaky Bucket 算法 这个本来可以是 TpsLimitStrategy 的一种实现的。后来我觉得,它其实并没有特别大的优势——虽然号称可以做到均匀,但是其实并做不到真正的均匀。通过调整 SlidingWindow 的窗口大小,是可以接近它宣称的均匀消费的效果的。比如说调整到一秒,那其实就已经很均匀了。而这并不会带来多少额外的开销。 作者信息: 邓明,毕业于南京大学,就职于eBay Payment部门,负责退款业务开发 原文链接 本文为云栖社区原创内容,未经允许不得转载。
来源:OSCHINA
发布时间:2019-11-27 15:49:00
通过Eigen的矩阵运算,将点云进行Z轴旋转45°,再沿X轴平移2.5. http://pointclouds.org/documentation/tutorials/matrix_transform.php#matrix-transform #include #include pcl::PointCloud::Ptr source_cloud(new pcl::PointCloud()); pcl::PointCloud::Ptr transformed_cloud(new pcl::PointCloud()); float theta = M_PI / 4; // The angle of rotation in radians Eigen::Affine3f transform_2 = Eigen::Affine3f::Identity(); // Define a translation of 2.5 meters on the x axis. transform_2.translation() << 2.5, 0.0, 0.0; // The same rotation matrix as before; theta radians around Z axis transform_2.rotate(Eigen::AngleAxisf(theta, Eigen::Vector3f::UnitZ())); // Print the transformation printf("\nMethod #2: using an Affine3f\n"); std::cout << transform_2.matrix() << std::endl; // Executing the transformation // You can either apply transform_1 or transform_2; they are the same pcl::transformPointCloud(*source_cloud, *transformed_cloud, transform_2);
来源:OSCHINA
发布时间:2019-11-27 15:29:00
HTTP Proxy Demo 代码 1、Python #! -- encoding:utf-8 -- import requests # 要访问的目标页面 targetUrl = "http://ip.hahado.cn/ip" # 代理服务器 proxyHost = "ip.hahado.cn" proxyPort = "39010" # 代理隧道验证信息 proxyUser = "username" proxyPass = "password" proxyMeta = "http://%(user)s:%(pass)s@%(host)s:%(port)s" % { "host" : proxyHost, "port" : proxyPort, "user" : proxyUser, "pass" : proxyPass, } proxies = { "http" : proxyMeta, "https" : proxyMeta, } resp = requests.get(targetUrl, proxies=proxies) print resp.status_code print resp.text 2、C Sharp HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://ip.hahado.cn/ip"); WebProxy myProxy = new WebProxy(); Uri newUri = new Uri("http://ip.hahado.cn:39010"); myProxy.Address = newUri; myProxy.Credentials = new NetworkCredential("username", "password"); request.Proxy = myProxy; 3、PHP // 要访问的目标页面 $targetUrl = "http://ip.hahado.cn/ip"; //$targetUrl = "http://ip.hahado.cn/switch-ip"; //$targetUrl = "http://ip.hahado.cn/current-ip"; // 代理服务器 define("PROXY_SERVER", "ip.hahado.cn:39010"); // 隧道身份信息 define("PROXY_USER", "username"); define("PROXY_PASS", "password"); $proxyAuth = base64_encode(PROXY_USER . ":" . PROXY_PASS); $headers = implode("\r\n", [ "Proxy-Authorization: Basic {$proxyAuth}", "Proxy-Switch-Ip: yes", ]); $options = [ "http" => [ "proxy" => $proxyServer, "header" => $headers, "method" => "GET", ], ]; $context = stream_context_create($options); $result = file_get_contents($url, false, $context); var_dump($result); 4、JAVA import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.Authenticator; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.PasswordAuthentication; import java.net.Proxy; import java.net.URL; class ProxyAuthenticator extends Authenticator { private String user, password; public ProxyAuthenticator(String user, String password) { this.user = user; this.password = password; } protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(user, password.toCharArray()); } } /** * 注意:下面代码仅仅实现HTTP请求链接,每一次请求都是无状态保留的,仅仅是这次请求是更换IP的,如果下次请求的IP地址会改变 * 如果是多线程访问的话,只要将下面的代码嵌入到你自己的业务逻辑里面,那么每次都会用新的IP进行访问,如果担心IP有重复, * 自己可以维护IP的使用情况,并做校验。 */ public class ProxyDemo { public static void main(String args[]) throws Exception { // 要访问的目标页面 String targetUrl = "http://ip.hahado.cn/ip"; //String targetUrl = "http://ip.hahado.cn/switch-ip"; //String targetUrl = "http://ip.hahado.cn/current-ip"; // 代理服务器 String proxyServer = "ip.hahado.cn"; int proxyPort = 39010; // 代理隧道验证信息 String proxyUser = "username"; String proxyPass = "password"; try { URL url = new URL(targetUrl); Authenticator.setDefault(new ProxyAuthenticator(proxyUser, proxyPass)); // 创建代理服务器地址对象 InetSocketAddress addr = new InetSocketAddress(proxyServer, proxyPort); // 创建HTTP类型代理对象 Proxy proxy = new Proxy(Proxy.Type.HTTP, addr); // 设置通过代理访问目标页面 HttpURLConnection connection = (HttpURLConnection) url.openConnection(proxy); // 设置IP切换头 connection.setRequestProperty("Proxy-Switch-Ip","yes"); // 解析返回数据 byte[] response = readStream(connection.getInputStream()); System.out.println(new String(response)); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } } /** * 将输入流转换成字符串 * * @param inStream * @return * @throws Exception */ public static byte[] readStream(InputStream inStream) throws Exception { ByteArrayOutputStream outSteam = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int len = -1; while ((len = inStream.read(buffer)) != -1) { outSteam.write(buffer, 0, len); } outSteam.close(); inStream.close(); return outSteam.toByteArray(); } } 5、golang package main import ( "net/url" "net/http" "bytes" "fmt" "io/ioutil" ) const ProxyServer = "ip.hahado.cn:39010" type ProxyAuth struct { License string SecretKey string } func (p ProxyAuth) ProxyClient() http.Client { proxyURL, _ := url.Parse("http://" + p.License + ":" + p.SecretKey + "@" + ProxyServer) return http.Client{Transport: &http.Transport{Proxy:http.ProxyURL(proxyURL)}} } func main() { targetURI := "http://ip.hahaod.cn/ip" //targetURI := "http://ip.hahaod.cn/switch-ip" //targetURI := "http://ip.hahaod.cn/current-ip" // 初始化 proxy http client client := ProxyAuth{License: "username", SecretKey: "password"}.ProxyClient() request, _ := http.NewRequest("GET", targetURI, bytes.NewBuffer([] byte(``))) // 切换IP (只支持 HTTP) request.Header.Set("Proxy-Switch-Ip", "yes") response, err := client.Do(request) if err != nil { panic("failed to connect: " + err.Error()) } else { bodyByte, err := ioutil.ReadAll(response.Body) if err != nil { fmt.Println("读取 Body 时出错", err) return } response.Body.Close() body := string(bodyByte) fmt.Println("Response Status:", response.Status) fmt.Println("Response Header:", response.Header) fmt.Println("Response Body:\n", body) } } 提取代理IP连接: https://v.duoip.cn/customer/signup/?sale=xujinyang1991
来源:OSCHINA
发布时间:2019-11-27 11:52:00
极简教程-Python的容器部署 场景描述:我们使用一个简单的python项目,本项目是中文分词的算法。如何实现Docker安装部署。 第一步: Win10下创建目录文本。选择在D盘下创建docker目录,分别新建三个文件:Dockerfile,app.py,equirements.txt Dockerfile(没有后缀):一个文本文件,包含了一条条的指令(Instruction),每一条指令构建一层,因此每一条指令的内容,就是描述该层应当如何构建。创建镜像必须文件。 # 基于镜像基础 FROM python:3.7 # 设置代码文件夹工作目录 /app WORKDIR /app # 复制当前代码文件到容器中 /app ADD . /app # 安装所需的包 RUN pip install -r requirements.txt # Run app.py when the container launches CMD ["python", "app.py"] app.py:python项目的源代码,这里测试的单个python文件,如果是一个完整项目,可以将整个文件夹拷贝到这里。 # coding:utf8 ​ """ DESC: Python数据预处理之第一个分词程序范例 Author:伏草惟存 Prompt: code in Python3 env """ ​ import jieba ​ str = "道路千万条,安全第一条;行车不规范,亲人两行泪。" print("原句: \n" + str) ​ seg_list = jieba.cut(str) print("分词: \n" + " / ".join(seg_list)) equirements.txt :所需要的插件,以python为例,其获取方法是cmd命令,进入到【D:\docker】目录,执行命令:pip freeze > requirements.txt 第二步:生成镜像。本文采用的windows环境。docker build -t friendlyhello .命令中最后的点不要忘记,这里表示当前目录 第三步:查看镜像是否生成 第四步:运行镜像程序,这里可以看到分词效果
来源:OSCHINA
发布时间:2019-11-27 09:43:00
服务器风扇的作用是加快散热片表面空气的流动速度,以提高散热片和空气的热交换速度。风扇作为风冷散热器的两大重要部件之一,它的性能的好坏往往对服务器散热器效果和使用寿命起着一定的决定性作用。在选购服务器风扇的时候,考虑风扇的基本指标有以下几点:      1、风扇功率      功率越大,风扇风力越强劲,散热效果也就越好。而风扇的功率与风扇的转速又是有直接联系的,也就是说风扇的转速越高,风扇也就越强劲有力。      2、风扇转速      风扇的转速与风扇的功率是密不可分的,转速的大小直接影响到风扇功率的大小。风扇的转速越高,向CPU传送的进风量就越大,CPU获得的冷却效果就会越好。但是一旦风扇的转速超过它的额定值,那么风扇在长时间超负荷运作之下,本身产生热量也会增高,而且时间越长产生的热量也就越大,此时风扇不但不能起到很好的冷却效果,反而会“火上浇油”。      另外,风扇在高速动转过程中,可能会产生很强的噪音,时间长了可能会缩短风扇寿命;还有,较高的运转速度需要较大的功率来提供“动力源”,而高动力源又是从主板和电源中的功率中获得的,一旦超出主板的负荷就会引起系统的不稳定。因此,我们在选择风扇的,同时应该平衡风扇的转速和发热量之间的关系,最好选择转速在3500转至5200转之间的风扇。      3、风扇材质      CPU发出热量首先传导到散热片,再由风扇带来的冷空气吹拂而把散热片的热量带走,而风扇所能传导的热量快慢是由组成风扇的导热片的材质决定的,因此风扇的材料质量对热量的传导性能具有很大的作用,为此我们在选择风扇时一定要注意风扇导热片的热传导性能是否良好。      4、风扇噪声      太大的噪音将会影响我们操作电脑的心情。噪音太小通常与风扇的功率有关,功率越大、转速也就越快,此时一个负影响也就表现出来了,那就是噪声。我们在购买风扇时,一定要先试听一下风扇的噪音,如果太大,那么最好是不要买。如今风扇为了减轻噪声都投入了一些设计,例如改变扇叶的角度,增加扇轴的润滑度和稳定度等。      现在有很多便宜的风扇用的轴承都是油封的,由铜质外套和钢制轴芯组成,长时间工作之后扇轴润滑度不够,风扇噪音增大、转速减低,这很容易导致机器过热而出现死机现象,严重的时候还有可能把机芯烧坏。      现在有许多知名品牌的风扇开始使用滚珠轴承,这种轴承就是利用许多钢珠来作为减少摩擦的介质。这种滚珠风扇的特点就是风力大,寿命长、噪音小,但成本比较高,只有高档风扇才可能使用到它。      5、风扇排风量      风扇排风量可以说是一个比较综合的指标,因此我们可以这么说排风量是衡量一个风扇性能的最直接因素。如果一个风扇可以达到5000转/分,不过如果扇叶是扁平的话,那是不会形成任何气流的,所以关系到散热风扇的排风量的时候,扇叶的角度也是很重要的一个因素。测试一个风扇排风量的方法很容易,只要将手放在散热片附近感受一下吹出的风的强度即可,通常质量好的风扇,即使我们在离它很远的位置,也仍然可以感到风流,这就是散热效果上佳的表现。      6、风扇叶片      同一风扇如果其他部分保持不变,只将叶片由五扇叶改为七扇叶,风量变化可能不会增加多少。但是就风扇的转速而言,七扇叶的转速会低于五扇叶(通风量相同的情况下),相对的如果采用七扇叶风扇,轴承的磨损,漏油情况较少,风扇的寿命较长。如果五扇叶和七扇叶的转速相同,七扇叶的通风量会更大。风扇的转速越高,相应的寿命就越短,噪音也越大。另外,风扇的扇叶越厚,叶片斜角越大,则风压也越大。扇叶的入口角(以45度为最大)也是决定风扇通风量的重要因素之一。      我们知道,服务器AMD CPU的发热量比INTEL大,但是AMD CPU所能承受的最高温度也比INTEL高。正由于AMD CPU发热量大,相对与AMD CPU来说,风扇散热片底部的厚度越厚越好,而INTEL的发热量小,散热片的厚度可以小一些。由于散热片的厚度要求不同,最终对风扇的要求也不同。      对于底部较厚的散热片,它可以很快吸收到CPU的热量,存储的热量也更多。为了不使CPU长期工作在高温环境下。除了要求散热片本身的导热性较好以外,还需要更大的风流来吹散CPU热量。如果要把底部的热量吹走,就需要风扇产生足够的风压,能将风流吹到散热片的底部,对流方式的散热才能从底部开始进行。
来源:OSCHINA
发布时间:2019-11-26 14:23:00
前面文章介绍过 Hadoop分布式的配置 ,但是设计到高可用,这次使用zookeeper配置Hadoop高可用。 1.环境准备 1)修改IP 2)修改主机名及主机名和IP地址的映射 3)关闭防火墙 4)ssh免密登录 5)创建hadoop用户和用户组 6)安装更新安装源、JDK、配置环境变量等 2.服务器规划 Node1 Node2 Node3 NameNode NameNode JournalNode JournalNode JournalNode DataNode DataNode DataNode ZK ZK ZK ResourceManager NodeManager NodeManager ResourceManager NodeManager 3.配置Zookeeper集群 参考我的之前的文章 Zookeeper安装和配置说明 4.安装Hadoop 1)官方下载地址:http://hadoop.apache.org/ 2)解压hadoop2.7.2至/usr/local/hadoop2.7 3)修改hadoop2.7的所属组和所属者为hadoop chown -R hadoop:hadoop /usr/local/hadoop2.7 4)配置HADOOP_HOME vim /etc/profile #HADOOP_HOME export HADOOP_HOME=/usr/local/hadoop2.7 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_HOME}/lib/native export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH 5.配置Hadoop集群高可用 5.1配置HDFS集群 hadoop-env.sh export JAVA_HOME=/usr/local/jdk1.8.0_221 hadoop-site.xml dfs.nameservices hadoopCluster dfs.ha.namenodes.hadoopCluster nn1,nn2 dfs.namenode.rpc-address.hadoopCluster.nn1 node1:9000 dfs.namenode.rpc-address.hadoopCluster.nn2 node2:9000 dfs.namenode.http-address.hadoopCluster.nn1 node1:50070 dfs.namenode.http-address.hadoopCluster.nn2 node2:50070 dfs.namenode.shared.edits.dir qjournal://node1:8485;node2:8485;node3:8485/hadoopCluster dfs.ha.fencing.methods sshfence dfs.ha.fencing.ssh.private-key-files /home/hadoop/.ssh/id_rsa dfs.journalnode.edits.dir /data_disk/hadoop/jn dfs.permissions.enable false dfs.client.failover.proxy.provider.hadoopCluster org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.namenode.name.dir file:///data_disk/hadoop/name 为了保证元数据的安全一般配置多个不同目录 dfs.datanode.data.dir file:///data_disk/hadoop/data datanode 的数据存储目录 dfs.replication 3 HDFS的数据块的副本存储个数,默认是3 core-site.xml fs.defaultFS hdfs://hadoopCluster hadoop.tmp.dir file:///data_disk/hadoop/tmp 启动hadoop集群 (1)在各个JournalNode节点上,输入以下命令启动journalnode服务 sbin/hadoop-daemon.sh start journalnode (2)在[nn1]上,对其进行格式化,并启动 bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode (3)在[nn2]上,同步nn1的元数据信息 bin/hdfs namenode -bootstrapStandby (4)启动[nn2] sbin/hadoop-daemon.sh start namenode (5)在[nn1]上,启动所有datanode sbin/hadoop-daemons.sh start datanode (6)将[nn1]切换为Active bin/hdfs haadmin -transitionToActive nn1 (7)查看是否Active bin/hdfs haadmin -getServiceState nn1 打开浏览器查看namenode的状态 5.2配置HDFS自动故障转移 在hdfs-site.xml中增加 dfs.ha.automatic-failover.enabled true 在core-site.xml文件中增加 ha.zookeeper.quorum node1:2181,node2:2181,node3:2181 5.2.1启动 (1)关闭所有HDFS服务: sbin/stop-dfs.sh (2)启动Zookeeper集群: bin/zkServer.sh start (3)初始化HA在Zookeeper中状态: bin/hdfs zkfc -formatZK (4)启动HDFS服务: sbin/start-dfs.sh (5)在各个NameNode节点上启动DFSZK Failover Controller,先在哪台机器启动,哪个机器的NameNode就是Active NameNode sbin/hadoop-daemon.sh start zkfc 5.2.2验证 (1)将Active NameNode进程kill kill -9 namenode的进程id (2)将Active NameNode机器断开网络 service network stop 如果kill nn1后nn2没有变成active,可能有以下原因 (1)ssh免密登录没配置好 (2)未找到fuster程序,导致无法进行fence,参考 博文 5.3YARN-HA配置 yarn-site.xml yarn.nodemanager.aux-services mapreduce_shuffle yarn.resourcemanager.ha.enabled true yarn.resourcemanager.cluster-id cluster-yarn1 yarn.resourcemanager.ha.rm-ids rm1,rm2 yarn.resourcemanager.hostname.rm1 node1 yarn.resourcemanager.hostname.rm2 node3 yarn.resourcemanager.zk-address node1:2181,node2:2181,node3:2181 yarn.resourcemanager.recovery.enabled true yarn.resourcemanager.store.class org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore 5.3.1启动HDFS (1)在各个JournalNode节点上,输入以下命令启动journalnode服务: sbin/hadoop-daemon.sh start journalnode (2)在[nn1]上,对其进行格式化,并启动: bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode (3)在[nn2]上,同步nn1的元数据信息: bin/hdfs namenode -bootstrapStandby (4)启动[nn2]: sbin/hadoop-daemon.sh start namenode (5)启动所有DataNode sbin/hadoop-daemons.sh start datanode (6)将[nn1]切换为Active bin/hdfs haadmin -transitionToActive nn1 5.3.2启动YARN (1)在node1中执行: sbin/start-yarn.sh (2)在node3中执行: sbin/yarn-daemon.sh start resourcemanager (3)查看服务状态 bin/yarn rmadmin -getServiceState rm1
来源:OSCHINA
发布时间:2019-11-26 10:12:00
漏洞描述 Apache Flink是一个用于分布式流和批处理数据的开放源码平台。Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分发、通信和容错功能。Flink在流引擎之上构建批处理,覆盖本地迭代支持、托管内存和程序优化。近日有安全研究人员发现apache flink允许上传任意的jar包从而导致远程代码执行。 漏洞级别 高危 影响范围 Apache Flink <=1.9.1 漏洞复现 首先下载Apache Flink 1.9.1安装包并进行解压,之后进入bin文件夹内运行./start-cluster.sh启动环境,浏览器访问 http://ip:8081验证是否成功,如下图所示: 接着使用生成jar的木马文件并进行上传,如下图所示: 开启msf进行监听并点击提交,可看到成功返回一个shell。如下图所示: 修复建议 建议用户关注Apache Flink官网,及时获取该漏洞最新补丁。 临时解决建议 设置IP白名单只允许信任的IP访问控制台并添加访问认证。 漏洞检测方法 目前github已有相应公开的检测poc,如下图所示: 链接: https://github.com/LandGrey/flink-unauth-rce ​ 更多Flink相关博文欢迎关注实时流式计算 本文由博客一文多发平台 OpenWrite 发布!
来源:OSCHINA
发布时间:2019-11-26 09:25:00
集群容错中的第二个关键词Router,中文意思就是路由 前端的路由和后端的路由他们是不同的,但是思想是基本一致的. 鉴于很多技术文章都有一个诟病,就是只讲概念,却不讲应用场景,其实Router在应用隔离,读写分离,灰度发布中都有它的影子.因此本篇用灰度发布的例子来做前期的铺垫 灰度发布 百度百科 你发布应用的时候,不停止对外的服务,也就是让用户感觉不到你在发布 那么下面演示一下灰度发布 1.首先在192.168.56.2和192.168.56.3两台机器上启动Provider,然后启动Consumer,如下图 2.假设我们要升级192.168.56.2服务器上的服务,接着我们去dubbo的控制台配置路由,切断192.168.56.2的流量,配置完成并且启动之后,就看到此时只调用192.168.56.3的服务 3.假设此时你在192.168.56.2服务器升级服务,升级完成后再次将启动服务. 4.由于服务已经升级完成,那么我们此时我们要把刚才的禁用路由取消点,于是点了禁用,但是此时dubbo的这个管理平台就出现了bug,如下图所示 惊奇的发现点了禁用,数据就变两条了,继续点禁用,还是两条,而且删除还删除不了,这样就很蛋疼了...但是一直删不了也不是办法,解决办法也是有的,那就是去zookeeper上删除节点 Mac上好像没有特别好用的zookeeper可视化客户端工具,于是我就用了这个idea的zookeeper插件 只要将这个zookeeper节点删除 然后刷新控制台的界面,如下图那么就只剩下一条了 6.那么此时我们再看控制台的输出,已经恢复正常,整个灰度发布流程结束 Router的继承体系图 从图中可以看出,他有四个实现类 MockInvokersSelector在 Dubbo 源码解析(一) - 集群架构的设计 中提到这里 ScriptRouter在dubbo的测试用例中就有用到,这个类的源码不多,也就124行.引用官网的描述 > 脚本路由规则 支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。 当然看到这里可能你可能还是没有感觉出这个类有什么不可替代的作用,你注意一下这个类中有个ScriptEngine的属性 那么我可以举一个应用场景给你 假如有这么个表达式如下: double d = (1+1-(2-4)*2)/24; //没有问题 // 但是假如这个表达式是这样的字符串格式,或者更复杂的运算,那么你就不好处理了 // 然后这个ScriptEngine类的eval方法就能很好处理这类字符串表达式的问题 "(1+1-(2-4)*2)/24" 本篇主要讲讲 ConditionRouter(条件路由) 条件路由主要就是根据dubbo管理控制台配置的路由规则来过滤相关的invoker,当我们对路由规则点击启用的时候,就会触发RegistryDirectory类的notify方法 @Override public synchronized void notify(List urls) { List invokerUrls = new ArrayList(); List routerUrls = new ArrayList(); List configuratorUrls = new ArrayList(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { List routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); } 为什么这个notify方法传入的是List呢? 引用一段官网文档的描述 > 所有配置最终都将转换为 URL 表示,并由服务提供方生成,经注册中心传递给消费方,各属性对应 URL 的参数,参见配置项一览表中的 "对应URL参数" 列 其实对于 Router 来说,我们最关心的就是他是怎么过滤的.所以下面这些流程代码我们先走一遍 /** * 将 invokerURL 列表转换为Invoker Map。 转换规则如下: * 1. 如果已将URL转换为invoker,则不再将重新引用该URL且直接从缓存中获取它,并且请注意,URL中的任何参数更改都将被重新引用。 * 2. 如果传入的invoker列表不为空,则表示它是最新的invoker列表 * 3. 如果传入的invokerUrl列表为空,则表示该规则只是覆盖规则或路由规则,需要重新进行比较以决定是否重新引用。 * * @参数 invokerUrls 此参数不能为空 */ // TODO: 2017/8/31 FIXME 应使用线程池刷新地址,否则可能会积累任务。 private void refreshInvoker(List invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止访问 this.methodInvokerMap = null; // 将方法invoker map设置为null destroyAllInvokers(); //关闭所有invoker } else { this.forbidden = false; // 允许访问 Map> oldUrlInvokerMap = this.urlInvokerMap; // 本地引用 if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet(); this.cachedInvokerUrls.addAll(invokerUrls);// 缓存的invoker网址,便于比较 } if (invokerUrls.isEmpty()) { return; } Map> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map Map>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 更改方法名称以映射Invoker Map // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } /** * 使用方法将invokers列表转换为映射关系 * * @param invokersMap Invoker Map * @return Mapping relation between Invoker and method */ private Map>> toMethodInvokers(Map> invokersMap) { Map>> newMethodInvokerMap = new HashMap<>(); // 根据provider URL声明的方法分类,这些方法与注册表兼容以执行过滤的方法 List> invokersList = new ArrayList>(); if (invokersMap != null && invokersMap.size() > 0) { for (Invoker invoker : invokersMap.values()) { String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY); if (parameter != null && parameter.length() > 0) { String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter); if (methods != null && methods.length > 0) { for (String method : methods) { if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { List> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null) { methodInvokers = new ArrayList>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); } } List> newInvokersList = route(invokersList, null); newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList); if (serviceMethods != null && serviceMethods.length > 0) { for (String method : serviceMethods) { List> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null || methodInvokers.isEmpty()) { methodInvokers = newInvokersList; } newMethodInvokerMap.put(method, route(methodInvokers, method)); } } // 排序且不可修改 for (String method : new HashSet(newMethodInvokerMap.keySet())) { List> methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers)); } return Collections.unmodifiableMap(newMethodInvokerMap); } 这个条件路由有一个特点,就是他的getUrl是有值的 从这里我们看到,此时实现类是ConditionRouter,由于接下来的逻辑如果直接让大家看源码图可能不够清晰,所以我又把这个核心的筛选过程用了一个高清无码图,并且用序号标注 最后的筛选结果如下,因为我们在管理后台配置了禁用192.168.56.2,所以最后添加进invokers的就只有192.168.56.3 参考 dubbo源码解析-router > 本文由博客一文多发平台 OpenWrite 发布!
来源:OSCHINA
发布时间:2019-11-25 23:43:00
1、hdfs是通过分布式集群来存储文件,为客户端提供了一个便捷的访问方式,就是一个虚拟的目录结构 2、文件存储到hdfs集群中去的时候是被切分成block的 3、文件的block存放在若干台datanode节点上 4、hdfs文件系统中的文件与真实的block之间有映射关系,由namenode管理 5、每一个block在集群中会存储多个副本,好处是可以提高数据的可靠性,还可以提高访问的吞吐量
来源:OSCHINA
发布时间:2019-11-25 19:28:00
您是否想加入Apache社区并成为某个项目的Committer或PPMC,拥有一个apache邮箱呢? 你是否知道apache社区的Committer也可以是非代码贡献者? 本联合meetup旨在让对开源有兴趣的伙伴们有机会加入到社区中来,成为一份子,让自己的青春热血留下永久痕迹,让自己的代码(或者文档、或者issue等)才华绽放出璀璨的光芒! 活动介绍 如今,开源在中国遍地开花,开源之势不可挡,Apache社区已经有10多个来自咱们中国本土的开源项目,本次联合两个Apache社区项目的用户以及技术爱好者欢聚一堂,一起分享开源技术,一起为中国本土开源献力! Apache ShardingSphere(Incubator)是一套开源的分布式数据库中间件,旨在充分合理地在分布式的场景下利用关系型数据库的计算和存储能力,提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。 Apache DolphinScheduler(Incubator)是一个分布式去中心化,易扩展的可视化DAG工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,具有高可靠性(HA)、易扩展、支持绝大多数任务场景、简单易用(可视化拖拽)等特性,已在数十家公司使用。 特邀请到两个社区的使用伙伴和Committer等理论+实践进行干货分享以及现场交流,活动的最后,会有如何加入Apache社区并成为Committer或PPMC的精彩讨论,引导大家成为开源社区的贡献者,一起为开源献一份力! 时间地点 沙龙时间:2019-12-08 14:00 沙龙地点: 北京市海淀区海淀大街34号海置创投大厦7层 创业邦 面向人群:对开源技术感兴趣的小伙伴均可参与 日程安排 14:00 - 14:40 The integration of DolphinScheduler and containerization (Xiaochun Liu). 《DolphinScheduler与容器化的融合》 趣加游戏 & Committer 刘小春 14:40 - 15:20 Analyzing of Sharding-Proxy principle (Yonglun Zhang). 《Sharding-Proxy原理解析》 京东数科 & PPMC 张永伦 15:20 - 16:00 Migration and application of DolphinScheduler in Baiwang (Shuang Yang). 《DolphinScheduler在百望云的迁移和应用》 大数据平台部总监 杨爽 16:00-16:10 Break 16:10 - 16:40 The Architecture of ShardingSphere and Roadmap (Juan Pan). 《ShardingSphere的架构及未来规划》 京东数科高级DBA && PPMC 潘娟 16:40 - 17:20 Roundtable Discussion - How to join the Apache community and to be a committer. 《圆桌讨论 - 如何加入Apache社区并且成为Committer》 Free discussion 本次分享的伙伴都是对参与开源社区很有经验的伙伴,圆桌讨论环节更是精心为大家准备,还在犹豫什么呢,赶快扫下图中的二维码来现场交流吧
来源:OSCHINA
发布时间:2019-11-25 18:47:00
BI报表工具很有意思的一点是:不管是老员工还是新入职的,一眼就能发现数据中的问题,进而针对这个问题从不同角度进行深入的分析挖掘,从问题产生的原因(包括时间、地点、经手人员等明细)到问题造成的影响等,都能在几个点击中快速挖掘出来。可以说,用BI报表工具分析数据,就算是新入职的都能将问题的产生、发展过程一一找出来。 觉得不可能?太夸张了?不如看看以下两张报表感受一下BI报表带来的直观形象的数据分析效果。 以上便是采用奥威BI报表工具制作的BI报表效果。图1为医院管理驾驶舱,图二为奥威小镇分析报表,不管是那一个都能让浏览者对数据有较为清晰直观的感受。当然以上为截图,具体的自助式数据分析效果大家感受不到。一般来说,即便是在BI报表中没采用智能钻取、高效联动等可视化分析功能,浏览者也可通过点击左上角的小图标快速调出数据集构建器,自助式分析挖掘数据。如更改字段与维度组合,如修改数据排序等。又或者通过右上角调出筛选按钮,自行筛选数据来实现自助式数据分析挖掘。 SpeedBI数据分析云就是奥威BI报表工具系列中支持用户自由选择云端、私有部署,人人都能轻松上手的智能可视化分析报表。在该BI报表工具中,用户只需上传数据即可在前端快速完成BI报表,复杂多变的数据运算分析也好,细致的UI调整也好,都能通过SpeedBI数据分析云平台快速完成。如向来以核算科目多变、运算组合多变、数量多而阻碍分析的财务运算,也能借助奥威BI报表工具独有行计算模型,在前端轻松实现。 不仅是行计算模型,SpeedBI数据分析云同样提供无缝对接主流ERP的奥威BI标准解决方案、针对不同行业共性而量身定制的奥威BI行业解决方案,支持报表语言多样化,支持AI取数,支持不同终端(大屏、电脑、手机)等,支持用户随时随地打开BI报表,获取关键数据,更及时发现并解决问题。 http://www.powerbi.com.cn/
来源:OSCHINA
发布时间:2019-11-25 11:23:00
分析思维这种事,即便是面对同一份数据、同一份报表,不同的人看到的、接下去想要看到的都很可能完全不一样,但一般分析报表也就只有一个分析视角,不管是你在这张报表中发现什么问题,想继续分析研究那些数据都只能另外再做一张报表,不仅跟不上你的分析思维,还可能打断你的分析。但是,如果是BI报表,它能自动跟上你的分析思维,你想分析研究那些数据,想看哪些方面的数据分析,BI报表下一秒就能呈现出来。 怎么确保BI 报表准确跟上浏览者的分析思维? 构建强大数据中心,确保数据随传随到 BI报表要跟上浏览者的分析思维,前提之一是想要什么数据就能秒速调取什么数据,这就要求BI报表功能拥有功能强大、反应灵敏的数据中心,将多个业务系统的主数据和交易数据全部打通,消除信息孤岛,统一数据分析口径。 简单来说这个数据中心就像一个数据中转站,各种各样的数据汇聚到这里,并进行统一整理清洗,当前端传来数据调取分析指令时就能立即投入数据调取分析中。 高效智能的可视化分析系统,确保数据可视化分析的高效、高质 数据中心将数据统一整理清洗,可视化分析系统则负责智能分析挖掘,并通过直观易懂的方式呈现数据。就如在奥威BI报表工具上,当用户将数据上传后,只需在前台进行简单的操作(通常为点击、拖拉拽),下达数据分析指令后,系统将自主完成数据抽取、分析、挖掘的整个过程,并且仅需一两秒就能以直观易懂的图像化分析图表呈现在电脑屏幕上。 落地多维动态分析功能,确保浏览者随时能自主分析挖掘数据 在BI报表跟上浏览者分析思维,随时呈现浏览者想要的分析角度、分析内容效果上,多维动态分析功能是一个不可忽视的关键功能,正是因为有了多维动态分析功能,浏览者才能随时自定义字段与维度组合,随时以浏览者的身份筛选数据、层层钻取BI报表或数据明细等。 奥威BI报表工具所制作的BI报表,不仅支持大屏、平板、电脑、手机,自动适应不同屏幕大小,以最佳方式展现数据,呈现数据,更重要的是,不管在那一个终端,用户都能实现自助式数据可视化分析。同一张报表,在不同人手上都能按照浏览者的要求快速呈现浏览者所要的数据可视化分析,一张BI报表能呈现多少内容,能从多少个角度对数据进行多方面的分析挖掘都取决于浏览者自身。 如果说以前的分析报表,是人在适应分析报表的话,那么在奥威 BI 报表工具上,就是BI 报表在随时随地适应人,跟随人的分析思维变化而改变。 奥威BI报表工具不仅围绕用户实际分析需求自主开发了多项高效、智能的可视化分析功能,提供丰富直观的可视化分析图表,更可实现 “ BI+ ”模式 ,也就是在奥威BI报表工具的基础上,落地奥威BI独有的BI解决方案。无缝对接主流ERP,预设分析模型,预设前端BI报表样式,仅需做必要个性化设计,仅需针对来源业务系统修改部分ETL脚本(基本基本的SQL能力即可)。甚至对于金蝶、用友标准解决方案来说,1天就能出方案,真正的0开发。 奥威BI报表工具具体有哪些,服务范围有什么区别?哪款更适合我呢?有没有相关的可视化分析功能体验页面?……登录奥威BI官方网站的相关页面,了解一下哪款BI报表工具更适合自己吧! http://www.powerbi.com.cn/
来源:OSCHINA
发布时间:2019-11-29 10:18:00
svg水球图演示效果 如上述的动态水球图效果,替换不同的svg图片即可实现不同的动态水球效果。 svg格式说明(百度百科) SVG是一种图像文件格式,它的英文全称为Scalable Vector Graphics,意思为可缩放的矢量图形。它是基于XML(Extensible Markup Language),由World Wide Web Consortium(W3C)联盟进行开发的。严格来说应该是一种开放标准的矢量图形语言,可让你设计激动人心的、高分辨率的Web图形页面。用户可以直接用代码来描绘图像,可以用任何文字处理工具打开SVG图像,通过改变部分代码来使图像具有交互功能,并可以随时插入到HTML中通过浏览器来观看。 svg图片的制作或获取 1.创建path形状的方法,利用Illustartior定制你自己的个性化图标(需要有一定的设计基础); 2.素材网下载svg格式,如阿里巴巴旗下iconfont;或下载ai格式,而后直接打开到Illustartor编辑; 3.右键查看路径,把所有可释放的复合路径全部释放,只保留要一个路径,也只能用一个路径; 4.全部选中右键菜单里选择→建立复合路径( 复制 path 标签内的 d 属性的值,如果有多个,则拼接到一起,然后粘贴替换 symbols 里面的路径,也就); 5.文件-导出-导出为 -保存类型【选择SVG】-点击按钮导出; 6.SVG文件右键菜单选择记事本打开,效果如下: 7.svg代码将显示出来,找到path标签 d=“复制这里的代码”; svg水球图的Echarts代码 option = { backgroundColor : "#000" , title : { text : 'Mouse Beautiful' , textStyle : { fontWeight : 'normal' , fontSize : 25 , color : '#fff' } } , series : [ { type : 'liquidFill' , data : [ 0.7 ] , radius : '90%' , waveLength : '30%' , waveHeight : '10' , amplitude : 20 , outline : { show : false } , backgroundStyle : { color : '#333' , borderColor : '#000' , borderWidth : 2 , shadowColor : 'rgba(0, 0, 0, 0.4)' , shadowBlur : 20 } , //path代码粘贴到此处,代码的多少取决于图形的复杂度 shape : 'path://M1185 32L1182 34L1181 40L1197 56L1198 73L1196 75L1191 91L1186 100L1185 106L1174 113L1176 119L1174 125L1179 133L1185 134L1185 143L1192 147L1195 154L1194 160L1196 162L1197 175L1202 183L1204 193L1211 198L1211 200L1197 209L1191 210L1189 214L1185 217L1179 228L1185 240L1181 241L1182 248L1180 250L1178 249L1178 246L1175 247L1162 234L1158 234L1157 236L1159 243L1156 247L1165 259L1167 259L1168 262L1174 267L1173 269L1171 268L1168 274L1178 279L1180 286L1182 288L1180 292L1183 294L1183 297L1186 301L1183 303L1181 302L1177 295L1169 299L1166 296L1163 297L1165 299L1165 304L1161 306L1158 305L1152 312L1144 310L1140 307L1136 308L1132 305L1127 307L1129 311L1128 314L1125 310L1124 305L1119 306L1114 312L1111 312L1109 308L1107 308L1103 317L1092 328L1094 333L1090 336L1087 335L1085 337L1085 343L1087 346L1091 349L1092 347L1094 348L1090 353L1086 355L1083 354L1082 351L1082 354L1080 356L1073 356L1063 363L1045 370L1042 370L1035 363L1024 365L1021 367L1012 367L1012 383L1016 392L1019 394L1024 390L1027 390L1034 396L1031 413L1024 423L1024 425L1029 422L1031 422L1031 424L1029 429L1019 430L1015 434L1003 470L999 472L992 472L995 463L999 460L998 455L990 456L984 468L981 471L977 471L965 467L959 470L959 468L956 467L957 460L955 454L958 452L958 449L956 447L953 446L947 449L939 457L928 453L927 446L923 444L920 439L921 428L925 418L924 413L922 413L922 411L912 411L909 397L905 395L892 399L886 398L896 393L898 390L896 385L890 385L885 387L880 382L865 382L861 379L856 380L845 388L842 382L835 377L830 369L826 366L823 355L818 350L815 350L807 354L786 369L780 368L772 357L775 354L775 350L779 340L777 336L772 334L771 331L775 324L771 322L767 317L768 311L764 304L766 299L755 298L754 295L757 294L757 292L752 288L741 283L732 281L729 278L720 278L719 273L708 260L702 256L701 253L703 238L699 230L698 222L705 205L702 199L695 193L693 178L706 168L713 155L721 155L726 152L727 145L729 143L733 141L744 142L746 138L755 133L753 131L753 121L756 111L757 99L753 92L743 88L740 88L731 93L724 92L730 77L735 72L739 71L744 67L749 60L756 59L762 44L754 32L750 30L739 30L736 27L743 14L747 11L760 9L762 7L767 -8L775 -14L786 -19L800 -33L804 -31L806 -16L816 -5L818 14L822 16L835 12L848 17L853 24L861 44L866 50L877 56L886 56L891 54L899 47L899 39L890 23L891 16L903 2L926 -11L936 -10L939 -6L944 10L947 11L949 7L965 -2L972 -12L980 -37L987 -43L992 -44L995 -43L1000 -36L1000 -25L1002 -19L1010 -17L1024 -20L1029 -11L1030 -5L1034 1L1040 9L1048 15L1056 15L1057 9L1051 -6L1052 -9L1059 -12L1071 -10L1095 -1L1100 5L1102 17L1110 29L1139 23L1148 30L1156 32L1167 29L1169 23L1175 17L1177 17L1186 23L1183 30L1184 31z' , color : [ 'rgba(255,255,0,0.3)' ] , //水波的颜色 对应的是data里面值 label : { normal : { formatter : '70%' , } } } ] } ; Done!
来源:OSCHINA
发布时间:2020-07-18 09:29:00
一、创建数组: var dataName = [ "A" , "B" , "C" , "D" , "E" ] ; var datalabel = [ 100 , 2 , 3 , 12 , 13 ] ; var data = [ 18203 , 23489 , 29034 , 104970 , 131744 ] ; 二、设置option var option = { tooltip : { trigger : 'axis' , axisPointer : { type : 'shadow' } } , grid : { left : '3%' , right : '4%' , bottom : '3%' , containLabel : true } , xAxis : { type : 'value' , boundaryGap : [ 0 , 0.01 ] } , yAxis : { type : 'category' , data : dataName , axisLabel : { interval : 0 , color : '#666' , align : 'right' , fontSize : 13 , } } , series : [ { name : '漏刻有时' , type : 'bar' , itemStyle : { normal : { barBorderRadius : 5 , } , } , label : { show : true , position : "right" , formatter : function ( params ) { console . log ( params . dataIndex ) ; return '总金额:' + data [ params . dataIndex ] + '元\n\n总数量:' + datalabel [ params . dataIndex ] + '个' } } , data : data } , ] } ; 三、重点解读: label : { show : true , position : "right" , formatter : function ( params ) { //console.log(params.dataIndex); return '总金额:' + data [ params . dataIndex ] + '元\n\n总数量:' + datalabel [ params . dataIndex ] + '个' } } Done!
来源:OSCHINA
发布时间:2020-07-18 09:29:00
python脚本方式执行spark程序,好处是不用编译,写完就走! 示例脚本如下: from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("myTest").setMaster("local") sc = SparkContext(conf=conf) x = [1,2,3] rdd = sc.parallelize(x) count=rdd.count() print("len=",count) # read textfile rdd2=sc.textFile("c:\\spark\\doc\\word.txt") def f(x):return print(x) rdd2.foreach(f) print("rdd2:",rdd2.count()) 保存为"test1.py"文件。然后执行spark-submit test1.py提交执行即可。 pyspark比scala方式要方便多了。 word.txt内容: hello world 1 执行结果: len= 3 hello workd 1 rdd2: 3
来源:OSCHINA
发布时间:2020-07-17 17:39:00
随着科技的发展,大数据已变成信息通信业的主要资源及主要运用。物联网技术、云计算技术、移动互联、车联网平台、手机、平板、PC及遍及地球每个角落里的传感器,无一不是数据来源或者承载方,“大数据”被视作云计算技术之后的又一高新科技网络热点,大伙儿都在讲:智慧旅游的发展趋势离不了大数据,借助大数据提供充足有益的资源,智慧旅游才可以得以“智慧”发展趋势,那么大数据如何助推智慧旅游呢?大数据打造智慧旅游有什么益处? 大数据打造智慧旅游的4点益处 1、大数据打造智慧旅游,工作人员应用管理系统大数据集成化技术,剖析旅客总流量、商家经营、公共文化服务、咨询举报等旅游综合信息,对景区展开实时监测,对出现异常指标值展开预警信息,尽快为旅客、商家等出示服务项目。持续加速智慧旅游大城市建设脚步,以智慧旅游建设和大数据运用为主线,积极主动推动信息科技和旅游产业融合发展趋势,积极开展智慧旅游尤其是景区智慧化提高工作能力。 旅游局(旅游管委会)或景区创建旅游网络舆情监测系统软件,对主要新闻媒体、社区论坛、博客、新浪微博等方式舆情信息展开动态性监管,将海量数据依照信息的正负面信息、知名度、信息内容特性及时间等展开归类,获取基本信息,按时自动生成相对汇报,依照预订对策对潜在性的危机事件及时预警和处理。 数据信息综合分析展现服务平台根据运用旅游局、景区等多年经营及从第三方选购的大数据基础上,打造实时数据统计分析的信息化管理展现服务平台,根据对领域、旅客等信息内容展开多层次的精确剖析和合理预测分析,能够 为客户出示经营管理决策、舆情分析、恶性事件预警信息,另外能够 根据合理融合旅游管控数据信息、旅游行业大数据,为政府部门,旅游公司制订宣传策划营销战略出示合理的数据信息支撑点,真实完成“智慧旅游”。 2、大数据打造智慧旅游,在我国公布的《“十三五”全国旅游信息化规划》明确提出,要“推动旅游大数据应用,推动新驱动”,要“用大数据对旅客信息内容展开相关性分析,提升旅游公共文化服务资源配备”,要“数据共享,注重产业生态圈旅游的共享发展”,这说明旅游大数据的时期已到来。在大众旅游时期,产业生态圈旅游发展趋势不能再借助理性经验,而必须借助大数据助推管理决策。 以便提高景区的管理力度和现代化管理能力,并打造智慧旅游景区的总体目标,提出了景区员工管理与客流分析系统软件,选用视频数据分析系统的方法完成人流量的数据分析。另外选用面部识别技术和智能视频无损检测技术完成景区进出口人脸识别入园和景区风险地区警报提示等。 3、大数据打造智慧旅游,将为智慧旅游服务保障,为智慧旅游发展趋势引入新的魅力驱动力。借助大数据出示的有益资源,推动智慧旅游完成稳步发展。大数据将以更科学、更简易、更智慧的方法促进政府部门监管、企业经营和旅客消费管理决策。中国移动大数据将助推智慧旅游发展趋势完成质的提升,助推智慧旅游腾飞。 4、大数据打造智慧旅游,微信客户端顾客有其庞大的顾客规模和网络通信的实用性,不但确保了数据信息出示的实用性和可持续性,还铸就了大量和多样性的数据信息。这种数据信息和景区视频监控系统数据信息、金融大数据、实时路况数据信息、景区运作数据信息等旅游数据信息紧密结合,可精确清楚地剖析出人流量来源、旅游运动轨迹、热力地图、旅游喜好等内容。根据数据整理解决和深层发掘技术,就能掌握旅游领域的行业动态、旅客消费行为、旅游公司运行情况,进而正确引导旅游市场健康有序发展。 根据旅游基础数据库查询的建设,考虑旅游大数据解决的运用要求,对景区各软件系统的数据信息展开统一监管,提升信息内容发掘与运用,并产生有关的信息化管理建设的规范与标准。以互联网技术为基础,融合多样信息科技方式,创建旅游基础信息共享及互换平台,产生统一的旅游基础数据库查询及旅游资源共享互换管理中心,支撑主题风格数据库查询运用和部门协作旅游信息内容资源聚集、互换和共享。 现如今,智慧旅游大数据让传统式旅游更为聪慧,将景区通过剖析大数据,发觉每一个时间点的主要旅客人群,根据合理的方法推送给潜在旅客,吸引他们的前来。这不仅精准营销,减少无用的营销推广成本费,让景区的运营管理更为简单轻松。
来源:OSCHINA
发布时间:2020-07-17 14:04:00
https://docs.cloudera.com/documentation/enterprise/5-8-x/topics/impala_resource_management.html https://blog.csdn.net/silentwolfyh/article/details/83549202 0440-如何启用Impala的动态资源池:https://blog.csdn.net/Hadoop_SC/article/details/104350431 0441-Impala动态资源池及放置规则使用: https://blog.csdn.net/Hadoop_SC/article/details/104350416?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1 Cloudera Management 的Impala Admission Control impala开启资源管理,这里不依赖YARN的资源管理。 需要开启Impala的Admisson Control 保存配置后重启Impala服务,以上就完成了Impala动态资源池的启用。 3.进入Impala动态资源池管理界面 4.点击”Impala Admission Control”,进入资源池配置界面 1.Impala的Admission Control功能主要是为了限制用户提交SQL的并发数,以避免集群繁忙内存不足的情况。当集群的查询太多或查询需要的总内存太多,达到一个阈值时,提交的SQL将进入等待状态,当集群资源可用时才会开始查询。 2.Impala的动态资源池与Yarn动态资源池一致,可用创建多个不同的资源池、创建不同的执行计划以及设置放置规则。 3.Impala中的资源池的层级只支持两级,父级资源池均为root https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/impala_howto_rm.html#enable_admission_control 此功能仅在“启用 ResourceManager ACL” 设置为 true 且“管理 ACL” 未设置为 * 时相关。(请参见顶级页面中的“访问控制设置”。) 关于Impala 动态资源池 放置规则类型的解释说明: root.[pool name]:该规则始终满足,在其它规则不匹配的情况下使用,因此该规则默认要放置在所有匹配规则之后。 root.[primary group]:该放规则使用与该用户主要组匹配的资源池。Linux中用户默认的主要组与用户名一致,匹配时会通过用户的主要组与资源池名称比对。 root.[secondarygroup]:该放置规则用于匹配用户的次要组,使用与次要组之一匹配的资源池。 root.[username]:该放置规则用于匹配与用户名一致的资源池。(不推荐使用) 已在运行时指定:该放置规则主要使用在运行时指定的资源池。 放置规则的判断方式,根据放置规则的顺序1、2、3…进行判断,判断到满足条件的放置规则后,后续的规则不再进行匹配。
来源:OSCHINA
发布时间:2020-07-16 16:04:00
网罗数据集,不定期更新! 数据集链接:https://pan.baidu.com/s/1RgmRv80zQB71HSze8bQvwA 提取码:ih2c 酒品数据集( wine.csv) 数据格式:wine.csv 标签:有 语言: 英文 Wine Alcohol Malic.acid Ash Acl Mg Phenols Flavanoids Nonflavanoid.phenols Proanth Color.int Hue OD Proline X: [1:] 13个feature y: [0] 3分类(1 2 3) 数据大小:10.8k,178条数据 数据用途:多分类任务 预处理代码: 构造Dataset & Dataloader : https://my.oschina.net/u/4228078/blog/4320363 纯英文预料数据集(text8.zip) 数据格式:text8.train.txt text8.dev.txt text8.test.txt 英文数据集,无标点无换行 标签:无 语言: 英文 数据大小:95M 数据用途:文本分析 预处理代码: https://my.oschina.net/u/4228078/blog/4405730 项目:语言模型实现: https://my.oschina.net/u/4228078/blog/4462382 名字-国家数据集(names.csv.gz.zip) 数据格式:names_train.csv.gz names_test.csv.gz csv的压缩gz格式文件 第一列[0]:人名 第二列[1]:人名对应的国家 标签:有 语言: 英文 数据大小:train:13374条数据 test:6700条数据 数据用途:根据人名预测国籍 预处理代码: https://my.oschina.net/u/4228078/blog/4415324 青云数据集(qingyun.tsv) 数据格式:qingyun.tsv 第一列[0]:问题 第二列[1]:回答 标签:有 语言: 中文 数据大小:105914条对话 数据用途:开放式聊天机器人 预处理代码: 英文-中文翻译数据集(translate_en2cn) 数据格式:英文+'\t' + 中文 标签:有 语言: 中英文 数据大小:1.1M 数据用途:机器翻译 预处理代码: https://my.oschina.net/u/4228078/blog/4471073 图片数据集(ants, bees) 数据格式:图片 标签:有 语言: 数据大小:400张图片 数据用途:图片分类 预处理代码:
来源:OSCHINA
发布时间:2020-07-16 15:22:00
本文首发于 vivo互联网技术 微信公众号 链接: https://mp.weixin.qq.com/s/qayKiwk5QAIWI7-nyD3FVA 作者:DuZhimin 随着互联网、尤其是物联网的发展,我们需要把各种类型的终端实时监测、检查与分析设备所采集、产生的数据记录下来,在有时间的坐标中将这些数据连点成线,往过去看可以做成多纬度报表,揭示其趋势性、规律性、异常性;往未来看可以做大数据分析,机器学习,实现预测和预警。 这些数据的典型特点是: 产生频率快 (每一个监测点一秒钟内可产生多条数据)、 严重依赖于采集时间 (每一条数据均要求对应唯一的时间)、 测点多信息量大 (实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB的数据量)。 基于时间序列数据的特点,关系型数据库无法满足对时间序列数据的有效存储与处理,因此迫切需要一种专门针对时间序列数据来做优化处理的数据库系统。 一、简介 1、时序数据 时序数据是基于时间的一系列的数据。 2、时序数据库 时序数据库就是存放时序数据的数据库,并且需要支持时序数据的快速写入、持久化、多纬度的聚合查询等基本功能。 对比传统数据库仅仅记录了数据的当前值,时序数据库则记录了所有的历史数据。同时时序数据的查询也总是会带上时间作为过滤条件。 3、OpenTSDB 毫无遗漏的接收并存储大量的时间序列数据。 3.1、存储 无需转换,写的是什么数据存的就是什么数据 时序数据以毫秒的精度保存 永久保留原始数据 3.2、扩展性 运行在Hadoop 和 HBase之上 可扩展到每秒数百万次写入 可以通过添加节点扩容 3.3、读能力 直接通过内置的GUI来生成图表 还可以通过HTTP API查询数据 另外还可以使用开源的前端与其交互 4、OpenTSDB核心概念 我们来看一下这样一段信息:2019-12-5 22:31:21版本号为‘3.2.1’的某产品客户端的首页PV是1000W Metric: 指标,即平时我们所说的监控项。譬如上面的PV Tags: 维度,也即标签,在OpenTSDB里面,Tags由tagk和tagv组成的键值对,即tagk=takv。标签是用来描述Metric的,比如上面的 某产品 客户端的版本号 version=‘3.2.1’ Value :一个Value表示一个metric的实际数值,比如:1000W Timestamp: 即时间戳,用来描述Value是什么时候发生的:比如:2019-12-5 22:31:21 Data Point: 即某个Metric在某个时间点的数值,Data Point包括以下部分:Metric、Tags、Value、Timestamp 保存到OpenTSDB的数据就是无数个DataPoint 上面描述2019-12-5 22:31:21版本号为‘3.2.1’的 某产品 客户端的首页PV是1000W,就是1个DataPoint。 二、OpenTSDB的部署架构 1、架构图 2、说明OpenTSDB底层是使用HBase来存储数据的,也就是说搭建OpenTSDB之前,必须先搭建好HBase环境。 OpenTSDB是由一系列的TSD和实用的命令行工具组成。 应用通过运行一个或多个tsd(Time Series Daemon, OpenTSDB的节点)来与OpenTSDB的交互。 每个TSD是独立的,没有master,没有共享状态,所以你可以运行尽可能多的 TSD 来处理工作负载。 三、HBase简介 从OpenTSDB的部署架构中我们看到OpenTSDB是建立在HBase之上的,那么HBase又是啥呢?为了更好的剖析OpenTSDB,这里我们简要介绍一下HBase。 1、HBase是一个高可靠性、强一致性、高性能、面向列、可伸缩、实时读写的分布式开源NoSQL数据库。 2、HBase是无模式数据库,只需要提前定义列簇,并不需要指定列限定符。同时它也是无类型数据库,所有数据都是按二进制字节方式存储的。 3、它把数据存储在表中,表按“行键,列簇,列限定符和时间版本”的四维坐标系来组织,也就是说如果要唯一定位一个值,需要四个都唯一才行。下面参考Excel来说明一下: 4、对 HBase 的操作和访问有 5 个基本方式,即 Get、Put、Delete 和 Scan 以及 Increment,HBase 基于非行键值查询的唯一途径是通过带过滤器的扫描。 5、数据在HBase中的存储(物理上): 6、数据在HBase中的存储(逻辑上): 四、 支撑OpenTSDB运行的HBase表 如果你第一次用你的HBase实例运行OpenTSDB,需要创建必要的HBase表,OpenTSDB 运行仅仅需要四张表:tsdb, tsdb-uid, tsdb-tree 和 tsdb-meta,所有的DataPoint 数据都保存在这四张表中,建表语句如下: 1、tsdb-uidcreate 'tsdb-uid', {NAME => 'id', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'}, {NAME => 'name', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 2、tsdbcreate 'tsdb', {NAME => 't', VERSIONS => 1, COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 3、tsdb-treecreate 'tsdb-tree', {NAME => 't', VERSIONS => 1, COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 4、tsdb-metacreate 'tsdb-meta', {NAME => 'name', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 后面将对照实际数据来专门讲解这四张表分别存储的内容。 五、 OpenTSDB是如何把一个数据点保存到HBase中的呢? 1、首先检查一下四个表里面的数据 从上面看,四个表里面的数据都是空的 2、然后我们往OpenTSDB写一个数据点@Test public void addData() { String metricName = "metric"; long value = 1; Map tags = new HashMap(); tags.put("tagk", "tagv"); long timestamp = System.currentTimeMillis(); tsdb.addPoint(metricName, timestamp, value, tags); System.out.println("------------"); } 3、插入数据之后我们再来查看一下四个表数据 发现HBase里面有数据,在tsdb-uid、tsdb、和 tsdb-meta 表里面有数据,而tsdb-tree 表里面没任何数据,下面我们针对这些数据做一下具体分析。 4、tsdb-tree表 它是一张索引表,用于展示树状结构的,类似于文件系统,以方便其他系统使用,这里我们不做深入的分析。 通过配置项tsd.core.tree.enable_processing来打开是否需要往此表里面写入数据。 5、tsdb-meta表 这个表是OpenTSDB中不同时间序列的一个索引,可以用来存储一些额外的信息,该表只有一个列族name,两个列,分别为ts_meta、ts_ctr。这个表里面的数据是可以根据配置项配置来控制是否生成与否,生成几个列,具体的配置项有: tsd.core.meta.enable_realtime_ts tsd.core.meta.enable_tsuid_incrementing tsd.core.meta.enable_tsuid_tracking Row Key 和tsdb表一样,其中不包含时间戳,[...] ts_meta Column 和UIDMeta相似,其为UTF-8编码的JSON格式字符串 ts_ctr Column 计数器,用来记录一个时间序列中存储的数据个数,其列名为ts_ctr,为8位有符号的整数。 6、tsdb-uid表数据分析 tsdb-uid用来存储UID映射,包括正向的和反向的。存在两列族,一列族叫做name用来将一个UID映射到一个字符串,另一个列族叫做id,用来将字符串映射到UID。列族的每一行都至少有以下三列中的一个: metrics 将metric的名称映射到UID tagk 将tag名称映射到UID tagv 将tag的值映射到UID 如果配置了metadata,则name列族还可以包括额外的metatata列。 6.1、id 列族 Row Key: 实际的指标名称或者tagK或者tagV Column Qualifiers: metrics、tagk、tagv三种列类型中一种 Column Value : 一个无符号的整数,默认是被编码为3个byte,自增的数字,其值为UID 6.2、name 列族 Row Key : UID,就是ID列簇的值 Column Qualifiers: metrics、tagk、tagv、metrics_meta、tagk_meta、tagv_meta六种列类型中一种,*_meta是需要开启tsd.core.meta.enable_realtime_uid才会生成 Column Value: 与UID对应的字符串,对于一个*_meta列,其值将会是一个UTF-8编码的JSON格式字符串。不要在OpenTSDB外部去修改该值,其中的字段顺序会影响CAS调用。 7、tsdb表: 时间点数据就保存在此表中,只有一个列簇t: 7.1、RowKey格式 UID: 默认编码为3 Bytes,而时间戳会编码为4 Bytes salt: 打散同一metric不同时间线的热点 metric, tagK, tagV: 实际存储的是字符串对应的UID(在tsdb-uid表中) timestamp: 每小时数据存在一行,记录的是每小时整点秒级时间戳 7.2、Column格式 column qualifier 占用2 Bytes或者4 Bytes, 占用2 Bytes时表示以秒为单位的偏移,格式为: 12 bits:相对row表示的小时的delta, 最多2^ 12 = 4096 > 3600因此没有问题 1 bit: an integer or floating point 3 bits: 标明数据的长度,其长度必须是1、2、4、8。000表示1个byte,010表示2byte,011表示4byte,100表示8byte 占用4 Bytes时表示以毫秒为单位的偏移,格式为: 4 bits:十六进制的1或者F 22 bits:毫秒偏移 2 bit:保留 1 bit: an integer or floating point,0表示整数,1表示浮点数 3 bits: 标明数据的长度,其长度必须是1、2、4、8。000表示1个byte,010表示2byte,011表示4byte,100表示8byte 7.3、value value 使用8 Bytes存储,既可以存储long,也可以存储double。 7.4、tsdb表设计的特点: metric和tag映射成UID,不存储实际字符串,以节约空间。 每条时间线每小时的数据点归在一行,每列是一个数据点,这样每列只需要记录与这行起始时间偏移,以节省空间。 每列就是一个KeyValue。 六、 写在最后 1、应用场景作为时序数据库,OpenTSDB 不仅仅可以提供原始数据的查询,并且还支持对原始数据的聚合能力,支持过滤、过滤之后的聚合计算 。 支持降采样查询,比如原始数据是1分钟一个数据点,如果我想1个小时一个数据点进行展示,也能支持。 支持根据维度分组查询,比如我有一个中国地市的数据,现在我想根据省份进行分组之后查询,也能支持。 2、使用注意事项OpenTSDB 默认情况下的字符集是ISO-8859-1,为什么会使用这个字符集呢,是因为它的编码是单字节编码,编码后的长度是固定的,如果要支持中文,需要对源码进行编译,修改为UTF-8即可。 默认提供的HBase建表语句是没有预分区的,这样会导致大批量数据写入的时候有热点问题,建议进行预分区。 OpenTSDB不适合超大数据量,在千万级、亿级中提取几万条数据,比如某个指标半年内的5分钟级别的数据,还是很快响应的。但如果再提取多点数据,几十万,百万这样的量级,又或者提取后再做个聚合运算,OpenTSDB 就勉为其难,实际使用的时候用作服务端机器的监控无任何问题,如果作为客户端APP监控,响应就比较迟缓。 OpenTSDB 只有4 张HBase 表,所有的数据都存放在一张表,这就意味在OpenTSDB 这个层级上是无法更小的粒度来区别对待不同业务,比如不同的业务建不同的表存储数据。 OpenTSDB 支持实时聚合计算功能,但是基于单点,所以运算能力有限。 3、展望 如果需要支持特大批量时序数据,建议使用Druid或InfluxDB,其中InfluxDB是最易用的时序数据库。 更多内容敬请关注 vivo 互联网技术 微信公众号 注:转载文章请先与微信号: Labs2020 联系。
来源:OSCHINA
发布时间:2020-07-16 10:12:00
1. 业务需求 接收实时数据流数据,实时更新状态,并且每隔一定的时间,将所有状态数据输出。 实时数据类型:("张", 1) 状态更新:第一个元素为key,将第二个元素全部缓存起来,放到list中,最后将key和其对应的list全部输出。 2. 实现方案 使用processFunction算子,在processElement函数中仅注册一次定时器,然后在onTimer函数中处理定时器任务,并且重新注册定时器。 3. 实现代码 3.1 source /** * 每隔1秒发送一个tuple2类型的数据,第一个字段值为随机的一个姓氏,第二个字段为自增的数字 **/ class MySourceTuple2 extends SourceFunction[(String, Long)] { var isRunning: Boolean = true val names: List[String] = List("张", "王", "李", "赵") private val random = new Random() var number: Long = 1 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = { while (true) { val index: Int = random.nextInt(4) ctx.collect((names(index), number)) number += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } 3.2 流处理 object TimerMain2 { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env .addSource(new MySourceTuple2) .keyBy(_._1) .process(new KeyedProcessFunction[String, (String, Long), String] { //缓存流数据 private val cache: mutable.Map[String, ListBuffer[Long]] = mutable.Map[String, ListBuffer[Long]]() private var first: Boolean = true /** * 定时器触发时回调该函数 * * @param timestamp 定时器触发时间 */ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Long), String]#OnTimerContext, out: Collector[String]): Unit = { println("定时器触发:" + timestamp) //将缓存中的数据组织成需要的格式 val builder = new StringBuilder() for (entry: (String, ListBuffer[Long]) <- cache) { builder.append(entry._1).append(":") for (ele <- entry._2) { builder.append(ele).append(",") } builder.delete(builder.size - 1, builder.size).append(";") cache(entry._1).clear() } println("定时器注册:" + timestamp) //该定时器执行完任务之后,重新注册一个定时器 ctx.timerService().registerProcessingTimeTimer(timestamp + 5000) out.collect(builder.toString()) } /** * 处理每一个流数据 */ override def processElement(value: (String, Long), ctx: KeyedProcessFunction[String, (String, Long), String]#Context, out: Collector[String]): Unit = { //仅在该算子接收到第一个数据时,注册一个定时器 if (first) { first = false val time: Long = System.currentTimeMillis() println("定时器第一次注册:" + time) ctx.timerService().registerProcessingTimeTimer(time + 5000) } //将流数据更新到缓存中 if (cache.contains(value._1)) { cache(value._1).append(value._2) } else { cache.put(value._1, ListBuffer[Long](value._2)) } } } ) .print("处理结果:") env.execute() } } 所有代码解释均在注释中。 4. 运行结果 可以看到,定时器注册之后,过5秒就会被触发,同时注册下个5秒的注册器,然后将数据发送到下个算子打印出来。 注意:该实例中算子并行度为1,所以“定时器第一次注册”只会触发一次,如果是多个并行度的话,则会在每个并行度里面进行“定时器第一次注册”,并且每个算子维护自己的定时器,算子之间互不影响。
来源:OSCHINA
发布时间:2020-07-15 17:55:00