【转】ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

 
转自:http://dongxuan.iteye.com/blog/901689

工作中多处接触到了ThreadPoolExecutor。趁着现在还算空,学习总结一下。

前记:

  1. jdk官方文档(javadoc)是学习的最好,最权威的参考。
  2. 文章分上中下。上篇中主要介绍ThreadPoolExecutor接受任务相关的两方面入参的意义和区别,池大小参数corePoolSize和maximumPoolSize,BlockingQueue选型(SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue);中篇中主要聊聊与keepAliveTime这个参数相关的话题;下片中介绍一下一些比较少用的该类的API,及他的近亲:ScheduledThreadPoolExecutor
  3. 如果理解错误,请直接指出。

查看JDK帮助文档,可以发现该类比较简单,继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

ThreadPoolExecutor的完整构造方法的签名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

先记着,后面慢慢解释。

===============================神奇分割线==================================

其实对于ThreadPoolExecutor的构造函数网上有N多的解释的,大多讲得都很好,不过我想先换个方式,从Executors这个类入手。因为他的几个构造工厂构造方法名字取得令人很容易了解有什么特点。但是其实Executors类的底层实现便是ThreadPoolExecutor!

ThreadPoolExecutor是Executors类的底层实现。

在JDK帮助文档中,有如此一段话:

“强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。”

可以推断出ThreadPoolExecutor与Executors类必然关系密切。

===============================神奇分割线==================================

OK,那就来看看源码吧,从newFixedThreadPool开始。

ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。

可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

Java代码 收藏代码

  1. public static ExecutorService newFixedThreadPool(int nThreads) { 
  2. return new ThreadPoolExecutor(nThreads, nThreads, 
  3.                                       0L, TimeUnit.MILLISECONDS, 
  4. new LinkedBlockingQueue<Runnable>()); 
  5.     } 

ExecutorService newSingleThreadExecutor():单线程。

可以看到,与fixedThreadPool很像,只不过fixedThreadPool中的入参直接退化为1

Java代码 收藏代码

  1. public static ExecutorService newSingleThreadExecutor() { 
  2. return new FinalizableDelegatedExecutorService 
  3.             (new ThreadPoolExecutor(1, 1, 
  4.                                     0L, TimeUnit.MILLISECONDS, 
  5. new LinkedBlockingQueue<Runnable>())); 
  6.     } 

ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收。

这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个

线程的对应移除操作。比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)

注意到介绍中的自动回收线程的特性吗,为什么呢?先不说,但注意到该实现中corePoolSize和maximumPoolSize的大小不同。

Java代码 收藏代码

  1. public static ExecutorService newCachedThreadPool() { 
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
  3.                                       60L, TimeUnit.SECONDS, 
  4. new SynchronousQueue<Runnable>()); 
  5.     } 

===============================神奇分割线==================================

到此如果有很多疑问,那是必然了(除非你也很了解了)

先从BlockingQueue<Runnable> workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)

所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
  • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)
  • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

先不着急举例子,因为首先需要知道queue上的三种类型。

排队有三种通用策略:

  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

===============================神奇分割线==================================

到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。

我们使用一下参数构造ThreadPoolExecutor:

Java代码 收藏代码

  1. new ThreadPoolExecutor( 
  2. 2, 3, 30, TimeUnit.SECONDS,  
  3. new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),  
  4. new RecorderThreadFactory("CookieRecorderPool"),  
  5. new ThreadPoolExecutor.CallerRunsPolicy()); 

当核心线程已经有2个正在运行.

  1. 此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
  2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
  3. 此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
  4. 暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。

所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。

什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中

例子二:使用无界队列策略,即LinkedBlockingQueue

这个就拿newFixedThreadPool来说,根据前文提到的规则:

写道

如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。

那么当任务继续增加,会发生什么呢?

写道

如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?

写道

如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。

可以仔细想想哈。

例子三:有界队列,使用ArrayBlockingQueue。

这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

举例来说,请看如下构造方法:

Java代码 收藏代码

  1. new ThreadPoolExecutor( 
  2. 2, 4, 30, TimeUnit.SECONDS,  
  3. new ArrayBlockingQueue<Runnable>(2),  
  4. new RecorderThreadFactory("CookieRecorderPool"),  
  5. new ThreadPoolExecutor.CallerRunsPolicy()); 

假设,所有的任务都永远无法执行完。

对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

总结:

  1. ThreadPoolExecutor的使用还是很有技巧的。
  2. 使用无界queue可能会耗尽系统资源。
  3. 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
  4. 线程数自然也有开销,所以需要根据不同应用进行调节。

通常来说对于静态任务可以归为:

  1. 数量大,但是执行时间很短
  2. 数量小,但是执行时间较长
  3. 数量又大执行时间又长
  4. 除了以上特点外,任务间还有些内在关系

看完这篇问文章后,希望能够可以选择合适的类型了

Continue reading 【转】ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

DSSA(Domain Specific Software Architecture) 特定领域软件架构

与软件体系结构风格的区别

DSSA与软件体系结构风格是从不同角度出发研究问题的两种结果,前者从问题域出发,后者从解决域出发。

DSSA只对某个领域进行专家知识的提取、存储、组织,但可以同时使用多种软件体系结构风格;而在某个软件体系结构风格中进行专家知识组织时,可以将提取的公共结构和设计方法扩展到多个应用领域。

DSSA通常选用一个或多个适合所研究的领域的体系结构风格,并设计一个该领域专用的体系结构分析设计工具。但该方法提取的专家知识只能应用于一个较小的范围--所在领域。一个领域的DSSA及其工具在另一个领域中是不适应的,或不可以重用的。

体系结构风格避免设计到特定的应用领域或背景,所提取的知识比DSSA提取的知识应用范围要广。但在一个特定领域中,正是由于体系结构风格对领域知识、领域背景的忽略,使其在一个具体领域的开发中的作用并不比DSSA要大。

DSSA与体系结构风格是互为补充的两种技术。

http://iknow.seforge.org/sewiki/DSSA_e6_96_b9_e6_b3_95

作为DARPA的DSSA计划的一部分,Will Tracz在DSSA-ADAGE项目中提出了DSSA领域工程方法,与基于构架的系统开发过程相配合,应用于航空电子设备自动导航领域。

在DSSA方法中,进行领域工程的主要方式是领域工程师与领域专家的会谈,其中领域专家要就领域工程师提出的一系列问题进行报告,领域工程师对这些报告进行综合和整理,然后与领域专家一起对结果进行复审。

DSSA的领域工程过程是并发的 (concurrent)、递归的(recursive)和迭代的(iterative)。或者可以说,它是螺旋型的(spiral)。完成这个过程可能需要对每个阶段都经历几遍,每次增加更多的细节。对每个阶段的描述中包括一组需要回答的问题,一组需要的输入,一组将产生的输出和验证准则。

该方法分为五个主要阶段,在此之前还需要进行一些准备工作。其中前三个阶段集中于领域分析,即显式地把握领域特定的知识,这些知识时常被领域专家看作普通的知识。后两个阶段处理DSSA的设计、分析以及实现。

准备工作

在与领域专家会谈之前,领域工程师应尽可能熟悉该领域。理想情况下,领域工程师应对该领域具有一定经验。领域工程师应对DSSA方法指南中的问题回答有一定的想法,以便在会谈过程中诱导领域专家的答案,或者由领域专家对这些想法进行确认。领域工程师要确定DSSA方法的指南文档中哪些问题是与本次领域工程相关的,并对指南中的领域工程过程进行相应的裁剪。领域工程师还要理解本次领域工程所要达到的目标和实施这项工作的原因。

第一个阶段――定义领域分析的范围

本阶段集中于在感兴趣的领域之内有哪些事物。本步骤的输出包括感兴趣的领域与其它领域间的关系图、领域中应用的用户的需求列表、作为信息源的人的列表、作为信息源的项目的列表。

第二个阶段――定义特定于领域的元素并限制范围

本阶段的目标是编辑特定于领域的术语字典和同义词词典。在高级别块图的基础上,增加更多的细节,其中重点是在领域中的应用之间识别共同性,分离差异性。应特别重视对领域中的基本元素进行标准化和分类。

第三个阶段――定义和精化特定于领域的设计和实现约束

DSSA方法的一个特色是区别“需求”和“约束”。在DSSA方法中,“需求”描述一组在问题空间中的特性。“约束”描述一组在解空间中的特性。本阶段的目标是描述解空间中的特征。不但要识别约束,而且要记录它们对设计和实现决定的影响,以及对处理它们时产生的问题的讨论。

第四个阶段――开发领域构架/模型 领域设计

本阶段的目标是提出一般的构架,并说明构成构架的模块或构件的语法和语义。为满足前面识别的需求和约束,在一个应用领域中可能需要设计几个DSSA。

第五个阶段――产生或搜集复用产品 领域实现

本阶段的目标是为DSSA充实构件使得它可以被用来产生问题域中的新应用。参与这个阶段的领域专家是开发过这个领域中应用的软件工程师。他们最适合识别现有的可复用构件,或可以作为产生可复用构件的基础的构件。

Snap1

参见:

http://read.chaoxing.com/ebook/read_11401987.html P93

Continue reading DSSA(Domain Specific Software Architecture) 特定领域软件架构

正交软件体系结构

正交软件体系结构由组织层和线索的构件构成。层是由一组具有相同抽象级别的构件构成。线索是子系统的特例,它是由完成不同层次功能的构件组成(通过相互调用来关联),每一条线索完成整个系统中相对独立的一部分功能。每一条线索的实现与其他线索的实现无关或关联很少,在同一层中的构件之间是不存在相互调用的。

如果线索是相互独立的,即不同线索中的构件之间没有相互调用,那么这个结构就是完全正交的。从以上定义,我们可以看出,正交软件体系结构是一种以垂直线索构件族为基础的层次化结构,其基本思想是把应用系统的结构按功能的正交相关性,垂直分割为若干个线索(子系统),线索又分为几个层次,每个线索由多个具有不同层次功能和不同抽象级别的构件构成。各线索的相同层次的构件具有相同的抽象级别。因此,我们可以归纳正交软件体系结构的主要特征如下:

(1)正交软件体系结构由完成不同功能的n(n > 1)个线索(子系统)组成;

(2)系统具有m(m > 1)个不同抽象级别的层;

(3)线索之间是相互独立的(正交的);

(4)系统有一个公共驱动层(一般为最高层,下图的第一层)和公共数据结构(一般为最低层,下图第五层)。

对于大型的和复杂的软件系统,其子线索(一级子线索)还可以划分为更低一级的子线索(二级子线索),形成多级正交结构。正交软件体系结构的框架如图1所示。

xjj

图1是一个三级线索、五层结构的正交软件体系结构框架图,在该图中,ABDFK组成了一条线索,ACEJK也是一条线索。因为B、C处于同一层次中,所以不允许进行互相调用;H、J处于同一层次中,也不允许进行互相调用。一般来讲,第五层是一个物理数据库连接构件或设备构件,供整个系统公用。

在软件进化过程中,系统需求会不断发生变化。在正交软件体系结构中,因线索的正交性,每一个需求变动仅影响某一条线索,而不会涉及到其他线索。这样,就把软件需求的变动局部化了,产生的影响也被限制在一定范围内,因此实现容易。

正交软件体系结构具有以下优点:

(1)结构清晰,易于理解。正交软件体系结构的形式有利于理解。由于线索功能相互独立,不进行互相调用,结构简单、清晰,构件在结构图中的位置已经说明它所实现的是哪一级抽象,担负的是什么功能。

(2)易修改,可维护性强。由于线索之间是相互独立的,所以对一个线索的修改不会影响到其他线索。因此,当软件需求发生变化时,可以将新需求分解为独立的子需求,然后以线索和其中的构件为主要对象分别对各个子需求进行处理,这样软件修改就很容易实现。系统功能的增加或减少,只需相应的增删线索构件族,而不影响整个正交体系结构,因此能方便地实现结构调整。

(3)可移植性强,重用粒度大。因为正交结构可以为一个领域内的所有应用程序所共享,这些软件有着相同或类似的层次和线索,可以实现体系结构级的重用。

 

正交软件体系结构设计过程
正交软件体系结构设计过程分为2 个阶段:原型阶段和演化阶段。开发原型的主要目的是为了确认需求,设计体系结构基线。软件开发小组需要建立一系列原型,与用户一起讨论和评审。原型有助于需求的清晰化,同时也有助于进一步分析系统的可行性,更清楚地认识系统,完善对系统的理解。
原型开发的第1 个迭代周期的目标并不明确具体。为了提高开发效率,缩短开发周期,可将开发人员分为2 组,一组负责界面原型;一组负责业务模型。界面原型是界面层次的水平模型, 没有真正地实现系统功能。业务模型可用UML(统一建模语言)图表述,最终生成系统的SRS(软件需求说明书)。原型开发的第2 个迭代周期的任务是设计和建立正交软件体系结构。本次迭代大致分为6 个阶段[5]:
(1)标识构件:为系统生成逻辑结构,比如生成类图、包、构件等。

(2)提出软件体系结构模型:选择合适的软件体系结构风格是必要的,在此基础上,开发人员通过软件体系结构模型,熟悉了软件体系结构属性等方面的要求,虽然这个模型可能存在错误,但它为整个系统的演化确立了目标。
(3)把已标识的构件映射到软件体系结构中:把第(1)阶段标识的构件映射到体系结构中。
(4)分析构件,建立连接件:为了把已标识的构件集成到体系结构中,必须分析所有构件的关系,可结合UML 图表述,比如活动图、时序图等。
(5)生成软件体系结构。关键构件决定软件体系结构,主要是在第(4)阶段的基础上做精化。
(6)正交化。通过以上几个阶段产生的软件体系结构不一定满足正交性(同一层次的构件之间可能存在相互调)。通过从左至右、自顶向下地增加、删除、拆分合并构件,把不满足正交性的线索和构件正交化。

正交软件体系结构的演化控制

正交软件体系结构的重用粒度大。在演化控制和新系统开发过程中,逐渐建立线索库和构件库,如果有新需求,可以重用库中的线索和构件,以提高开发效率。因此,对新需求可以通过在原软件体系结构基础之上新
增、修改、删除线索来完成。对线索中的构件进行修改和添加时按照从左到右、自顶向下的原则,先修改高级构件,然后根据需要修改被其调用的构件。基于正交软件体系结构开发的软件,必须按照正交的演化步骤修改。其实演化过程也是正交软件体系结构的验证和完善过程。

(1)变动需求归类。将变动对应到相应的线索和构件上,对于新需求需要建立的新的线索和构件,都先作好标记。
(2)制定演进计划。在对原有模型改动前,需要制定一个周密的演进计划,作为后续开发工作的指南。

(3)增加、删除和修改构件。在第(1)步的基础上,开发人员依据演进计划对线索和构件进行增删改。
(4)更新构件关系。参照正交化简易算法对构件关系做必要调整,使线索和构件满足正交性。
(5)产生演化后的新软件体系结构。在原有系统上的修改必须映射到原有体系结构上,作为后续开发的基础。
(6)对以上修改作阶段性技术评审。
(7)迭代演进。迭代地进行第(3)步~第(6)步,直到新体系结构足够详细。

 

参考:

http://se.csai.cn/NewTech/No046.htm 张友生

http://read.chaoxing.com/ebook/read_11401987.html 张友生 p72

http://ishare.iask.sina.com.cn/f/8438626.html 基于正交软件体系结构的CRM 系统 汪保杰,王如龙 (湖南大学软件学院,长沙 410082)

Continue reading 正交软件体系结构

过程控制(Process Control),开环,闭环

适用情况

对被控制目标的检测、计算和控制的不断循环执行,以实时地对环境的变化做出反应。

目的

将系统的指定输出值保持在set points(设定点)上

概念

过程变量(Process variable):反映系统运行状态
输入变量(Input variable):系统输入的变量
控制变量(Controlled variable):系统输出的变量
设定点(Set point):系统运行所设定的目标。

Open-loop system 开环

输出的控制只考虑目标设定,既不考虑输入,也不考虑输出变化的控制

Snap3

这个开环加热系统会使温度不断升高

Closed-loop system 闭环(control loop控制环路)

把输出执行的状况经检测后,作为输入,经过与设定目标的组合,将其值经过计算后产生输出控制信息的系统;

Snap4

这个闭环加热系统会使温度最终保持到一定温度。

 

闭环系统分为反馈控制和前馈控制:

反馈控制:feedback

负反馈:目标与被测输入的值作为控制依据,使得输出稳定在设定目标上;
正反馈:目标与被测输入的作为控制依据,无法使输出稳定在设定目标上;

Snap5>负反馈:反馈信息与控制信息的作用方向相反,因而可以纠正控制信息的效应。

Continue reading 过程控制(Process Control),开环,闭环

黑板系统Blackboard

适用问题:

没有直接的算法,多种方法都可能解决问题;
找不到确定的求解策略(先做什么?后做什么?);
问题没有唯一的解答;
每个求解步骤中都可能产生多个可能的解,需要寻求最佳或可接受解。
需要多个领域的专门知识协作解决。
例如:自然语言处理、语音处理、模式识别、图像处理等

 

基本思想

独立程序(知识源)集合
对公共数据结构进行协同操作;
每个程序专门解决一个子问题;
不存在互相调用;
不存在事先确定的操作顺序;
中心控制部件
反映整体问题求解过程的状态;
状态变化时,中心控制部件对信息进行评价,协调各程序工作;
可以试探调用各个可能的求解算法;

 

结构:

Snap1

Snap2

黑板部件

中心共享数据;

存储系统的初始输入、问题求解的局部和中间结果,以及反映问题求解状态的数据;

为知识源访问这些元素提供读和写的操作;

知识源:独立的求解程序

独立, 依赖于应用的问题求解子系统;

它们之间的交互仅能通过读写黑板的数据和状态来实现(不直接交互);

包含条件和操作两个部分:条件部分评估求解过程的当前状态,即黑板上的当前内容。当条件满足时,执行相应的操作;

操作引起黑板数据状态的变化;

控制组件

监视黑板上信息和状态的变化,并根据变化决定采取的行动。

根据知识调度策略,调度知识源对条件的匹配评价,并采取相应的操作。

 

应用:

起源于人工智能(Artificial Intelligence, AI)领域;
典型应用领域:自然语言处理、语音处理、模式识别、图像处理等;
HEARSAY-II (自然语言处理系统,系统输入是自然语言的语音信号,经过语音音节、词汇、句法和语义分析后,获得用户对数据库的查询请求)
HASP/SIAP(在特定海域根据声纳阵列信号探测敌方潜艇出没的系统)
CRYALIS(根据X射线探测数据推测蛋白质分子三维结构的系统)
TRICERO(在分布环境下监视飞机活动的系统)

 

优点:

应用于没有确定解方法的问题:用不同的算法进行试验;
支持可更改性和可维护性(知识源、控制算法、中心数据分离);
知识可重用:知识来自某类任务的独立专家;
支持容错性和健壮性:中间结论只是下一步的假设条件;

缺点:

测试困难(没有一个确定的算法);
难以保证最优的解决方案;
效率低;
开发工作昂贵;
缺少对并行机制的支持(黑板上数据的访问必须同步);

Continue reading 黑板系统Blackboard

各Nosql数据库特点

下面从http://www.cnw.com.cn/software-database/htm2011/20110808_230968.shtml 摘抄。自己补充了些!

Cassandra

Facebook需要一个更快、更廉价的方式处理数以亿计的状态更新。因此,他们启动了这一项目,并最终将其移植到了Apache上,这就是 Cassandra。在Apache上,它能够得到许多社区的帮助。目前Cassandra已经不再仅仅用于Facebook,许多为该项目工作的程序员来自其他公司。如今DataStax.com公司正致力于为Cassandra提供商业支持。

Cassandra是一个跟踪如Facebook上的状态更新等大量数据的优秀工具。这一工具可以帮助创建一个计算机网络,网络上的所有计算都拥有相同的数据。这意味着每台机器都可以被相互替代。一旦数据通过P2P网络节点,它们的一致性就会丧失。关键是“最终一致”,而不是“时刻一致”。如果你发现你的状态更新在Facebook消失一下,然后又重新出现,你就明白这意味着什么。

CouchDB

CouchDB被用于存储文档,最大的变化在于查询。取代一些基本查询结构的是,CouchDB通过两种功能来搜索文档,以导航并减少数据。一个编排文档格式,另一个确定包括哪些文档。一名清楚存储程序的、熟练的甲骨文数据库操作人员会做同样的事情。但是,导航和减少结构将让基层程序员大开眼界。目前,AJAX开发人员已经能够编写出相当复杂的搜索程序,这些程序可以写入一些更为复杂的逻辑。

CouchDB的核心由Erlang编写。但是API和界面却是JavaScript或是JSON。

JavaScript API仅仅是加强CouchDB对普通Web开发人员的吸引力。这些开发人员可以将文档,甚至整个网站存储在数据库中。

Restful的方式确实很有特点,可以直接使用javascript调用了。但不知会不会有人这样用?注意json解析的开销。

参见:

http://www.oschina.net/question/5189_3939 

MongoDB

MongoDB是一个关于JavaScript如何掌握世界的典型。程序获取格式化为JSON格式的数据,然后将它们存储起来。查询是 JavaScript的基础功能,这与使用浏览器控制台没有太大的差别,只是简化了一些东西。最大的区别是,MongoDB会为你的数据库创建索引,如果索引被正确地创建,那么反馈查询结果的速度将会很快。另外,该数据库可以与大量的其他工具协同工作。

Redis

与CouchDB和MongoDB一样,Redis用于存储文档和由键值对组织的文件。与其他的NoSQL数据库不同的是,其存储的不仅仅是字符串或是数字,其中还包括分类和未分类的字符串集合作为与键关联的值。这一特点使其可以为用户提供更为复杂的集合操作。用户不再需要下载数据计算交集,因为 Redis能够在服务器上做这一工作。

Redis催生了一些没有过多编码的简单结构。Luke Melia通过创建一个全新集合追踪其网站上的访问者。最后五个集合的并集可确定那些当时在线的访问者。这一带有好友列表的并集的交集可以生成在线好友列表。这类集操作拥有许多应用。Redis集群为我们揭示了其强大的功能。

Redis将数据存储在内存中,仅记录下每次变化的列表。由于其具有功能强大,可写入硬盘中写入的缓存,许多人甚至并不将Redis称之为数据库。由于Redis只需要在数据读入内存之前进行等待,因此速度要比传统数据库快很多,但是不适时机的断电导致其存在潜在的应用风险。

Redis 对内存很依赖,但也提供dump操作,然而使用要小心。

参见:

http://gly199.iteye.com/blog/1050606

http://zhaohaolin.iteye.com/blog/1122876

http://www.iteye.com/topic/808293

 

Riak

Riak是设计最为精巧的数据存储,其拥有其他产品的绝大多数功能,并且对副本有着更多的控制。尽管基本结构存储着多对键值,但是恢复它们和确保它们的一致性的选项很多。比如写入操作包括了要求Riak确认数据何时被成功传输到集群其他机器上的参数。如果你不希望仅信任一台机器,在发送确认信息前,你可以要求其等待,直至两台、三台或是54台机器写入了数据。这也是该团队能够打出“最终一致性不是数据遗失的借口”这一口号的原因。

数据自身并不仅仅写在硬盘中。这只是其中的一个选项,但是并不是主要的。Riak使用的是插件式存储引擎(默认为Bitcask)。该引擎用它们自己的内部格式将数据写入硬盘中。此外,它还有多种选项,包括MySQL使用的InnoDB版本。Riak的集群能力可以确保所有一切都万无一失。

在抓取数据时,Riak会消除任何可能出现的错误。如果在两个节点的目标版本不同,那么Riak会选择最新升级版本,或是将两个目标版本都反馈回来,交由你的客户端代码做决定。对于发现数据中存在的潜在错误,这是一个非常有用的选项。

Neo4J

在我们所提到的几个应用之中,Neo4J是最具特色一个。它可以用于存储图而不是数据。它对图数据是以节点和边(关系)模式进行存储。社交网络应用是它的强项。Neo4J非常的新,开发人员仍然在寻找更好的算法。在新版本中,开发人员开始尝试新的缓存策略:由于Neo4J能够缓存节点信息,因此搜索算法运行速度很快。开发人员还为其增加了类似XSL模式匹配的新查询语言。

Neo4J受到了Neo Technology公司的支持。该公司推出的商业用版本数据库拥有备份、故障恢复和复杂监视功能。

数据以图结构存储在硬盘中,而不是表的形式。图是指数学图论中的那个图,不是指多媒体中的图形图像。

它是支持事务的!

 

FlockDB

有些人抱怨代码过于复杂,他们认为Neo4J过于复杂,超出了他们的需求。那么他们不妨尝试一下FlockDB。FlockDB是一个实时的、分布式的数据库,是Twitter网站基础设施的核心组件。Twitter在一年多以前推出了基于Apache许可证的开源项目FlockDB。如果你想建立起自己的Twitter,那么你需要下载Gizzard工具,其作用是分割跨多个Flock的数据。由于FlockDB存储两个节点之间的关联,我们中的许多人将FlockDB称为“图数据库”。不过,也有人认为这一称呼仅适用于像Neo4J这类复杂的工具。

如何选择NoSQL数据库?

关于如何选择NoSQL数据库这一问题并不好回答。许多IT部门会随便选一个,有时候他们选择的数据库并不能满足他们的需求。由于优秀的开发人员希望能够平衡项目的优势、商业支持的可获得性以及文档质量,因此选择一个最佳数据库是十分困难的。

这些数据库都存储了大堆的键和值,但是真正的问题是如何在服务器中合理分配负载,如何将变更传递给它们。另一个问题是托管。云服务能够替你完成所有的维护,这一点非常具有吸引力。与SQL数据库相比,NoSQL数据库的数据交换更为困难。目前全球还没有一个标准的查询语言,也没有一个像JDBC一样的大型虚拟层。尽管如此,NoSQL数据库已经对我们具备了足够的吸引力。

 

各nosql比较:

http://www.distream.org/?p=10

Continue reading 各Nosql数据库特点

kanban与敏捷

看了半天,先来来句精髓的:

准时与保证供应质量是不容置疑的.

精益管理

引言:

Kanban一词的原意是指日本小酒馆服务员(酒保,多为女性)应该在最合适的时候把热酒送上,也就是在前一壶酒刚刚喝完的时候即时将新的一壶热酒送上,送早了酒就放凉了,送晚了使酒兴正浓的客人扫兴。因此,它指的是准时提供服务。这种服务是在顾客需要的时候服务员为顾客即时提供质量合格的商品与服务,不早也不迟,正好是顾客需要的时刻与质量。因此,可以肯定地讲把Kanban译成“看板”是错误的,英文采取音译或译成“库存控制”就是佐证。其错误的原因是错误地把Kanban理解为表示或可视化的库存台帐,或者进度表的“看板”。在实践中可以看到,国内有的企业不惜动用财政在许多工位放上黑板,或做了许多由箭头标示的进度表或表达库存的彩色牌子,问他们为什么这样做,回答说:“这就是‘看板’管理”,令人哭笑不得。
所以,应该恢复Kanban提供准时合格产品与服务的本意——准时的库存控制。在丰田生产体系中它是可视化了的准时工作指令系统和可视化的生产记录与控制系统,而非具体特指“工票”、“进度表”、或“库存台帐”,其载体也可以是电子Kanban表或物件,如零件的容器等。其本质的涵义——准时与保证供应质量是不容置疑的。所以,2001年R.B.Chase就曾经明确指出,Kanban系统中的图卡表达了可视的制作与处理产品的拉式工作指令。从理论上讲,美国运用的Kanban与日本的是相同的。在实践上,差别是美国企业把它修改成适应美国式的“顶层一底层”管理方式、工人个性化和缺乏就近供应商等特定的方式。特别是在美国并不实行终身雇佣制,对工人必须有比日本更多的激励措施。

再来看实际的软件开发问题:http://www.cnblogs.com/caicainiao/archive/2011/08/11/2135144.html

然后看看这些:

将看板应用于软件开发:从敏捷到精益
http://www.infoq.com/cn/articles/hiranabe-lean-agile-kanban

用“看板图”实现敏捷项目的可视化
http://www.infoq.com/cn/articles/agile-kanban-boards

精益管理在开发项目上三大精髓
http://news.csdn.net/n/20080519/116069.html

软件开发中的准时化生产
http://news.csdn.net/n/20080519/116067.html

再来看看这篇总结 http://blog.vsharing.com/sharptoolbox/A720863.html

 

我来总结一下:

要让开发流程高效,要掌握好任务粒度,每个任务要保证质量和准时完成。

任务粒度掌握不好,就会造成浪费。

 

说白了,这些道理很简单,不需要发明这样那样的术语,方法学。再好的方法,只不过是炒现饭。使用的人不行,那还是一团糟。

Continue reading kanban与敏捷

【转】[翻译] [RabbitMQ+Python入门经典] 兔子和兔子窝

转自 http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/

 

2010年3月14日 Zhang Cong 发表评论 阅读评论

RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解……

原文:http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

兔子和兔子窝

当时我们的动机很简单:从生产环境的电子邮件处理流程当中分支出一个特定的离线分析流程。我们开始用的MySQL,将要处理的东西放在表里面,另一个程序从中取。不过很快,这种设计的丑陋之处就显现出来了…… 你想要多个程序从一个队列当中取数据来处理?没问题,我们硬编码程序的个数好了……什么?还要能够允许程序动态地增加和减少的时候动态进行压力分配?

是的,当年我们想的简单的东西(做一个分支处理)逐渐变成了一个棘手的问题。以前拿着锤子(MySQL)看所有东西都是钉子(表)的年代是多么美好……

在搜索了一下之后,我们走进了消息队列(message queue)的大门。不不,我们当然知道消息队列是什么,我们可是以做电子邮件程序谋生的。我们实现过各种各样的专业的,高速的内存队列用来做电子邮件处理。我们不知道的是那一大类现成的、通用的消息队列(MQ)服务器——无论是用什么语言写出的,不需要复杂的装配的,可以自然的在网络上的应用程序之间传送数据的一类程序。不用我们自己写?看看再说。

让大家看看你们的Queue吧……

过去的4年里,人们写了有好多好多的开源的MQ服务器啊。其中大多数都是某公司例如LiveJournal写出来用来解决特定问题的。它们的确不关心上面跑的是什么类型的消息,不过他们的设计思想通常是和创建者息息相关的(消息的持久化,崩溃恢复等通常不在他们考虑范围内)。不过,有三个专门设计用来做及其灵活的消息队列的程序值得关注:

Apache ActiveMQ 曝光率最高,不过看起来它有些问题,可能会造成丢消息。不可接受,下一个。

ZeroMQ 和 RabbitMQ 都支持一个开源的消息协议,成为AMQP。AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的Message Queue (IBM和Tibco)竞争,很好。不过ZeroMQ不支持消息持久化和崩溃恢复,不太好。剩下的只有RabbitMQ了。如果你不在意消息持久化和崩溃恢复,试试ZeroMQ吧,延迟很低,而且支持灵活的拓扑。[特点:支持消息持久化的开源项目]

剩下的只有这个吃胡萝卜的家伙了……

当我读到它是用Erlang写的时候,RabbitMQ震了我一下。Erlang 是爱立信开发的高度并行的语言,用来跑在电话交换机上。是的,那些要求6个9的在线时间的东西。在Erlang当中,充斥着大量轻量进程,它们之间用消息传递来通信。听起来思路和我们用消息队列的思路是一样的,不是么?

而且,RabbitMQ支持持久化。是的,如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会回来。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那样,它可以和Python无缝结合。除此之外,RabbitMQ的文档相当的……恐怖。如果你懂AMQP,这些文档还好,但是有多少人懂AMQP?这些文档就像MySQL的文档假设你已经懂了SQL一样……不过没关系啦。

好了,废话少说。这里是花了一周时间阅读关于AMQP和关于它如何在RabbitMQ上工作的文档之后的一个总结,还有,怎么在Python当中使用。

开始吧

AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。如果这就够了,那现在就可以开始了。

交换机,队列,还有绑定……天哪!

刚开始我思维的列车就是在这里脱轨的…… 这些鬼东西怎么结合起来的?

队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过。你可以将一个队列配置成这样的:一旦消息进入这个队列,biu~,它就烟消云散了。这个有点跑题了……

需要记住的是,队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ就会起来拍拍他的脑袋,笑一笑,然后忽略这个请求。因此你可以将消息队列的配置写在应用程序的代码里面。这个概念不错。

OK,你已经创建并且连接到了你的队列,你的消费者程序正在百无聊赖的敲着手指等待消息的到来,敲啊,敲啊…… 没有消息。发生了什么?你当然需要先把一个消息放进队列才行。不过要做这个,你需要一个交换机(Exchange)……

交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes),例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。先不讨论这个,我们有点超前了。

你的消费者程序要负责创建你的交换机(复数)。啥?你是说你可以有多个交换机?是的,这个可以有,不过为啥?很简单,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8 核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。

OK,你已经创建了一个交换机。但是他并不知道要把消息送到哪个队列。你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the- big-dude”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。

现在复杂的东西来了:交换机有多种类型。他们都是做路由的,不过接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?

Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog

Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:

Source: Red Hat Messaging Tutorial: 1.3 Topic Exchange

持久化这些小东西们

你花了大量的时间来创建队列、交换机和绑定,然后,砰~服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?

放松~如果你是用默认参数构造的这一切的话,那么,他们,都,biu~,灰飞烟灭了。是的,RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?

队列和交换机有一个创建时候指定的标志durable,直译叫做坚固的。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?

但是首先一个问题是,你真的需要消息是持久的吗?对于一个需要在重启之后回复的消息来说,它需要被写入到磁盘上,而即使是最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,你更看重的是消息处理的速度,那么不要使用持久化的消息。不过对于我们@DigiTar来说,持久化很重要。

当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样(我们后面会讨论如何用Python搞定)。简单的说,就是将 Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:

  1. 将交换机设成 durable。
  2. 将队列设成 durable。
  3. 将消息的 Delivery Mode 设置成2 。

就这样,不是很复杂,起码没有造火箭复杂,不过也有可能犯点小错误。

下面还要罗嗦一个东西……绑定(Bindings)怎么办?我们无法在创建绑定的时候设置成durable。没问题,如果你绑定了一个 durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除。

注意两点:

  • RabbitMQ 不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。
  • 一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。
开始喂蛇了~

【译注】说喂蛇是因为Python的图标是条蛇。

AMQP的一个空白地带是如何在Python当中使用。对于其他语言有一大坨材料。

但是对Python老兄来说,你需要花点时间来挖掘一下。所以我写了这个,这样别的家伙们就不需要经历我这种抓狂的过程了。

首先,我们需要一个Python的AMQP库。有两个可选:

根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的,txAMQP可以保证用异步IO构建超高性能的AMQP程序。但是Twisted编程本身就是一个很大的主题……因此清晰起见,我们打算用 py-amqplib。更新:请参见Esteve Fernandez关于txAMQP的使用和代码样例的回复

AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者,你可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配 channel标识,以便防止冲突。

现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程序,他会创建一个叫做“po_box”的队列和一个叫“sorting_room”的交换机:

chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)

这段代码干了啥?首先,它创建了一个名叫“po_box”的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=False)。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。

(你可以注意到了另一个标志,称为“exclusive”。如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的)。

还有另一个交换机声明,创建了一个名字叫“sorting_room”的交换机。auto_delete和durable的含义和队列是一样的。但是,.excange_declare() 还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanout, directtopic.

到此为止,你已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过我们需要创建一个绑定,把它们连接起来。

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,

routing_key=”jason”)

这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。

现在,你有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调。

def recv_callback(msg):     print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")

chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一直。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。

需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ 会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。参见chan.basic_get() 的实例代码。

好了,这就是消费者的全部代码。(下载:amqp_consumer.py

不过没有人发送消息的话,要消费者何用?所以需要一个生产者。下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。

剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:

chan.close()
conn.close()

很简单吧。(下载:amqp_publisher.py

来真实地跑一下吧……

现在我们已经写好了生产者和消费者,让他们跑起来吧。假设你的RabbitMQ在localhost上安装并且运行。

打开一个终端,执行python ./amqp_consumer.py让消费者运行,并且创建队列、交换机和绑定。

然后在另一个终端运行python ./amqp_publisher.py “AMQP rocks.” 。如果一切良好,你应该能够在第一个终端看到输出的消息。

付诸使用吧

我知道这个教程是非常粗浅的关于AMQP/RabbitMQ和如何使用Python访问的教程。希望这个可以说明所有的概念如何在Python当中被组合起来。如果你发现任何错误,请联系原作者([email protected]) 【译注:如果是翻译问题请联系译者】。同时,我很高兴回答我知道的问题。【译注:译者也是一样的】。接下来是,集群化(clustering)!不过我需要先把它弄懂再说。

注:关于RabbitMQ的知识我主要来自这些来源,推荐阅读:

–完–

Continue reading 【转】[翻译] [RabbitMQ+Python入门经典] 兔子和兔子窝

Java Sound API,格式转换

转换音频格式本以为java很难实现,但是发现Java sound Api还是比较强大的,它可以读取音频设备输入,播放音频,还能进行音频格式转换,当然,这种转换不是所有的格式都支持,下面代码将音频格式从8bit 8k pcm wav转换为16bit 8k pcm wav。

this code show how to convert audio from 8bit 8k pcm to 16 bit 8k pcm in pure java:

> public static void convert(InputStream in, OutputStream out) throws IOException, UnsupportedAudioFileException {

Continue reading Java Sound API,格式转换

Pagination


Total views.

© 2013 - 2019. All rights reserved.

Powered by Hydejack v6.6.1